Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/sixlowpan.py: 68%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

579 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Cesar A. Bernardini <mesarpe@gmail.com> 

5# Intern at INRIA Grand Nancy Est 

6# Copyright (C) Gabriel Potter <gabriel[]potter[]fr> 

7""" 

86LoWPAN Protocol Stack 

9====================== 

10 

11This implementation follows the next documents: 

12 

13- Transmission of IPv6 Packets over IEEE 802.15.4 Networks: RFC 4944 

14- Compression Format for IPv6 Datagrams in Low Power and Lossy 

15 networks (6LoWPAN): RFC 6282 

16- RFC 4291 

17 

18+----------------------------+-----------------------+ 

19| Application | Application Protocols | 

20+----------------------------+------------+----------+ 

21| Transport | UDP | TCP | 

22+----------------------------+------------+----------+ 

23| Network | IPv6 | 

24+----------------------------+-----------------------+ 

25| | LoWPAN | 

26+----------------------------+-----------------------+ 

27| Data Link Layer | IEEE 802.15.4 MAC | 

28+----------------------------+-----------------------+ 

29| Physical | IEEE 802.15.4 PHY | 

30+----------------------------+-----------------------+ 

31 

32Note that: 

33 

34 - Only IPv6 is supported 

35 - LoWPAN is in the middle between network and data link layer 

36 

37The Internet Control Message protocol v6 (ICMPv6) is used for control 

38messaging. 

39 

40Adaptation between full IPv6 and the LoWPAN format is performed by routers at 

41the edge of 6LoWPAN islands. 

42 

43A LoWPAN support addressing; a direct mapping between the link-layer address 

44and the IPv6 address is used for achieving compression. 

45 

46Known Issues: 

47 * Unimplemented context information 

48 * Unimplemented IPv6 extensions fields 

49""" 

50 

51import socket 

52import struct 

53 

54from scapy.compat import chb, orb, raw 

55from scapy.data import ETHER_TYPES 

56 

57from scapy.packet import Packet, bind_layers, bind_top_down 

58from scapy.fields import ( 

59 BitEnumField, 

60 BitField, 

61 BitLenField, 

62 BitScalingField, 

63 ByteEnumField, 

64 ByteField, 

65 ConditionalField, 

66 FieldLenField, 

67 MultipleTypeField, 

68 PacketField, 

69 PacketListField, 

70 StrFixedLenField, 

71 XBitField, 

72 XLongField, 

73 XShortField, 

74) 

75 

76from scapy.layers.dot15d4 import Dot15d4Data 

77from scapy.layers.inet6 import ( 

78 IP6Field, 

79 IPv6, 

80 _IPv6ExtHdr, 

81 ipv6nh, 

82) 

83from scapy.layers.inet import UDP 

84from scapy.layers.l2 import Ether 

85 

86from scapy.utils import mac2str 

87from scapy.config import conf 

88from scapy.error import warning 

89 

90from scapy.packet import Raw 

91from scapy.pton_ntop import inet_pton, inet_ntop 

92from scapy.volatile import RandShort 

93 

94ETHER_TYPES[0xA0ED] = "6LoWPAN" 

95 

96LINK_LOCAL_PREFIX = b"\xfe\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" # noqa: E501 

97 

98 

99########## 

100# Fields # 

101########## 

102 

103 

104class IP6FieldLenField(IP6Field): 

105 __slots__ = ["length_of"] 

106 

107 def __init__(self, name, default, length_of=None): 

108 IP6Field.__init__(self, name, default) 

109 self.length_of = length_of 

110 

111 def addfield(self, pkt, s, val): 

112 """Add an internal value to a string""" 

113 tmp_len = self.length_of(pkt) 

114 if tmp_len == 0: 

115 return s 

116 internal = self.i2m(pkt, val)[-tmp_len:] 

117 return s + struct.pack("!%ds" % tmp_len, internal) 

118 

119 def getfield(self, pkt, s): 

120 tmp_len = self.length_of(pkt) 

121 assert tmp_len >= 0 and tmp_len <= 16 

122 if tmp_len <= 0: 

123 return s, b"" 

124 return (s[tmp_len:], 

125 self.m2i(pkt, b"\x00" * (16 - tmp_len) + s[:tmp_len])) 

126 

127 

128################# 

129# Basic 6LoWPAN # 

130################# 

131# https://tools.ietf.org/html/rfc4944 

132 

133 

134class LoWPANUncompressedIPv6(Packet): 

135 name = "6LoWPAN Uncompressed IPv6" 

136 fields_desc = [ 

137 BitField("_type", 0x41, 8) 

138 ] 

139 

140 def default_payload_class(self, pay): 

141 return IPv6 

142 

143# https://tools.ietf.org/html/rfc4944#section-5.2 

144 

145 

146class LoWPANMesh(Packet): 

147 name = "6LoWPAN Mesh Packet" 

148 deprecated_fields = { 

149 "_v": ("v", "2.4.4"), 

150 "_f": ("f", "2.4.4"), 

151 "_sourceAddr": ("src", "2.4.4"), 

152 "_destinyAddr": ("dst", "2.4.4"), 

153 } 

154 fields_desc = [ 

155 BitField("reserved", 0x2, 2), 

156 BitEnumField("v", 0x0, 1, ["EUI-64", "Short"]), 

157 BitEnumField("f", 0x0, 1, ["EUI-64", "Short"]), 

158 BitField("hopsLeft", 0x0, 4), 

159 MultipleTypeField( 

160 [(XShortField("src", 0x0), lambda pkt: pkt.v == 1)], 

161 XLongField("src", 0x0) 

162 ), 

163 MultipleTypeField( 

164 [(XShortField("dst", 0x0), lambda pkt: pkt.v == 1)], 

165 XLongField("dst", 0x0) 

166 ) 

167 ] 

168 

169 

170# https://tools.ietf.org/html/rfc4944#section-10.1 

171# This implementation is NOT RECOMMENDED according to RFC 6282 

172 

173 

174class LoWPAN_HC2_UDP(Packet): 

175 name = "6LoWPAN HC1 UDP encoding" 

176 fields_desc = [ 

177 BitEnumField("sc", 0, 1, ["In-line", "Compressed"]), 

178 BitEnumField("dc", 0, 1, ["In-line", "Compressed"]), 

179 BitEnumField("lc", 0, 1, ["In-line", "Compressed"]), 

180 BitField("res", 0, 5), 

181 ] 

182 

183 def default_payload_class(self, payload): 

184 return conf.padding_layer 

185 

186 

187def _get_hc1_pad(pkt): 

188 """ 

189 Get LoWPAN_HC1 padding 

190 

191 LoWPAN_HC1 is not recommended for several reasons, one 

192 of them being that padding is a mess (not 8-bit regular) 

193 We therefore add padding bits that are not in the spec to restore 

194 8-bit parity. Wireshark seems to agree 

195 """ 

196 length = 0 # in bits, of the fields that are not //8 

197 if not pkt.tc_fl: 

198 length += 20 

199 if pkt.hc2: 

200 if pkt.nh == 1: 

201 length += pkt.hc2Field.sc * 4 

202 length += pkt.hc2Field.dc * 4 

203 return (-length) % 8 

204 

205 

206class LoWPAN_HC1(Packet): 

207 name = "LoWPAN_HC1 Compressed IPv6" 

208 fields_desc = [ 

209 # https://tools.ietf.org/html/rfc4944#section-10.1 

210 ByteField("reserved", 0x42), 

211 BitEnumField("sp", 0, 1, ["In-line", "Compressed"]), 

212 BitEnumField("si", 0, 1, ["In-line", "Elided"]), 

213 BitEnumField("dp", 0, 1, ["In-line", "Compressed"]), 

214 BitEnumField("di", 0, 1, ["In-line", "Elided"]), 

215 BitEnumField("tc_fl", 0, 1, ["Not compressed", "zero"]), 

216 BitEnumField("nh", 0, 2, {0: "not compressed", 

217 1: "UDP", 

218 2: "ICMP", 

219 3: "TCP"}), 

220 BitEnumField("hc2", 0, 1, ["No more header compression bits", 

221 "HC2 Present"]), 

222 # https://tools.ietf.org/html/rfc4944#section-10.2 

223 ConditionalField( 

224 MultipleTypeField( 

225 [ 

226 (PacketField("hc2Field", LoWPAN_HC2_UDP(), 

227 LoWPAN_HC2_UDP), 

228 lambda pkt: pkt.nh == 1), 

229 # TODO: ICMP & TCP not implemented yet for HC1 

230 # (PacketField("hc2Field", LoWPAN_HC2_ICMP(), 

231 # LoWPAN_HC2_ICMP), 

232 # lambda pkt: pkt.nh == 2), 

233 # (PacketField("hc2Field", LoWPAN_HC2_TCP(), 

234 # LoWPAN_HC2_TCP), 

235 # lambda pkt: pkt.nh == 3), 

236 ], 

237 StrFixedLenField("hc2Field", b"", 0), 

238 ), 

239 lambda pkt: pkt.hc2 

240 ), 

241 # IPv6 header fields 

242 # https://tools.ietf.org/html/rfc4944#section-10.3.1 

243 ByteField("hopLimit", 0x0), 

244 IP6FieldLenField("src", "::", 

245 lambda pkt: (0 if pkt.sp else 8) + 

246 (0 if pkt.si else 8)), 

247 IP6FieldLenField("dst", "::", 

248 lambda pkt: (0 if pkt.dp else 8) + 

249 (0 if pkt.di else 8)), 

250 ConditionalField( 

251 ByteField("traffic_class", 0), 

252 lambda pkt: not pkt.tc_fl 

253 ), 

254 ConditionalField( 

255 BitField("flow_label", 0, 20), 

256 lambda pkt: not pkt.tc_fl 

257 ), 

258 # Other fields 

259 # https://tools.ietf.org/html/rfc4944#section-10.3.2 

260 ConditionalField( 

261 MultipleTypeField( 

262 [(BitScalingField("udpSourcePort", 0, 4, offset=0xF0B0), 

263 lambda pkt: getattr(pkt.hc2Field, "sc", 0))], 

264 BitField("udpSourcePort", 0, 16) 

265 ), 

266 lambda pkt: pkt.nh == 1 and pkt.hc2 

267 ), 

268 ConditionalField( 

269 MultipleTypeField( 

270 [(BitScalingField("udpDestPort", 0, 4, offset=0xF0B0), 

271 lambda pkt: getattr(pkt.hc2Field, "dc", 0))], 

272 BitField("udpDestPort", 0, 16) 

273 ), 

274 lambda pkt: pkt.nh == 1 and pkt.hc2 

275 ), 

276 ConditionalField( 

277 BitField("udpLength", 0, 16), 

278 lambda pkt: pkt.nh == 1 and pkt.hc2 and not pkt.hc2Field.lc 

279 ), 

280 ConditionalField( 

281 XBitField("udpChecksum", 0, 16), 

282 lambda pkt: pkt.nh == 1 and pkt.hc2 

283 ), 

284 # Out of spec 

285 BitLenField("pad", 0, _get_hc1_pad) 

286 ] 

287 

288 def post_dissect(self, data): 

289 # uncompress payload 

290 packet = IPv6() 

291 packet.version = IPHC_DEFAULT_VERSION 

292 packet.tc = self.traffic_class 

293 packet.fl = self.flow_label 

294 nh_match = { 

295 1: socket.IPPROTO_UDP, 

296 2: socket.IPPROTO_ICMP, 

297 3: socket.IPPROTO_TCP 

298 } 

299 if self.nh: 

300 packet.nh = nh_match.get(self.nh) 

301 packet.hlim = self.hopLimit 

302 

303 packet.src = self.decompressSourceAddr() 

304 packet.dst = self.decompressDestAddr() 

305 

306 if self.hc2 and self.nh == 1: # UDP 

307 udp = UDP() 

308 udp.sport = self.udpSourcePort 

309 udp.dport = self.udpDestPort 

310 udp.len = self.udpLength or None 

311 udp.chksum = self.udpChecksum 

312 udp.add_payload(data) 

313 packet.add_payload(udp) 

314 else: 

315 packet.add_payload(data) 

316 data = raw(packet) 

317 return Packet.post_dissect(self, data) 

318 

319 def decompressSourceAddr(self): 

320 if not self.sp and not self.si: 

321 # Prefix & Interface 

322 return self.src 

323 elif not self.si: 

324 # Only interface 

325 addr = inet_pton(socket.AF_INET6, self.src)[-8:] 

326 addr = LINK_LOCAL_PREFIX[:8] + addr 

327 else: 

328 # Interface not provided 

329 addr = _extract_upperaddress(self, source=True) 

330 self.src = inet_ntop(socket.AF_INET6, addr) 

331 return self.src 

332 

333 def decompressDestAddr(self): 

334 if not self.dp and not self.di: 

335 # Prefix & Interface 

336 return self.dst 

337 elif not self.di: 

338 # Only interface 

339 addr = inet_pton(socket.AF_INET6, self.dst)[-8:] 

340 addr = LINK_LOCAL_PREFIX[:8] + addr 

341 else: 

342 # Interface not provided 

343 addr = _extract_upperaddress(self, source=False) 

344 self.dst = inet_ntop(socket.AF_INET6, addr) 

345 return self.dst 

346 

347 def do_build(self): 

348 if not isinstance(self.payload, IPv6): 

349 return Packet.do_build(self) 

350 # IPv6 

351 ipv6 = self.payload 

352 self.src = ipv6.src 

353 self.dst = ipv6.dst 

354 self.flow_label = ipv6.fl 

355 self.traffic_class = ipv6.tc 

356 self.hopLimit = ipv6.hlim 

357 if isinstance(ipv6.payload, UDP): 

358 self.nh = 1 

359 self.hc2 = 1 

360 udp = ipv6.payload 

361 self.udpSourcePort = udp.sport 

362 self.udpDestPort = udp.dport 

363 if not udp.len or not udp.chksum: 

364 udp = UDP(raw(udp)) 

365 self.udpLength = udp.len 

366 self.udpChecksum = udp.chksum 

367 return Packet.do_build(self) 

368 

369 def do_build_payload(self): 

370 # Elide the IPv6 and UDP payload 

371 if isinstance(self.payload, IPv6): 

372 if isinstance(self.payload.payload, UDP): 

373 return raw(self.payload.payload.payload) 

374 return raw(self.payload.payload) 

375 return Packet.do_build_payload(self) 

376 

377# https://tools.ietf.org/html/rfc4944#section-5.3 

378 

379 

380class LoWPANFragmentationFirst(Packet): 

381 name = "6LoWPAN First Fragmentation Packet" 

382 fields_desc = [ 

383 BitField("reserved", 0x18, 5), 

384 BitField("datagramSize", 0x0, 11), 

385 XShortField("datagramTag", 0x0), 

386 ] 

387 

388 

389class LoWPANFragmentationSubsequent(Packet): 

390 name = "6LoWPAN Subsequent Fragmentation Packet" 

391 fields_desc = [ 

392 BitField("reserved", 0x1C, 5), 

393 BitField("datagramSize", 0x0, 11), 

394 XShortField("datagramTag", RandShort()), 

395 ByteField("datagramOffset", 0x0), # VALUE PRINTED IN OCTETS, wireshark does in bits (128 bits == 16 octets) # noqa: E501 

396 ] 

397 

398 

399# https://tools.ietf.org/html/rfc4944#section-11.1 

400 

401class LoWPANBroadcast(Packet): 

402 name = "6LoWPAN Broadcast" 

403 fields_desc = [ 

404 ByteField("reserved", 0x50), 

405 ByteField("seq", 0) 

406 ] 

407 

408 

409######################### 

410# LoWPAN_IPHC (RFC6282) # 

411######################### 

412 

413 

414IPHC_DEFAULT_VERSION = 6 

415IPHC_DEFAULT_TF = 0 

416IPHC_DEFAULT_FL = 0 

417 

418 

419def source_addr_size(pkt): 

420 """Source address size 

421 

422 This function depending on the arguments returns the amount of bits to be 

423 used by the source address. 

424 

425 Keyword arguments: 

426 pkt -- packet object instance 

427 """ 

428 if pkt.sac == 0x0: 

429 if pkt.sam == 0x0: 

430 return 16 

431 elif pkt.sam == 0x1: 

432 return 8 

433 elif pkt.sam == 0x2: 

434 return 2 

435 elif pkt.sam == 0x3: 

436 return 0 

437 else: 

438 if pkt.sam == 0x0: 

439 return 0 

440 elif pkt.sam == 0x1: 

441 return 8 

442 elif pkt.sam == 0x2: 

443 return 2 

444 elif pkt.sam == 0x3: 

445 return 0 

446 

447 

448def dest_addr_size(pkt): 

449 """Destination address size 

450 

451 This function depending on the arguments returns the amount of bits to be 

452 used by the destination address. 

453 

454 Keyword arguments: 

455 pkt -- packet object instance 

456 """ 

457 if pkt.m == 0 and pkt.dac == 0: 

458 if pkt.dam == 0x0: 

459 return 16 

460 elif pkt.dam == 0x1: 

461 return 8 

462 elif pkt.dam == 0x2: 

463 return 2 

464 else: 

465 return 0 

466 elif pkt.m == 0 and pkt.dac == 1: 

467 if pkt.dam == 0x0: 

468 # reserved 

469 return 0 

470 elif pkt.dam == 0x1: 

471 return 8 

472 elif pkt.dam == 0x2: 

473 return 2 

474 else: 

475 return 0 

476 elif pkt.m == 1 and pkt.dac == 0: 

477 if pkt.dam == 0x0: 

478 return 16 

479 elif pkt.dam == 0x1: 

480 return 6 

481 elif pkt.dam == 0x2: 

482 return 4 

483 elif pkt.dam == 0x3: 

484 return 1 

485 elif pkt.m == 1 and pkt.dac == 1: 

486 if pkt.dam == 0x0: 

487 return 6 

488 elif pkt.dam == 0x1: 

489 # reserved 

490 return 0 

491 elif pkt.dam == 0x2: 

492 # reserved 

493 return 0 

494 elif pkt.dam == 0x3: 

495 # reserved 

496 return 0 

497 

498 

499def _extract_upperaddress(pkt, source=True): 

500 """This function extracts the source/destination address of a 6LoWPAN 

501 from its upper layer. 

502 

503 (Upper layer could be 802.15.4 data, Ethernet...) 

504 

505 params: 

506 - source: if True, the address is the source one. Otherwise, it is the 

507 destination. 

508 returns: (upper_address, ipv6_address) 

509 """ 

510 # https://tools.ietf.org/html/rfc6282#section-3.2.2 

511 SUPPORTED_LAYERS = (Ether, Dot15d4Data) 

512 underlayer = pkt.underlayer 

513 while underlayer and not isinstance(underlayer, SUPPORTED_LAYERS): 

514 underlayer = underlayer.underlayer 

515 # Extract and process address 

516 if type(underlayer) == Ether: 

517 addr = mac2str(underlayer.src if source else underlayer.dst) 

518 # https://tools.ietf.org/html/rfc2464#section-4 

519 return LINK_LOCAL_PREFIX[:8] + addr[:3] + b"\xff\xfe" + addr[3:] 

520 elif type(underlayer) == Dot15d4Data: 

521 if source: 

522 addr = underlayer.src_addr 

523 addrmode = underlayer.underlayer.fcf_srcaddrmode 

524 else: 

525 addr = underlayer.dest_addr 

526 addrmode = underlayer.underlayer.fcf_destaddrmode 

527 addr = struct.pack(">Q", addr) 

528 if addrmode == 3: # Extended/long 

529 tmp_ip = LINK_LOCAL_PREFIX[0:8] + addr 

530 # Turn off the bit 7. 

531 return tmp_ip[0:8] + struct.pack("B", (orb(tmp_ip[8]) ^ 0x2)) + tmp_ip[9:16] # noqa: E501 

532 elif addrmode == 2: # Short 

533 return ( 

534 LINK_LOCAL_PREFIX[0:8] + 

535 b"\x00\x00\x00\xff\xfe\x00" + 

536 addr[6:] 

537 ) 

538 else: 

539 # Most of the times, it's necessary the IEEE 802.15.4 data to extract 

540 # this address, sometimes another layer. 

541 warning( 

542 'Unimplemented: Unsupported upper layer: %s' % type(underlayer) 

543 ) 

544 return b"\x00" * 16 

545 

546 

547class LoWPAN_IPHC(Packet): 

548 """6LoWPAN IPv6 header compressed packets 

549 

550 It follows the implementation of RFC6282 

551 """ 

552 __slots__ = ["_ipv6"] 

553 # the LOWPAN_IPHC encoding utilizes 13 bits, 5 dispatch type 

554 name = "LoWPAN IP Header Compression Packet" 

555 _address_modes = ["Unspecified (0)", "1", "16-bits inline (3)", 

556 "Compressed (3)"] 

557 _state_mode = ["Stateless (0)", "Stateful (1)"] 

558 deprecated_fields = { 

559 "_nhField": ("nhField", "2.4.4"), 

560 "_hopLimit": ("hopLimit", "2.4.4"), 

561 "sourceAddr": ("src", "2.4.4"), 

562 "destinyAddr": ("dst", "2.4.4"), 

563 "udpDestinyPort": ("udpDestPort", "2.4.4"), 

564 } 

565 fields_desc = [ 

566 # Base Format https://tools.ietf.org/html/rfc6282#section-3.1.2 

567 BitField("_reserved", 0x03, 3), 

568 BitField("tf", 0x0, 2), 

569 BitEnumField("nh", 0x0, 1, ["Inline", "Compressed"]), 

570 BitEnumField("hlim", 0x0, 2, {0: "Inline", 

571 1: "Compressed/HL1", 

572 2: "Compressed/HL64", 

573 3: "Compressed/HL255"}), 

574 BitEnumField("cid", 0x0, 1, {1: "Present (1)"}), 

575 BitEnumField("sac", 0x0, 1, _state_mode), 

576 BitEnumField("sam", 0x0, 2, _address_modes), 

577 BitEnumField("m", 0x0, 1, {1: "multicast (1)"}), 

578 BitEnumField("dac", 0x0, 1, _state_mode), 

579 BitEnumField("dam", 0x0, 2, _address_modes), 

580 # https://tools.ietf.org/html/rfc6282#section-3.1.2 

581 # Context Identifier Extension 

582 ConditionalField( 

583 BitField("sci", 0, 4), 

584 lambda pkt: pkt.cid == 0x1 

585 ), 

586 ConditionalField( 

587 BitField("dci", 0, 4), 

588 lambda pkt: pkt.cid == 0x1 

589 ), 

590 # https://tools.ietf.org/html/rfc6282#section-3.2.1 

591 ConditionalField( 

592 BitField("tc_ecn", 0, 2), 

593 lambda pkt: pkt.tf in [0, 1, 2] 

594 ), 

595 ConditionalField( 

596 BitField("tc_dscp", 0, 6), 

597 lambda pkt: pkt.tf in [0, 2], 

598 ), 

599 ConditionalField( 

600 MultipleTypeField( 

601 [(BitField("rsv", 0, 4), lambda pkt: pkt.tf == 0)], 

602 BitField("rsv", 0, 2), 

603 ), 

604 lambda pkt: pkt.tf in [0, 1] 

605 ), 

606 ConditionalField( 

607 BitField("flowlabel", 0, 20), 

608 lambda pkt: pkt.tf in [0, 1] 

609 ), 

610 # Inline fields https://tools.ietf.org/html/rfc6282#section-3.1.1 

611 ConditionalField( 

612 ByteEnumField("nhField", 0x0, ipv6nh), 

613 lambda pkt: pkt.nh == 0x0 

614 ), 

615 ConditionalField( 

616 ByteField("hopLimit", 0x0), 

617 lambda pkt: pkt.hlim == 0x0 

618 ), 

619 # The src and dst fields are filled up or removed in the 

620 # pre_dissect and post_build, depending on the other options. 

621 IP6FieldLenField("src", "::", length_of=source_addr_size), 

622 IP6FieldLenField("dst", "::", length_of=dest_addr_size), # problem when it's 0 # noqa: E501 

623 ] 

624 

625 def post_dissect(self, data): 

626 """dissect the IPv6 package compressed into this IPHC packet. 

627 

628 The packet payload needs to be decompressed and depending on the 

629 arguments, several conversions should be done. 

630 """ 

631 

632 # uncompress payload 

633 packet = IPv6() 

634 packet.tc, packet.fl = self._getTrafficClassAndFlowLabel() 

635 if not self.nh: 

636 packet.nh = self.nhField 

637 # HLIM: Hop Limit 

638 if self.hlim == 0: 

639 packet.hlim = self.hopLimit 

640 elif self.hlim == 0x1: 

641 packet.hlim = 1 

642 elif self.hlim == 0x2: 

643 packet.hlim = 64 

644 else: 

645 packet.hlim = 255 

646 

647 packet.src = self.decompressSourceAddr(packet) 

648 packet.dst = self.decompressDestAddr(packet) 

649 

650 pay_cls = self.guess_payload_class(data) 

651 if pay_cls == IPv6: 

652 packet.add_payload(data) 

653 data = raw(packet) 

654 elif pay_cls == LoWPAN_NHC: 

655 self._ipv6 = packet 

656 return Packet.post_dissect(self, data) 

657 

658 def decompressDestAddr(self, packet): 

659 # https://tools.ietf.org/html/rfc6282#section-3.1.1 

660 try: 

661 tmp_ip = inet_pton(socket.AF_INET6, self.dst) 

662 except socket.error: 

663 tmp_ip = b"\x00" * 16 

664 

665 if self.m == 0 and self.dac == 0: 

666 if self.dam == 0: 

667 # Address fully carried 

668 pass 

669 elif self.dam == 1: 

670 tmp_ip = LINK_LOCAL_PREFIX[0:8] + tmp_ip[-8:] 

671 elif self.dam == 2: 

672 tmp_ip = LINK_LOCAL_PREFIX[0:8] + b"\x00\x00\x00\xff\xfe\x00" + tmp_ip[-2:] # noqa: E501 

673 elif self.dam == 3: 

674 tmp_ip = _extract_upperaddress(self, source=False) 

675 

676 elif self.m == 0 and self.dac == 1: 

677 if self.dam == 0: 

678 # reserved 

679 pass 

680 elif self.dam == 0x3: 

681 # should use context IID + encapsulating header 

682 tmp_ip = _extract_upperaddress(self, source=False) 

683 elif self.dam not in [0x1, 0x2]: 

684 # https://tools.ietf.org/html/rfc6282#page-9 

685 # Should use context information: unimplemented 

686 pass 

687 elif self.m == 1 and self.dac == 0: 

688 if self.dam == 0: 

689 # Address fully carried 

690 pass 

691 elif self.dam == 1: 

692 tmp = b"\xff" + chb(tmp_ip[16 - dest_addr_size(self)]) 

693 tmp_ip = tmp + b"\x00" * 9 + tmp_ip[-5:] 

694 elif self.dam == 2: 

695 tmp = b"\xff" + chb(tmp_ip[16 - dest_addr_size(self)]) 

696 tmp_ip = tmp + b"\x00" * 11 + tmp_ip[-3:] 

697 else: # self.dam == 3: 

698 tmp_ip = b"\xff\x02" + b"\x00" * 13 + tmp_ip[-1:] 

699 elif self.m == 1 and self.dac == 1: 

700 if self.dam == 0x0: 

701 # https://tools.ietf.org/html/rfc6282#page-10 

702 # https://github.com/wireshark/wireshark/blob/f54611d1104d85a425e52c7318c522ed249916b6/epan/dissectors/packet-6lowpan.c#L2149-L2166 

703 # Format: ffXX:XXLL:PPPP:PPPP:PPPP:PPPP:XXXX:XXXX 

704 # P and L should be retrieved from context 

705 P = b"\x00" * 16 

706 L = b"\x00" 

707 X = tmp_ip[-6:] 

708 tmp_ip = b"\xff" + X[:2] + L + P[:8] + X[2:6] 

709 else: # all the others values: reserved 

710 pass 

711 

712 self.dst = inet_ntop(socket.AF_INET6, tmp_ip) 

713 return self.dst 

714 

715 def compressSourceAddr(self, ipv6): 

716 # https://tools.ietf.org/html/rfc6282#section-3.1.1 

717 tmp_ip = inet_pton(socket.AF_INET6, ipv6.src) 

718 

719 if self.sac == 0: 

720 if self.sam == 0x0: 

721 pass 

722 elif self.sam == 0x1: 

723 tmp_ip = tmp_ip[8:16] 

724 elif self.sam == 0x2: 

725 tmp_ip = tmp_ip[14:16] 

726 else: # self.sam == 0x3: 

727 pass 

728 else: # self.sac == 1 

729 if self.sam == 0x0: 

730 tmp_ip = b"\x00" * 16 

731 elif self.sam == 0x1: 

732 tmp_ip = tmp_ip[8:16] 

733 elif self.sam == 0x2: 

734 tmp_ip = tmp_ip[14:16] 

735 

736 self.src = inet_ntop(socket.AF_INET6, b"\x00" * (16 - len(tmp_ip)) + tmp_ip) # noqa: E501 

737 return self.src 

738 

739 def compressDestAddr(self, ipv6): 

740 # https://tools.ietf.org/html/rfc6282#section-3.1.1 

741 tmp_ip = inet_pton(socket.AF_INET6, ipv6.dst) 

742 

743 if self.m == 0 and self.dac == 0: 

744 if self.dam == 0x0: 

745 pass 

746 elif self.dam == 0x1: 

747 tmp_ip = b"\x00" * 8 + tmp_ip[8:16] 

748 elif self.dam == 0x2: 

749 tmp_ip = b"\x00" * 14 + tmp_ip[14:16] 

750 elif self.m == 0 and self.dac == 1: 

751 if self.dam == 0x1: 

752 tmp_ip = b"\x00" * 8 + tmp_ip[8:16] 

753 elif self.dam == 0x2: 

754 tmp_ip = b"\x00" * 14 + tmp_ip[14:16] 

755 elif self.m == 1 and self.dac == 0: 

756 if self.dam == 0x0: 

757 pass 

758 if self.dam == 0x1: 

759 tmp_ip = b"\x00" * 10 + tmp_ip[1:2] + tmp_ip[11:16] 

760 elif self.dam == 0x2: 

761 tmp_ip = b"\x00" * 12 + tmp_ip[1:2] + tmp_ip[13:16] 

762 elif self.dam == 0x3: 

763 tmp_ip = b"\x00" * 15 + tmp_ip[15:16] 

764 elif self.m == 1 and self.dac == 1: 

765 if self.dam == 0: 

766 tmp_ip = b"\x00" * 10 + tmp_ip[1:3] + tmp_ip[12:16] 

767 

768 self.dst = inet_ntop(socket.AF_INET6, tmp_ip) 

769 

770 def decompressSourceAddr(self, packet): 

771 # https://tools.ietf.org/html/rfc6282#section-3.1.1 

772 try: 

773 tmp_ip = inet_pton(socket.AF_INET6, self.src) 

774 except socket.error: 

775 tmp_ip = b"\x00" * 16 

776 

777 if self.sac == 0: 

778 if self.sam == 0x0: 

779 # Full address is carried in-line 

780 pass 

781 elif self.sam == 0x1: 

782 tmp_ip = LINK_LOCAL_PREFIX[0:8] + tmp_ip[16 - source_addr_size(self):16] # noqa: E501 

783 elif self.sam == 0x2: 

784 tmp = LINK_LOCAL_PREFIX[0:8] + b"\x00\x00\x00\xff\xfe\x00" 

785 tmp_ip = tmp + tmp_ip[16 - source_addr_size(self):16] 

786 elif self.sam == 0x3: 

787 # Taken from encapsulating header 

788 tmp_ip = _extract_upperaddress(self, source=True) 

789 else: # self.sac == 1: 

790 if self.sam == 0x0: 

791 # Unspecified address :: 

792 pass 

793 elif self.sam == 0x1: 

794 # should use context IID 

795 pass 

796 elif self.sam == 0x2: 

797 # should use context IID 

798 tmp = LINK_LOCAL_PREFIX[0:8] + b"\x00\x00\x00\xff\xfe\x00" 

799 tmp_ip = tmp + tmp_ip[16 - source_addr_size(self):16] 

800 elif self.sam == 0x3: 

801 # should use context IID 

802 tmp_ip = LINK_LOCAL_PREFIX[0:8] + b"\x00" * 8 

803 self.src = inet_ntop(socket.AF_INET6, tmp_ip) 

804 return self.src 

805 

806 def guess_payload_class(self, payload): 

807 if self.nh: 

808 return LoWPAN_NHC 

809 u = self.underlayer 

810 if u and isinstance(u, (LoWPANFragmentationFirst, 

811 LoWPANFragmentationSubsequent)): 

812 return Raw 

813 return IPv6 

814 

815 def do_build(self): 

816 _cur = self 

817 if isinstance(_cur.payload, LoWPAN_NHC): 

818 _cur = _cur.payload 

819 if not isinstance(_cur.payload, IPv6): 

820 return Packet.do_build(self) 

821 ipv6 = _cur.payload 

822 

823 self._reserved = 0x03 

824 

825 # NEW COMPRESSION TECHNIQUE! 

826 # a ) Compression Techniques 

827 

828 # 1. Set Traffic Class 

829 if self.tf == 0x0: 

830 self.tc_ecn = ipv6.tc >> 6 

831 self.tc_dscp = ipv6.tc & 0x3F 

832 self.flowlabel = ipv6.fl 

833 elif self.tf == 0x1: 

834 self.tc_ecn = ipv6.tc >> 6 

835 self.flowlabel = ipv6.fl 

836 elif self.tf == 0x2: 

837 self.tc_ecn = ipv6.tc >> 6 

838 self.tc_dscp = ipv6.tc & 0x3F 

839 else: # self.tf == 0x3: 

840 pass # no field is set 

841 

842 # 2. Next Header 

843 if self.nh == 0x0: 

844 self.nhField = ipv6.nh 

845 elif self.nh == 1: 

846 # This will be handled in LoWPAN_NHC 

847 pass 

848 

849 # 3. HLim 

850 if self.hlim == 0x0: 

851 self.hopLimit = ipv6.hlim 

852 else: # if hlim is 1, 2 or 3, there are nothing to do! 

853 pass 

854 

855 # 4. Context (which context to use...) 

856 if self.cid == 0x0: 

857 pass 

858 else: 

859 # TODO: Context Unimplemented yet 

860 pass 

861 

862 # 5. Compress Source Addr 

863 self.compressSourceAddr(ipv6) 

864 self.compressDestAddr(ipv6) 

865 

866 return Packet.do_build(self) 

867 

868 def do_build_payload(self): 

869 # Elide the IPv6 payload 

870 if isinstance(self.payload, IPv6): 

871 return raw(self.payload.payload) 

872 return Packet.do_build_payload(self) 

873 

874 def _getTrafficClassAndFlowLabel(self): 

875 """Page 6, draft feb 2011 """ 

876 if self.tf == 0x0: 

877 return (self.tc_ecn << 6) + self.tc_dscp, self.flowlabel 

878 elif self.tf == 0x1: 

879 return (self.tc_ecn << 6), self.flowlabel 

880 elif self.tf == 0x2: 

881 return (self.tc_ecn << 6) + self.tc_dscp, 0 

882 else: 

883 return 0, 0 

884 

885############## 

886# LOWPAN_NHC # 

887############## 

888 

889# https://tools.ietf.org/html/rfc6282#section-4 

890 

891 

892class LoWPAN_NHC_Hdr(Packet): 

893 @classmethod 

894 def get_next_cls(cls, s): 

895 if s and len(s) >= 2: 

896 fb = ord(s[:1]) 

897 if fb >> 3 == 0x1e: 

898 return LoWPAN_NHC_UDP 

899 if fb >> 4 == 0xe: 

900 return LoWPAN_NHC_IPv6Ext 

901 return None 

902 

903 @classmethod 

904 def dispatch_hook(cls, _pkt=b"", *args, **kargs): 

905 return LoWPAN_NHC_Hdr.get_next_cls(_pkt) or LoWPAN_NHC_Hdr 

906 

907 def extract_padding(self, s): 

908 return b"", s 

909 

910 

911class LoWPAN_NHC_UDP(LoWPAN_NHC_Hdr): 

912 fields_desc = [ 

913 BitField("res", 0x1e, 5), 

914 BitField("C", 0, 1), 

915 BitField("P", 0, 2), 

916 MultipleTypeField( 

917 [(BitField("udpSourcePort", 0, 16), 

918 lambda pkt: pkt.P in [0, 1]), 

919 (BitField("udpSourcePort", 0, 8), 

920 lambda pkt: pkt.P == 2), 

921 (BitField("udpSourcePort", 0, 4), 

922 lambda pkt: pkt.P == 3)], 

923 BitField("udpSourcePort", 0x0, 16), 

924 ), 

925 MultipleTypeField( 

926 [(BitField("udpDestPort", 0, 16), 

927 lambda pkt: pkt.P in [0, 2]), 

928 (BitField("udpDestPort", 0, 8), 

929 lambda pkt: pkt.P == 1), 

930 (BitField("udpDestPort", 0, 4), 

931 lambda pkt: pkt.P == 3)], 

932 BitField("udpDestPort", 0x0, 16), 

933 ), 

934 ConditionalField( 

935 XShortField("udpChecksum", 0x0), 

936 lambda pkt: pkt.C == 0 

937 ), 

938 ] 

939 

940 

941_lowpan_nhc_ipv6ext_eid = { 

942 0: "Hop-by-hop Options Header", 

943 1: "IPv6 Routing Header", 

944 2: "IPv6 Fragment Header", 

945 3: "IPv6 Destination Options Header", 

946 4: "IPv6 Mobility Header", 

947 7: "IPv6 Header", 

948} 

949 

950 

951class LoWPAN_NHC_IPv6Ext(LoWPAN_NHC_Hdr): 

952 fields_desc = [ 

953 BitField("res", 0xe, 4), 

954 BitEnumField("eid", 0, 3, _lowpan_nhc_ipv6ext_eid), 

955 BitField("nh", 0, 1), 

956 ConditionalField( 

957 ByteField("nhField", 0), 

958 lambda pkt: pkt.nh == 0 

959 ), 

960 FieldLenField("len", None, length_of="data", fmt="B"), 

961 StrFixedLenField("data", b"", length_from=lambda pkt: pkt.len) 

962 ] 

963 

964 def post_build(self, p, pay): 

965 if self.len is None: 

966 offs = (not self.nh) + 1 

967 p = p[:offs] + struct.pack("!B", len(p) - offs) + p[offs + 1:] 

968 return p + pay 

969 

970 

971class LoWPAN_NHC(Packet): 

972 name = "LOWPAN_NHC" 

973 fields_desc = [ 

974 PacketListField( 

975 "exts", [], pkt_cls=LoWPAN_NHC_Hdr, 

976 next_cls_cb=lambda *s: LoWPAN_NHC_Hdr.get_next_cls(s[3]) 

977 ) 

978 ] 

979 

980 def post_dissect(self, data): 

981 if not self.underlayer or not hasattr(self.underlayer, "_ipv6"): 

982 return data 

983 if self.guess_payload_class(data) != IPv6: 

984 return data 

985 # Underlayer is LoWPAN_IPHC 

986 packet = self.underlayer._ipv6 

987 try: 

988 ipv6_hdr = next( 

989 x for x in self.exts if isinstance(x, LoWPAN_NHC_IPv6Ext) 

990 ) 

991 except StopIteration: 

992 ipv6_hdr = None 

993 if ipv6_hdr: 

994 # XXX todo: implement: append the IPv6 extension 

995 # packet = packet / ipv6extension 

996 pass 

997 try: 

998 udp_hdr = next( 

999 x for x in self.exts if isinstance(x, LoWPAN_NHC_UDP) 

1000 ) 

1001 except StopIteration: 

1002 udp_hdr = None 

1003 if udp_hdr: 

1004 packet.nh = 0x11 # UDP 

1005 udp = UDP() 

1006 # https://tools.ietf.org/html/rfc6282#section-4.3.3 

1007 if udp_hdr.C == 0: 

1008 udp.chksum = udp_hdr.udpChecksum 

1009 if udp_hdr.P == 0: 

1010 udp.sport = udp_hdr.udpSourcePort 

1011 udp.dport = udp_hdr.udpDestPort 

1012 elif udp_hdr.P == 1: 

1013 udp.sport = udp_hdr.udpSourcePort 

1014 udp.dport = 0xF000 + udp_hdr.udpDestPort 

1015 elif udp_hdr.P == 2: 

1016 udp.sport = 0xF000 + udp_hdr.udpSourcePort 

1017 udp.dport = udp_hdr.udpDestPort 

1018 elif udp_hdr.P == 3: 

1019 udp.sport = 0xF0B0 + udp_hdr.udpSourcePort 

1020 udp.dport = 0xF0B0 + udp_hdr.udpDestPort 

1021 packet.lastlayer().add_payload(udp / data) 

1022 else: 

1023 packet.lastlayer().add_payload(data) 

1024 data = raw(packet) 

1025 return Packet.post_dissect(self, data) 

1026 

1027 def do_build(self): 

1028 if not isinstance(self.payload, IPv6): 

1029 return Packet.do_build(self) 

1030 pay = self.payload.payload 

1031 while pay and isinstance(pay.payload, _IPv6ExtHdr): 

1032 # XXX todo: populate a LoWPAN_NHC_IPv6Ext 

1033 pay = pay.payload 

1034 if isinstance(pay, UDP): 

1035 try: 

1036 udp_hdr = next( 

1037 x for x in self.exts if isinstance(x, LoWPAN_NHC_UDP) 

1038 ) 

1039 except StopIteration: 

1040 udp_hdr = LoWPAN_NHC_UDP() 

1041 # Guess best compression 

1042 if pay.sport >> 4 == 0xf0b and pay.dport >> 4 == 0xf0b: 

1043 udp_hdr.P = 3 

1044 elif pay.sport >> 8 == 0xf0: 

1045 udp_hdr.P = 2 

1046 elif pay.dport >> 8 == 0xf0: 

1047 udp_hdr.P = 1 

1048 self.exts.insert(0, udp_hdr) 

1049 # https://tools.ietf.org/html/rfc6282#section-4.3.3 

1050 if udp_hdr.P == 0: 

1051 udp_hdr.udpSourcePort = pay.sport 

1052 udp_hdr.udpDestPort = pay.dport 

1053 elif udp_hdr.P == 1: 

1054 udp_hdr.udpSourcePort = pay.sport 

1055 udp_hdr.udpDestPort = pay.dport & 255 

1056 elif udp_hdr.P == 2: 

1057 udp_hdr.udpSourcePort = pay.sport & 255 

1058 udp_hdr.udpDestPort = pay.dport 

1059 elif udp_hdr.P == 3: 

1060 udp_hdr.udpSourcePort = pay.sport & 15 

1061 udp_hdr.udpDestPort = pay.dport & 15 

1062 if udp_hdr.C == 0: 

1063 if pay.chksum: 

1064 udp_hdr.udpChecksum = pay.chksum 

1065 else: 

1066 udp_hdr.udpChecksum = UDP(raw(pay)).chksum 

1067 return Packet.do_build(self) 

1068 

1069 def do_build_payload(self): 

1070 # Elide IPv6 payload, extensions and UDP 

1071 if isinstance(self.payload, IPv6): 

1072 cur = self.payload 

1073 while cur and isinstance(cur, (IPv6, UDP)): 

1074 cur = cur.payload 

1075 return raw(cur) 

1076 return Packet.do_build_payload(self) 

1077 

1078 def guess_payload_class(self, payload): 

1079 if self.underlayer: 

1080 u = self.underlayer.underlayer 

1081 if isinstance(u, (LoWPANFragmentationFirst, 

1082 LoWPANFragmentationSubsequent)): 

1083 return Raw 

1084 return IPv6 

1085 

1086 

1087###################### 

1088# 6LowPan Dispatcher # 

1089###################### 

1090 

1091# https://tools.ietf.org/html/rfc4944#section-5.1 

1092 

1093class SixLoWPAN_ESC(Packet): 

1094 name = "SixLoWPAN Dispatcher ESC" 

1095 fields_desc = [ByteField("dispatch", 0)] 

1096 

1097 

1098class SixLoWPAN(Packet): 

1099 name = "SixLoWPAN Dispatcher" 

1100 

1101 @classmethod 

1102 def dispatch_hook(cls, _pkt=b"", *args, **kargs): 

1103 """Depending on the payload content, the frame type we should interpret""" 

1104 if _pkt and len(_pkt) >= 1: 

1105 fb = ord(_pkt[:1]) 

1106 if fb == 0x41: 

1107 return LoWPANUncompressedIPv6 

1108 if fb == 0x42: 

1109 return LoWPAN_HC1 

1110 if fb == 0x50: 

1111 return LoWPANBroadcast 

1112 if fb == 0x7f: 

1113 return SixLoWPAN_ESC 

1114 if fb >> 3 == 0x18: 

1115 return LoWPANFragmentationFirst 

1116 if fb >> 3 == 0x1C: 

1117 return LoWPANFragmentationSubsequent 

1118 if fb >> 6 == 0x02: 

1119 return LoWPANMesh 

1120 if fb >> 6 == 0x01: 

1121 return LoWPAN_IPHC 

1122 return cls 

1123 

1124 

1125################# 

1126# Fragmentation # 

1127################# 

1128 

1129# fragmentate IPv6 

1130MAX_SIZE = 96 

1131 

1132 

1133def sixlowpan_fragment(packet, datagram_tag=1): 

1134 """Split a packet into different links to transmit as 6lowpan packets. 

1135 Usage example:: 

1136 

1137 >>> ipv6 = ..... (very big packet) 

1138 >>> pkts = sixlowpan_fragment(ipv6, datagram_tag=0x17) 

1139 >>> send = [Dot15d4()/Dot15d4Data()/x for x in pkts] 

1140 >>> wireshark(send) 

1141 """ 

1142 if not packet.haslayer(IPv6): 

1143 raise Exception("SixLoWPAN only fragments IPv6 packets !") 

1144 

1145 str_packet = raw(packet[IPv6]) 

1146 

1147 if len(str_packet) <= MAX_SIZE: 

1148 return [packet] 

1149 

1150 def chunks(li, n): 

1151 return [li[i:i + n] for i in range(0, len(li), n)] 

1152 

1153 new_packet = chunks(str_packet, MAX_SIZE) 

1154 

1155 new_packet[0] = LoWPANFragmentationFirst(datagramTag=datagram_tag, datagramSize=len(str_packet)) / new_packet[0] # noqa: E501 

1156 i = 1 

1157 while i < len(new_packet): 

1158 new_packet[i] = LoWPANFragmentationSubsequent(datagramTag=datagram_tag, datagramSize=len(str_packet), datagramOffset=MAX_SIZE // 8 * i) / new_packet[i] # noqa: E501 

1159 i += 1 

1160 

1161 return new_packet 

1162 

1163 

1164def sixlowpan_defragment(packet_list): 

1165 results = {} 

1166 for p in packet_list: 

1167 cls = None 

1168 if LoWPANFragmentationFirst in p: 

1169 cls = LoWPANFragmentationFirst 

1170 elif LoWPANFragmentationSubsequent in p: 

1171 cls = LoWPANFragmentationSubsequent 

1172 if cls: 

1173 tag = p[cls].datagramTag 

1174 results[tag] = results.get(tag, b"") + p[cls].payload.load # noqa: E501 

1175 return {tag: SixLoWPAN(x) for tag, x in results.items()} 

1176 

1177############ 

1178# Bindings # 

1179############ 

1180 

1181 

1182bind_layers(LoWPAN_HC1, IPv6) 

1183 

1184bind_top_down(LoWPAN_IPHC, LoWPAN_NHC, nh=1) 

1185bind_layers(LoWPANFragmentationFirst, SixLoWPAN) 

1186bind_layers(LoWPANMesh, SixLoWPAN) 

1187bind_layers(LoWPANBroadcast, SixLoWPAN) 

1188 

1189bind_layers(Ether, SixLoWPAN, type=0xA0ED)