Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/sixlowpan.py: 68%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

575 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Cesar A. Bernardini <mesarpe@gmail.com> 

5# Intern at INRIA Grand Nancy Est 

6# Copyright (C) Gabriel Potter <gabriel[]potter[]fr> 

7""" 

86LoWPAN Protocol Stack 

9====================== 

10 

11This implementation follows the next documents: 

12 

13- Transmission of IPv6 Packets over IEEE 802.15.4 Networks: RFC 4944 

14- Compression Format for IPv6 Datagrams in Low Power and Lossy 

15 networks (6LoWPAN): RFC 6282 

16- RFC 4291 

17 

18+----------------------------+-----------------------+ 

19| Application | Application Protocols | 

20+----------------------------+------------+----------+ 

21| Transport | UDP | TCP | 

22+----------------------------+------------+----------+ 

23| Network | IPv6 | 

24+----------------------------+-----------------------+ 

25| | LoWPAN | 

26+----------------------------+-----------------------+ 

27| Data Link Layer | IEEE 802.15.4 MAC | 

28+----------------------------+-----------------------+ 

29| Physical | IEEE 802.15.4 PHY | 

30+----------------------------+-----------------------+ 

31 

32Note that: 

33 

34 - Only IPv6 is supported 

35 - LoWPAN is in the middle between network and data link layer 

36 

37The Internet Control Message protocol v6 (ICMPv6) is used for control 

38messaging. 

39 

40Adaptation between full IPv6 and the LoWPAN format is performed by routers at 

41the edge of 6LoWPAN islands. 

42 

43A LoWPAN support addressing; a direct mapping between the link-layer address 

44and the IPv6 address is used for achieving compression. 

45 

46Known Issues: 

47 * Unimplemented context information 

48 * Unimplemented IPv6 extensions fields 

49""" 

50 

51import socket 

52import struct 

53 

54from scapy.compat import chb, orb, raw 

55from scapy.data import ETHER_TYPES 

56 

57from scapy.packet import Packet, bind_layers, bind_top_down 

58from scapy.fields import ( 

59 BitEnumField, 

60 BitField, 

61 BitLenField, 

62 BitScalingField, 

63 ByteEnumField, 

64 ByteField, 

65 ConditionalField, 

66 FieldLenField, 

67 MultipleTypeField, 

68 PacketField, 

69 PacketListField, 

70 StrFixedLenField, 

71 XBitField, 

72 XLongField, 

73 XShortField, 

74) 

75 

76from scapy.layers.dot15d4 import Dot15d4Data 

77from scapy.layers.inet6 import ( 

78 IP6Field, 

79 IPv6, 

80 _IPv6ExtHdr, 

81 ipv6nh, 

82) 

83from scapy.layers.inet import UDP 

84from scapy.layers.l2 import Ether 

85 

86from scapy.utils import mac2str 

87from scapy.config import conf 

88from scapy.error import warning 

89 

90from scapy.packet import Raw 

91from scapy.pton_ntop import inet_pton, inet_ntop 

92from scapy.volatile import RandShort 

93 

94ETHER_TYPES[0xA0ED] = "6LoWPAN" 

95 

96LINK_LOCAL_PREFIX = b"\xfe\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" # noqa: E501 

97 

98 

99########## 

100# Fields # 

101########## 

102 

103 

104class IP6FieldLenField(IP6Field): 

105 __slots__ = ["length_of"] 

106 

107 def __init__(self, name, default, length_of=None): 

108 IP6Field.__init__(self, name, default) 

109 self.length_of = length_of 

110 

111 def addfield(self, pkt, s, val): 

112 """Add an internal value to a string""" 

113 tmp_len = self.length_of(pkt) 

114 if tmp_len == 0: 

115 return s 

116 internal = self.i2m(pkt, val)[-tmp_len:] 

117 return s + struct.pack("!%ds" % tmp_len, internal) 

118 

119 def getfield(self, pkt, s): 

120 tmp_len = self.length_of(pkt) 

121 assert tmp_len >= 0 and tmp_len <= 16 

122 if tmp_len <= 0: 

123 return s, b"" 

124 return (s[tmp_len:], 

125 self.m2i(pkt, b"\x00" * (16 - tmp_len) + s[:tmp_len])) 

126 

127 

128################# 

129# Basic 6LoWPAN # 

130################# 

131# https://tools.ietf.org/html/rfc4944 

132 

133 

134class LoWPANUncompressedIPv6(Packet): 

135 name = "6LoWPAN Uncompressed IPv6" 

136 fields_desc = [ 

137 BitField("_type", 0x41, 8) 

138 ] 

139 

140 def default_payload_class(self, pay): 

141 return IPv6 

142 

143# https://tools.ietf.org/html/rfc4944#section-5.2 

144 

145 

146class LoWPANMesh(Packet): 

147 name = "6LoWPAN Mesh Packet" 

148 deprecated_fields = { 

149 "_v": ("v", "2.4.4"), 

150 "_f": ("f", "2.4.4"), 

151 "_sourceAddr": ("src", "2.4.4"), 

152 "_destinyAddr": ("dst", "2.4.4"), 

153 } 

154 fields_desc = [ 

155 BitField("reserved", 0x2, 2), 

156 BitEnumField("v", 0x0, 1, ["EUI-64", "Short"]), 

157 BitEnumField("f", 0x0, 1, ["EUI-64", "Short"]), 

158 BitField("hopsLeft", 0x0, 4), 

159 MultipleTypeField( 

160 [(XShortField("src", 0x0), lambda pkt: pkt.v == 1)], 

161 XLongField("src", 0x0) 

162 ), 

163 MultipleTypeField( 

164 [(XShortField("dst", 0x0), lambda pkt: pkt.v == 1)], 

165 XLongField("dst", 0x0) 

166 ) 

167 ] 

168 

169 

170# https://tools.ietf.org/html/rfc4944#section-10.1 

171# This implementation is NOT RECOMMENDED according to RFC 6282 

172 

173 

174class LoWPAN_HC2_UDP(Packet): 

175 name = "6LoWPAN HC1 UDP encoding" 

176 fields_desc = [ 

177 BitEnumField("sc", 0, 1, ["In-line", "Compressed"]), 

178 BitEnumField("dc", 0, 1, ["In-line", "Compressed"]), 

179 BitEnumField("lc", 0, 1, ["In-line", "Compressed"]), 

180 BitField("res", 0, 5), 

181 ] 

182 

183 def default_payload_class(self, payload): 

184 return conf.padding_layer 

185 

186 

187def _get_hc1_pad(pkt): 

188 """ 

189 Get LoWPAN_HC1 padding 

190 

191 LoWPAN_HC1 is not recommended for several reasons, one 

192 of them being that padding is a mess (not 8-bit regular) 

193 We therefore add padding bits that are not in the spec to restore 

194 8-bit parity. Wireshark seems to agree 

195 """ 

196 length = 0 # in bits, of the fields that are not //8 

197 if not pkt.tc_fl: 

198 length += 20 

199 if pkt.hc2: 

200 if pkt.nh == 1: 

201 length += pkt.hc2Field.sc * 4 

202 length += pkt.hc2Field.dc * 4 

203 return (-length) % 8 

204 

205 

206class LoWPAN_HC1(Packet): 

207 name = "LoWPAN_HC1 Compressed IPv6" 

208 fields_desc = [ 

209 # https://tools.ietf.org/html/rfc4944#section-10.1 

210 ByteField("reserved", 0x42), 

211 BitEnumField("sp", 0, 1, ["In-line", "Compressed"]), 

212 BitEnumField("si", 0, 1, ["In-line", "Elided"]), 

213 BitEnumField("dp", 0, 1, ["In-line", "Compressed"]), 

214 BitEnumField("di", 0, 1, ["In-line", "Elided"]), 

215 BitEnumField("tc_fl", 0, 1, ["Not compressed", "zero"]), 

216 BitEnumField("nh", 0, 2, {0: "not compressed", 

217 1: "UDP", 

218 2: "ICMP", 

219 3: "TCP"}), 

220 BitEnumField("hc2", 0, 1, ["No more header compression bits", 

221 "HC2 Present"]), 

222 # https://tools.ietf.org/html/rfc4944#section-10.2 

223 ConditionalField( 

224 MultipleTypeField( 

225 [ 

226 (PacketField("hc2Field", LoWPAN_HC2_UDP(), 

227 LoWPAN_HC2_UDP), 

228 lambda pkt: pkt.nh == 1), 

229 # TODO: ICMP & TCP not implemented yet for HC1 

230 # (PacketField("hc2Field", LoWPAN_HC2_ICMP(), 

231 # LoWPAN_HC2_ICMP), 

232 # lambda pkt: pkt.nh == 2), 

233 # (PacketField("hc2Field", LoWPAN_HC2_TCP(), 

234 # LoWPAN_HC2_TCP), 

235 # lambda pkt: pkt.nh == 3), 

236 ], 

237 StrFixedLenField("hc2Field", b"", 0), 

238 ), 

239 lambda pkt: pkt.hc2 

240 ), 

241 # IPv6 header fields 

242 # https://tools.ietf.org/html/rfc4944#section-10.3.1 

243 ByteField("hopLimit", 0x0), 

244 IP6FieldLenField("src", "::", 

245 lambda pkt: (0 if pkt.sp else 8) + 

246 (0 if pkt.si else 8)), 

247 IP6FieldLenField("dst", "::", 

248 lambda pkt: (0 if pkt.dp else 8) + 

249 (0 if pkt.di else 8)), 

250 ConditionalField( 

251 ByteField("traffic_class", 0), 

252 lambda pkt: not pkt.tc_fl 

253 ), 

254 ConditionalField( 

255 BitField("flow_label", 0, 20), 

256 lambda pkt: not pkt.tc_fl 

257 ), 

258 # Other fields 

259 # https://tools.ietf.org/html/rfc4944#section-10.3.2 

260 ConditionalField( 

261 MultipleTypeField( 

262 [(BitScalingField("udpSourcePort", 0, 4, offset=0xF0B0), 

263 lambda pkt: getattr(pkt.hc2Field, "sc", 0))], 

264 BitField("udpSourcePort", 0, 16) 

265 ), 

266 lambda pkt: pkt.nh == 1 and pkt.hc2 

267 ), 

268 ConditionalField( 

269 MultipleTypeField( 

270 [(BitScalingField("udpDestPort", 0, 4, offset=0xF0B0), 

271 lambda pkt: getattr(pkt.hc2Field, "dc", 0))], 

272 BitField("udpDestPort", 0, 16) 

273 ), 

274 lambda pkt: pkt.nh == 1 and pkt.hc2 

275 ), 

276 ConditionalField( 

277 BitField("udpLength", 0, 16), 

278 lambda pkt: pkt.nh == 1 and pkt.hc2 and not pkt.hc2Field.lc 

279 ), 

280 ConditionalField( 

281 XBitField("udpChecksum", 0, 16), 

282 lambda pkt: pkt.nh == 1 and pkt.hc2 

283 ), 

284 # Out of spec 

285 BitLenField("pad", 0, _get_hc1_pad) 

286 ] 

287 

288 def post_dissect(self, data): 

289 # uncompress payload 

290 packet = IPv6() 

291 packet.version = IPHC_DEFAULT_VERSION 

292 packet.tc = self.traffic_class 

293 packet.fl = self.flow_label 

294 nh_match = { 

295 1: socket.IPPROTO_UDP, 

296 2: socket.IPPROTO_ICMP, 

297 3: socket.IPPROTO_TCP 

298 } 

299 if self.nh: 

300 packet.nh = nh_match.get(self.nh) 

301 packet.hlim = self.hopLimit 

302 

303 packet.src = self.decompressSourceAddr() 

304 packet.dst = self.decompressDestAddr() 

305 

306 if self.hc2 and self.nh == 1: # UDP 

307 udp = UDP() 

308 udp.sport = self.udpSourcePort 

309 udp.dport = self.udpDestPort 

310 udp.len = self.udpLength or None 

311 udp.chksum = self.udpChecksum 

312 udp.add_payload(data) 

313 packet.add_payload(udp) 

314 else: 

315 packet.add_payload(data) 

316 data = raw(packet) 

317 return Packet.post_dissect(self, data) 

318 

319 def decompressSourceAddr(self): 

320 if not self.sp and not self.si: 

321 # Prefix & Interface 

322 return self.src 

323 elif not self.si: 

324 # Only interface 

325 addr = inet_pton(socket.AF_INET6, self.src)[-8:] 

326 addr = LINK_LOCAL_PREFIX[:8] + addr 

327 else: 

328 # Interface not provided 

329 addr = _extract_upperaddress(self, source=True) 

330 self.src = inet_ntop(socket.AF_INET6, addr) 

331 return self.src 

332 

333 def decompressDestAddr(self): 

334 if not self.dp and not self.di: 

335 # Prefix & Interface 

336 return self.dst 

337 elif not self.di: 

338 # Only interface 

339 addr = inet_pton(socket.AF_INET6, self.dst)[-8:] 

340 addr = LINK_LOCAL_PREFIX[:8] + addr 

341 else: 

342 # Interface not provided 

343 addr = _extract_upperaddress(self, source=False) 

344 self.dst = inet_ntop(socket.AF_INET6, addr) 

345 return self.dst 

346 

347 def do_build(self): 

348 if not isinstance(self.payload, IPv6): 

349 return Packet.do_build(self) 

350 # IPv6 

351 ipv6 = self.payload 

352 self.src = ipv6.src 

353 self.dst = ipv6.dst 

354 self.flow_label = ipv6.fl 

355 self.traffic_class = ipv6.tc 

356 self.hopLimit = ipv6.hlim 

357 if isinstance(ipv6.payload, UDP): 

358 self.nh = 1 

359 self.hc2 = 1 

360 udp = ipv6.payload 

361 self.udpSourcePort = udp.sport 

362 self.udpDestPort = udp.dport 

363 if not udp.len or not udp.chksum: 

364 udp = UDP(raw(udp)) 

365 self.udpLength = udp.len 

366 self.udpChecksum = udp.chksum 

367 return Packet.do_build(self) 

368 

369 def do_build_payload(self): 

370 # Elide the IPv6 and UDP payload 

371 if isinstance(self.payload, IPv6): 

372 if isinstance(self.payload.payload, UDP): 

373 return raw(self.payload.payload.payload) 

374 return raw(self.payload.payload) 

375 return Packet.do_build_payload(self) 

376 

377# https://tools.ietf.org/html/rfc4944#section-5.3 

378 

379 

380class LoWPANFragmentationFirst(Packet): 

381 name = "6LoWPAN First Fragmentation Packet" 

382 fields_desc = [ 

383 BitField("reserved", 0x18, 5), 

384 BitField("datagramSize", 0x0, 11), 

385 XShortField("datagramTag", 0x0), 

386 ] 

387 

388 

389class LoWPANFragmentationSubsequent(Packet): 

390 name = "6LoWPAN Subsequent Fragmentation Packet" 

391 fields_desc = [ 

392 BitField("reserved", 0x1C, 5), 

393 BitField("datagramSize", 0x0, 11), 

394 XShortField("datagramTag", RandShort()), 

395 ByteField("datagramOffset", 0x0), # VALUE PRINTED IN OCTETS, wireshark does in bits (128 bits == 16 octets) # noqa: E501 

396 ] 

397 

398 

399# https://tools.ietf.org/html/rfc4944#section-11.1 

400 

401class LoWPANBroadcast(Packet): 

402 name = "6LoWPAN Broadcast" 

403 fields_desc = [ 

404 ByteField("reserved", 0x50), 

405 ByteField("seq", 0) 

406 ] 

407 

408 

409######################### 

410# LoWPAN_IPHC (RFC6282) # 

411######################### 

412 

413 

414IPHC_DEFAULT_VERSION = 6 

415IPHC_DEFAULT_TF = 0 

416IPHC_DEFAULT_FL = 0 

417 

418 

419def source_addr_size(pkt): 

420 """Source address size 

421 

422 This function depending on the arguments returns the amount of bits to be 

423 used by the source address. 

424 

425 Keyword arguments: 

426 pkt -- packet object instance 

427 """ 

428 if pkt.sac == 0x0: 

429 if pkt.sam == 0x0: 

430 return 16 

431 elif pkt.sam == 0x1: 

432 return 8 

433 elif pkt.sam == 0x2: 

434 return 2 

435 elif pkt.sam == 0x3: 

436 return 0 

437 else: 

438 if pkt.sam == 0x0: 

439 return 0 

440 elif pkt.sam == 0x1: 

441 return 8 

442 elif pkt.sam == 0x2: 

443 return 2 

444 elif pkt.sam == 0x3: 

445 return 0 

446 

447 

448def dest_addr_size(pkt): 

449 """Destination address size 

450 

451 This function depending on the arguments returns the amount of bits to be 

452 used by the destination address. 

453 

454 Keyword arguments: 

455 pkt -- packet object instance 

456 """ 

457 if pkt.m == 0 and pkt.dac == 0: 

458 if pkt.dam == 0x0: 

459 return 16 

460 elif pkt.dam == 0x1: 

461 return 8 

462 elif pkt.dam == 0x2: 

463 return 2 

464 else: 

465 return 0 

466 elif pkt.m == 0 and pkt.dac == 1: 

467 if pkt.dam == 0x0: 

468 # reserved 

469 return 0 

470 elif pkt.dam == 0x1: 

471 return 8 

472 elif pkt.dam == 0x2: 

473 return 2 

474 else: 

475 return 0 

476 elif pkt.m == 1 and pkt.dac == 0: 

477 if pkt.dam == 0x0: 

478 return 16 

479 elif pkt.dam == 0x1: 

480 return 6 

481 elif pkt.dam == 0x2: 

482 return 4 

483 elif pkt.dam == 0x3: 

484 return 1 

485 elif pkt.m == 1 and pkt.dac == 1: 

486 if pkt.dam == 0x0: 

487 return 6 

488 elif pkt.dam == 0x1: 

489 # reserved 

490 return 0 

491 elif pkt.dam == 0x2: 

492 # reserved 

493 return 0 

494 elif pkt.dam == 0x3: 

495 # reserved 

496 return 0 

497 

498 

499def _extract_upperaddress(pkt, source=True): 

500 """This function extracts the source/destination address of a 6LoWPAN 

501 from its upper layer. 

502 

503 (Upper layer could be 802.15.4 data, Ethernet...) 

504 

505 params: 

506 - source: if True, the address is the source one. Otherwise, it is the 

507 destination. 

508 returns: (upper_address, ipv6_address) 

509 """ 

510 # https://tools.ietf.org/html/rfc6282#section-3.2.2 

511 SUPPORTED_LAYERS = (Ether, Dot15d4Data) 

512 underlayer = pkt.underlayer 

513 while underlayer and not isinstance(underlayer, SUPPORTED_LAYERS): 

514 underlayer = underlayer.underlayer 

515 # Extract and process address 

516 if type(underlayer) == Ether: 

517 addr = mac2str(underlayer.src if source else underlayer.dst) 

518 # https://tools.ietf.org/html/rfc2464#section-4 

519 return LINK_LOCAL_PREFIX[:8] + addr[:3] + b"\xff\xfe" + addr[3:] 

520 elif type(underlayer) == Dot15d4Data: 

521 addr = underlayer.src_addr if source else underlayer.dest_addr 

522 addr = struct.pack(">Q", addr) 

523 if underlayer.underlayer.fcf_destaddrmode == 3: # Extended/long 

524 tmp_ip = LINK_LOCAL_PREFIX[0:8] + addr 

525 # Turn off the bit 7. 

526 return tmp_ip[0:8] + struct.pack("B", (orb(tmp_ip[8]) ^ 0x2)) + tmp_ip[9:16] # noqa: E501 

527 elif underlayer.underlayer.fcf_destaddrmode == 2: # Short 

528 return ( 

529 LINK_LOCAL_PREFIX[0:8] + 

530 b"\x00\x00\x00\xff\xfe\x00" + 

531 addr[6:] 

532 ) 

533 else: 

534 # Most of the times, it's necessary the IEEE 802.15.4 data to extract 

535 # this address, sometimes another layer. 

536 warning( 

537 'Unimplemented: Unsupported upper layer: %s' % type(underlayer) 

538 ) 

539 return b"\x00" * 16 

540 

541 

542class LoWPAN_IPHC(Packet): 

543 """6LoWPAN IPv6 header compressed packets 

544 

545 It follows the implementation of RFC6282 

546 """ 

547 __slots__ = ["_ipv6"] 

548 # the LOWPAN_IPHC encoding utilizes 13 bits, 5 dispatch type 

549 name = "LoWPAN IP Header Compression Packet" 

550 _address_modes = ["Unspecified (0)", "1", "16-bits inline (3)", 

551 "Compressed (3)"] 

552 _state_mode = ["Stateless (0)", "Stateful (1)"] 

553 deprecated_fields = { 

554 "_nhField": ("nhField", "2.4.4"), 

555 "_hopLimit": ("hopLimit", "2.4.4"), 

556 "sourceAddr": ("src", "2.4.4"), 

557 "destinyAddr": ("dst", "2.4.4"), 

558 "udpDestinyPort": ("udpDestPort", "2.4.4"), 

559 } 

560 fields_desc = [ 

561 # Base Format https://tools.ietf.org/html/rfc6282#section-3.1.2 

562 BitField("_reserved", 0x03, 3), 

563 BitField("tf", 0x0, 2), 

564 BitEnumField("nh", 0x0, 1, ["Inline", "Compressed"]), 

565 BitEnumField("hlim", 0x0, 2, {0: "Inline", 

566 1: "Compressed/HL1", 

567 2: "Compressed/HL64", 

568 3: "Compressed/HL255"}), 

569 BitEnumField("cid", 0x0, 1, {1: "Present (1)"}), 

570 BitEnumField("sac", 0x0, 1, _state_mode), 

571 BitEnumField("sam", 0x0, 2, _address_modes), 

572 BitEnumField("m", 0x0, 1, {1: "multicast (1)"}), 

573 BitEnumField("dac", 0x0, 1, _state_mode), 

574 BitEnumField("dam", 0x0, 2, _address_modes), 

575 # https://tools.ietf.org/html/rfc6282#section-3.1.2 

576 # Context Identifier Extension 

577 ConditionalField( 

578 BitField("sci", 0, 4), 

579 lambda pkt: pkt.cid == 0x1 

580 ), 

581 ConditionalField( 

582 BitField("dci", 0, 4), 

583 lambda pkt: pkt.cid == 0x1 

584 ), 

585 # https://tools.ietf.org/html/rfc6282#section-3.2.1 

586 ConditionalField( 

587 BitField("tc_ecn", 0, 2), 

588 lambda pkt: pkt.tf in [0, 1, 2] 

589 ), 

590 ConditionalField( 

591 BitField("tc_dscp", 0, 6), 

592 lambda pkt: pkt.tf in [0, 2], 

593 ), 

594 ConditionalField( 

595 MultipleTypeField( 

596 [(BitField("rsv", 0, 4), lambda pkt: pkt.tf == 0)], 

597 BitField("rsv", 0, 2), 

598 ), 

599 lambda pkt: pkt.tf in [0, 1] 

600 ), 

601 ConditionalField( 

602 BitField("flowlabel", 0, 20), 

603 lambda pkt: pkt.tf in [0, 1] 

604 ), 

605 # Inline fields https://tools.ietf.org/html/rfc6282#section-3.1.1 

606 ConditionalField( 

607 ByteEnumField("nhField", 0x0, ipv6nh), 

608 lambda pkt: pkt.nh == 0x0 

609 ), 

610 ConditionalField( 

611 ByteField("hopLimit", 0x0), 

612 lambda pkt: pkt.hlim == 0x0 

613 ), 

614 # The src and dst fields are filled up or removed in the 

615 # pre_dissect and post_build, depending on the other options. 

616 IP6FieldLenField("src", "::", length_of=source_addr_size), 

617 IP6FieldLenField("dst", "::", length_of=dest_addr_size), # problem when it's 0 # noqa: E501 

618 ] 

619 

620 def post_dissect(self, data): 

621 """dissect the IPv6 package compressed into this IPHC packet. 

622 

623 The packet payload needs to be decompressed and depending on the 

624 arguments, several conversions should be done. 

625 """ 

626 

627 # uncompress payload 

628 packet = IPv6() 

629 packet.tc, packet.fl = self._getTrafficClassAndFlowLabel() 

630 if not self.nh: 

631 packet.nh = self.nhField 

632 # HLIM: Hop Limit 

633 if self.hlim == 0: 

634 packet.hlim = self.hopLimit 

635 elif self.hlim == 0x1: 

636 packet.hlim = 1 

637 elif self.hlim == 0x2: 

638 packet.hlim = 64 

639 else: 

640 packet.hlim = 255 

641 

642 packet.src = self.decompressSourceAddr(packet) 

643 packet.dst = self.decompressDestAddr(packet) 

644 

645 pay_cls = self.guess_payload_class(data) 

646 if pay_cls == IPv6: 

647 packet.add_payload(data) 

648 data = raw(packet) 

649 elif pay_cls == LoWPAN_NHC: 

650 self._ipv6 = packet 

651 return Packet.post_dissect(self, data) 

652 

653 def decompressDestAddr(self, packet): 

654 # https://tools.ietf.org/html/rfc6282#section-3.1.1 

655 try: 

656 tmp_ip = inet_pton(socket.AF_INET6, self.dst) 

657 except socket.error: 

658 tmp_ip = b"\x00" * 16 

659 

660 if self.m == 0 and self.dac == 0: 

661 if self.dam == 0: 

662 # Address fully carried 

663 pass 

664 elif self.dam == 1: 

665 tmp_ip = LINK_LOCAL_PREFIX[0:8] + tmp_ip[-8:] 

666 elif self.dam == 2: 

667 tmp_ip = LINK_LOCAL_PREFIX[0:8] + b"\x00\x00\x00\xff\xfe\x00" + tmp_ip[-2:] # noqa: E501 

668 elif self.dam == 3: 

669 tmp_ip = _extract_upperaddress(self, source=False) 

670 

671 elif self.m == 0 and self.dac == 1: 

672 if self.dam == 0: 

673 # reserved 

674 pass 

675 elif self.dam == 0x3: 

676 # should use context IID + encapsulating header 

677 tmp_ip = _extract_upperaddress(self, source=False) 

678 elif self.dam not in [0x1, 0x2]: 

679 # https://tools.ietf.org/html/rfc6282#page-9 

680 # Should use context information: unimplemented 

681 pass 

682 elif self.m == 1 and self.dac == 0: 

683 if self.dam == 0: 

684 # Address fully carried 

685 pass 

686 elif self.dam == 1: 

687 tmp = b"\xff" + chb(tmp_ip[16 - dest_addr_size(self)]) 

688 tmp_ip = tmp + b"\x00" * 9 + tmp_ip[-5:] 

689 elif self.dam == 2: 

690 tmp = b"\xff" + chb(tmp_ip[16 - dest_addr_size(self)]) 

691 tmp_ip = tmp + b"\x00" * 11 + tmp_ip[-3:] 

692 else: # self.dam == 3: 

693 tmp_ip = b"\xff\x02" + b"\x00" * 13 + tmp_ip[-1:] 

694 elif self.m == 1 and self.dac == 1: 

695 if self.dam == 0x0: 

696 # https://tools.ietf.org/html/rfc6282#page-10 

697 # https://github.com/wireshark/wireshark/blob/f54611d1104d85a425e52c7318c522ed249916b6/epan/dissectors/packet-6lowpan.c#L2149-L2166 

698 # Format: ffXX:XXLL:PPPP:PPPP:PPPP:PPPP:XXXX:XXXX 

699 # P and L should be retrieved from context 

700 P = b"\x00" * 16 

701 L = b"\x00" 

702 X = tmp_ip[-6:] 

703 tmp_ip = b"\xff" + X[:2] + L + P[:8] + X[2:6] 

704 else: # all the others values: reserved 

705 pass 

706 

707 self.dst = inet_ntop(socket.AF_INET6, tmp_ip) 

708 return self.dst 

709 

710 def compressSourceAddr(self, ipv6): 

711 # https://tools.ietf.org/html/rfc6282#section-3.1.1 

712 tmp_ip = inet_pton(socket.AF_INET6, ipv6.src) 

713 

714 if self.sac == 0: 

715 if self.sam == 0x0: 

716 pass 

717 elif self.sam == 0x1: 

718 tmp_ip = tmp_ip[8:16] 

719 elif self.sam == 0x2: 

720 tmp_ip = tmp_ip[14:16] 

721 else: # self.sam == 0x3: 

722 pass 

723 else: # self.sac == 1 

724 if self.sam == 0x0: 

725 tmp_ip = b"\x00" * 16 

726 elif self.sam == 0x1: 

727 tmp_ip = tmp_ip[8:16] 

728 elif self.sam == 0x2: 

729 tmp_ip = tmp_ip[14:16] 

730 

731 self.src = inet_ntop(socket.AF_INET6, b"\x00" * (16 - len(tmp_ip)) + tmp_ip) # noqa: E501 

732 return self.src 

733 

734 def compressDestAddr(self, ipv6): 

735 # https://tools.ietf.org/html/rfc6282#section-3.1.1 

736 tmp_ip = inet_pton(socket.AF_INET6, ipv6.dst) 

737 

738 if self.m == 0 and self.dac == 0: 

739 if self.dam == 0x0: 

740 pass 

741 elif self.dam == 0x1: 

742 tmp_ip = b"\x00" * 8 + tmp_ip[8:16] 

743 elif self.dam == 0x2: 

744 tmp_ip = b"\x00" * 14 + tmp_ip[14:16] 

745 elif self.m == 0 and self.dac == 1: 

746 if self.dam == 0x1: 

747 tmp_ip = b"\x00" * 8 + tmp_ip[8:16] 

748 elif self.dam == 0x2: 

749 tmp_ip = b"\x00" * 14 + tmp_ip[14:16] 

750 elif self.m == 1 and self.dac == 0: 

751 if self.dam == 0x0: 

752 pass 

753 if self.dam == 0x1: 

754 tmp_ip = b"\x00" * 10 + tmp_ip[1:2] + tmp_ip[11:16] 

755 elif self.dam == 0x2: 

756 tmp_ip = b"\x00" * 12 + tmp_ip[1:2] + tmp_ip[13:16] 

757 elif self.dam == 0x3: 

758 tmp_ip = b"\x00" * 15 + tmp_ip[15:16] 

759 elif self.m == 1 and self.dac == 1: 

760 if self.dam == 0: 

761 tmp_ip = b"\x00" * 10 + tmp_ip[1:3] + tmp_ip[12:16] 

762 

763 self.dst = inet_ntop(socket.AF_INET6, tmp_ip) 

764 

765 def decompressSourceAddr(self, packet): 

766 # https://tools.ietf.org/html/rfc6282#section-3.1.1 

767 try: 

768 tmp_ip = inet_pton(socket.AF_INET6, self.src) 

769 except socket.error: 

770 tmp_ip = b"\x00" * 16 

771 

772 if self.sac == 0: 

773 if self.sam == 0x0: 

774 # Full address is carried in-line 

775 pass 

776 elif self.sam == 0x1: 

777 tmp_ip = LINK_LOCAL_PREFIX[0:8] + tmp_ip[16 - source_addr_size(self):16] # noqa: E501 

778 elif self.sam == 0x2: 

779 tmp = LINK_LOCAL_PREFIX[0:8] + b"\x00\x00\x00\xff\xfe\x00" 

780 tmp_ip = tmp + tmp_ip[16 - source_addr_size(self):16] 

781 elif self.sam == 0x3: 

782 # Taken from encapsulating header 

783 tmp_ip = _extract_upperaddress(self, source=True) 

784 else: # self.sac == 1: 

785 if self.sam == 0x0: 

786 # Unspecified address :: 

787 pass 

788 elif self.sam == 0x1: 

789 # should use context IID 

790 pass 

791 elif self.sam == 0x2: 

792 # should use context IID 

793 tmp = LINK_LOCAL_PREFIX[0:8] + b"\x00\x00\x00\xff\xfe\x00" 

794 tmp_ip = tmp + tmp_ip[16 - source_addr_size(self):16] 

795 elif self.sam == 0x3: 

796 # should use context IID 

797 tmp_ip = LINK_LOCAL_PREFIX[0:8] + b"\x00" * 8 

798 self.src = inet_ntop(socket.AF_INET6, tmp_ip) 

799 return self.src 

800 

801 def guess_payload_class(self, payload): 

802 if self.nh: 

803 return LoWPAN_NHC 

804 u = self.underlayer 

805 if u and isinstance(u, (LoWPANFragmentationFirst, 

806 LoWPANFragmentationSubsequent)): 

807 return Raw 

808 return IPv6 

809 

810 def do_build(self): 

811 _cur = self 

812 if isinstance(_cur.payload, LoWPAN_NHC): 

813 _cur = _cur.payload 

814 if not isinstance(_cur.payload, IPv6): 

815 return Packet.do_build(self) 

816 ipv6 = _cur.payload 

817 

818 self._reserved = 0x03 

819 

820 # NEW COMPRESSION TECHNIQUE! 

821 # a ) Compression Techniques 

822 

823 # 1. Set Traffic Class 

824 if self.tf == 0x0: 

825 self.tc_ecn = ipv6.tc >> 6 

826 self.tc_dscp = ipv6.tc & 0x3F 

827 self.flowlabel = ipv6.fl 

828 elif self.tf == 0x1: 

829 self.tc_ecn = ipv6.tc >> 6 

830 self.flowlabel = ipv6.fl 

831 elif self.tf == 0x2: 

832 self.tc_ecn = ipv6.tc >> 6 

833 self.tc_dscp = ipv6.tc & 0x3F 

834 else: # self.tf == 0x3: 

835 pass # no field is set 

836 

837 # 2. Next Header 

838 if self.nh == 0x0: 

839 self.nhField = ipv6.nh 

840 elif self.nh == 1: 

841 # This will be handled in LoWPAN_NHC 

842 pass 

843 

844 # 3. HLim 

845 if self.hlim == 0x0: 

846 self.hopLimit = ipv6.hlim 

847 else: # if hlim is 1, 2 or 3, there are nothing to do! 

848 pass 

849 

850 # 4. Context (which context to use...) 

851 if self.cid == 0x0: 

852 pass 

853 else: 

854 # TODO: Context Unimplemented yet 

855 pass 

856 

857 # 5. Compress Source Addr 

858 self.compressSourceAddr(ipv6) 

859 self.compressDestAddr(ipv6) 

860 

861 return Packet.do_build(self) 

862 

863 def do_build_payload(self): 

864 # Elide the IPv6 payload 

865 if isinstance(self.payload, IPv6): 

866 return raw(self.payload.payload) 

867 return Packet.do_build_payload(self) 

868 

869 def _getTrafficClassAndFlowLabel(self): 

870 """Page 6, draft feb 2011 """ 

871 if self.tf == 0x0: 

872 return (self.tc_ecn << 6) + self.tc_dscp, self.flowlabel 

873 elif self.tf == 0x1: 

874 return (self.tc_ecn << 6), self.flowlabel 

875 elif self.tf == 0x2: 

876 return (self.tc_ecn << 6) + self.tc_dscp, 0 

877 else: 

878 return 0, 0 

879 

880############## 

881# LOWPAN_NHC # 

882############## 

883 

884# https://tools.ietf.org/html/rfc6282#section-4 

885 

886 

887class LoWPAN_NHC_Hdr(Packet): 

888 @classmethod 

889 def get_next_cls(cls, s): 

890 if s and len(s) >= 2: 

891 fb = ord(s[:1]) 

892 if fb >> 3 == 0x1e: 

893 return LoWPAN_NHC_UDP 

894 if fb >> 4 == 0xe: 

895 return LoWPAN_NHC_IPv6Ext 

896 return None 

897 

898 @classmethod 

899 def dispatch_hook(cls, _pkt=b"", *args, **kargs): 

900 return LoWPAN_NHC_Hdr.get_next_cls(_pkt) or LoWPAN_NHC_Hdr 

901 

902 def extract_padding(self, s): 

903 return b"", s 

904 

905 

906class LoWPAN_NHC_UDP(LoWPAN_NHC_Hdr): 

907 fields_desc = [ 

908 BitField("res", 0x1e, 5), 

909 BitField("C", 0, 1), 

910 BitField("P", 0, 2), 

911 MultipleTypeField( 

912 [(BitField("udpSourcePort", 0, 16), 

913 lambda pkt: pkt.P in [0, 1]), 

914 (BitField("udpSourcePort", 0, 8), 

915 lambda pkt: pkt.P == 2), 

916 (BitField("udpSourcePort", 0, 4), 

917 lambda pkt: pkt.P == 3)], 

918 BitField("udpSourcePort", 0x0, 16), 

919 ), 

920 MultipleTypeField( 

921 [(BitField("udpDestPort", 0, 16), 

922 lambda pkt: pkt.P in [0, 2]), 

923 (BitField("udpDestPort", 0, 8), 

924 lambda pkt: pkt.P == 1), 

925 (BitField("udpDestPort", 0, 4), 

926 lambda pkt: pkt.P == 3)], 

927 BitField("udpDestPort", 0x0, 16), 

928 ), 

929 ConditionalField( 

930 XShortField("udpChecksum", 0x0), 

931 lambda pkt: pkt.C == 0 

932 ), 

933 ] 

934 

935 

936_lowpan_nhc_ipv6ext_eid = { 

937 0: "Hop-by-hop Options Header", 

938 1: "IPv6 Routing Header", 

939 2: "IPv6 Fragment Header", 

940 3: "IPv6 Destination Options Header", 

941 4: "IPv6 Mobility Header", 

942 7: "IPv6 Header", 

943} 

944 

945 

946class LoWPAN_NHC_IPv6Ext(LoWPAN_NHC_Hdr): 

947 fields_desc = [ 

948 BitField("res", 0xe, 4), 

949 BitEnumField("eid", 0, 3, _lowpan_nhc_ipv6ext_eid), 

950 BitField("nh", 0, 1), 

951 ConditionalField( 

952 ByteField("nhField", 0), 

953 lambda pkt: pkt.nh == 0 

954 ), 

955 FieldLenField("len", None, length_of="data", fmt="B"), 

956 StrFixedLenField("data", b"", length_from=lambda pkt: pkt.len) 

957 ] 

958 

959 def post_build(self, p, pay): 

960 if self.len is None: 

961 offs = (not self.nh) + 1 

962 p = p[:offs] + struct.pack("!B", len(p) - offs) + p[offs + 1:] 

963 return p + pay 

964 

965 

966class LoWPAN_NHC(Packet): 

967 name = "LOWPAN_NHC" 

968 fields_desc = [ 

969 PacketListField( 

970 "exts", [], pkt_cls=LoWPAN_NHC_Hdr, 

971 next_cls_cb=lambda *s: LoWPAN_NHC_Hdr.get_next_cls(s[3]) 

972 ) 

973 ] 

974 

975 def post_dissect(self, data): 

976 if not self.underlayer or not hasattr(self.underlayer, "_ipv6"): 

977 return data 

978 if self.guess_payload_class(data) != IPv6: 

979 return data 

980 # Underlayer is LoWPAN_IPHC 

981 packet = self.underlayer._ipv6 

982 try: 

983 ipv6_hdr = next( 

984 x for x in self.exts if isinstance(x, LoWPAN_NHC_IPv6Ext) 

985 ) 

986 except StopIteration: 

987 ipv6_hdr = None 

988 if ipv6_hdr: 

989 # XXX todo: implement: append the IPv6 extension 

990 # packet = packet / ipv6extension 

991 pass 

992 try: 

993 udp_hdr = next( 

994 x for x in self.exts if isinstance(x, LoWPAN_NHC_UDP) 

995 ) 

996 except StopIteration: 

997 udp_hdr = None 

998 if udp_hdr: 

999 packet.nh = 0x11 # UDP 

1000 udp = UDP() 

1001 # https://tools.ietf.org/html/rfc6282#section-4.3.3 

1002 if udp_hdr.C == 0: 

1003 udp.chksum = udp_hdr.udpChecksum 

1004 if udp_hdr.P == 0: 

1005 udp.sport = udp_hdr.udpSourcePort 

1006 udp.dport = udp_hdr.udpDestPort 

1007 elif udp_hdr.P == 1: 

1008 udp.sport = udp_hdr.udpSourcePort 

1009 udp.dport = 0xF000 + udp_hdr.udpDestPort 

1010 elif udp_hdr.P == 2: 

1011 udp.sport = 0xF000 + udp_hdr.udpSourcePort 

1012 udp.dport = udp_hdr.udpDestPort 

1013 elif udp_hdr.P == 3: 

1014 udp.sport = 0xF0B0 + udp_hdr.udpSourcePort 

1015 udp.dport = 0xF0B0 + udp_hdr.udpDestPort 

1016 packet.lastlayer().add_payload(udp / data) 

1017 else: 

1018 packet.lastlayer().add_payload(data) 

1019 data = raw(packet) 

1020 return Packet.post_dissect(self, data) 

1021 

1022 def do_build(self): 

1023 if not isinstance(self.payload, IPv6): 

1024 return Packet.do_build(self) 

1025 pay = self.payload.payload 

1026 while pay and isinstance(pay.payload, _IPv6ExtHdr): 

1027 # XXX todo: populate a LoWPAN_NHC_IPv6Ext 

1028 pay = pay.payload 

1029 if isinstance(pay, UDP): 

1030 try: 

1031 udp_hdr = next( 

1032 x for x in self.exts if isinstance(x, LoWPAN_NHC_UDP) 

1033 ) 

1034 except StopIteration: 

1035 udp_hdr = LoWPAN_NHC_UDP() 

1036 # Guess best compression 

1037 if pay.sport >> 4 == 0xf0b and pay.dport >> 4 == 0xf0b: 

1038 udp_hdr.P = 3 

1039 elif pay.sport >> 8 == 0xf0: 

1040 udp_hdr.P = 2 

1041 elif pay.dport >> 8 == 0xf0: 

1042 udp_hdr.P = 1 

1043 self.exts.insert(0, udp_hdr) 

1044 # https://tools.ietf.org/html/rfc6282#section-4.3.3 

1045 if udp_hdr.P == 0: 

1046 udp_hdr.udpSourcePort = pay.sport 

1047 udp_hdr.udpDestPort = pay.dport 

1048 elif udp_hdr.P == 1: 

1049 udp_hdr.udpSourcePort = pay.sport 

1050 udp_hdr.udpDestPort = pay.dport & 255 

1051 elif udp_hdr.P == 2: 

1052 udp_hdr.udpSourcePort = pay.sport & 255 

1053 udp_hdr.udpDestPort = pay.dport 

1054 elif udp_hdr.P == 3: 

1055 udp_hdr.udpSourcePort = pay.sport & 15 

1056 udp_hdr.udpDestPort = pay.dport & 15 

1057 if udp_hdr.C == 0: 

1058 if pay.chksum: 

1059 udp_hdr.udpChecksum = pay.chksum 

1060 else: 

1061 udp_hdr.udpChecksum = UDP(raw(pay)).chksum 

1062 return Packet.do_build(self) 

1063 

1064 def do_build_payload(self): 

1065 # Elide IPv6 payload, extensions and UDP 

1066 if isinstance(self.payload, IPv6): 

1067 cur = self.payload 

1068 while cur and isinstance(cur, (IPv6, UDP)): 

1069 cur = cur.payload 

1070 return raw(cur) 

1071 return Packet.do_build_payload(self) 

1072 

1073 def guess_payload_class(self, payload): 

1074 if self.underlayer: 

1075 u = self.underlayer.underlayer 

1076 if isinstance(u, (LoWPANFragmentationFirst, 

1077 LoWPANFragmentationSubsequent)): 

1078 return Raw 

1079 return IPv6 

1080 

1081 

1082###################### 

1083# 6LowPan Dispatcher # 

1084###################### 

1085 

1086# https://tools.ietf.org/html/rfc4944#section-5.1 

1087 

1088class SixLoWPAN_ESC(Packet): 

1089 name = "SixLoWPAN Dispatcher ESC" 

1090 fields_desc = [ByteField("dispatch", 0)] 

1091 

1092 

1093class SixLoWPAN(Packet): 

1094 name = "SixLoWPAN Dispatcher" 

1095 

1096 @classmethod 

1097 def dispatch_hook(cls, _pkt=b"", *args, **kargs): 

1098 """Depending on the payload content, the frame type we should interpret""" 

1099 if _pkt and len(_pkt) >= 1: 

1100 fb = ord(_pkt[:1]) 

1101 if fb == 0x41: 

1102 return LoWPANUncompressedIPv6 

1103 if fb == 0x42: 

1104 return LoWPAN_HC1 

1105 if fb == 0x50: 

1106 return LoWPANBroadcast 

1107 if fb == 0x7f: 

1108 return SixLoWPAN_ESC 

1109 if fb >> 3 == 0x18: 

1110 return LoWPANFragmentationFirst 

1111 if fb >> 3 == 0x1C: 

1112 return LoWPANFragmentationSubsequent 

1113 if fb >> 6 == 0x02: 

1114 return LoWPANMesh 

1115 if fb >> 6 == 0x01: 

1116 return LoWPAN_IPHC 

1117 return cls 

1118 

1119 

1120################# 

1121# Fragmentation # 

1122################# 

1123 

1124# fragmentate IPv6 

1125MAX_SIZE = 96 

1126 

1127 

1128def sixlowpan_fragment(packet, datagram_tag=1): 

1129 """Split a packet into different links to transmit as 6lowpan packets. 

1130 Usage example:: 

1131 

1132 >>> ipv6 = ..... (very big packet) 

1133 >>> pkts = sixlowpan_fragment(ipv6, datagram_tag=0x17) 

1134 >>> send = [Dot15d4()/Dot15d4Data()/x for x in pkts] 

1135 >>> wireshark(send) 

1136 """ 

1137 if not packet.haslayer(IPv6): 

1138 raise Exception("SixLoWPAN only fragments IPv6 packets !") 

1139 

1140 str_packet = raw(packet[IPv6]) 

1141 

1142 if len(str_packet) <= MAX_SIZE: 

1143 return [packet] 

1144 

1145 def chunks(li, n): 

1146 return [li[i:i + n] for i in range(0, len(li), n)] 

1147 

1148 new_packet = chunks(str_packet, MAX_SIZE) 

1149 

1150 new_packet[0] = LoWPANFragmentationFirst(datagramTag=datagram_tag, datagramSize=len(str_packet)) / new_packet[0] # noqa: E501 

1151 i = 1 

1152 while i < len(new_packet): 

1153 new_packet[i] = LoWPANFragmentationSubsequent(datagramTag=datagram_tag, datagramSize=len(str_packet), datagramOffset=MAX_SIZE // 8 * i) / new_packet[i] # noqa: E501 

1154 i += 1 

1155 

1156 return new_packet 

1157 

1158 

1159def sixlowpan_defragment(packet_list): 

1160 results = {} 

1161 for p in packet_list: 

1162 cls = None 

1163 if LoWPANFragmentationFirst in p: 

1164 cls = LoWPANFragmentationFirst 

1165 elif LoWPANFragmentationSubsequent in p: 

1166 cls = LoWPANFragmentationSubsequent 

1167 if cls: 

1168 tag = p[cls].datagramTag 

1169 results[tag] = results.get(tag, b"") + p[cls].payload.load # noqa: E501 

1170 return {tag: SixLoWPAN(x) for tag, x in results.items()} 

1171 

1172############ 

1173# Bindings # 

1174############ 

1175 

1176 

1177bind_layers(LoWPAN_HC1, IPv6) 

1178 

1179bind_top_down(LoWPAN_IPHC, LoWPAN_NHC, nh=1) 

1180bind_layers(LoWPANFragmentationFirst, SixLoWPAN) 

1181bind_layers(LoWPANMesh, SixLoWPAN) 

1182bind_layers(LoWPANBroadcast, SixLoWPAN) 

1183 

1184bind_layers(Ether, SixLoWPAN, type=0xA0ED)