1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Cesar A. Bernardini <mesarpe@gmail.com>
5# Intern at INRIA Grand Nancy Est
6# Copyright (C) Gabriel Potter <gabriel[]potter[]fr>
7"""
86LoWPAN Protocol Stack
9======================
10
11This implementation follows the next documents:
12
13- Transmission of IPv6 Packets over IEEE 802.15.4 Networks: RFC 4944
14- Compression Format for IPv6 Datagrams in Low Power and Lossy
15 networks (6LoWPAN): RFC 6282
16- RFC 4291
17
18+----------------------------+-----------------------+
19| Application | Application Protocols |
20+----------------------------+------------+----------+
21| Transport | UDP | TCP |
22+----------------------------+------------+----------+
23| Network | IPv6 |
24+----------------------------+-----------------------+
25| | LoWPAN |
26+----------------------------+-----------------------+
27| Data Link Layer | IEEE 802.15.4 MAC |
28+----------------------------+-----------------------+
29| Physical | IEEE 802.15.4 PHY |
30+----------------------------+-----------------------+
31
32Note that:
33
34 - Only IPv6 is supported
35 - LoWPAN is in the middle between network and data link layer
36
37The Internet Control Message protocol v6 (ICMPv6) is used for control
38messaging.
39
40Adaptation between full IPv6 and the LoWPAN format is performed by routers at
41the edge of 6LoWPAN islands.
42
43A LoWPAN support addressing; a direct mapping between the link-layer address
44and the IPv6 address is used for achieving compression.
45
46Known Issues:
47 * Unimplemented context information
48 * Unimplemented IPv6 extensions fields
49"""
50
51import socket
52import struct
53
54from scapy.compat import chb, orb, raw
55from scapy.data import ETHER_TYPES
56
57from scapy.packet import Packet, bind_layers, bind_top_down
58from scapy.fields import (
59 BitEnumField,
60 BitField,
61 BitLenField,
62 BitScalingField,
63 ByteEnumField,
64 ByteField,
65 ConditionalField,
66 FieldLenField,
67 MultipleTypeField,
68 PacketField,
69 PacketListField,
70 StrFixedLenField,
71 XBitField,
72 XLongField,
73 XShortField,
74)
75
76from scapy.layers.dot15d4 import Dot15d4Data
77from scapy.layers.inet6 import (
78 IP6Field,
79 IPv6,
80 _IPv6ExtHdr,
81 ipv6nh,
82)
83from scapy.layers.inet import UDP
84from scapy.layers.l2 import Ether
85
86from scapy.utils import mac2str
87from scapy.config import conf
88from scapy.error import warning
89
90from scapy.packet import Raw
91from scapy.pton_ntop import inet_pton, inet_ntop
92from scapy.volatile import RandShort
93
94ETHER_TYPES[0xA0ED] = "6LoWPAN"
95
96LINK_LOCAL_PREFIX = b"\xfe\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" # noqa: E501
97
98
99##########
100# Fields #
101##########
102
103
104class IP6FieldLenField(IP6Field):
105 __slots__ = ["length_of"]
106
107 def __init__(self, name, default, length_of=None):
108 IP6Field.__init__(self, name, default)
109 self.length_of = length_of
110
111 def addfield(self, pkt, s, val):
112 """Add an internal value to a string"""
113 tmp_len = self.length_of(pkt)
114 if tmp_len == 0:
115 return s
116 internal = self.i2m(pkt, val)[-tmp_len:]
117 return s + struct.pack("!%ds" % tmp_len, internal)
118
119 def getfield(self, pkt, s):
120 tmp_len = self.length_of(pkt)
121 assert tmp_len >= 0 and tmp_len <= 16
122 if tmp_len <= 0:
123 return s, b""
124 return (s[tmp_len:],
125 self.m2i(pkt, b"\x00" * (16 - tmp_len) + s[:tmp_len]))
126
127
128#################
129# Basic 6LoWPAN #
130#################
131# https://tools.ietf.org/html/rfc4944
132
133
134class LoWPANUncompressedIPv6(Packet):
135 name = "6LoWPAN Uncompressed IPv6"
136 fields_desc = [
137 BitField("_type", 0x41, 8)
138 ]
139
140 def default_payload_class(self, pay):
141 return IPv6
142
143# https://tools.ietf.org/html/rfc4944#section-5.2
144
145
146class LoWPANMesh(Packet):
147 name = "6LoWPAN Mesh Packet"
148 deprecated_fields = {
149 "_v": ("v", "2.4.4"),
150 "_f": ("f", "2.4.4"),
151 "_sourceAddr": ("src", "2.4.4"),
152 "_destinyAddr": ("dst", "2.4.4"),
153 }
154 fields_desc = [
155 BitField("reserved", 0x2, 2),
156 BitEnumField("v", 0x0, 1, ["EUI-64", "Short"]),
157 BitEnumField("f", 0x0, 1, ["EUI-64", "Short"]),
158 BitField("hopsLeft", 0x0, 4),
159 MultipleTypeField(
160 [(XShortField("src", 0x0), lambda pkt: pkt.v == 1)],
161 XLongField("src", 0x0)
162 ),
163 MultipleTypeField(
164 [(XShortField("dst", 0x0), lambda pkt: pkt.v == 1)],
165 XLongField("dst", 0x0)
166 )
167 ]
168
169
170# https://tools.ietf.org/html/rfc4944#section-10.1
171# This implementation is NOT RECOMMENDED according to RFC 6282
172
173
174class LoWPAN_HC2_UDP(Packet):
175 name = "6LoWPAN HC1 UDP encoding"
176 fields_desc = [
177 BitEnumField("sc", 0, 1, ["In-line", "Compressed"]),
178 BitEnumField("dc", 0, 1, ["In-line", "Compressed"]),
179 BitEnumField("lc", 0, 1, ["In-line", "Compressed"]),
180 BitField("res", 0, 5),
181 ]
182
183 def default_payload_class(self, payload):
184 return conf.padding_layer
185
186
187def _get_hc1_pad(pkt):
188 """
189 Get LoWPAN_HC1 padding
190
191 LoWPAN_HC1 is not recommended for several reasons, one
192 of them being that padding is a mess (not 8-bit regular)
193 We therefore add padding bits that are not in the spec to restore
194 8-bit parity. Wireshark seems to agree
195 """
196 length = 0 # in bits, of the fields that are not //8
197 if not pkt.tc_fl:
198 length += 20
199 if pkt.hc2:
200 if pkt.nh == 1:
201 length += pkt.hc2Field.sc * 4
202 length += pkt.hc2Field.dc * 4
203 return (-length) % 8
204
205
206class LoWPAN_HC1(Packet):
207 name = "LoWPAN_HC1 Compressed IPv6"
208 fields_desc = [
209 # https://tools.ietf.org/html/rfc4944#section-10.1
210 ByteField("reserved", 0x42),
211 BitEnumField("sp", 0, 1, ["In-line", "Compressed"]),
212 BitEnumField("si", 0, 1, ["In-line", "Elided"]),
213 BitEnumField("dp", 0, 1, ["In-line", "Compressed"]),
214 BitEnumField("di", 0, 1, ["In-line", "Elided"]),
215 BitEnumField("tc_fl", 0, 1, ["Not compressed", "zero"]),
216 BitEnumField("nh", 0, 2, {0: "not compressed",
217 1: "UDP",
218 2: "ICMP",
219 3: "TCP"}),
220 BitEnumField("hc2", 0, 1, ["No more header compression bits",
221 "HC2 Present"]),
222 # https://tools.ietf.org/html/rfc4944#section-10.2
223 ConditionalField(
224 MultipleTypeField(
225 [
226 (PacketField("hc2Field", LoWPAN_HC2_UDP(),
227 LoWPAN_HC2_UDP),
228 lambda pkt: pkt.nh == 1),
229 # TODO: ICMP & TCP not implemented yet for HC1
230 # (PacketField("hc2Field", LoWPAN_HC2_ICMP(),
231 # LoWPAN_HC2_ICMP),
232 # lambda pkt: pkt.nh == 2),
233 # (PacketField("hc2Field", LoWPAN_HC2_TCP(),
234 # LoWPAN_HC2_TCP),
235 # lambda pkt: pkt.nh == 3),
236 ],
237 StrFixedLenField("hc2Field", b"", 0),
238 ),
239 lambda pkt: pkt.hc2
240 ),
241 # IPv6 header fields
242 # https://tools.ietf.org/html/rfc4944#section-10.3.1
243 ByteField("hopLimit", 0x0),
244 IP6FieldLenField("src", "::",
245 lambda pkt: (0 if pkt.sp else 8) +
246 (0 if pkt.si else 8)),
247 IP6FieldLenField("dst", "::",
248 lambda pkt: (0 if pkt.dp else 8) +
249 (0 if pkt.di else 8)),
250 ConditionalField(
251 ByteField("traffic_class", 0),
252 lambda pkt: not pkt.tc_fl
253 ),
254 ConditionalField(
255 BitField("flow_label", 0, 20),
256 lambda pkt: not pkt.tc_fl
257 ),
258 # Other fields
259 # https://tools.ietf.org/html/rfc4944#section-10.3.2
260 ConditionalField(
261 MultipleTypeField(
262 [(BitScalingField("udpSourcePort", 0, 4, offset=0xF0B0),
263 lambda pkt: getattr(pkt.hc2Field, "sc", 0))],
264 BitField("udpSourcePort", 0, 16)
265 ),
266 lambda pkt: pkt.nh == 1 and pkt.hc2
267 ),
268 ConditionalField(
269 MultipleTypeField(
270 [(BitScalingField("udpDestPort", 0, 4, offset=0xF0B0),
271 lambda pkt: getattr(pkt.hc2Field, "dc", 0))],
272 BitField("udpDestPort", 0, 16)
273 ),
274 lambda pkt: pkt.nh == 1 and pkt.hc2
275 ),
276 ConditionalField(
277 BitField("udpLength", 0, 16),
278 lambda pkt: pkt.nh == 1 and pkt.hc2 and not pkt.hc2Field.lc
279 ),
280 ConditionalField(
281 XBitField("udpChecksum", 0, 16),
282 lambda pkt: pkt.nh == 1 and pkt.hc2
283 ),
284 # Out of spec
285 BitLenField("pad", 0, _get_hc1_pad)
286 ]
287
288 def post_dissect(self, data):
289 # uncompress payload
290 packet = IPv6()
291 packet.version = IPHC_DEFAULT_VERSION
292 packet.tc = self.traffic_class
293 packet.fl = self.flow_label
294 nh_match = {
295 1: socket.IPPROTO_UDP,
296 2: socket.IPPROTO_ICMP,
297 3: socket.IPPROTO_TCP
298 }
299 if self.nh:
300 packet.nh = nh_match.get(self.nh)
301 packet.hlim = self.hopLimit
302
303 packet.src = self.decompressSourceAddr()
304 packet.dst = self.decompressDestAddr()
305
306 if self.hc2 and self.nh == 1: # UDP
307 udp = UDP()
308 udp.sport = self.udpSourcePort
309 udp.dport = self.udpDestPort
310 udp.len = self.udpLength or None
311 udp.chksum = self.udpChecksum
312 udp.add_payload(data)
313 packet.add_payload(udp)
314 else:
315 packet.add_payload(data)
316 data = raw(packet)
317 return Packet.post_dissect(self, data)
318
319 def decompressSourceAddr(self):
320 if not self.sp and not self.si:
321 # Prefix & Interface
322 return self.src
323 elif not self.si:
324 # Only interface
325 addr = inet_pton(socket.AF_INET6, self.src)[-8:]
326 addr = LINK_LOCAL_PREFIX[:8] + addr
327 else:
328 # Interface not provided
329 addr = _extract_upperaddress(self, source=True)
330 self.src = inet_ntop(socket.AF_INET6, addr)
331 return self.src
332
333 def decompressDestAddr(self):
334 if not self.dp and not self.di:
335 # Prefix & Interface
336 return self.dst
337 elif not self.di:
338 # Only interface
339 addr = inet_pton(socket.AF_INET6, self.dst)[-8:]
340 addr = LINK_LOCAL_PREFIX[:8] + addr
341 else:
342 # Interface not provided
343 addr = _extract_upperaddress(self, source=False)
344 self.dst = inet_ntop(socket.AF_INET6, addr)
345 return self.dst
346
347 def do_build(self):
348 if not isinstance(self.payload, IPv6):
349 return Packet.do_build(self)
350 # IPv6
351 ipv6 = self.payload
352 self.src = ipv6.src
353 self.dst = ipv6.dst
354 self.flow_label = ipv6.fl
355 self.traffic_class = ipv6.tc
356 self.hopLimit = ipv6.hlim
357 if isinstance(ipv6.payload, UDP):
358 self.nh = 1
359 self.hc2 = 1
360 udp = ipv6.payload
361 self.udpSourcePort = udp.sport
362 self.udpDestPort = udp.dport
363 if not udp.len or not udp.chksum:
364 udp = UDP(raw(udp))
365 self.udpLength = udp.len
366 self.udpChecksum = udp.chksum
367 return Packet.do_build(self)
368
369 def do_build_payload(self):
370 # Elide the IPv6 and UDP payload
371 if isinstance(self.payload, IPv6):
372 if isinstance(self.payload.payload, UDP):
373 return raw(self.payload.payload.payload)
374 return raw(self.payload.payload)
375 return Packet.do_build_payload(self)
376
377# https://tools.ietf.org/html/rfc4944#section-5.3
378
379
380class LoWPANFragmentationFirst(Packet):
381 name = "6LoWPAN First Fragmentation Packet"
382 fields_desc = [
383 BitField("reserved", 0x18, 5),
384 BitField("datagramSize", 0x0, 11),
385 XShortField("datagramTag", 0x0),
386 ]
387
388
389class LoWPANFragmentationSubsequent(Packet):
390 name = "6LoWPAN Subsequent Fragmentation Packet"
391 fields_desc = [
392 BitField("reserved", 0x1C, 5),
393 BitField("datagramSize", 0x0, 11),
394 XShortField("datagramTag", RandShort()),
395 ByteField("datagramOffset", 0x0), # VALUE PRINTED IN OCTETS, wireshark does in bits (128 bits == 16 octets) # noqa: E501
396 ]
397
398
399# https://tools.ietf.org/html/rfc4944#section-11.1
400
401class LoWPANBroadcast(Packet):
402 name = "6LoWPAN Broadcast"
403 fields_desc = [
404 ByteField("reserved", 0x50),
405 ByteField("seq", 0)
406 ]
407
408
409#########################
410# LoWPAN_IPHC (RFC6282) #
411#########################
412
413
414IPHC_DEFAULT_VERSION = 6
415IPHC_DEFAULT_TF = 0
416IPHC_DEFAULT_FL = 0
417
418
419def source_addr_size(pkt):
420 """Source address size
421
422 This function depending on the arguments returns the amount of bits to be
423 used by the source address.
424
425 Keyword arguments:
426 pkt -- packet object instance
427 """
428 if pkt.sac == 0x0:
429 if pkt.sam == 0x0:
430 return 16
431 elif pkt.sam == 0x1:
432 return 8
433 elif pkt.sam == 0x2:
434 return 2
435 elif pkt.sam == 0x3:
436 return 0
437 else:
438 if pkt.sam == 0x0:
439 return 0
440 elif pkt.sam == 0x1:
441 return 8
442 elif pkt.sam == 0x2:
443 return 2
444 elif pkt.sam == 0x3:
445 return 0
446
447
448def dest_addr_size(pkt):
449 """Destination address size
450
451 This function depending on the arguments returns the amount of bits to be
452 used by the destination address.
453
454 Keyword arguments:
455 pkt -- packet object instance
456 """
457 if pkt.m == 0 and pkt.dac == 0:
458 if pkt.dam == 0x0:
459 return 16
460 elif pkt.dam == 0x1:
461 return 8
462 elif pkt.dam == 0x2:
463 return 2
464 else:
465 return 0
466 elif pkt.m == 0 and pkt.dac == 1:
467 if pkt.dam == 0x0:
468 # reserved
469 return 0
470 elif pkt.dam == 0x1:
471 return 8
472 elif pkt.dam == 0x2:
473 return 2
474 else:
475 return 0
476 elif pkt.m == 1 and pkt.dac == 0:
477 if pkt.dam == 0x0:
478 return 16
479 elif pkt.dam == 0x1:
480 return 6
481 elif pkt.dam == 0x2:
482 return 4
483 elif pkt.dam == 0x3:
484 return 1
485 elif pkt.m == 1 and pkt.dac == 1:
486 if pkt.dam == 0x0:
487 return 6
488 elif pkt.dam == 0x1:
489 # reserved
490 return 0
491 elif pkt.dam == 0x2:
492 # reserved
493 return 0
494 elif pkt.dam == 0x3:
495 # reserved
496 return 0
497
498
499def _extract_upperaddress(pkt, source=True):
500 """This function extracts the source/destination address of a 6LoWPAN
501 from its upper layer.
502
503 (Upper layer could be 802.15.4 data, Ethernet...)
504
505 params:
506 - source: if True, the address is the source one. Otherwise, it is the
507 destination.
508 returns: (upper_address, ipv6_address)
509 """
510 # https://tools.ietf.org/html/rfc6282#section-3.2.2
511 SUPPORTED_LAYERS = (Ether, Dot15d4Data)
512 underlayer = pkt.underlayer
513 while underlayer and not isinstance(underlayer, SUPPORTED_LAYERS):
514 underlayer = underlayer.underlayer
515 # Extract and process address
516 if type(underlayer) == Ether:
517 addr = mac2str(underlayer.src if source else underlayer.dst)
518 # https://tools.ietf.org/html/rfc2464#section-4
519 return LINK_LOCAL_PREFIX[:8] + addr[:3] + b"\xff\xfe" + addr[3:]
520 elif type(underlayer) == Dot15d4Data:
521 addr = underlayer.src_addr if source else underlayer.dest_addr
522 addr = struct.pack(">Q", addr)
523 if underlayer.underlayer.fcf_destaddrmode == 3: # Extended/long
524 tmp_ip = LINK_LOCAL_PREFIX[0:8] + addr
525 # Turn off the bit 7.
526 return tmp_ip[0:8] + struct.pack("B", (orb(tmp_ip[8]) ^ 0x2)) + tmp_ip[9:16] # noqa: E501
527 elif underlayer.underlayer.fcf_destaddrmode == 2: # Short
528 return (
529 LINK_LOCAL_PREFIX[0:8] +
530 b"\x00\x00\x00\xff\xfe\x00" +
531 addr[6:]
532 )
533 else:
534 # Most of the times, it's necessary the IEEE 802.15.4 data to extract
535 # this address, sometimes another layer.
536 warning(
537 'Unimplemented: Unsupported upper layer: %s' % type(underlayer)
538 )
539 return b"\x00" * 16
540
541
542class LoWPAN_IPHC(Packet):
543 """6LoWPAN IPv6 header compressed packets
544
545 It follows the implementation of RFC6282
546 """
547 __slots__ = ["_ipv6"]
548 # the LOWPAN_IPHC encoding utilizes 13 bits, 5 dispatch type
549 name = "LoWPAN IP Header Compression Packet"
550 _address_modes = ["Unspecified (0)", "1", "16-bits inline (3)",
551 "Compressed (3)"]
552 _state_mode = ["Stateless (0)", "Stateful (1)"]
553 deprecated_fields = {
554 "_nhField": ("nhField", "2.4.4"),
555 "_hopLimit": ("hopLimit", "2.4.4"),
556 "sourceAddr": ("src", "2.4.4"),
557 "destinyAddr": ("dst", "2.4.4"),
558 "udpDestinyPort": ("udpDestPort", "2.4.4"),
559 }
560 fields_desc = [
561 # Base Format https://tools.ietf.org/html/rfc6282#section-3.1.2
562 BitField("_reserved", 0x03, 3),
563 BitField("tf", 0x0, 2),
564 BitEnumField("nh", 0x0, 1, ["Inline", "Compressed"]),
565 BitEnumField("hlim", 0x0, 2, {0: "Inline",
566 1: "Compressed/HL1",
567 2: "Compressed/HL64",
568 3: "Compressed/HL255"}),
569 BitEnumField("cid", 0x0, 1, {1: "Present (1)"}),
570 BitEnumField("sac", 0x0, 1, _state_mode),
571 BitEnumField("sam", 0x0, 2, _address_modes),
572 BitEnumField("m", 0x0, 1, {1: "multicast (1)"}),
573 BitEnumField("dac", 0x0, 1, _state_mode),
574 BitEnumField("dam", 0x0, 2, _address_modes),
575 # https://tools.ietf.org/html/rfc6282#section-3.1.2
576 # Context Identifier Extension
577 ConditionalField(
578 BitField("sci", 0, 4),
579 lambda pkt: pkt.cid == 0x1
580 ),
581 ConditionalField(
582 BitField("dci", 0, 4),
583 lambda pkt: pkt.cid == 0x1
584 ),
585 # https://tools.ietf.org/html/rfc6282#section-3.2.1
586 ConditionalField(
587 BitField("tc_ecn", 0, 2),
588 lambda pkt: pkt.tf in [0, 1, 2]
589 ),
590 ConditionalField(
591 BitField("tc_dscp", 0, 6),
592 lambda pkt: pkt.tf in [0, 2],
593 ),
594 ConditionalField(
595 MultipleTypeField(
596 [(BitField("rsv", 0, 4), lambda pkt: pkt.tf == 0)],
597 BitField("rsv", 0, 2),
598 ),
599 lambda pkt: pkt.tf in [0, 1]
600 ),
601 ConditionalField(
602 BitField("flowlabel", 0, 20),
603 lambda pkt: pkt.tf in [0, 1]
604 ),
605 # Inline fields https://tools.ietf.org/html/rfc6282#section-3.1.1
606 ConditionalField(
607 ByteEnumField("nhField", 0x0, ipv6nh),
608 lambda pkt: pkt.nh == 0x0
609 ),
610 ConditionalField(
611 ByteField("hopLimit", 0x0),
612 lambda pkt: pkt.hlim == 0x0
613 ),
614 # The src and dst fields are filled up or removed in the
615 # pre_dissect and post_build, depending on the other options.
616 IP6FieldLenField("src", "::", length_of=source_addr_size),
617 IP6FieldLenField("dst", "::", length_of=dest_addr_size), # problem when it's 0 # noqa: E501
618 ]
619
620 def post_dissect(self, data):
621 """dissect the IPv6 package compressed into this IPHC packet.
622
623 The packet payload needs to be decompressed and depending on the
624 arguments, several conversions should be done.
625 """
626
627 # uncompress payload
628 packet = IPv6()
629 packet.tc, packet.fl = self._getTrafficClassAndFlowLabel()
630 if not self.nh:
631 packet.nh = self.nhField
632 # HLIM: Hop Limit
633 if self.hlim == 0:
634 packet.hlim = self.hopLimit
635 elif self.hlim == 0x1:
636 packet.hlim = 1
637 elif self.hlim == 0x2:
638 packet.hlim = 64
639 else:
640 packet.hlim = 255
641
642 packet.src = self.decompressSourceAddr(packet)
643 packet.dst = self.decompressDestAddr(packet)
644
645 pay_cls = self.guess_payload_class(data)
646 if pay_cls == IPv6:
647 packet.add_payload(data)
648 data = raw(packet)
649 elif pay_cls == LoWPAN_NHC:
650 self._ipv6 = packet
651 return Packet.post_dissect(self, data)
652
653 def decompressDestAddr(self, packet):
654 # https://tools.ietf.org/html/rfc6282#section-3.1.1
655 try:
656 tmp_ip = inet_pton(socket.AF_INET6, self.dst)
657 except socket.error:
658 tmp_ip = b"\x00" * 16
659
660 if self.m == 0 and self.dac == 0:
661 if self.dam == 0:
662 # Address fully carried
663 pass
664 elif self.dam == 1:
665 tmp_ip = LINK_LOCAL_PREFIX[0:8] + tmp_ip[-8:]
666 elif self.dam == 2:
667 tmp_ip = LINK_LOCAL_PREFIX[0:8] + b"\x00\x00\x00\xff\xfe\x00" + tmp_ip[-2:] # noqa: E501
668 elif self.dam == 3:
669 tmp_ip = _extract_upperaddress(self, source=False)
670
671 elif self.m == 0 and self.dac == 1:
672 if self.dam == 0:
673 # reserved
674 pass
675 elif self.dam == 0x3:
676 # should use context IID + encapsulating header
677 tmp_ip = _extract_upperaddress(self, source=False)
678 elif self.dam not in [0x1, 0x2]:
679 # https://tools.ietf.org/html/rfc6282#page-9
680 # Should use context information: unimplemented
681 pass
682 elif self.m == 1 and self.dac == 0:
683 if self.dam == 0:
684 # Address fully carried
685 pass
686 elif self.dam == 1:
687 tmp = b"\xff" + chb(tmp_ip[16 - dest_addr_size(self)])
688 tmp_ip = tmp + b"\x00" * 9 + tmp_ip[-5:]
689 elif self.dam == 2:
690 tmp = b"\xff" + chb(tmp_ip[16 - dest_addr_size(self)])
691 tmp_ip = tmp + b"\x00" * 11 + tmp_ip[-3:]
692 else: # self.dam == 3:
693 tmp_ip = b"\xff\x02" + b"\x00" * 13 + tmp_ip[-1:]
694 elif self.m == 1 and self.dac == 1:
695 if self.dam == 0x0:
696 # https://tools.ietf.org/html/rfc6282#page-10
697 # https://github.com/wireshark/wireshark/blob/f54611d1104d85a425e52c7318c522ed249916b6/epan/dissectors/packet-6lowpan.c#L2149-L2166
698 # Format: ffXX:XXLL:PPPP:PPPP:PPPP:PPPP:XXXX:XXXX
699 # P and L should be retrieved from context
700 P = b"\x00" * 16
701 L = b"\x00"
702 X = tmp_ip[-6:]
703 tmp_ip = b"\xff" + X[:2] + L + P[:8] + X[2:6]
704 else: # all the others values: reserved
705 pass
706
707 self.dst = inet_ntop(socket.AF_INET6, tmp_ip)
708 return self.dst
709
710 def compressSourceAddr(self, ipv6):
711 # https://tools.ietf.org/html/rfc6282#section-3.1.1
712 tmp_ip = inet_pton(socket.AF_INET6, ipv6.src)
713
714 if self.sac == 0:
715 if self.sam == 0x0:
716 pass
717 elif self.sam == 0x1:
718 tmp_ip = tmp_ip[8:16]
719 elif self.sam == 0x2:
720 tmp_ip = tmp_ip[14:16]
721 else: # self.sam == 0x3:
722 pass
723 else: # self.sac == 1
724 if self.sam == 0x0:
725 tmp_ip = b"\x00" * 16
726 elif self.sam == 0x1:
727 tmp_ip = tmp_ip[8:16]
728 elif self.sam == 0x2:
729 tmp_ip = tmp_ip[14:16]
730
731 self.src = inet_ntop(socket.AF_INET6, b"\x00" * (16 - len(tmp_ip)) + tmp_ip) # noqa: E501
732 return self.src
733
734 def compressDestAddr(self, ipv6):
735 # https://tools.ietf.org/html/rfc6282#section-3.1.1
736 tmp_ip = inet_pton(socket.AF_INET6, ipv6.dst)
737
738 if self.m == 0 and self.dac == 0:
739 if self.dam == 0x0:
740 pass
741 elif self.dam == 0x1:
742 tmp_ip = b"\x00" * 8 + tmp_ip[8:16]
743 elif self.dam == 0x2:
744 tmp_ip = b"\x00" * 14 + tmp_ip[14:16]
745 elif self.m == 0 and self.dac == 1:
746 if self.dam == 0x1:
747 tmp_ip = b"\x00" * 8 + tmp_ip[8:16]
748 elif self.dam == 0x2:
749 tmp_ip = b"\x00" * 14 + tmp_ip[14:16]
750 elif self.m == 1 and self.dac == 0:
751 if self.dam == 0x0:
752 pass
753 if self.dam == 0x1:
754 tmp_ip = b"\x00" * 10 + tmp_ip[1:2] + tmp_ip[11:16]
755 elif self.dam == 0x2:
756 tmp_ip = b"\x00" * 12 + tmp_ip[1:2] + tmp_ip[13:16]
757 elif self.dam == 0x3:
758 tmp_ip = b"\x00" * 15 + tmp_ip[15:16]
759 elif self.m == 1 and self.dac == 1:
760 if self.dam == 0:
761 tmp_ip = b"\x00" * 10 + tmp_ip[1:3] + tmp_ip[12:16]
762
763 self.dst = inet_ntop(socket.AF_INET6, tmp_ip)
764
765 def decompressSourceAddr(self, packet):
766 # https://tools.ietf.org/html/rfc6282#section-3.1.1
767 try:
768 tmp_ip = inet_pton(socket.AF_INET6, self.src)
769 except socket.error:
770 tmp_ip = b"\x00" * 16
771
772 if self.sac == 0:
773 if self.sam == 0x0:
774 # Full address is carried in-line
775 pass
776 elif self.sam == 0x1:
777 tmp_ip = LINK_LOCAL_PREFIX[0:8] + tmp_ip[16 - source_addr_size(self):16] # noqa: E501
778 elif self.sam == 0x2:
779 tmp = LINK_LOCAL_PREFIX[0:8] + b"\x00\x00\x00\xff\xfe\x00"
780 tmp_ip = tmp + tmp_ip[16 - source_addr_size(self):16]
781 elif self.sam == 0x3:
782 # Taken from encapsulating header
783 tmp_ip = _extract_upperaddress(self, source=True)
784 else: # self.sac == 1:
785 if self.sam == 0x0:
786 # Unspecified address ::
787 pass
788 elif self.sam == 0x1:
789 # should use context IID
790 pass
791 elif self.sam == 0x2:
792 # should use context IID
793 tmp = LINK_LOCAL_PREFIX[0:8] + b"\x00\x00\x00\xff\xfe\x00"
794 tmp_ip = tmp + tmp_ip[16 - source_addr_size(self):16]
795 elif self.sam == 0x3:
796 # should use context IID
797 tmp_ip = LINK_LOCAL_PREFIX[0:8] + b"\x00" * 8
798 self.src = inet_ntop(socket.AF_INET6, tmp_ip)
799 return self.src
800
801 def guess_payload_class(self, payload):
802 if self.nh:
803 return LoWPAN_NHC
804 u = self.underlayer
805 if u and isinstance(u, (LoWPANFragmentationFirst,
806 LoWPANFragmentationSubsequent)):
807 return Raw
808 return IPv6
809
810 def do_build(self):
811 _cur = self
812 if isinstance(_cur.payload, LoWPAN_NHC):
813 _cur = _cur.payload
814 if not isinstance(_cur.payload, IPv6):
815 return Packet.do_build(self)
816 ipv6 = _cur.payload
817
818 self._reserved = 0x03
819
820 # NEW COMPRESSION TECHNIQUE!
821 # a ) Compression Techniques
822
823 # 1. Set Traffic Class
824 if self.tf == 0x0:
825 self.tc_ecn = ipv6.tc >> 6
826 self.tc_dscp = ipv6.tc & 0x3F
827 self.flowlabel = ipv6.fl
828 elif self.tf == 0x1:
829 self.tc_ecn = ipv6.tc >> 6
830 self.flowlabel = ipv6.fl
831 elif self.tf == 0x2:
832 self.tc_ecn = ipv6.tc >> 6
833 self.tc_dscp = ipv6.tc & 0x3F
834 else: # self.tf == 0x3:
835 pass # no field is set
836
837 # 2. Next Header
838 if self.nh == 0x0:
839 self.nhField = ipv6.nh
840 elif self.nh == 1:
841 # This will be handled in LoWPAN_NHC
842 pass
843
844 # 3. HLim
845 if self.hlim == 0x0:
846 self.hopLimit = ipv6.hlim
847 else: # if hlim is 1, 2 or 3, there are nothing to do!
848 pass
849
850 # 4. Context (which context to use...)
851 if self.cid == 0x0:
852 pass
853 else:
854 # TODO: Context Unimplemented yet
855 pass
856
857 # 5. Compress Source Addr
858 self.compressSourceAddr(ipv6)
859 self.compressDestAddr(ipv6)
860
861 return Packet.do_build(self)
862
863 def do_build_payload(self):
864 # Elide the IPv6 payload
865 if isinstance(self.payload, IPv6):
866 return raw(self.payload.payload)
867 return Packet.do_build_payload(self)
868
869 def _getTrafficClassAndFlowLabel(self):
870 """Page 6, draft feb 2011 """
871 if self.tf == 0x0:
872 return (self.tc_ecn << 6) + self.tc_dscp, self.flowlabel
873 elif self.tf == 0x1:
874 return (self.tc_ecn << 6), self.flowlabel
875 elif self.tf == 0x2:
876 return (self.tc_ecn << 6) + self.tc_dscp, 0
877 else:
878 return 0, 0
879
880##############
881# LOWPAN_NHC #
882##############
883
884# https://tools.ietf.org/html/rfc6282#section-4
885
886
887class LoWPAN_NHC_Hdr(Packet):
888 @classmethod
889 def get_next_cls(cls, s):
890 if s and len(s) >= 2:
891 fb = ord(s[:1])
892 if fb >> 3 == 0x1e:
893 return LoWPAN_NHC_UDP
894 if fb >> 4 == 0xe:
895 return LoWPAN_NHC_IPv6Ext
896 return None
897
898 @classmethod
899 def dispatch_hook(cls, _pkt=b"", *args, **kargs):
900 return LoWPAN_NHC_Hdr.get_next_cls(_pkt) or LoWPAN_NHC_Hdr
901
902 def extract_padding(self, s):
903 return b"", s
904
905
906class LoWPAN_NHC_UDP(LoWPAN_NHC_Hdr):
907 fields_desc = [
908 BitField("res", 0x1e, 5),
909 BitField("C", 0, 1),
910 BitField("P", 0, 2),
911 MultipleTypeField(
912 [(BitField("udpSourcePort", 0, 16),
913 lambda pkt: pkt.P in [0, 1]),
914 (BitField("udpSourcePort", 0, 8),
915 lambda pkt: pkt.P == 2),
916 (BitField("udpSourcePort", 0, 4),
917 lambda pkt: pkt.P == 3)],
918 BitField("udpSourcePort", 0x0, 16),
919 ),
920 MultipleTypeField(
921 [(BitField("udpDestPort", 0, 16),
922 lambda pkt: pkt.P in [0, 2]),
923 (BitField("udpDestPort", 0, 8),
924 lambda pkt: pkt.P == 1),
925 (BitField("udpDestPort", 0, 4),
926 lambda pkt: pkt.P == 3)],
927 BitField("udpDestPort", 0x0, 16),
928 ),
929 ConditionalField(
930 XShortField("udpChecksum", 0x0),
931 lambda pkt: pkt.C == 0
932 ),
933 ]
934
935
936_lowpan_nhc_ipv6ext_eid = {
937 0: "Hop-by-hop Options Header",
938 1: "IPv6 Routing Header",
939 2: "IPv6 Fragment Header",
940 3: "IPv6 Destination Options Header",
941 4: "IPv6 Mobility Header",
942 7: "IPv6 Header",
943}
944
945
946class LoWPAN_NHC_IPv6Ext(LoWPAN_NHC_Hdr):
947 fields_desc = [
948 BitField("res", 0xe, 4),
949 BitEnumField("eid", 0, 3, _lowpan_nhc_ipv6ext_eid),
950 BitField("nh", 0, 1),
951 ConditionalField(
952 ByteField("nhField", 0),
953 lambda pkt: pkt.nh == 0
954 ),
955 FieldLenField("len", None, length_of="data", fmt="B"),
956 StrFixedLenField("data", b"", length_from=lambda pkt: pkt.len)
957 ]
958
959 def post_build(self, p, pay):
960 if self.len is None:
961 offs = (not self.nh) + 1
962 p = p[:offs] + struct.pack("!B", len(p) - offs) + p[offs + 1:]
963 return p + pay
964
965
966class LoWPAN_NHC(Packet):
967 name = "LOWPAN_NHC"
968 fields_desc = [
969 PacketListField(
970 "exts", [], pkt_cls=LoWPAN_NHC_Hdr,
971 next_cls_cb=lambda *s: LoWPAN_NHC_Hdr.get_next_cls(s[3])
972 )
973 ]
974
975 def post_dissect(self, data):
976 if not self.underlayer or not hasattr(self.underlayer, "_ipv6"):
977 return data
978 if self.guess_payload_class(data) != IPv6:
979 return data
980 # Underlayer is LoWPAN_IPHC
981 packet = self.underlayer._ipv6
982 try:
983 ipv6_hdr = next(
984 x for x in self.exts if isinstance(x, LoWPAN_NHC_IPv6Ext)
985 )
986 except StopIteration:
987 ipv6_hdr = None
988 if ipv6_hdr:
989 # XXX todo: implement: append the IPv6 extension
990 # packet = packet / ipv6extension
991 pass
992 try:
993 udp_hdr = next(
994 x for x in self.exts if isinstance(x, LoWPAN_NHC_UDP)
995 )
996 except StopIteration:
997 udp_hdr = None
998 if udp_hdr:
999 packet.nh = 0x11 # UDP
1000 udp = UDP()
1001 # https://tools.ietf.org/html/rfc6282#section-4.3.3
1002 if udp_hdr.C == 0:
1003 udp.chksum = udp_hdr.udpChecksum
1004 if udp_hdr.P == 0:
1005 udp.sport = udp_hdr.udpSourcePort
1006 udp.dport = udp_hdr.udpDestPort
1007 elif udp_hdr.P == 1:
1008 udp.sport = udp_hdr.udpSourcePort
1009 udp.dport = 0xF000 + udp_hdr.udpDestPort
1010 elif udp_hdr.P == 2:
1011 udp.sport = 0xF000 + udp_hdr.udpSourcePort
1012 udp.dport = udp_hdr.udpDestPort
1013 elif udp_hdr.P == 3:
1014 udp.sport = 0xF0B0 + udp_hdr.udpSourcePort
1015 udp.dport = 0xF0B0 + udp_hdr.udpDestPort
1016 packet.lastlayer().add_payload(udp / data)
1017 else:
1018 packet.lastlayer().add_payload(data)
1019 data = raw(packet)
1020 return Packet.post_dissect(self, data)
1021
1022 def do_build(self):
1023 if not isinstance(self.payload, IPv6):
1024 return Packet.do_build(self)
1025 pay = self.payload.payload
1026 while pay and isinstance(pay.payload, _IPv6ExtHdr):
1027 # XXX todo: populate a LoWPAN_NHC_IPv6Ext
1028 pay = pay.payload
1029 if isinstance(pay, UDP):
1030 try:
1031 udp_hdr = next(
1032 x for x in self.exts if isinstance(x, LoWPAN_NHC_UDP)
1033 )
1034 except StopIteration:
1035 udp_hdr = LoWPAN_NHC_UDP()
1036 # Guess best compression
1037 if pay.sport >> 4 == 0xf0b and pay.dport >> 4 == 0xf0b:
1038 udp_hdr.P = 3
1039 elif pay.sport >> 8 == 0xf0:
1040 udp_hdr.P = 2
1041 elif pay.dport >> 8 == 0xf0:
1042 udp_hdr.P = 1
1043 self.exts.insert(0, udp_hdr)
1044 # https://tools.ietf.org/html/rfc6282#section-4.3.3
1045 if udp_hdr.P == 0:
1046 udp_hdr.udpSourcePort = pay.sport
1047 udp_hdr.udpDestPort = pay.dport
1048 elif udp_hdr.P == 1:
1049 udp_hdr.udpSourcePort = pay.sport
1050 udp_hdr.udpDestPort = pay.dport & 255
1051 elif udp_hdr.P == 2:
1052 udp_hdr.udpSourcePort = pay.sport & 255
1053 udp_hdr.udpDestPort = pay.dport
1054 elif udp_hdr.P == 3:
1055 udp_hdr.udpSourcePort = pay.sport & 15
1056 udp_hdr.udpDestPort = pay.dport & 15
1057 if udp_hdr.C == 0:
1058 if pay.chksum:
1059 udp_hdr.udpChecksum = pay.chksum
1060 else:
1061 udp_hdr.udpChecksum = UDP(raw(pay)).chksum
1062 return Packet.do_build(self)
1063
1064 def do_build_payload(self):
1065 # Elide IPv6 payload, extensions and UDP
1066 if isinstance(self.payload, IPv6):
1067 cur = self.payload
1068 while cur and isinstance(cur, (IPv6, UDP)):
1069 cur = cur.payload
1070 return raw(cur)
1071 return Packet.do_build_payload(self)
1072
1073 def guess_payload_class(self, payload):
1074 if self.underlayer:
1075 u = self.underlayer.underlayer
1076 if isinstance(u, (LoWPANFragmentationFirst,
1077 LoWPANFragmentationSubsequent)):
1078 return Raw
1079 return IPv6
1080
1081
1082######################
1083# 6LowPan Dispatcher #
1084######################
1085
1086# https://tools.ietf.org/html/rfc4944#section-5.1
1087
1088class SixLoWPAN_ESC(Packet):
1089 name = "SixLoWPAN Dispatcher ESC"
1090 fields_desc = [ByteField("dispatch", 0)]
1091
1092
1093class SixLoWPAN(Packet):
1094 name = "SixLoWPAN Dispatcher"
1095
1096 @classmethod
1097 def dispatch_hook(cls, _pkt=b"", *args, **kargs):
1098 """Depending on the payload content, the frame type we should interpret"""
1099 if _pkt and len(_pkt) >= 1:
1100 fb = ord(_pkt[:1])
1101 if fb == 0x41:
1102 return LoWPANUncompressedIPv6
1103 if fb == 0x42:
1104 return LoWPAN_HC1
1105 if fb == 0x50:
1106 return LoWPANBroadcast
1107 if fb == 0x7f:
1108 return SixLoWPAN_ESC
1109 if fb >> 3 == 0x18:
1110 return LoWPANFragmentationFirst
1111 if fb >> 3 == 0x1C:
1112 return LoWPANFragmentationSubsequent
1113 if fb >> 6 == 0x02:
1114 return LoWPANMesh
1115 if fb >> 6 == 0x01:
1116 return LoWPAN_IPHC
1117 return cls
1118
1119
1120#################
1121# Fragmentation #
1122#################
1123
1124# fragmentate IPv6
1125MAX_SIZE = 96
1126
1127
1128def sixlowpan_fragment(packet, datagram_tag=1):
1129 """Split a packet into different links to transmit as 6lowpan packets.
1130 Usage example::
1131
1132 >>> ipv6 = ..... (very big packet)
1133 >>> pkts = sixlowpan_fragment(ipv6, datagram_tag=0x17)
1134 >>> send = [Dot15d4()/Dot15d4Data()/x for x in pkts]
1135 >>> wireshark(send)
1136 """
1137 if not packet.haslayer(IPv6):
1138 raise Exception("SixLoWPAN only fragments IPv6 packets !")
1139
1140 str_packet = raw(packet[IPv6])
1141
1142 if len(str_packet) <= MAX_SIZE:
1143 return [packet]
1144
1145 def chunks(li, n):
1146 return [li[i:i + n] for i in range(0, len(li), n)]
1147
1148 new_packet = chunks(str_packet, MAX_SIZE)
1149
1150 new_packet[0] = LoWPANFragmentationFirst(datagramTag=datagram_tag, datagramSize=len(str_packet)) / new_packet[0] # noqa: E501
1151 i = 1
1152 while i < len(new_packet):
1153 new_packet[i] = LoWPANFragmentationSubsequent(datagramTag=datagram_tag, datagramSize=len(str_packet), datagramOffset=MAX_SIZE // 8 * i) / new_packet[i] # noqa: E501
1154 i += 1
1155
1156 return new_packet
1157
1158
1159def sixlowpan_defragment(packet_list):
1160 results = {}
1161 for p in packet_list:
1162 cls = None
1163 if LoWPANFragmentationFirst in p:
1164 cls = LoWPANFragmentationFirst
1165 elif LoWPANFragmentationSubsequent in p:
1166 cls = LoWPANFragmentationSubsequent
1167 if cls:
1168 tag = p[cls].datagramTag
1169 results[tag] = results.get(tag, b"") + p[cls].payload.load # noqa: E501
1170 return {tag: SixLoWPAN(x) for tag, x in results.items()}
1171
1172############
1173# Bindings #
1174############
1175
1176
1177bind_layers(LoWPAN_HC1, IPv6)
1178
1179bind_top_down(LoWPAN_IPHC, LoWPAN_NHC, nh=1)
1180bind_layers(LoWPANFragmentationFirst, SixLoWPAN)
1181bind_layers(LoWPANMesh, SixLoWPAN)
1182bind_layers(LoWPANBroadcast, SixLoWPAN)
1183
1184bind_layers(Ether, SixLoWPAN, type=0xA0ED)