1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Cesar A. Bernardini <mesarpe@gmail.com>
5# Intern at INRIA Grand Nancy Est
6# Copyright (C) Gabriel Potter <gabriel[]potter[]fr>
7"""
86LoWPAN Protocol Stack
9======================
10
11This implementation follows the next documents:
12
13- Transmission of IPv6 Packets over IEEE 802.15.4 Networks: RFC 4944
14- Compression Format for IPv6 Datagrams in Low Power and Lossy
15 networks (6LoWPAN): RFC 6282
16- RFC 4291
17
18+----------------------------+-----------------------+
19| Application | Application Protocols |
20+----------------------------+------------+----------+
21| Transport | UDP | TCP |
22+----------------------------+------------+----------+
23| Network | IPv6 |
24+----------------------------+-----------------------+
25| | LoWPAN |
26+----------------------------+-----------------------+
27| Data Link Layer | IEEE 802.15.4 MAC |
28+----------------------------+-----------------------+
29| Physical | IEEE 802.15.4 PHY |
30+----------------------------+-----------------------+
31
32Note that:
33
34 - Only IPv6 is supported
35 - LoWPAN is in the middle between network and data link layer
36
37The Internet Control Message protocol v6 (ICMPv6) is used for control
38messaging.
39
40Adaptation between full IPv6 and the LoWPAN format is performed by routers at
41the edge of 6LoWPAN islands.
42
43A LoWPAN support addressing; a direct mapping between the link-layer address
44and the IPv6 address is used for achieving compression.
45
46Known Issues:
47 * Unimplemented context information
48 * Unimplemented IPv6 extensions fields
49"""
50
51import socket
52import struct
53
54from scapy.compat import chb, orb, raw
55from scapy.data import ETHER_TYPES
56
57from scapy.packet import Packet, bind_layers, bind_top_down
58from scapy.fields import (
59 BitEnumField,
60 BitField,
61 BitLenField,
62 BitScalingField,
63 ByteEnumField,
64 ByteField,
65 ConditionalField,
66 FieldLenField,
67 MultipleTypeField,
68 PacketField,
69 PacketListField,
70 StrFixedLenField,
71 XBitField,
72 XLongField,
73 XShortField,
74)
75
76from scapy.layers.dot15d4 import Dot15d4Data
77from scapy.layers.inet6 import (
78 IP6Field,
79 IPv6,
80 _IPv6ExtHdr,
81 ipv6nh,
82)
83from scapy.layers.inet import UDP
84from scapy.layers.l2 import Ether
85
86from scapy.utils import mac2str
87from scapy.config import conf
88from scapy.error import warning
89
90from scapy.packet import Raw
91from scapy.pton_ntop import inet_pton, inet_ntop
92from scapy.volatile import RandShort
93
94ETHER_TYPES[0xA0ED] = "6LoWPAN"
95
96LINK_LOCAL_PREFIX = b"\xfe\x80\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00" # noqa: E501
97
98
99##########
100# Fields #
101##########
102
103
104class IP6FieldLenField(IP6Field):
105 __slots__ = ["length_of"]
106
107 def __init__(self, name, default, length_of=None):
108 IP6Field.__init__(self, name, default)
109 self.length_of = length_of
110
111 def addfield(self, pkt, s, val):
112 """Add an internal value to a string"""
113 tmp_len = self.length_of(pkt)
114 if tmp_len == 0:
115 return s
116 internal = self.i2m(pkt, val)[-tmp_len:]
117 return s + struct.pack("!%ds" % tmp_len, internal)
118
119 def getfield(self, pkt, s):
120 tmp_len = self.length_of(pkt)
121 assert tmp_len >= 0 and tmp_len <= 16
122 if tmp_len <= 0:
123 return s, b""
124 return (s[tmp_len:],
125 self.m2i(pkt, b"\x00" * (16 - tmp_len) + s[:tmp_len]))
126
127
128#################
129# Basic 6LoWPAN #
130#################
131# https://tools.ietf.org/html/rfc4944
132
133
134class LoWPANUncompressedIPv6(Packet):
135 name = "6LoWPAN Uncompressed IPv6"
136 fields_desc = [
137 BitField("_type", 0x41, 8)
138 ]
139
140 def default_payload_class(self, pay):
141 return IPv6
142
143# https://tools.ietf.org/html/rfc4944#section-5.2
144
145
146class LoWPANMesh(Packet):
147 name = "6LoWPAN Mesh Packet"
148 deprecated_fields = {
149 "_v": ("v", "2.4.4"),
150 "_f": ("f", "2.4.4"),
151 "_sourceAddr": ("src", "2.4.4"),
152 "_destinyAddr": ("dst", "2.4.4"),
153 }
154 fields_desc = [
155 BitField("reserved", 0x2, 2),
156 BitEnumField("v", 0x0, 1, ["EUI-64", "Short"]),
157 BitEnumField("f", 0x0, 1, ["EUI-64", "Short"]),
158 BitField("hopsLeft", 0x0, 4),
159 MultipleTypeField(
160 [(XShortField("src", 0x0), lambda pkt: pkt.v == 1)],
161 XLongField("src", 0x0)
162 ),
163 MultipleTypeField(
164 [(XShortField("dst", 0x0), lambda pkt: pkt.v == 1)],
165 XLongField("dst", 0x0)
166 )
167 ]
168
169
170# https://tools.ietf.org/html/rfc4944#section-10.1
171# This implementation is NOT RECOMMENDED according to RFC 6282
172
173
174class LoWPAN_HC2_UDP(Packet):
175 name = "6LoWPAN HC1 UDP encoding"
176 fields_desc = [
177 BitEnumField("sc", 0, 1, ["In-line", "Compressed"]),
178 BitEnumField("dc", 0, 1, ["In-line", "Compressed"]),
179 BitEnumField("lc", 0, 1, ["In-line", "Compressed"]),
180 BitField("res", 0, 5),
181 ]
182
183 def default_payload_class(self, payload):
184 return conf.padding_layer
185
186
187def _get_hc1_pad(pkt):
188 """
189 Get LoWPAN_HC1 padding
190
191 LoWPAN_HC1 is not recommended for several reasons, one
192 of them being that padding is a mess (not 8-bit regular)
193 We therefore add padding bits that are not in the spec to restore
194 8-bit parity. Wireshark seems to agree
195 """
196 length = 0 # in bits, of the fields that are not //8
197 if not pkt.tc_fl:
198 length += 20
199 if pkt.hc2:
200 if pkt.nh == 1:
201 length += pkt.hc2Field.sc * 4
202 length += pkt.hc2Field.dc * 4
203 return (-length) % 8
204
205
206class LoWPAN_HC1(Packet):
207 name = "LoWPAN_HC1 Compressed IPv6"
208 fields_desc = [
209 # https://tools.ietf.org/html/rfc4944#section-10.1
210 ByteField("reserved", 0x42),
211 BitEnumField("sp", 0, 1, ["In-line", "Compressed"]),
212 BitEnumField("si", 0, 1, ["In-line", "Elided"]),
213 BitEnumField("dp", 0, 1, ["In-line", "Compressed"]),
214 BitEnumField("di", 0, 1, ["In-line", "Elided"]),
215 BitEnumField("tc_fl", 0, 1, ["Not compressed", "zero"]),
216 BitEnumField("nh", 0, 2, {0: "not compressed",
217 1: "UDP",
218 2: "ICMP",
219 3: "TCP"}),
220 BitEnumField("hc2", 0, 1, ["No more header compression bits",
221 "HC2 Present"]),
222 # https://tools.ietf.org/html/rfc4944#section-10.2
223 ConditionalField(
224 MultipleTypeField(
225 [
226 (PacketField("hc2Field", LoWPAN_HC2_UDP(),
227 LoWPAN_HC2_UDP),
228 lambda pkt: pkt.nh == 1),
229 # TODO: ICMP & TCP not implemented yet for HC1
230 # (PacketField("hc2Field", LoWPAN_HC2_ICMP(),
231 # LoWPAN_HC2_ICMP),
232 # lambda pkt: pkt.nh == 2),
233 # (PacketField("hc2Field", LoWPAN_HC2_TCP(),
234 # LoWPAN_HC2_TCP),
235 # lambda pkt: pkt.nh == 3),
236 ],
237 StrFixedLenField("hc2Field", b"", 0),
238 ),
239 lambda pkt: pkt.hc2
240 ),
241 # IPv6 header fields
242 # https://tools.ietf.org/html/rfc4944#section-10.3.1
243 ByteField("hopLimit", 0x0),
244 IP6FieldLenField("src", "::",
245 lambda pkt: (0 if pkt.sp else 8) +
246 (0 if pkt.si else 8)),
247 IP6FieldLenField("dst", "::",
248 lambda pkt: (0 if pkt.dp else 8) +
249 (0 if pkt.di else 8)),
250 ConditionalField(
251 ByteField("traffic_class", 0),
252 lambda pkt: not pkt.tc_fl
253 ),
254 ConditionalField(
255 BitField("flow_label", 0, 20),
256 lambda pkt: not pkt.tc_fl
257 ),
258 # Other fields
259 # https://tools.ietf.org/html/rfc4944#section-10.3.2
260 ConditionalField(
261 MultipleTypeField(
262 [(BitScalingField("udpSourcePort", 0, 4, offset=0xF0B0),
263 lambda pkt: getattr(pkt.hc2Field, "sc", 0))],
264 BitField("udpSourcePort", 0, 16)
265 ),
266 lambda pkt: pkt.nh == 1 and pkt.hc2
267 ),
268 ConditionalField(
269 MultipleTypeField(
270 [(BitScalingField("udpDestPort", 0, 4, offset=0xF0B0),
271 lambda pkt: getattr(pkt.hc2Field, "dc", 0))],
272 BitField("udpDestPort", 0, 16)
273 ),
274 lambda pkt: pkt.nh == 1 and pkt.hc2
275 ),
276 ConditionalField(
277 BitField("udpLength", 0, 16),
278 lambda pkt: pkt.nh == 1 and pkt.hc2 and not pkt.hc2Field.lc
279 ),
280 ConditionalField(
281 XBitField("udpChecksum", 0, 16),
282 lambda pkt: pkt.nh == 1 and pkt.hc2
283 ),
284 # Out of spec
285 BitLenField("pad", 0, _get_hc1_pad)
286 ]
287
288 def post_dissect(self, data):
289 # uncompress payload
290 packet = IPv6()
291 packet.version = IPHC_DEFAULT_VERSION
292 packet.tc = self.traffic_class
293 packet.fl = self.flow_label
294 nh_match = {
295 1: socket.IPPROTO_UDP,
296 2: socket.IPPROTO_ICMP,
297 3: socket.IPPROTO_TCP
298 }
299 if self.nh:
300 packet.nh = nh_match.get(self.nh)
301 packet.hlim = self.hopLimit
302
303 packet.src = self.decompressSourceAddr()
304 packet.dst = self.decompressDestAddr()
305
306 if self.hc2 and self.nh == 1: # UDP
307 udp = UDP()
308 udp.sport = self.udpSourcePort
309 udp.dport = self.udpDestPort
310 udp.len = self.udpLength or None
311 udp.chksum = self.udpChecksum
312 udp.add_payload(data)
313 packet.add_payload(udp)
314 else:
315 packet.add_payload(data)
316 data = raw(packet)
317 return Packet.post_dissect(self, data)
318
319 def decompressSourceAddr(self):
320 if not self.sp and not self.si:
321 # Prefix & Interface
322 return self.src
323 elif not self.si:
324 # Only interface
325 addr = inet_pton(socket.AF_INET6, self.src)[-8:]
326 addr = LINK_LOCAL_PREFIX[:8] + addr
327 else:
328 # Interface not provided
329 addr = _extract_upperaddress(self, source=True)
330 self.src = inet_ntop(socket.AF_INET6, addr)
331 return self.src
332
333 def decompressDestAddr(self):
334 if not self.dp and not self.di:
335 # Prefix & Interface
336 return self.dst
337 elif not self.di:
338 # Only interface
339 addr = inet_pton(socket.AF_INET6, self.dst)[-8:]
340 addr = LINK_LOCAL_PREFIX[:8] + addr
341 else:
342 # Interface not provided
343 addr = _extract_upperaddress(self, source=False)
344 self.dst = inet_ntop(socket.AF_INET6, addr)
345 return self.dst
346
347 def do_build(self):
348 if not isinstance(self.payload, IPv6):
349 return Packet.do_build(self)
350 # IPv6
351 ipv6 = self.payload
352 self.src = ipv6.src
353 self.dst = ipv6.dst
354 self.flow_label = ipv6.fl
355 self.traffic_class = ipv6.tc
356 self.hopLimit = ipv6.hlim
357 if isinstance(ipv6.payload, UDP):
358 self.nh = 1
359 self.hc2 = 1
360 udp = ipv6.payload
361 self.udpSourcePort = udp.sport
362 self.udpDestPort = udp.dport
363 if not udp.len or not udp.chksum:
364 udp = UDP(raw(udp))
365 self.udpLength = udp.len
366 self.udpChecksum = udp.chksum
367 return Packet.do_build(self)
368
369 def do_build_payload(self):
370 # Elide the IPv6 and UDP payload
371 if isinstance(self.payload, IPv6):
372 if isinstance(self.payload.payload, UDP):
373 return raw(self.payload.payload.payload)
374 return raw(self.payload.payload)
375 return Packet.do_build_payload(self)
376
377# https://tools.ietf.org/html/rfc4944#section-5.3
378
379
380class LoWPANFragmentationFirst(Packet):
381 name = "6LoWPAN First Fragmentation Packet"
382 fields_desc = [
383 BitField("reserved", 0x18, 5),
384 BitField("datagramSize", 0x0, 11),
385 XShortField("datagramTag", 0x0),
386 ]
387
388
389class LoWPANFragmentationSubsequent(Packet):
390 name = "6LoWPAN Subsequent Fragmentation Packet"
391 fields_desc = [
392 BitField("reserved", 0x1C, 5),
393 BitField("datagramSize", 0x0, 11),
394 XShortField("datagramTag", RandShort()),
395 ByteField("datagramOffset", 0x0), # VALUE PRINTED IN OCTETS, wireshark does in bits (128 bits == 16 octets) # noqa: E501
396 ]
397
398
399# https://tools.ietf.org/html/rfc4944#section-11.1
400
401class LoWPANBroadcast(Packet):
402 name = "6LoWPAN Broadcast"
403 fields_desc = [
404 ByteField("reserved", 0x50),
405 ByteField("seq", 0)
406 ]
407
408
409#########################
410# LoWPAN_IPHC (RFC6282) #
411#########################
412
413
414IPHC_DEFAULT_VERSION = 6
415IPHC_DEFAULT_TF = 0
416IPHC_DEFAULT_FL = 0
417
418
419def source_addr_size(pkt):
420 """Source address size
421
422 This function depending on the arguments returns the amount of bits to be
423 used by the source address.
424
425 Keyword arguments:
426 pkt -- packet object instance
427 """
428 if pkt.sac == 0x0:
429 if pkt.sam == 0x0:
430 return 16
431 elif pkt.sam == 0x1:
432 return 8
433 elif pkt.sam == 0x2:
434 return 2
435 elif pkt.sam == 0x3:
436 return 0
437 else:
438 if pkt.sam == 0x0:
439 return 0
440 elif pkt.sam == 0x1:
441 return 8
442 elif pkt.sam == 0x2:
443 return 2
444 elif pkt.sam == 0x3:
445 return 0
446
447
448def dest_addr_size(pkt):
449 """Destination address size
450
451 This function depending on the arguments returns the amount of bits to be
452 used by the destination address.
453
454 Keyword arguments:
455 pkt -- packet object instance
456 """
457 if pkt.m == 0 and pkt.dac == 0:
458 if pkt.dam == 0x0:
459 return 16
460 elif pkt.dam == 0x1:
461 return 8
462 elif pkt.dam == 0x2:
463 return 2
464 else:
465 return 0
466 elif pkt.m == 0 and pkt.dac == 1:
467 if pkt.dam == 0x0:
468 # reserved
469 return 0
470 elif pkt.dam == 0x1:
471 return 8
472 elif pkt.dam == 0x2:
473 return 2
474 else:
475 return 0
476 elif pkt.m == 1 and pkt.dac == 0:
477 if pkt.dam == 0x0:
478 return 16
479 elif pkt.dam == 0x1:
480 return 6
481 elif pkt.dam == 0x2:
482 return 4
483 elif pkt.dam == 0x3:
484 return 1
485 elif pkt.m == 1 and pkt.dac == 1:
486 if pkt.dam == 0x0:
487 return 6
488 elif pkt.dam == 0x1:
489 # reserved
490 return 0
491 elif pkt.dam == 0x2:
492 # reserved
493 return 0
494 elif pkt.dam == 0x3:
495 # reserved
496 return 0
497
498
499def _extract_upperaddress(pkt, source=True):
500 """This function extracts the source/destination address of a 6LoWPAN
501 from its upper layer.
502
503 (Upper layer could be 802.15.4 data, Ethernet...)
504
505 params:
506 - source: if True, the address is the source one. Otherwise, it is the
507 destination.
508 returns: (upper_address, ipv6_address)
509 """
510 # https://tools.ietf.org/html/rfc6282#section-3.2.2
511 SUPPORTED_LAYERS = (Ether, Dot15d4Data)
512 underlayer = pkt.underlayer
513 while underlayer and not isinstance(underlayer, SUPPORTED_LAYERS):
514 underlayer = underlayer.underlayer
515 # Extract and process address
516 if type(underlayer) == Ether:
517 addr = mac2str(underlayer.src if source else underlayer.dst)
518 # https://tools.ietf.org/html/rfc2464#section-4
519 return LINK_LOCAL_PREFIX[:8] + addr[:3] + b"\xff\xfe" + addr[3:]
520 elif type(underlayer) == Dot15d4Data:
521 if source:
522 addr = underlayer.src_addr
523 addrmode = underlayer.underlayer.fcf_srcaddrmode
524 else:
525 addr = underlayer.dest_addr
526 addrmode = underlayer.underlayer.fcf_destaddrmode
527 addr = struct.pack(">Q", addr)
528 if addrmode == 3: # Extended/long
529 tmp_ip = LINK_LOCAL_PREFIX[0:8] + addr
530 # Turn off the bit 7.
531 return tmp_ip[0:8] + struct.pack("B", (orb(tmp_ip[8]) ^ 0x2)) + tmp_ip[9:16] # noqa: E501
532 elif addrmode == 2: # Short
533 return (
534 LINK_LOCAL_PREFIX[0:8] +
535 b"\x00\x00\x00\xff\xfe\x00" +
536 addr[6:]
537 )
538 else:
539 # Most of the times, it's necessary the IEEE 802.15.4 data to extract
540 # this address, sometimes another layer.
541 warning(
542 'Unimplemented: Unsupported upper layer: %s' % type(underlayer)
543 )
544 return b"\x00" * 16
545
546
547class LoWPAN_IPHC(Packet):
548 """6LoWPAN IPv6 header compressed packets
549
550 It follows the implementation of RFC6282
551 """
552 __slots__ = ["_ipv6"]
553 # the LOWPAN_IPHC encoding utilizes 13 bits, 5 dispatch type
554 name = "LoWPAN IP Header Compression Packet"
555 _address_modes = ["Unspecified (0)", "1", "16-bits inline (3)",
556 "Compressed (3)"]
557 _state_mode = ["Stateless (0)", "Stateful (1)"]
558 deprecated_fields = {
559 "_nhField": ("nhField", "2.4.4"),
560 "_hopLimit": ("hopLimit", "2.4.4"),
561 "sourceAddr": ("src", "2.4.4"),
562 "destinyAddr": ("dst", "2.4.4"),
563 "udpDestinyPort": ("udpDestPort", "2.4.4"),
564 }
565 fields_desc = [
566 # Base Format https://tools.ietf.org/html/rfc6282#section-3.1.2
567 BitField("_reserved", 0x03, 3),
568 BitField("tf", 0x0, 2),
569 BitEnumField("nh", 0x0, 1, ["Inline", "Compressed"]),
570 BitEnumField("hlim", 0x0, 2, {0: "Inline",
571 1: "Compressed/HL1",
572 2: "Compressed/HL64",
573 3: "Compressed/HL255"}),
574 BitEnumField("cid", 0x0, 1, {1: "Present (1)"}),
575 BitEnumField("sac", 0x0, 1, _state_mode),
576 BitEnumField("sam", 0x0, 2, _address_modes),
577 BitEnumField("m", 0x0, 1, {1: "multicast (1)"}),
578 BitEnumField("dac", 0x0, 1, _state_mode),
579 BitEnumField("dam", 0x0, 2, _address_modes),
580 # https://tools.ietf.org/html/rfc6282#section-3.1.2
581 # Context Identifier Extension
582 ConditionalField(
583 BitField("sci", 0, 4),
584 lambda pkt: pkt.cid == 0x1
585 ),
586 ConditionalField(
587 BitField("dci", 0, 4),
588 lambda pkt: pkt.cid == 0x1
589 ),
590 # https://tools.ietf.org/html/rfc6282#section-3.2.1
591 ConditionalField(
592 BitField("tc_ecn", 0, 2),
593 lambda pkt: pkt.tf in [0, 1, 2]
594 ),
595 ConditionalField(
596 BitField("tc_dscp", 0, 6),
597 lambda pkt: pkt.tf in [0, 2],
598 ),
599 ConditionalField(
600 MultipleTypeField(
601 [(BitField("rsv", 0, 4), lambda pkt: pkt.tf == 0)],
602 BitField("rsv", 0, 2),
603 ),
604 lambda pkt: pkt.tf in [0, 1]
605 ),
606 ConditionalField(
607 BitField("flowlabel", 0, 20),
608 lambda pkt: pkt.tf in [0, 1]
609 ),
610 # Inline fields https://tools.ietf.org/html/rfc6282#section-3.1.1
611 ConditionalField(
612 ByteEnumField("nhField", 0x0, ipv6nh),
613 lambda pkt: pkt.nh == 0x0
614 ),
615 ConditionalField(
616 ByteField("hopLimit", 0x0),
617 lambda pkt: pkt.hlim == 0x0
618 ),
619 # The src and dst fields are filled up or removed in the
620 # pre_dissect and post_build, depending on the other options.
621 IP6FieldLenField("src", "::", length_of=source_addr_size),
622 IP6FieldLenField("dst", "::", length_of=dest_addr_size), # problem when it's 0 # noqa: E501
623 ]
624
625 def post_dissect(self, data):
626 """dissect the IPv6 package compressed into this IPHC packet.
627
628 The packet payload needs to be decompressed and depending on the
629 arguments, several conversions should be done.
630 """
631
632 # uncompress payload
633 packet = IPv6()
634 packet.tc, packet.fl = self._getTrafficClassAndFlowLabel()
635 if not self.nh:
636 packet.nh = self.nhField
637 # HLIM: Hop Limit
638 if self.hlim == 0:
639 packet.hlim = self.hopLimit
640 elif self.hlim == 0x1:
641 packet.hlim = 1
642 elif self.hlim == 0x2:
643 packet.hlim = 64
644 else:
645 packet.hlim = 255
646
647 packet.src = self.decompressSourceAddr(packet)
648 packet.dst = self.decompressDestAddr(packet)
649
650 pay_cls = self.guess_payload_class(data)
651 if pay_cls == IPv6:
652 packet.add_payload(data)
653 data = raw(packet)
654 elif pay_cls == LoWPAN_NHC:
655 self._ipv6 = packet
656 return Packet.post_dissect(self, data)
657
658 def decompressDestAddr(self, packet):
659 # https://tools.ietf.org/html/rfc6282#section-3.1.1
660 try:
661 tmp_ip = inet_pton(socket.AF_INET6, self.dst)
662 except socket.error:
663 tmp_ip = b"\x00" * 16
664
665 if self.m == 0 and self.dac == 0:
666 if self.dam == 0:
667 # Address fully carried
668 pass
669 elif self.dam == 1:
670 tmp_ip = LINK_LOCAL_PREFIX[0:8] + tmp_ip[-8:]
671 elif self.dam == 2:
672 tmp_ip = LINK_LOCAL_PREFIX[0:8] + b"\x00\x00\x00\xff\xfe\x00" + tmp_ip[-2:] # noqa: E501
673 elif self.dam == 3:
674 tmp_ip = _extract_upperaddress(self, source=False)
675
676 elif self.m == 0 and self.dac == 1:
677 if self.dam == 0:
678 # reserved
679 pass
680 elif self.dam == 0x3:
681 # should use context IID + encapsulating header
682 tmp_ip = _extract_upperaddress(self, source=False)
683 elif self.dam not in [0x1, 0x2]:
684 # https://tools.ietf.org/html/rfc6282#page-9
685 # Should use context information: unimplemented
686 pass
687 elif self.m == 1 and self.dac == 0:
688 if self.dam == 0:
689 # Address fully carried
690 pass
691 elif self.dam == 1:
692 tmp = b"\xff" + chb(tmp_ip[16 - dest_addr_size(self)])
693 tmp_ip = tmp + b"\x00" * 9 + tmp_ip[-5:]
694 elif self.dam == 2:
695 tmp = b"\xff" + chb(tmp_ip[16 - dest_addr_size(self)])
696 tmp_ip = tmp + b"\x00" * 11 + tmp_ip[-3:]
697 else: # self.dam == 3:
698 tmp_ip = b"\xff\x02" + b"\x00" * 13 + tmp_ip[-1:]
699 elif self.m == 1 and self.dac == 1:
700 if self.dam == 0x0:
701 # https://tools.ietf.org/html/rfc6282#page-10
702 # https://github.com/wireshark/wireshark/blob/f54611d1104d85a425e52c7318c522ed249916b6/epan/dissectors/packet-6lowpan.c#L2149-L2166
703 # Format: ffXX:XXLL:PPPP:PPPP:PPPP:PPPP:XXXX:XXXX
704 # P and L should be retrieved from context
705 P = b"\x00" * 16
706 L = b"\x00"
707 X = tmp_ip[-6:]
708 tmp_ip = b"\xff" + X[:2] + L + P[:8] + X[2:6]
709 else: # all the others values: reserved
710 pass
711
712 self.dst = inet_ntop(socket.AF_INET6, tmp_ip)
713 return self.dst
714
715 def compressSourceAddr(self, ipv6):
716 # https://tools.ietf.org/html/rfc6282#section-3.1.1
717 tmp_ip = inet_pton(socket.AF_INET6, ipv6.src)
718
719 if self.sac == 0:
720 if self.sam == 0x0:
721 pass
722 elif self.sam == 0x1:
723 tmp_ip = tmp_ip[8:16]
724 elif self.sam == 0x2:
725 tmp_ip = tmp_ip[14:16]
726 else: # self.sam == 0x3:
727 pass
728 else: # self.sac == 1
729 if self.sam == 0x0:
730 tmp_ip = b"\x00" * 16
731 elif self.sam == 0x1:
732 tmp_ip = tmp_ip[8:16]
733 elif self.sam == 0x2:
734 tmp_ip = tmp_ip[14:16]
735
736 self.src = inet_ntop(socket.AF_INET6, b"\x00" * (16 - len(tmp_ip)) + tmp_ip) # noqa: E501
737 return self.src
738
739 def compressDestAddr(self, ipv6):
740 # https://tools.ietf.org/html/rfc6282#section-3.1.1
741 tmp_ip = inet_pton(socket.AF_INET6, ipv6.dst)
742
743 if self.m == 0 and self.dac == 0:
744 if self.dam == 0x0:
745 pass
746 elif self.dam == 0x1:
747 tmp_ip = b"\x00" * 8 + tmp_ip[8:16]
748 elif self.dam == 0x2:
749 tmp_ip = b"\x00" * 14 + tmp_ip[14:16]
750 elif self.m == 0 and self.dac == 1:
751 if self.dam == 0x1:
752 tmp_ip = b"\x00" * 8 + tmp_ip[8:16]
753 elif self.dam == 0x2:
754 tmp_ip = b"\x00" * 14 + tmp_ip[14:16]
755 elif self.m == 1 and self.dac == 0:
756 if self.dam == 0x0:
757 pass
758 if self.dam == 0x1:
759 tmp_ip = b"\x00" * 10 + tmp_ip[1:2] + tmp_ip[11:16]
760 elif self.dam == 0x2:
761 tmp_ip = b"\x00" * 12 + tmp_ip[1:2] + tmp_ip[13:16]
762 elif self.dam == 0x3:
763 tmp_ip = b"\x00" * 15 + tmp_ip[15:16]
764 elif self.m == 1 and self.dac == 1:
765 if self.dam == 0:
766 tmp_ip = b"\x00" * 10 + tmp_ip[1:3] + tmp_ip[12:16]
767
768 self.dst = inet_ntop(socket.AF_INET6, tmp_ip)
769
770 def decompressSourceAddr(self, packet):
771 # https://tools.ietf.org/html/rfc6282#section-3.1.1
772 try:
773 tmp_ip = inet_pton(socket.AF_INET6, self.src)
774 except socket.error:
775 tmp_ip = b"\x00" * 16
776
777 if self.sac == 0:
778 if self.sam == 0x0:
779 # Full address is carried in-line
780 pass
781 elif self.sam == 0x1:
782 tmp_ip = LINK_LOCAL_PREFIX[0:8] + tmp_ip[16 - source_addr_size(self):16] # noqa: E501
783 elif self.sam == 0x2:
784 tmp = LINK_LOCAL_PREFIX[0:8] + b"\x00\x00\x00\xff\xfe\x00"
785 tmp_ip = tmp + tmp_ip[16 - source_addr_size(self):16]
786 elif self.sam == 0x3:
787 # Taken from encapsulating header
788 tmp_ip = _extract_upperaddress(self, source=True)
789 else: # self.sac == 1:
790 if self.sam == 0x0:
791 # Unspecified address ::
792 pass
793 elif self.sam == 0x1:
794 # should use context IID
795 pass
796 elif self.sam == 0x2:
797 # should use context IID
798 tmp = LINK_LOCAL_PREFIX[0:8] + b"\x00\x00\x00\xff\xfe\x00"
799 tmp_ip = tmp + tmp_ip[16 - source_addr_size(self):16]
800 elif self.sam == 0x3:
801 # should use context IID
802 tmp_ip = LINK_LOCAL_PREFIX[0:8] + b"\x00" * 8
803 self.src = inet_ntop(socket.AF_INET6, tmp_ip)
804 return self.src
805
806 def guess_payload_class(self, payload):
807 if self.nh:
808 return LoWPAN_NHC
809 u = self.underlayer
810 if u and isinstance(u, (LoWPANFragmentationFirst,
811 LoWPANFragmentationSubsequent)):
812 return Raw
813 return IPv6
814
815 def do_build(self):
816 _cur = self
817 if isinstance(_cur.payload, LoWPAN_NHC):
818 _cur = _cur.payload
819 if not isinstance(_cur.payload, IPv6):
820 return Packet.do_build(self)
821 ipv6 = _cur.payload
822
823 self._reserved = 0x03
824
825 # NEW COMPRESSION TECHNIQUE!
826 # a ) Compression Techniques
827
828 # 1. Set Traffic Class
829 if self.tf == 0x0:
830 self.tc_ecn = ipv6.tc >> 6
831 self.tc_dscp = ipv6.tc & 0x3F
832 self.flowlabel = ipv6.fl
833 elif self.tf == 0x1:
834 self.tc_ecn = ipv6.tc >> 6
835 self.flowlabel = ipv6.fl
836 elif self.tf == 0x2:
837 self.tc_ecn = ipv6.tc >> 6
838 self.tc_dscp = ipv6.tc & 0x3F
839 else: # self.tf == 0x3:
840 pass # no field is set
841
842 # 2. Next Header
843 if self.nh == 0x0:
844 self.nhField = ipv6.nh
845 elif self.nh == 1:
846 # This will be handled in LoWPAN_NHC
847 pass
848
849 # 3. HLim
850 if self.hlim == 0x0:
851 self.hopLimit = ipv6.hlim
852 else: # if hlim is 1, 2 or 3, there are nothing to do!
853 pass
854
855 # 4. Context (which context to use...)
856 if self.cid == 0x0:
857 pass
858 else:
859 # TODO: Context Unimplemented yet
860 pass
861
862 # 5. Compress Source Addr
863 self.compressSourceAddr(ipv6)
864 self.compressDestAddr(ipv6)
865
866 return Packet.do_build(self)
867
868 def do_build_payload(self):
869 # Elide the IPv6 payload
870 if isinstance(self.payload, IPv6):
871 return raw(self.payload.payload)
872 return Packet.do_build_payload(self)
873
874 def _getTrafficClassAndFlowLabel(self):
875 """Page 6, draft feb 2011 """
876 if self.tf == 0x0:
877 return (self.tc_ecn << 6) + self.tc_dscp, self.flowlabel
878 elif self.tf == 0x1:
879 return (self.tc_ecn << 6), self.flowlabel
880 elif self.tf == 0x2:
881 return (self.tc_ecn << 6) + self.tc_dscp, 0
882 else:
883 return 0, 0
884
885##############
886# LOWPAN_NHC #
887##############
888
889# https://tools.ietf.org/html/rfc6282#section-4
890
891
892class LoWPAN_NHC_Hdr(Packet):
893 @classmethod
894 def get_next_cls(cls, s):
895 if s and len(s) >= 2:
896 fb = ord(s[:1])
897 if fb >> 3 == 0x1e:
898 return LoWPAN_NHC_UDP
899 if fb >> 4 == 0xe:
900 return LoWPAN_NHC_IPv6Ext
901 return None
902
903 @classmethod
904 def dispatch_hook(cls, _pkt=b"", *args, **kargs):
905 return LoWPAN_NHC_Hdr.get_next_cls(_pkt) or LoWPAN_NHC_Hdr
906
907 def extract_padding(self, s):
908 return b"", s
909
910
911class LoWPAN_NHC_UDP(LoWPAN_NHC_Hdr):
912 fields_desc = [
913 BitField("res", 0x1e, 5),
914 BitField("C", 0, 1),
915 BitField("P", 0, 2),
916 MultipleTypeField(
917 [(BitField("udpSourcePort", 0, 16),
918 lambda pkt: pkt.P in [0, 1]),
919 (BitField("udpSourcePort", 0, 8),
920 lambda pkt: pkt.P == 2),
921 (BitField("udpSourcePort", 0, 4),
922 lambda pkt: pkt.P == 3)],
923 BitField("udpSourcePort", 0x0, 16),
924 ),
925 MultipleTypeField(
926 [(BitField("udpDestPort", 0, 16),
927 lambda pkt: pkt.P in [0, 2]),
928 (BitField("udpDestPort", 0, 8),
929 lambda pkt: pkt.P == 1),
930 (BitField("udpDestPort", 0, 4),
931 lambda pkt: pkt.P == 3)],
932 BitField("udpDestPort", 0x0, 16),
933 ),
934 ConditionalField(
935 XShortField("udpChecksum", 0x0),
936 lambda pkt: pkt.C == 0
937 ),
938 ]
939
940
941_lowpan_nhc_ipv6ext_eid = {
942 0: "Hop-by-hop Options Header",
943 1: "IPv6 Routing Header",
944 2: "IPv6 Fragment Header",
945 3: "IPv6 Destination Options Header",
946 4: "IPv6 Mobility Header",
947 7: "IPv6 Header",
948}
949
950
951class LoWPAN_NHC_IPv6Ext(LoWPAN_NHC_Hdr):
952 fields_desc = [
953 BitField("res", 0xe, 4),
954 BitEnumField("eid", 0, 3, _lowpan_nhc_ipv6ext_eid),
955 BitField("nh", 0, 1),
956 ConditionalField(
957 ByteField("nhField", 0),
958 lambda pkt: pkt.nh == 0
959 ),
960 FieldLenField("len", None, length_of="data", fmt="B"),
961 StrFixedLenField("data", b"", length_from=lambda pkt: pkt.len)
962 ]
963
964 def post_build(self, p, pay):
965 if self.len is None:
966 offs = (not self.nh) + 1
967 p = p[:offs] + struct.pack("!B", len(p) - offs) + p[offs + 1:]
968 return p + pay
969
970
971class LoWPAN_NHC(Packet):
972 name = "LOWPAN_NHC"
973 fields_desc = [
974 PacketListField(
975 "exts", [], pkt_cls=LoWPAN_NHC_Hdr,
976 next_cls_cb=lambda *s: LoWPAN_NHC_Hdr.get_next_cls(s[3])
977 )
978 ]
979
980 def post_dissect(self, data):
981 if not self.underlayer or not hasattr(self.underlayer, "_ipv6"):
982 return data
983 if self.guess_payload_class(data) != IPv6:
984 return data
985 # Underlayer is LoWPAN_IPHC
986 packet = self.underlayer._ipv6
987 try:
988 ipv6_hdr = next(
989 x for x in self.exts if isinstance(x, LoWPAN_NHC_IPv6Ext)
990 )
991 except StopIteration:
992 ipv6_hdr = None
993 if ipv6_hdr:
994 # XXX todo: implement: append the IPv6 extension
995 # packet = packet / ipv6extension
996 pass
997 try:
998 udp_hdr = next(
999 x for x in self.exts if isinstance(x, LoWPAN_NHC_UDP)
1000 )
1001 except StopIteration:
1002 udp_hdr = None
1003 if udp_hdr:
1004 packet.nh = 0x11 # UDP
1005 udp = UDP()
1006 # https://tools.ietf.org/html/rfc6282#section-4.3.3
1007 if udp_hdr.C == 0:
1008 udp.chksum = udp_hdr.udpChecksum
1009 if udp_hdr.P == 0:
1010 udp.sport = udp_hdr.udpSourcePort
1011 udp.dport = udp_hdr.udpDestPort
1012 elif udp_hdr.P == 1:
1013 udp.sport = udp_hdr.udpSourcePort
1014 udp.dport = 0xF000 + udp_hdr.udpDestPort
1015 elif udp_hdr.P == 2:
1016 udp.sport = 0xF000 + udp_hdr.udpSourcePort
1017 udp.dport = udp_hdr.udpDestPort
1018 elif udp_hdr.P == 3:
1019 udp.sport = 0xF0B0 + udp_hdr.udpSourcePort
1020 udp.dport = 0xF0B0 + udp_hdr.udpDestPort
1021 packet.lastlayer().add_payload(udp / data)
1022 else:
1023 packet.lastlayer().add_payload(data)
1024 data = raw(packet)
1025 return Packet.post_dissect(self, data)
1026
1027 def do_build(self):
1028 if not isinstance(self.payload, IPv6):
1029 return Packet.do_build(self)
1030 pay = self.payload.payload
1031 while pay and isinstance(pay.payload, _IPv6ExtHdr):
1032 # XXX todo: populate a LoWPAN_NHC_IPv6Ext
1033 pay = pay.payload
1034 if isinstance(pay, UDP):
1035 try:
1036 udp_hdr = next(
1037 x for x in self.exts if isinstance(x, LoWPAN_NHC_UDP)
1038 )
1039 except StopIteration:
1040 udp_hdr = LoWPAN_NHC_UDP()
1041 # Guess best compression
1042 if pay.sport >> 4 == 0xf0b and pay.dport >> 4 == 0xf0b:
1043 udp_hdr.P = 3
1044 elif pay.sport >> 8 == 0xf0:
1045 udp_hdr.P = 2
1046 elif pay.dport >> 8 == 0xf0:
1047 udp_hdr.P = 1
1048 self.exts.insert(0, udp_hdr)
1049 # https://tools.ietf.org/html/rfc6282#section-4.3.3
1050 if udp_hdr.P == 0:
1051 udp_hdr.udpSourcePort = pay.sport
1052 udp_hdr.udpDestPort = pay.dport
1053 elif udp_hdr.P == 1:
1054 udp_hdr.udpSourcePort = pay.sport
1055 udp_hdr.udpDestPort = pay.dport & 255
1056 elif udp_hdr.P == 2:
1057 udp_hdr.udpSourcePort = pay.sport & 255
1058 udp_hdr.udpDestPort = pay.dport
1059 elif udp_hdr.P == 3:
1060 udp_hdr.udpSourcePort = pay.sport & 15
1061 udp_hdr.udpDestPort = pay.dport & 15
1062 if udp_hdr.C == 0:
1063 if pay.chksum:
1064 udp_hdr.udpChecksum = pay.chksum
1065 else:
1066 udp_hdr.udpChecksum = UDP(raw(pay)).chksum
1067 return Packet.do_build(self)
1068
1069 def do_build_payload(self):
1070 # Elide IPv6 payload, extensions and UDP
1071 if isinstance(self.payload, IPv6):
1072 cur = self.payload
1073 while cur and isinstance(cur, (IPv6, UDP)):
1074 cur = cur.payload
1075 return raw(cur)
1076 return Packet.do_build_payload(self)
1077
1078 def guess_payload_class(self, payload):
1079 if self.underlayer:
1080 u = self.underlayer.underlayer
1081 if isinstance(u, (LoWPANFragmentationFirst,
1082 LoWPANFragmentationSubsequent)):
1083 return Raw
1084 return IPv6
1085
1086
1087######################
1088# 6LowPan Dispatcher #
1089######################
1090
1091# https://tools.ietf.org/html/rfc4944#section-5.1
1092
1093class SixLoWPAN_ESC(Packet):
1094 name = "SixLoWPAN Dispatcher ESC"
1095 fields_desc = [ByteField("dispatch", 0)]
1096
1097
1098class SixLoWPAN(Packet):
1099 name = "SixLoWPAN Dispatcher"
1100
1101 @classmethod
1102 def dispatch_hook(cls, _pkt=b"", *args, **kargs):
1103 """Depending on the payload content, the frame type we should interpret"""
1104 if _pkt and len(_pkt) >= 1:
1105 fb = ord(_pkt[:1])
1106 if fb == 0x41:
1107 return LoWPANUncompressedIPv6
1108 if fb == 0x42:
1109 return LoWPAN_HC1
1110 if fb == 0x50:
1111 return LoWPANBroadcast
1112 if fb == 0x7f:
1113 return SixLoWPAN_ESC
1114 if fb >> 3 == 0x18:
1115 return LoWPANFragmentationFirst
1116 if fb >> 3 == 0x1C:
1117 return LoWPANFragmentationSubsequent
1118 if fb >> 6 == 0x02:
1119 return LoWPANMesh
1120 if fb >> 6 == 0x01:
1121 return LoWPAN_IPHC
1122 return cls
1123
1124
1125#################
1126# Fragmentation #
1127#################
1128
1129# fragmentate IPv6
1130MAX_SIZE = 96
1131
1132
1133def sixlowpan_fragment(packet, datagram_tag=1):
1134 """Split a packet into different links to transmit as 6lowpan packets.
1135 Usage example::
1136
1137 >>> ipv6 = ..... (very big packet)
1138 >>> pkts = sixlowpan_fragment(ipv6, datagram_tag=0x17)
1139 >>> send = [Dot15d4()/Dot15d4Data()/x for x in pkts]
1140 >>> wireshark(send)
1141 """
1142 if not packet.haslayer(IPv6):
1143 raise Exception("SixLoWPAN only fragments IPv6 packets !")
1144
1145 str_packet = raw(packet[IPv6])
1146
1147 if len(str_packet) <= MAX_SIZE:
1148 return [packet]
1149
1150 def chunks(li, n):
1151 return [li[i:i + n] for i in range(0, len(li), n)]
1152
1153 new_packet = chunks(str_packet, MAX_SIZE)
1154
1155 new_packet[0] = LoWPANFragmentationFirst(datagramTag=datagram_tag, datagramSize=len(str_packet)) / new_packet[0] # noqa: E501
1156 i = 1
1157 while i < len(new_packet):
1158 new_packet[i] = LoWPANFragmentationSubsequent(datagramTag=datagram_tag, datagramSize=len(str_packet), datagramOffset=MAX_SIZE // 8 * i) / new_packet[i] # noqa: E501
1159 i += 1
1160
1161 return new_packet
1162
1163
1164def sixlowpan_defragment(packet_list):
1165 results = {}
1166 for p in packet_list:
1167 cls = None
1168 if LoWPANFragmentationFirst in p:
1169 cls = LoWPANFragmentationFirst
1170 elif LoWPANFragmentationSubsequent in p:
1171 cls = LoWPANFragmentationSubsequent
1172 if cls:
1173 tag = p[cls].datagramTag
1174 results[tag] = results.get(tag, b"") + p[cls].payload.load # noqa: E501
1175 return {tag: SixLoWPAN(x) for tag, x in results.items()}
1176
1177############
1178# Bindings #
1179############
1180
1181
1182bind_layers(LoWPAN_HC1, IPv6)
1183
1184bind_top_down(LoWPAN_IPHC, LoWPAN_NHC, nh=1)
1185bind_layers(LoWPANFragmentationFirst, SixLoWPAN)
1186bind_layers(LoWPANMesh, SixLoWPAN)
1187bind_layers(LoWPANBroadcast, SixLoWPAN)
1188
1189bind_layers(Ether, SixLoWPAN, type=0xA0ED)