1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5# Copyright (C) Ryan Speers <ryan@rmspeers.com> 2011-2012
6# Copyright (C) Roger Meyer <roger.meyer@csus.edu>: 2012-03-10 Added frames
7# Copyright (C) Gabriel Potter <gabriel[]potter[]fr>: 2018
8# Copyright (C) Dimitrios-Georgios Akestoridis <akestoridis@cmu.edu>
9
10"""
11Wireless MAC according to IEEE 802.15.4.
12"""
13
14import struct
15
16from scapy.compat import orb, chb
17from scapy.error import warning
18from scapy.config import conf
19
20from scapy.data import DLT_IEEE802_15_4_WITHFCS, DLT_IEEE802_15_4_NOFCS
21from scapy.packet import Packet, bind_layers
22from scapy.fields import (
23 BitEnumField,
24 BitField,
25 ByteEnumField,
26 ByteField,
27 ConditionalField,
28 Emph,
29 FCSField,
30 Field,
31 FieldListField,
32 LELongField,
33 MultipleTypeField,
34 PacketField,
35 StrFixedLenField,
36 XByteField,
37 XLEIntField,
38 XLEShortField,
39)
40
41# Fields #
42
43
44class dot15d4AddressField(Field):
45 __slots__ = ["adjust", "length_of"]
46
47 def __init__(self, name, default, length_of=None, fmt="<H", adjust=None):
48 Field.__init__(self, name, default, fmt)
49 self.length_of = length_of
50 if adjust is not None:
51 self.adjust = adjust
52 else:
53 self.adjust = lambda pkt, x: self.lengthFromAddrMode(pkt, x)
54
55 def i2repr(self, pkt, x):
56 """Convert internal value to a nice representation"""
57 if len(hex(self.i2m(pkt, x))) < 7: # short address
58 return hex(self.i2m(pkt, x))
59 else: # long address
60 x = "%016x" % self.i2m(pkt, x)
61 return ":".join(["%s%s" % (x[i], x[i + 1]) for i in range(0, len(x), 2)]) # noqa: E501
62
63 def addfield(self, pkt, s, val):
64 """Add an internal value to a string"""
65 if self.adjust(pkt, self.length_of) == 2:
66 return s + struct.pack(self.fmt[0] + "H", val)
67 elif self.adjust(pkt, self.length_of) == 8:
68 return s + struct.pack(self.fmt[0] + "Q", val)
69 else:
70 return s
71
72 def getfield(self, pkt, s):
73 if self.adjust(pkt, self.length_of) == 2:
74 return s[2:], self.m2i(pkt, struct.unpack(self.fmt[0] + "H", s[:2])[0]) # noqa: E501
75 elif self.adjust(pkt, self.length_of) == 8:
76 return s[8:], self.m2i(pkt, struct.unpack(self.fmt[0] + "Q", s[:8])[0]) # noqa: E501
77 else:
78 raise Exception('impossible case')
79
80 def lengthFromAddrMode(self, pkt, x):
81 addrmode = 0
82 pkttop = pkt.underlayer
83 if pkttop is None:
84 warning("No underlayer to guess address mode")
85 return 0
86 while True:
87 try:
88 addrmode = pkttop.getfieldval(x)
89 break
90 except Exception:
91 if pkttop.underlayer is None:
92 break
93 pkttop = pkttop.underlayer
94 # print "Underlayer field value of", x, "is", addrmode
95 if addrmode == 2:
96 return 2
97 elif addrmode == 3:
98 return 8
99 return 0
100
101
102# Layers #
103
104class Dot15d4(Packet):
105 name = "802.15.4"
106 fields_desc = [
107 BitField("fcf_reserved_1", 0, 1), # fcf p1 b1
108 BitEnumField("fcf_panidcompress", 0, 1, [False, True]),
109 BitEnumField("fcf_ackreq", 0, 1, [False, True]),
110 BitEnumField("fcf_pending", 0, 1, [False, True]),
111 BitEnumField("fcf_security", 0, 1, [False, True]), # fcf p1 b2
112 Emph(BitEnumField("fcf_frametype", 0, 3, {0: "Beacon", 1: "Data", 2: "Ack", 3: "Command"})), # noqa: E501
113 BitEnumField("fcf_srcaddrmode", 0, 2, {0: "None", 1: "Reserved", 2: "Short", 3: "Long"}), # fcf p2 b1 # noqa: E501
114 BitField("fcf_framever", 0, 2), # 00 compatibility with 2003 version; 01 compatible with 2006 version # noqa: E501
115 BitEnumField("fcf_destaddrmode", 2, 2, {0: "None", 1: "Reserved", 2: "Short", 3: "Long"}), # fcf p2 b2 # noqa: E501
116 BitField("fcf_reserved_2", 0, 2),
117 Emph(ByteField("seqnum", 1)) # sequence number
118 ]
119
120 def mysummary(self):
121 return self.sprintf("802.15.4 %Dot15d4.fcf_frametype% ackreq(%Dot15d4.fcf_ackreq%) ( %Dot15d4.fcf_destaddrmode% -> %Dot15d4.fcf_srcaddrmode% ) Seq#%Dot15d4.seqnum%") # noqa: E501
122
123 def guess_payload_class(self, payload):
124 if self.fcf_frametype == 0x00:
125 return Dot15d4Beacon
126 elif self.fcf_frametype == 0x01:
127 return Dot15d4Data
128 elif self.fcf_frametype == 0x02:
129 return Dot15d4Ack
130 elif self.fcf_frametype == 0x03:
131 return Dot15d4Cmd
132 else:
133 return Packet.guess_payload_class(self, payload)
134
135 def answers(self, other):
136 if isinstance(other, Dot15d4):
137 if self.fcf_frametype == 2: # ack
138 if self.seqnum != other.seqnum: # check for seqnum matching
139 return 0
140 elif other.fcf_ackreq == 1: # check that an ack was indeed requested # noqa: E501
141 return 1
142 return 0
143
144 def post_build(self, p, pay):
145 # This just forces destaddrmode to None for Ack frames.
146 if self.fcf_frametype == 2 and self.fcf_destaddrmode != 0:
147 self.fcf_destaddrmode = 0
148 return p[:1] + \
149 chb((self.fcf_srcaddrmode << 6) + (self.fcf_framever << 4)) \
150 + p[2:] + pay
151 else:
152 return p + pay
153
154
155class Dot15d4FCS(Dot15d4):
156 '''
157 This class is a drop-in replacement for the Dot15d4 class above, except
158 it expects a FCS/checksum in the input, and produces one in the output.
159 This provides the user flexibility, as many 802.15.4 interfaces will have an AUTO_CRC setting # noqa: E501
160 that will validate the FCS/CRC in firmware, and add it automatically when transmitting. # noqa: E501
161 '''
162 name = "802.15.4 - FCS"
163 match_subclass = True
164 fields_desc = Dot15d4.fields_desc + [FCSField("fcs", None, fmt="<H")]
165
166 def compute_fcs(self, data):
167 # Do a CRC-CCITT Kermit 16bit on the data given
168 # Returns a CRC that is the FCS for the frame
169 # Implemented using pseudocode from: June 1986, Kermit Protocol Manual
170 # See also:
171 # http://regregex.bbcmicro.net/crc-catalogue.htm#crc.cat.kermit
172 crc = 0
173 for i in range(0, len(data)):
174 c = orb(data[i])
175 q = (crc ^ c) & 15 # Do low-order 4 bits
176 crc = (crc // 16) ^ (q * 4225)
177 q = (crc ^ (c // 16)) & 15 # And high 4 bits
178 crc = (crc // 16) ^ (q * 4225)
179 return struct.pack('<H', crc) # return as bytes in little endian order
180
181 def post_build(self, p, pay):
182 # construct the packet with the FCS at the end
183 p = Dot15d4.post_build(self, p, pay)
184 if self.fcs is None:
185 p = p[:-2]
186 p = p + self.compute_fcs(p)
187 return p
188
189
190class Dot15d4Ack(Packet):
191 name = "802.15.4 Ack"
192 fields_desc = []
193
194
195class Dot15d4AuxSecurityHeader(Packet):
196 name = "802.15.4 Auxiliary Security Header"
197 fields_desc = [
198 BitField("sec_sc_reserved", 0, 3),
199 # Key Identifier Mode
200 # 0: Key is determined implicitly from the originator and recipient(s) of the frame # noqa: E501
201 # 1: Key is determined explicitly from the the 1-octet Key Index subfield of the Key Identifier field # noqa: E501
202 # 2: Key is determined explicitly from the 4-octet Key Source and the 1-octet Key Index # noqa: E501
203 # 3: Key is determined explicitly from the 8-octet Key Source and the 1-octet Key Index # noqa: E501
204 BitEnumField("sec_sc_keyidmode", 0, 2, {
205 0: "Implicit", 1: "1oKeyIndex", 2: "4o-KeySource-1oKeyIndex", 3: "8o-KeySource-1oKeyIndex"} # noqa: E501
206 ),
207 BitEnumField("sec_sc_seclevel", 0, 3, {0: "None", 1: "MIC-32", 2: "MIC-64", 3: "MIC-128", 4: "ENC", 5: "ENC-MIC-32", 6: "ENC-MIC-64", 7: "ENC-MIC-128"}), # noqa: E501
208 XLEIntField("sec_framecounter", 0x00000000), # 4 octets
209 # Key Identifier (variable length): identifies the key that is used for cryptographic protection # noqa: E501
210 # Key Source : length of sec_keyid_keysource varies btwn 0, 4, and 8 bytes depending on sec_sc_keyidmode # noqa: E501
211 MultipleTypeField([
212 # 4 octets when sec_sc_keyidmode == 2
213 (XLEIntField("sec_keyid_keysource", 0x00000000),
214 lambda pkt: pkt.getfieldval("sec_sc_keyidmode") == 2),
215 # 8 octets when sec_sc_keyidmode == 3
216 (LELongField("sec_keyid_keysource", 0x0000000000000000),
217 lambda pkt: pkt.getfieldval("sec_sc_keyidmode") == 3),
218 ], StrFixedLenField("sec_keyid_keysource", "", length=0)),
219 # Key Index (1 octet): allows unique identification of different keys with the same originator # noqa: E501
220 ConditionalField(XByteField("sec_keyid_keyindex", 0xFF),
221 lambda pkt: pkt.getfieldval("sec_sc_keyidmode") != 0),
222 ]
223
224
225class Dot15d4Data(Packet):
226 name = "802.15.4 Data"
227 fields_desc = [
228 XLEShortField("dest_panid", 0xFFFF),
229 dot15d4AddressField("dest_addr", 0xFFFF, length_of="fcf_destaddrmode"),
230 ConditionalField(XLEShortField("src_panid", 0x0),
231 lambda pkt:util_srcpanid_present(pkt)),
232 ConditionalField(dot15d4AddressField("src_addr", None, length_of="fcf_srcaddrmode"), # noqa: E501
233 lambda pkt:pkt.underlayer.getfieldval("fcf_srcaddrmode") != 0), # noqa: E501
234 # Security field present if fcf_security == True
235 ConditionalField(PacketField("aux_sec_header", Dot15d4AuxSecurityHeader(), Dot15d4AuxSecurityHeader), # noqa: E501
236 lambda pkt:pkt.underlayer.getfieldval("fcf_security") is True), # noqa: E501
237 ]
238
239 def guess_payload_class(self, payload):
240 # TODO: See how it's done in wireshark:
241 # https://github.com/wireshark/wireshark/blob/93c60b3b7c801dddd11d8c7f2a0ea4b7d02d700a/epan/dissectors/packet-ieee802154.c#L2061 # noqa: E501
242 # it's too magic to me
243 from scapy.layers.sixlowpan import SixLoWPAN
244 from scapy.layers.zigbee import ZigbeeNWK
245 if conf.dot15d4_protocol == "sixlowpan":
246 return SixLoWPAN
247 elif conf.dot15d4_protocol == "zigbee":
248 return ZigbeeNWK
249 else:
250 if conf.dot15d4_protocol is None:
251 _msg = "Please set conf.dot15d4_protocol to select a " + \
252 "802.15.4 protocol. Values must be in the list: "
253 else:
254 _msg = "Unknown conf.dot15d4_protocol value: must be in "
255 warning(_msg +
256 "['sixlowpan', 'zigbee']" +
257 " Defaulting to SixLoWPAN")
258 return SixLoWPAN
259
260 def mysummary(self):
261 return self.sprintf("802.15.4 Data ( %Dot15d4Data.src_panid%:%Dot15d4Data.src_addr% -> %Dot15d4Data.dest_panid%:%Dot15d4Data.dest_addr% )") # noqa: E501
262
263
264class Dot15d4Beacon(Packet):
265 name = "802.15.4 Beacon"
266 fields_desc = [
267 XLEShortField("src_panid", 0x0),
268 dot15d4AddressField("src_addr", None, length_of="fcf_srcaddrmode"),
269 # Security field present if fcf_security == True
270 ConditionalField(PacketField("aux_sec_header", Dot15d4AuxSecurityHeader(), Dot15d4AuxSecurityHeader), # noqa: E501
271 lambda pkt:pkt.underlayer.getfieldval("fcf_security") is True), # noqa: E501
272
273 # Superframe spec field:
274 BitField("sf_sforder", 15, 4), # not used by ZigBee
275 BitField("sf_beaconorder", 15, 4), # not used by ZigBee
276 BitEnumField("sf_assocpermit", 0, 1, [False, True]),
277 BitEnumField("sf_pancoord", 0, 1, [False, True]),
278 BitField("sf_reserved", 0, 1), # not used by ZigBee
279 BitEnumField("sf_battlifeextend", 0, 1, [False, True]), # not used by ZigBee # noqa: E501
280 BitField("sf_finalcapslot", 15, 4), # not used by ZigBee
281
282 # GTS Fields
283 # GTS Specification (1 byte)
284 BitEnumField("gts_spec_permit", 1, 1, [False, True]), # GTS spec bit 7, true=1 iff PAN cord is accepting GTS requests # noqa: E501
285 BitField("gts_spec_reserved", 0, 4), # GTS spec bits 3-6
286 BitField("gts_spec_desccount", 0, 3), # GTS spec bits 0-2
287 # GTS Directions (0 or 1 byte)
288 ConditionalField(BitField("gts_dir_reserved", 0, 1), lambda pkt:pkt.getfieldval("gts_spec_desccount") != 0), # noqa: E501
289 ConditionalField(BitField("gts_dir_mask", 0, 7), lambda pkt:pkt.getfieldval("gts_spec_desccount") != 0), # noqa: E501
290 # GTS List (variable size)
291 # TODO add a Packet/FieldListField tied to 3bytes per count in gts_spec_desccount # noqa: E501
292
293 # Pending Address Fields:
294 # Pending Address Specification (1 byte)
295 BitField("pa_reserved_1", 0, 1),
296 BitField("pa_num_long", 0, 3), # number of long addresses pending
297 BitField("pa_reserved_2", 0, 1),
298 BitField("pa_num_short", 0, 3), # number of short addresses pending
299 # Address List (var length)
300 FieldListField("pa_short_addresses", [],
301 XLEShortField("", 0x0000),
302 count_from=lambda pkt: pkt.pa_num_short),
303 FieldListField("pa_long_addresses", [],
304 dot15d4AddressField("", 0, adjust=lambda pkt, x: 8),
305 count_from=lambda pkt: pkt.pa_num_long),
306 # TODO beacon payload
307 ]
308
309 def mysummary(self):
310 return self.sprintf("802.15.4 Beacon ( %Dot15d4Beacon.src_panid%:%Dot15d4Beacon.src_addr% ) assocPermit(%Dot15d4Beacon.sf_assocpermit%) panCoord(%Dot15d4Beacon.sf_pancoord%)") # noqa: E501
311
312
313class Dot15d4Cmd(Packet):
314 name = "802.15.4 Command"
315 fields_desc = [
316 XLEShortField("dest_panid", 0xFFFF),
317 # Users should correctly set the dest_addr field. By default is 0x0 for construction to work. # noqa: E501
318 dot15d4AddressField("dest_addr", 0x0, length_of="fcf_destaddrmode"),
319 ConditionalField(XLEShortField("src_panid", 0x0), \
320 lambda pkt:util_srcpanid_present(pkt)),
321 ConditionalField(dot15d4AddressField("src_addr", None,
322 length_of="fcf_srcaddrmode"),
323 lambda pkt:pkt.underlayer.getfieldval("fcf_srcaddrmode") != 0), # noqa: E501
324 # Security field present if fcf_security == True
325 ConditionalField(PacketField("aux_sec_header", Dot15d4AuxSecurityHeader(), Dot15d4AuxSecurityHeader), # noqa: E501
326 lambda pkt:pkt.underlayer.getfieldval("fcf_security") is True), # noqa: E501
327 ByteEnumField("cmd_id", 0, {
328 1: "AssocReq", # Association request
329 2: "AssocResp", # Association response
330 3: "DisassocNotify", # Disassociation notification
331 4: "DataReq", # Data request
332 5: "PANIDConflictNotify", # PAN ID conflict notification
333 6: "OrphanNotify", # Orphan notification
334 7: "BeaconReq", # Beacon request
335 8: "CoordRealign", # coordinator realignment
336 9: "GTSReq" # GTS request
337 # 0x0a - 0xff reserved
338 }),
339 # TODO command payload
340 ]
341
342 def mysummary(self):
343 return self.sprintf("802.15.4 Command %Dot15d4Cmd.cmd_id% ( %Dot15dCmd.src_panid%:%Dot15d4Cmd.src_addr% -> %Dot15d4Cmd.dest_panid%:%Dot15d4Cmd.dest_addr% )") # noqa: E501
344
345 # command frame payloads are complete: DataReq, PANIDConflictNotify, OrphanNotify, BeaconReq don't have any payload # noqa: E501
346 # Although BeaconReq can have an optional ZigBee Beacon payload (implemented in ZigBeeBeacon) # noqa: E501
347 def guess_payload_class(self, payload):
348 if self.cmd_id == 1:
349 return Dot15d4CmdAssocReq
350 elif self.cmd_id == 2:
351 return Dot15d4CmdAssocResp
352 elif self.cmd_id == 3:
353 return Dot15d4CmdDisassociation
354 elif self.cmd_id == 8:
355 return Dot15d4CmdCoordRealign
356 elif self.cmd_id == 9:
357 return Dot15d4CmdGTSReq
358 else:
359 return Packet.guess_payload_class(self, payload)
360
361
362class Dot15d4CmdCoordRealign(Packet):
363 name = "802.15.4 Coordinator Realign Command"
364 fields_desc = [
365 # PAN Identifier (2 octets)
366 XLEShortField("panid", 0xFFFF),
367 # Coordinator Short Address (2 octets)
368 XLEShortField("coord_address", 0x0000),
369 # Logical Channel (1 octet): the logical channel that the coordinator intends to use for all future communications # noqa: E501
370 ByteField("channel", 0),
371 # Short Address (2 octets)
372 XLEShortField("dev_address", 0xFFFF),
373 ]
374
375 def mysummary(self):
376 return self.sprintf("802.15.4 Coordinator Realign Payload ( PAN ID: %Dot15dCmdCoordRealign.pan_id% : channel %Dot15d4CmdCoordRealign.channel% )") # noqa: E501
377
378 def guess_payload_class(self, payload):
379 if len(payload) == 1:
380 return Dot15d4CmdCoordRealignPage
381 else:
382 return Packet.guess_payload_class(self, payload)
383
384
385class Dot15d4CmdCoordRealignPage(Packet):
386 name = "802.15.4 Coordinator Realign Page"
387 fields_desc = [
388 ByteField("channel_page", 0),
389 ]
390
391
392# Utility Functions #
393
394
395def util_srcpanid_present(pkt):
396 '''A source PAN ID is included if and only if both src addr mode != 0 and PAN ID Compression in FCF == 0''' # noqa: E501
397 if (pkt.underlayer.getfieldval("fcf_srcaddrmode") != 0) and (pkt.underlayer.getfieldval("fcf_panidcompress") == 0): # noqa: E501
398 return True
399 else:
400 return False
401
402
403class Dot15d4CmdAssocReq(Packet):
404 name = "802.15.4 Association Request Payload"
405 fields_desc = [
406 BitField("allocate_address", 0, 1), # Allocate Address
407 BitField("security_capability", 0, 1), # Security Capability
408 BitField("reserved2", 0, 1), # bit 5 is reserved
409 BitField("reserved1", 0, 1), # bit 4 is reserved
410 BitField("receiver_on_when_idle", 0, 1), # Receiver On When Idle
411 BitField("power_source", 0, 1), # Power Source
412 BitField("device_type", 0, 1), # Device Type
413 BitField("alternate_pan_coordinator", 0, 1), # Alternate PAN Coordinator # noqa: E501
414 ]
415
416 def mysummary(self):
417 return self.sprintf("802.15.4 Association Request Payload ( Alt PAN Coord: %Dot15d4CmdAssocReq.alternate_pan_coordinator% Device Type: %Dot15d4CmdAssocReq.device_type% )") # noqa: E501
418
419
420class Dot15d4CmdAssocResp(Packet):
421 name = "802.15.4 Association Response Payload"
422 fields_desc = [
423 XLEShortField("short_address", 0xFFFF), # Address assigned to device from coordinator (0xFFFF == none) # noqa: E501
424 # Association Status
425 # 0x00 == successful
426 # 0x01 == PAN at capacity
427 # 0x02 == PAN access denied
428 # 0x03 - 0x7f == Reserved
429 # 0x80 - 0xff == Reserved for MAC primitive enumeration values
430 ByteEnumField("association_status", 0x00, {0: 'successful', 1: 'PAN_at_capacity', 2: 'PAN_access_denied'}), # noqa: E501
431 ]
432
433 def mysummary(self):
434 return self.sprintf("802.15.4 Association Response Payload ( Association Status: %Dot15d4CmdAssocResp.association_status% Assigned Address: %Dot15d4CmdAssocResp.short_address% )") # noqa: E501
435
436
437class Dot15d4CmdDisassociation(Packet):
438 name = "802.15.4 Disassociation Notification Payload"
439 fields_desc = [
440 # Disassociation Reason
441 # 0x00 == Reserved
442 # 0x01 == The coordinator wishes the device to leave the PAN
443 # 0x02 == The device wishes to leave the PAN
444 # 0x03 - 0x7f == Reserved
445 # 0x80 - 0xff == Reserved for MAC primitive enumeration values
446 ByteEnumField("disassociation_reason", 0x02, {1: 'coord_wishes_device_to_leave', 2: 'device_wishes_to_leave'}), # noqa: E501
447 ]
448
449 def mysummary(self):
450 return self.sprintf("802.15.4 Disassociation Notification Payload ( Disassociation Reason %Dot15d4CmdDisassociation.disassociation_reason% )") # noqa: E501
451
452
453class Dot15d4CmdGTSReq(Packet):
454 name = "802.15.4 GTS request command"
455 fields_desc = [
456 # GTS Characteristics field (1 octet)
457 # Reserved (bits 6-7)
458 BitField("reserved", 0, 2),
459 # Characteristics Type (bit 5)
460 BitField("charact_type", 0, 1),
461 # GTS Direction (bit 4)
462 BitField("gts_dir", 0, 1),
463 # GTS Length (bits 0-3)
464 BitField("gts_len", 0, 4),
465 ]
466
467 def mysummary(self):
468 return self.sprintf("802.15.4 GTS Request Command ( %Dot15d4CmdGTSReq.gts_len% : %Dot15d4CmdGTSReq.gts_dir% )") # noqa: E501
469
470
471# PAN ID conflict notification command frame is not necessary, only Dot15d4Cmd with cmd_id = 5 ("PANIDConflictNotify") # noqa: E501
472# Orphan notification command not necessary, only Dot15d4Cmd with cmd_id = 6 ("OrphanNotify") # noqa: E501
473
474# Bindings #
475bind_layers(Dot15d4, Dot15d4Beacon, fcf_frametype=0)
476bind_layers(Dot15d4, Dot15d4Data, fcf_frametype=1)
477bind_layers(Dot15d4, Dot15d4Ack, fcf_frametype=2)
478bind_layers(Dot15d4, Dot15d4Cmd, fcf_frametype=3)
479
480# DLT Types #
481conf.l2types.register(DLT_IEEE802_15_4_WITHFCS, Dot15d4FCS)
482conf.l2types.register(DLT_IEEE802_15_4_NOFCS, Dot15d4)