Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/zigbee.py: 64%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

269 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Ryan Speers <ryan@rmspeers.com> 2011-2012 

5# Copyright (C) Roger Meyer <roger.meyer@csus.edu>: 2012-03-10 Added frames 

6# Copyright (C) Gabriel Potter <gabriel[]potter[]fr>: 2018 

7# Copyright (C) 2020-2021 Dimitrios-Georgios Akestoridis <akestoridis@cmu.edu> 

8 

9""" 

10ZigBee bindings for IEEE 802.15.4. 

11""" 

12 

13import struct 

14 

15from scapy.compat import orb 

16from scapy.packet import bind_layers, bind_bottom_up, Packet 

17from scapy.fields import BitField, ByteField, XLEIntField, ConditionalField, \ 

18 ByteEnumField, EnumField, BitEnumField, FieldListField, FlagsField, \ 

19 IntField, PacketListField, ShortField, StrField, StrFixedLenField, \ 

20 StrLenField, XLEShortField, XStrField 

21 

22from scapy.layers.dot15d4 import dot15d4AddressField, Dot15d4Beacon, Dot15d4, \ 

23 Dot15d4FCS 

24from scapy.layers.inet import UDP 

25from scapy.layers.ntp import TimeStampField 

26 

27 

28# APS Profile Identifiers 

29_aps_profile_identifiers = { 

30 0x0000: "Zigbee_Device_Profile", 

31 0x0101: "IPM_Industrial_Plant_Monitoring", 

32 0x0104: "HA_Home_Automation", 

33 0x0105: "CBA_Commercial_Building_Automation", 

34 0x0107: "TA_Telecom_Applications", 

35 0x0108: "HC_Health_Care", 

36 0x0109: "SE_Smart_Energy_Profile", 

37} 

38 

39# ZigBee Cluster Library Identifiers, Table 2.2 ZCL 

40_zcl_cluster_identifier = { 

41 # Functional Domain: General 

42 0x0000: "basic", 

43 0x0001: "power_configuration", 

44 0x0002: "device_temperature_configuration", 

45 0x0003: "identify", 

46 0x0004: "groups", 

47 0x0005: "scenes", 

48 0x0006: "on_off", 

49 0x0007: "on_off_switch_configuration", 

50 0x0008: "level_control", 

51 0x0009: "alarms", 

52 0x000a: "time", 

53 0x000b: "rssi_location", 

54 0x000c: "analog_input", 

55 0x000d: "analog_output", 

56 0x000e: "analog_value", 

57 0x000f: "binary_input", 

58 0x0010: "binary_output", 

59 0x0011: "binary_value", 

60 0x0012: "multistate_input", 

61 0x0013: "multistate_output", 

62 0x0014: "multistate_value", 

63 0x0015: "commissioning", 

64 # 0x0016 - 0x00ff reserved 

65 # Functional Domain: Closures 

66 0x0100: "shade_configuration", 

67 # 0x0101 - 0x01ff reserved 

68 # Functional Domain: HVAC 

69 0x0200: "pump_configuration_and_control", 

70 0x0201: "thermostat", 

71 0x0202: "fan_control", 

72 0x0203: "dehumidification_control", 

73 0x0204: "thermostat_user_interface_configuration", 

74 # 0x0205 - 0x02ff reserved 

75 # Functional Domain: Lighting 

76 0x0300: "color_control", 

77 0x0301: "ballast_configuration", 

78 # Functional Domain: Measurement and sensing 

79 0x0400: "illuminance_measurement", 

80 0x0401: "illuminance_level_sensing", 

81 0x0402: "temperature_measurement", 

82 0x0403: "pressure_measurement", 

83 0x0404: "flow_measurement", 

84 0x0405: "relative_humidity_measurement", 

85 0x0406: "occupancy_sensing", 

86 # Functional Domain: Security and safethy 

87 0x0500: "ias_zone", 

88 0x0501: "ias_ace", 

89 0x0502: "ias_wd", 

90 # Functional Domain: Protocol Interfaces 

91 0x0600: "generic_tunnel", 

92 0x0601: "bacnet_protocol_tunnel", 

93 0x0602: "analog_input_regular", 

94 0x0603: "analog_input_extended", 

95 0x0604: "analog_output_regular", 

96 0x0605: "analog_output_extended", 

97 0x0606: "analog_value_regular", 

98 0x0607: "analog_value_extended", 

99 0x0608: "binary_input_regular", 

100 0x0609: "binary_input_extended", 

101 0x060a: "binary_output_regular", 

102 0x060b: "binary_output_extended", 

103 0x060c: "binary_value_regular", 

104 0x060d: "binary_value_extended", 

105 0x060e: "multistate_input_regular", 

106 0x060f: "multistate_input_extended", 

107 0x0610: "multistate_output_regular", 

108 0x0611: "multistate_output_extended", 

109 0x0612: "multistate_value_regular", 

110 0x0613: "multistate_value", 

111 # Smart Energy Profile Clusters 

112 0x0700: "price", 

113 0x0701: "demand_response_and_load_control", 

114 0x0702: "metering", 

115 0x0703: "messaging", 

116 0x0704: "smart_energy_tunneling", 

117 0x0705: "prepayment", 

118 # Functional Domain: General 

119 # Key Establishment 

120 0x0800: "key_establishment", 

121} 

122 

123# ZigBee Cluster Library, Table 2.8 ZCL Command Frames 

124_zcl_command_frames = { 

125 0x00: "read_attributes", 

126 0x01: "read_attributes_response", 

127 0x02: "write_attributes", 

128 0x03: "write_attributes_undivided", 

129 0x04: "write_attributes_response", 

130 0x05: "write_attributes_no_response", 

131 0x06: "configure_reporting", 

132 0x07: "configure_reporting_response", 

133 0x08: "read_reporting_configuration", 

134 0x09: "read_reporting_configuration_response", 

135 0x0a: "report_attributes", 

136 0x0b: "default_response", 

137 0x0c: "discover_attributes", 

138 0x0d: "discover_attributes_response", 

139 0x0e: "read_attributes_structured", 

140 0x0f: "write_attributes_structured", 

141 0x10: "write_attributes_structured_response", 

142 0x11: "discover_commands_received", 

143 0x12: "discover_commands_received_response", 

144 0x13: "discover_commands_generated", 

145 0x14: "discover_commands_generated_response", 

146 0x15: "discover_attributes_extended", 

147 0x16: "discover_attributes_extended_response", 

148 # 0x17 - 0xff Reserved 

149} 

150 

151# ZigBee Cluster Library, Table 2.16 Enumerated Status Values 

152_zcl_enumerated_status_values = { 

153 0x00: "SUCCESS", 

154 0x01: "FAILURE", 

155 # 0x02 - 0x7d Reserved 

156 0x7e: "NOT_AUTHORIZED", 

157 0x7f: "RESERVED_FIELD_NOT_ZERO", 

158 0x80: "MALFORMED_COMMAND", 

159 0x81: "UNSUP_CLUSTER_COMMAND", 

160 0x82: "UNSUP_GENERAL_COMMAND", 

161 0x83: "UNSUP_MANUF_CLUSTER_COMMAND", 

162 0x84: "UNSUP_MANUF_GENERAL_COMMAND", 

163 0x85: "INVALID_FIELD", 

164 0x86: "UNSUPPORTED_ATTRIBUTE", 

165 0x87: "INVALID_VALUE", 

166 0x88: "READ_ONLY", 

167 0x89: "INSUFFICIENT_SPACE", 

168 0x8a: "DUPLICATE_EXISTS", 

169 0x8b: "NOT_FOUND", 

170 0x8c: "UNREPORTABLE_ATTRIBUTE", 

171 0x8d: "INVALID_DATA_TYPE", 

172 0x8e: "INVALID_SELECTOR", 

173 0x8f: "WRITE_ONLY", 

174 0x90: "INCONSISTENT_STARTUP_STATE", 

175 0x91: "DEFINED_OUT_OF_BAND", 

176 0x92: "INCONSISTENT", 

177 0x93: "ACTION_DENIED", 

178 0x94: "TIMEOUT", 

179 0x95: "ABORT", 

180 0x96: "INVALID_IMAGE", 

181 0x97: "WAIT_FOR_DATA", 

182 0x98: "NO_IMAGE_AVAILABLE", 

183 0x99: "REQUIRE_MORE_IMAGE", 

184 0x9a: "NOTIFICATION_PENDING", 

185 # 0x9b - 0xbf Reserved 

186 0xc0: "HARDWARE_FAILURE", 

187 0xc1: "SOFTWARE_FAILURE", 

188 0xc2: "CALIBRATION_ERROR", 

189 0xc3: "UNSUPPORTED_CLUSTER", 

190 # 0xc4 - 0xff Reserved 

191} 

192 

193# ZigBee Cluster Library, Table 2.15 Data Types 

194_zcl_attribute_data_types = { 

195 0x00: "no_data", 

196 # General data 

197 0x08: "8-bit_data", 

198 0x09: "16-bit_data", 

199 0x0a: "24-bit_data", 

200 0x0b: "32-bit_data", 

201 0x0c: "40-bit_data", 

202 0x0d: "48-bit_data", 

203 0x0e: "56-bit_data", 

204 0x0f: "64-bit_data", 

205 # Logical 

206 0x10: "boolean", 

207 # Bitmap 

208 0x18: "8-bit_bitmap", 

209 0x19: "16-bit_bitmap", 

210 0x1a: "24-bit_bitmap", 

211 0x1b: "32-bit_bitmap", 

212 0x1c: "40-bit_bitmap", 

213 0x1d: "48-bit_bitmap", 

214 0x1e: "56-bit_bitmap", 

215 0x1f: "64-bit_bitmap", 

216 # Unsigned integer 

217 0x20: "Unsigned_8-bit_integer", 

218 0x21: "Unsigned_16-bit_integer", 

219 0x22: "Unsigned_24-bit_integer", 

220 0x23: "Unsigned_32-bit_integer", 

221 0x24: "Unsigned_40-bit_integer", 

222 0x25: "Unsigned_48-bit_integer", 

223 0x26: "Unsigned_56-bit_integer", 

224 0x27: "Unsigned_64-bit_integer", 

225 # Signed integer 

226 0x28: "Signed_8-bit_integer", 

227 0x29: "Signed_16-bit_integer", 

228 0x2a: "Signed_24-bit_integer", 

229 0x2b: "Signed_32-bit_integer", 

230 0x2c: "Signed_40-bit_integer", 

231 0x2d: "Signed_48-bit_integer", 

232 0x2e: "Signed_56-bit_integer", 

233 0x2f: "Signed_64-bit_integer", 

234 # Enumeration 

235 0x30: "8-bit_enumeration", 

236 0x31: "16-bit_enumeration", 

237 # Floating point 

238 0x38: "semi_precision", 

239 0x39: "single_precision", 

240 0x3a: "double_precision", 

241 # String 

242 0x41: "octet-string", 

243 0x42: "character_string", 

244 0x43: "long_octet_string", 

245 0x44: "long_character_string", 

246 # Ordered sequence 

247 0x48: "array", 

248 0x4c: "structure", 

249 # Collection 

250 0x50: "set", 

251 0x51: "bag", 

252 # Time 

253 0xe0: "time_of_day", 

254 0xe1: "date", 

255 0xe2: "utc_time", 

256 # Identifier 

257 0xe8: "cluster_id", 

258 0xe9: "attribute_id", 

259 0xea: "bacnet_oid", 

260 # Miscellaneous 

261 0xf0: "ieee_address", 

262 0xf1: "128-bit_security_key", 

263 # Unknown 

264 0xff: "unknown", 

265} 

266 

267# Zigbee Cluster Library, IAS Zone, Enroll Response Codes 

268_zcl_ias_zone_enroll_response_codes = { 

269 0x00: "Success", 

270 0x01: "Not supported", 

271 0x02: "No enroll permit", 

272 0x03: "Too many zones", 

273} 

274 

275# Zigbee Cluster Library, IAS Zone, Zone Types 

276_zcl_ias_zone_zone_types = { 

277 0x0000: "Standard CIE", 

278 0x000d: "Motion sensor", 

279 0x0015: "Contact switch", 

280 0x0028: "Fire sensor", 

281 0x002a: "Water sensor", 

282 0x002b: "Carbon Monoxide (CO) sensor", 

283 0x002c: "Personal emergency device", 

284 0x002d: "Vibration/Movement sensor", 

285 0x010f: "Remote Control", 

286 0x0115: "Key fob", 

287 0x021d: "Keypad", 

288 0x0225: "Standard Warning Device", 

289 0x0226: "Glass break sensor", 

290 0x0229: "Security repeater", 

291 # 0x8000 - 0xfffe Manufacturer-specific types 

292 0xffff: "Invalid Zone Type", 

293} 

294 

295 

296# ZigBee # 

297 

298class ZigbeeNWK(Packet): 

299 name = "Zigbee Network Layer" 

300 fields_desc = [ 

301 BitField("discover_route", 0, 2), 

302 BitField("proto_version", 2, 4), 

303 BitEnumField("frametype", 0, 2, 

304 {0: 'data', 1: 'command', 3: 'Inter-PAN'}), 

305 FlagsField("flags", 0, 8, ['multicast', 'security', 'source_route', 'extended_dst', 'extended_src', 'reserved1', 'reserved2', 'reserved3']), # noqa: E501 

306 XLEShortField("destination", 0), 

307 XLEShortField("source", 0), 

308 ByteField("radius", 0), 

309 ByteField("seqnum", 1), 

310 

311 # ConditionalField(XLongField("ext_dst", 0), lambda pkt:pkt.flags & 8), 

312 

313 ConditionalField(dot15d4AddressField("ext_dst", 0, adjust=lambda pkt, x: 8), lambda pkt:pkt.flags & 8), # noqa: E501 

314 ConditionalField(dot15d4AddressField("ext_src", 0, adjust=lambda pkt, x: 8), lambda pkt:pkt.flags & 16), # noqa: E501 

315 

316 ConditionalField(ByteField("relay_count", 1), lambda pkt:pkt.flags & 0x04), # noqa: E501 

317 ConditionalField(ByteField("relay_index", 0), lambda pkt:pkt.flags & 0x04), # noqa: E501 

318 ConditionalField(FieldListField("relays", [], XLEShortField("", 0x0000), count_from=lambda pkt:pkt.relay_count), lambda pkt:pkt.flags & 0x04), # noqa: E501 

319 ] 

320 

321 @classmethod 

322 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

323 if _pkt and len(_pkt) >= 2: 

324 frametype = ord(_pkt[:1]) & 3 

325 if frametype == 3: 

326 return ZigbeeNWKStub 

327 return cls 

328 

329 def guess_payload_class(self, payload): 

330 if self.flags.security: 

331 return ZigbeeSecurityHeader 

332 elif self.frametype == 0: 

333 return ZigbeeAppDataPayload 

334 elif self.frametype == 1: 

335 return ZigbeeNWKCommandPayload 

336 else: 

337 return Packet.guess_payload_class(self, payload) 

338 

339 

340class LinkStatusEntry(Packet): 

341 name = "ZigBee Link Status Entry" 

342 

343 fields_desc = [ 

344 # Neighbor network address (2 octets) 

345 XLEShortField("neighbor_network_address", 0x0000), 

346 # Link status (1 octet) 

347 BitField("reserved1", 0, 1), 

348 BitField("outgoing_cost", 0, 3), 

349 BitField("reserved2", 0, 1), 

350 BitField("incoming_cost", 0, 3), 

351 ] 

352 

353 def extract_padding(self, p): 

354 return b"", p 

355 

356 

357class ZigbeeNWKCommandPayload(Packet): 

358 name = "Zigbee Network Layer Command Payload" 

359 fields_desc = [ 

360 ByteEnumField("cmd_identifier", 1, { 

361 1: "route request", 

362 2: "route reply", 

363 3: "network status", 

364 4: "leave", 

365 5: "route record", 

366 6: "rejoin request", 

367 7: "rejoin response", 

368 8: "link status", 

369 9: "network report", 

370 10: "network update", 

371 11: "end device timeout request", 

372 12: "end device timeout response" 

373 # 0x0d - 0xff reserved 

374 }), 

375 

376 # - Route Request Command - # 

377 # Command options (1 octet) 

378 ConditionalField(BitField("res1", 0, 1), 

379 lambda pkt: pkt.cmd_identifier in [1, 2]), 

380 ConditionalField(BitField("multicast", 0, 1), 

381 lambda pkt: pkt.cmd_identifier in [1, 2]), 

382 ConditionalField(BitField("dest_addr_bit", 0, 1), lambda pkt: pkt.cmd_identifier == 1), # noqa: E501 

383 ConditionalField( 

384 BitEnumField("many_to_one", 0, 2, { 

385 0: "not_m2one", 1: "m2one_support_rrt", 2: "m2one_no_support_rrt", 3: "reserved"} # noqa: E501 

386 ), lambda pkt: pkt.cmd_identifier == 1), 

387 ConditionalField(BitField("res2", 0, 3), lambda pkt: pkt.cmd_identifier == 1), # noqa: E501 

388 

389 # - Route Reply Command - # 

390 # Command options (1 octet) 

391 ConditionalField(BitField("responder_addr_bit", 0, 1), lambda pkt: pkt.cmd_identifier == 2), # noqa: E501 

392 ConditionalField(BitField("originator_addr_bit", 0, 1), lambda pkt: pkt.cmd_identifier == 2), # noqa: E501 

393 ConditionalField(BitField("res3", 0, 4), lambda pkt: pkt.cmd_identifier == 2), # noqa: E501 

394 # Route request identifier (1 octet) 

395 ConditionalField(ByteField("route_request_identifier", 0), 

396 lambda pkt: pkt.cmd_identifier in [1, 2]), # noqa: E501 

397 # Originator address (2 octets) 

398 ConditionalField(XLEShortField("originator_address", 0x0000), lambda pkt: pkt.cmd_identifier == 2), # noqa: E501 

399 # Responder address (2 octets) 

400 ConditionalField(XLEShortField("responder_address", 0x0000), lambda pkt: pkt.cmd_identifier == 2), # noqa: E501 

401 

402 # - Network Status Command - # 

403 # Status code (1 octet) 

404 ConditionalField(ByteEnumField("status_code", 0, { 

405 0x00: "No route available", 

406 0x01: "Tree link failure", 

407 0x02: "Non-tree link failure", 

408 0x03: "Low battery level", 

409 0x04: "No routing capacity", 

410 0x05: "No indirect capacity", 

411 0x06: "Indirect transaction expiry", 

412 0x07: "Target device unavailable", 

413 0x08: "Target address unallocated", 

414 0x09: "Parent link failure", 

415 0x0a: "Validate route", 

416 0x0b: "Source route failure", 

417 0x0c: "Many-to-one route failure", 

418 0x0d: "Address conflict", 

419 0x0e: "Verify addresses", 

420 0x0f: "PAN identifier update", 

421 0x10: "Network address update", 

422 0x11: "Bad frame counter", 

423 0x12: "Bad key sequence number", 

424 # 0x13 - 0xff Reserved 

425 }), lambda pkt: pkt.cmd_identifier == 3), 

426 # Destination address (2 octets) 

427 ConditionalField(XLEShortField("destination_address", 0x0000), 

428 lambda pkt: pkt.cmd_identifier in [1, 3]), 

429 # Path cost (1 octet) 

430 ConditionalField(ByteField("path_cost", 0), 

431 lambda pkt: pkt.cmd_identifier in [1, 2]), # noqa: E501 

432 # Destination IEEE Address (0/8 octets), only present when dest_addr_bit has a value of 1 # noqa: E501 

433 ConditionalField(dot15d4AddressField("ext_dst", 0, adjust=lambda pkt, x: 8), # noqa: E501 

434 lambda pkt: (pkt.cmd_identifier == 1 and pkt.dest_addr_bit == 1)), # noqa: E501 

435 # Originator IEEE address (0/8 octets) 

436 ConditionalField(dot15d4AddressField("originator_addr", 0, adjust=lambda pkt, x: 8), # noqa: E501 

437 lambda pkt: (pkt.cmd_identifier == 2 and pkt.originator_addr_bit == 1)), # noqa: E501 

438 # Responder IEEE address (0/8 octets) 

439 ConditionalField(dot15d4AddressField("responder_addr", 0, adjust=lambda pkt, x: 8), # noqa: E501 

440 lambda pkt: (pkt.cmd_identifier == 2 and pkt.responder_addr_bit == 1)), # noqa: E501 

441 

442 # - Leave Command - # 

443 # Command options (1 octet) 

444 # Bit 7: Remove children 

445 ConditionalField(BitField("remove_children", 0, 1), lambda pkt: pkt.cmd_identifier == 4), # noqa: E501 

446 # Bit 6: Request 

447 ConditionalField(BitField("request", 0, 1), lambda pkt: pkt.cmd_identifier == 4), # noqa: E501 

448 # Bit 5: Rejoin 

449 ConditionalField(BitField("rejoin", 0, 1), lambda pkt: pkt.cmd_identifier == 4), # noqa: E501 

450 # Bit 0 - 4: Reserved 

451 ConditionalField(BitField("res4", 0, 5), lambda pkt: pkt.cmd_identifier == 4), # noqa: E501 

452 

453 # - Route Record Command - # 

454 # Relay count (1 octet) 

455 ConditionalField(ByteField("rr_relay_count", 0), lambda pkt: pkt.cmd_identifier == 5), # noqa: E501 

456 # Relay list (variable in length) 

457 ConditionalField( 

458 FieldListField("rr_relay_list", [], XLEShortField("", 0x0000), count_from=lambda pkt:pkt.rr_relay_count), # noqa: E501 

459 lambda pkt:pkt.cmd_identifier == 5), 

460 

461 # - Rejoin Request Command - # 

462 # Capability Information (1 octet) 

463 ConditionalField(BitField("allocate_address", 0, 1), lambda pkt:pkt.cmd_identifier == 6), # Allocate Address # noqa: E501 

464 ConditionalField(BitField("security_capability", 0, 1), lambda pkt:pkt.cmd_identifier == 6), # Security Capability # noqa: E501 

465 ConditionalField(BitField("reserved2", 0, 1), lambda pkt:pkt.cmd_identifier == 6), # bit 5 is reserved # noqa: E501 

466 ConditionalField(BitField("reserved1", 0, 1), lambda pkt:pkt.cmd_identifier == 6), # bit 4 is reserved # noqa: E501 

467 ConditionalField(BitField("receiver_on_when_idle", 0, 1), lambda pkt:pkt.cmd_identifier == 6), # Receiver On When Idle # noqa: E501 

468 ConditionalField(BitField("power_source", 0, 1), lambda pkt:pkt.cmd_identifier == 6), # Power Source # noqa: E501 

469 ConditionalField(BitField("device_type", 0, 1), lambda pkt:pkt.cmd_identifier == 6), # Device Type # noqa: E501 

470 ConditionalField(BitField("alternate_pan_coordinator", 0, 1), lambda pkt:pkt.cmd_identifier == 6), # Alternate PAN Coordinator # noqa: E501 

471 

472 # - Rejoin Response Command - # 

473 # Network address (2 octets) 

474 ConditionalField(XLEShortField("network_address", 0xFFFF), lambda pkt:pkt.cmd_identifier == 7), # noqa: E501 

475 # Rejoin status (1 octet) 

476 ConditionalField(ByteField("rejoin_status", 0), lambda pkt:pkt.cmd_identifier == 7), # noqa: E501 

477 

478 # - Link Status Command - # 

479 # Command options (1 octet) 

480 ConditionalField(BitField("res5", 0, 1), lambda pkt:pkt.cmd_identifier == 8), # Reserved # noqa: E501 

481 ConditionalField(BitField("last_frame", 0, 1), lambda pkt:pkt.cmd_identifier == 8), # Last frame # noqa: E501 

482 ConditionalField(BitField("first_frame", 0, 1), lambda pkt:pkt.cmd_identifier == 8), # First frame # noqa: E501 

483 ConditionalField(BitField("entry_count", 0, 5), lambda pkt:pkt.cmd_identifier == 8), # Entry count # noqa: E501 

484 # Link status list (variable size) 

485 ConditionalField( 

486 PacketListField("link_status_list", [], LinkStatusEntry, count_from=lambda pkt:pkt.entry_count), # noqa: E501 

487 lambda pkt:pkt.cmd_identifier == 8), 

488 

489 # - Network Report Command - # 

490 # Command options (1 octet) 

491 ConditionalField( 

492 BitEnumField("report_command_identifier", 0, 3, {0: "PAN identifier conflict"}), # 0x01 - 0x07 Reserved # noqa: E501 

493 lambda pkt: pkt.cmd_identifier == 9), 

494 ConditionalField(BitField("report_information_count", 0, 5), lambda pkt: pkt.cmd_identifier == 9), # noqa: E501 

495 

496 # - Network Update Command - # 

497 # Command options (1 octet) 

498 ConditionalField( 

499 BitEnumField("update_command_identifier", 0, 3, {0: "PAN Identifier Update"}), # 0x01 - 0x07 Reserved # noqa: E501 

500 lambda pkt: pkt.cmd_identifier == 10), 

501 ConditionalField(BitField("update_information_count", 0, 5), lambda pkt: pkt.cmd_identifier == 10), # noqa: E501 

502 # EPID: Extended PAN ID (8 octets) 

503 ConditionalField( 

504 dot15d4AddressField("epid", 0, adjust=lambda pkt, x: 8), 

505 lambda pkt: pkt.cmd_identifier in [9, 10] 

506 ), 

507 # Report information (variable length) 

508 # Only present if we have a PAN Identifier Conflict Report 

509 ConditionalField( 

510 FieldListField("PAN_ID_conflict_report", [], XLEShortField("", 0x0000), # noqa: E501 

511 count_from=lambda pkt:pkt.report_information_count), 

512 lambda pkt:(pkt.cmd_identifier == 9 and pkt.report_command_identifier == 0) # noqa: E501 

513 ), 

514 # Update Id (1 octet) 

515 ConditionalField(ByteField("update_id", 0), lambda pkt: pkt.cmd_identifier == 10), # noqa: E501 

516 # Update Information (Variable) 

517 # Only present if we have a PAN Identifier Update 

518 # New PAN ID (2 octets) 

519 ConditionalField(XLEShortField("new_PAN_ID", 0x0000), 

520 lambda pkt: (pkt.cmd_identifier == 10 and pkt.update_command_identifier == 0)), # noqa: E501 

521 

522 # - End Device Timeout Request Command - # 

523 # Requested Timeout (1 octet) 

524 ConditionalField( 

525 ByteEnumField("req_timeout", 3, { 

526 0: "10 seconds", 

527 1: "2 minutes", 

528 2: "4 minutes", 

529 3: "8 minutes", 

530 4: "16 minutes", 

531 5: "32 minutes", 

532 6: "64 minutes", 

533 7: "128 minutes", 

534 8: "256 minutes", 

535 9: "512 minutes", 

536 10: "1024 minutes", 

537 11: "2048 minutes", 

538 12: "4096 minutes", 

539 13: "8192 minutes", 

540 14: "16384 minutes" 

541 }), 

542 lambda pkt: pkt.cmd_identifier == 11), 

543 # End Device Configuration (1 octet) 

544 ConditionalField( 

545 ByteField("ed_conf", 0), 

546 lambda pkt: pkt.cmd_identifier == 11), 

547 

548 # - End Device Timeout Response Command - # 

549 # Status (1 octet) 

550 ConditionalField( 

551 ByteEnumField("status", 0, { 

552 0: "Success", 

553 1: "Incorrect Value" 

554 }), 

555 lambda pkt: pkt.cmd_identifier == 12), 

556 # Parent Information (1 octet) 

557 ConditionalField( 

558 BitField("res6", 0, 6), 

559 lambda pkt: pkt.cmd_identifier == 12), 

560 ConditionalField( 

561 BitField("ed_timeout_req_keepalive", 0, 1), 

562 lambda pkt: pkt.cmd_identifier == 12), 

563 ConditionalField( 

564 BitField("mac_data_poll_keepalive", 0, 1), 

565 lambda pkt: pkt.cmd_identifier == 12) 

566 

567 # StrField("data", ""), 

568 ] 

569 

570 

571def util_mic_len(pkt): 

572 ''' Calculate the length of the attribute value field ''' 

573 if (pkt.nwk_seclevel == 0): # no encryption, no mic 

574 return 0 

575 elif (pkt.nwk_seclevel == 1): # MIC-32 

576 return 4 

577 elif (pkt.nwk_seclevel == 2): # MIC-64 

578 return 8 

579 elif (pkt.nwk_seclevel == 3): # MIC-128 

580 return 16 

581 elif (pkt.nwk_seclevel == 4): # ENC 

582 return 0 

583 elif (pkt.nwk_seclevel == 5): # ENC-MIC-32 

584 return 4 

585 elif (pkt.nwk_seclevel == 6): # ENC-MIC-64 

586 return 8 

587 elif (pkt.nwk_seclevel == 7): # ENC-MIC-128 

588 return 16 

589 else: 

590 return 0 

591 

592 

593class ZigbeeSecurityHeader(Packet): 

594 name = "Zigbee Security Header" 

595 fields_desc = [ 

596 # Security control (1 octet) 

597 FlagsField("reserved1", 0, 2, ['reserved1', 'reserved2']), 

598 BitField("extended_nonce", 1, 1), # set to 1 if the sender address field is present (source) # noqa: E501 

599 # Key identifier 

600 BitEnumField("key_type", 1, 2, { 

601 0: 'data_key', 

602 1: 'network_key', 

603 2: 'key_transport_key', 

604 3: 'key_load_key' 

605 }), 

606 # Security level (3 bits) 

607 BitEnumField("nwk_seclevel", 0, 3, { 

608 0: "None", 

609 1: "MIC-32", 

610 2: "MIC-64", 

611 3: "MIC-128", 

612 4: "ENC", 

613 5: "ENC-MIC-32", 

614 6: "ENC-MIC-64", 

615 7: "ENC-MIC-128" 

616 }), 

617 # Frame counter (4 octets) 

618 XLEIntField("fc", 0), # provide frame freshness and prevent duplicate frames # noqa: E501 

619 # Source address (0/8 octets) 

620 ConditionalField(dot15d4AddressField("source", 0, adjust=lambda pkt, x: 8), lambda pkt: pkt.extended_nonce), # noqa: E501 

621 # Key sequence number (0/1 octet): only present when key identifier is 1 (network key) # noqa: E501 

622 ConditionalField(ByteField("key_seqnum", 0), lambda pkt: pkt.getfieldval("key_type") == 1), # noqa: E501 

623 # Payload 

624 # the length of the encrypted data is the payload length minus the MIC 

625 StrField("data", ""), # noqa: E501 

626 # Message Integrity Code (0/variable in size), length depends on nwk_seclevel # noqa: E501 

627 XStrField("mic", ""), 

628 ] 

629 

630 def post_dissect(self, s): 

631 # Get the mic dissected correctly 

632 mic_length = util_mic_len(self) 

633 if mic_length > 0: # Slice "data" into "data + mic" 

634 _data, _mic = self.data[:-mic_length], self.data[-mic_length:] 

635 self.data, self.mic = _data, _mic 

636 return s 

637 

638 

639class ZigbeeAppDataPayload(Packet): 

640 name = "Zigbee Application Layer Data Payload (General APS Frame Format)" 

641 fields_desc = [ 

642 # Frame control (1 octet) 

643 FlagsField("frame_control", 2, 4, 

644 ['ack_format', 'security', 'ack_req', 'extended_hdr']), 

645 BitEnumField("delivery_mode", 0, 2, 

646 {0: 'unicast', 1: 'indirect', 

647 2: 'broadcast', 3: 'group_addressing'}), 

648 BitEnumField("aps_frametype", 0, 2, 

649 {0: 'data', 1: 'command', 2: 'ack'}), 

650 # Destination endpoint (0/1 octet) 

651 ConditionalField( 

652 ByteField("dst_endpoint", 10), 

653 lambda pkt: ((pkt.aps_frametype == 0 and 

654 pkt.delivery_mode in [0, 2]) or 

655 (pkt.aps_frametype == 2 and not 

656 pkt.frame_control.ack_format)) 

657 ), 

658 # Group address (0/2 octets) 

659 ConditionalField( 

660 XLEShortField("group_addr", 0x0000), 

661 lambda pkt: (pkt.aps_frametype == 0 and pkt.delivery_mode == 3) 

662 ), 

663 # Cluster identifier (0/2 octets) 

664 ConditionalField( 

665 # unsigned short (little-endian) 

666 XLEShortField("cluster", 0x0000), 

667 lambda pkt: ((pkt.aps_frametype == 0) or 

668 (pkt.aps_frametype == 2 and not 

669 pkt.frame_control.ack_format)) 

670 ), 

671 # Profile identifier (0/2 octets) 

672 ConditionalField( 

673 EnumField("profile", 0, _aps_profile_identifiers, fmt="<H"), 

674 lambda pkt: ((pkt.aps_frametype == 0) or 

675 (pkt.aps_frametype == 2 and not 

676 pkt.frame_control.ack_format)) 

677 ), 

678 # Source endpoint (0/1 octets) 

679 ConditionalField( 

680 ByteField("src_endpoint", 10), 

681 lambda pkt: ((pkt.aps_frametype == 0) or 

682 (pkt.aps_frametype == 2 and not 

683 pkt.frame_control.ack_format)) 

684 ), 

685 # APS counter (1 octet) 

686 ByteField("counter", 0), 

687 # Extended header (0/1/2 octets) 

688 # cribbed from https://github.com/wireshark/wireshark/blob/master/epan/dissectors/packet-zbee-aps.c # noqa: E501 

689 ConditionalField( 

690 ByteEnumField( 

691 "fragmentation", 0, 

692 {0: "none", 1: "first_block", 2: "middle_block"}), 

693 lambda pkt: (pkt.aps_frametype in [0, 2] and 

694 pkt.frame_control.extended_hdr) 

695 ), 

696 ConditionalField( 

697 ByteField("block_number", 0), 

698 lambda pkt: (pkt.aps_frametype in [0, 2] and 

699 pkt.fragmentation in [1, 2]) 

700 ), 

701 ConditionalField( 

702 ByteField("ack_bitfield", 0), 

703 lambda pkt: (pkt.aps_frametype == 2 and 

704 pkt.fragmentation in [1, 2]) 

705 ), 

706 # variable length frame payload: 

707 # 3 frame types: data, APS command, and acknowledgement 

708 # ConditionalField(StrField("data", ""), lambda pkt:pkt.aps_frametype == 0), # noqa: E501 

709 ] 

710 

711 def guess_payload_class(self, payload): 

712 if self.frame_control & 0x02: # we have a security header 

713 return ZigbeeSecurityHeader 

714 elif self.aps_frametype == 0: # data 

715 if self.profile == 0x0000: 

716 return ZigbeeDeviceProfile 

717 else: 

718 return ZigbeeClusterLibrary 

719 elif self.aps_frametype == 1: # command 

720 return ZigbeeAppCommandPayload 

721 else: 

722 return Packet.guess_payload_class(self, payload) 

723 

724 

725_TransportKeyKeyTypes = { 

726 0x00: "Trust Center Master Key", 

727 0x01: "Standard Network Key", 

728 0x02: "Application Master Key", 

729 0x03: "Application Link Key", 

730 0x04: "Trust Center Link Key", 

731 0x05: "High-Security Network Key", 

732} 

733 

734 

735_RequestKeyKeyTypes = { 

736 0x02: "Application Link Key", 

737 0x04: "Trust Center Link Key", 

738} 

739 

740 

741_ApsStatusValues = { 

742 0x00: "SUCCESS", 

743 0xa0: "ASDU_TOO_LONG", 

744 0xa1: "DEFRAG_DEFERRED", 

745 0xa2: "DEFRAG_UNSUPPORTED", 

746 0xa3: "ILLEGAL_REQUEST", 

747 0xa4: "INVALID_BINDING", 

748 0xa5: "INVALID_GROUP", 

749 0xa6: "INVALID_PARAMETER", 

750 0xa7: "NO_ACK", 

751 0xa8: "NO_BOUND_DEVICE", 

752 0xa9: "NO_SHORT_ADDRESS", 

753 0xaa: "NOT_SUPPORTED", 

754 0xab: "SECURED_LINK_KEY", 

755 0xac: "SECURED_NWK_KEY", 

756 0xad: "SECURITY_FAIL", 

757 0xae: "TABLE_FULL", 

758 0xaf: "UNSECURED", 

759 0xb0: "UNSUPPORTED_ATTRIBUTE" 

760} 

761 

762 

763class ZigbeeAppCommandPayload(Packet): 

764 name = "Zigbee Application Layer Command Payload" 

765 fields_desc = [ 

766 ByteEnumField("cmd_identifier", 1, { 

767 1: "APS_CMD_SKKE_1", 

768 2: "APS_CMD_SKKE_2", 

769 3: "APS_CMD_SKKE_3", 

770 4: "APS_CMD_SKKE_4", 

771 5: "APS_CMD_TRANSPORT_KEY", 

772 6: "APS_CMD_UPDATE_DEVICE", 

773 7: "APS_CMD_REMOVE_DEVICE", 

774 8: "APS_CMD_REQUEST_KEY", 

775 9: "APS_CMD_SWITCH_KEY", 

776 # TODO: implement 10 to 13 

777 10: "APS_CMD_EA_INIT_CHLNG", 

778 11: "APS_CMD_EA_RSP_CHLNG", 

779 12: "APS_CMD_EA_INIT_MAC_DATA", 

780 13: "APS_CMD_EA_RSP_MAC_DATA", 

781 14: "APS_CMD_TUNNEL", 

782 15: "APS_CMD_VERIFY_KEY", 

783 16: "APS_CMD_CONFIRM_KEY" 

784 }), 

785 # SKKE Commands 

786 ConditionalField(dot15d4AddressField("initiator", 0, 

787 adjust=lambda pkt, x: 8), 

788 lambda pkt: pkt.cmd_identifier in [1, 2, 3, 4]), 

789 ConditionalField(dot15d4AddressField("responder", 0, 

790 adjust=lambda pkt, x: 8), 

791 lambda pkt: pkt.cmd_identifier in [1, 2, 3, 4]), 

792 ConditionalField(StrFixedLenField("data", 0, length=16), 

793 lambda pkt: pkt.cmd_identifier in [1, 2, 3, 4]), 

794 # Confirm-key command 

795 ConditionalField( 

796 ByteEnumField("status", 0, _ApsStatusValues), 

797 lambda pkt: pkt.cmd_identifier == 16), 

798 # Common fields 

799 ConditionalField( 

800 ByteEnumField("key_type", 0, _TransportKeyKeyTypes), 

801 lambda pkt: pkt.cmd_identifier in [5, 8, 15, 16]), 

802 ConditionalField(dot15d4AddressField("address", 0, 

803 adjust=lambda pkt, x: 8), 

804 lambda pkt: pkt.cmd_identifier in [6, 7, 15, 16]), 

805 # Transport-key Command 

806 ConditionalField( 

807 StrFixedLenField("key", None, 16), 

808 lambda pkt: pkt.cmd_identifier == 5), 

809 ConditionalField( 

810 ByteField("key_seqnum", 0), 

811 lambda pkt: (pkt.cmd_identifier == 5 and 

812 pkt.key_type in [0x01, 0x05])), 

813 ConditionalField( 

814 dot15d4AddressField("dest_addr", 0, adjust=lambda pkt, x: 8), 

815 lambda pkt: ((pkt.cmd_identifier == 5 and 

816 pkt.key_type not in [0x02, 0x03]) or 

817 pkt.cmd_identifier == 14)), 

818 ConditionalField( 

819 dot15d4AddressField("src_addr", 0, adjust=lambda pkt, x: 8), 

820 lambda pkt: (pkt.cmd_identifier == 5 and 

821 pkt.key_type not in [0x02, 0x03])), 

822 ConditionalField( 

823 dot15d4AddressField("partner_addr", 0, adjust=lambda pkt, x: 8), 

824 lambda pkt: ((pkt.cmd_identifier == 5 and 

825 pkt.key_type in [0x02, 0x03]) or 

826 (pkt.cmd_identifier == 8 and pkt.key_type == 0x02))), 

827 ConditionalField( 

828 ByteField("initiator_flag", 0), 

829 lambda pkt: (pkt.cmd_identifier == 5 and 

830 pkt.key_type in [0x02, 0x03])), 

831 # Update-Device Command 

832 ConditionalField(XLEShortField("short_address", 0), 

833 lambda pkt: pkt.cmd_identifier == 6), 

834 ConditionalField(ByteField("update_status", 0), 

835 lambda pkt: pkt.cmd_identifier == 6), 

836 # Switch-Key Command 

837 ConditionalField(StrFixedLenField("seqnum", None, 8), 

838 lambda pkt: pkt.cmd_identifier == 9), 

839 # Un-implemented: 10-13 (+?) 

840 ConditionalField(StrField("unimplemented", ""), 

841 lambda pkt: (pkt.cmd_identifier >= 10 and 

842 pkt.cmd_identifier <= 13)), 

843 # Tunnel Command 

844 ConditionalField( 

845 FlagsField("frame_control", 2, 4, [ 

846 "ack_format", 

847 "security", 

848 "ack_req", 

849 "extended_hdr" 

850 ]), 

851 lambda pkt: pkt.cmd_identifier == 14), 

852 ConditionalField( 

853 BitEnumField("delivery_mode", 0, 2, { 

854 0: "unicast", 

855 1: "indirect", 

856 2: "broadcast", 

857 3: "group_addressing" 

858 }), 

859 lambda pkt: pkt.cmd_identifier == 14), 

860 ConditionalField( 

861 BitEnumField("aps_frametype", 1, 2, { 

862 0: "data", 

863 1: "command", 

864 2: "ack" 

865 }), 

866 lambda pkt: pkt.cmd_identifier == 14), 

867 ConditionalField( 

868 ByteField("counter", 0), 

869 lambda pkt: pkt.cmd_identifier == 14), 

870 # Verify-Key Command 

871 ConditionalField( 

872 StrFixedLenField("key_hash", None, 16), 

873 lambda pkt: pkt.cmd_identifier == 15), 

874 ] 

875 

876 def guess_payload_class(self, payload): 

877 if self.cmd_identifier == 14: 

878 # Tunneled APS Auxiliary Header 

879 return ZigbeeSecurityHeader 

880 else: 

881 return Packet.guess_payload_class(self, payload) 

882 

883 

884class ZigBeeBeacon(Packet): 

885 name = "ZigBee Beacon Payload" 

886 fields_desc = [ 

887 # Protocol ID (1 octet) 

888 ByteField("proto_id", 0), 

889 # nwkcProtocolVersion (4 bits) 

890 BitField("nwkc_protocol_version", 0, 4), 

891 # Stack profile (4 bits) 

892 BitField("stack_profile", 0, 4), 

893 # End device capacity (1 bit) 

894 BitField("end_device_capacity", 0, 1), 

895 # Device depth (4 bits) 

896 BitField("device_depth", 0, 4), 

897 # Router capacity (1 bit) 

898 BitField("router_capacity", 0, 1), 

899 # Reserved (2 bits) 

900 BitField("reserved", 0, 2), 

901 # Extended PAN ID (8 octets) 

902 dot15d4AddressField("extended_pan_id", 0, adjust=lambda pkt, x: 8), 

903 # Tx offset (3 bytes) 

904 # In ZigBee 2006 the Tx-Offset is optional, while in the 2007 and later versions, the Tx-Offset is a required value. # noqa: E501 

905 BitField("tx_offset", 0, 24), 

906 # Update ID (1 octet) 

907 ByteField("update_id", 0), 

908 ] 

909 

910 

911# Inter-PAN Transmission # 

912class ZigbeeNWKStub(Packet): 

913 name = "Zigbee Network Layer for Inter-PAN Transmission" 

914 fields_desc = [ 

915 # NWK frame control 

916 BitField("res1", 0, 2), # remaining subfields shall have a value of 0 # noqa: E501 

917 BitField("proto_version", 2, 4), 

918 BitField("frametype", 0b11, 2), # 0b11 (3) is a reserved frame type 

919 BitField("res2", 0, 8), # remaining subfields shall have a value of 0 # noqa: E501 

920 ] 

921 

922 def guess_payload_class(self, payload): 

923 if self.frametype == 0b11: 

924 return ZigbeeAppDataPayloadStub 

925 else: 

926 return Packet.guess_payload_class(self, payload) 

927 

928 

929class ZigbeeAppDataPayloadStub(Packet): 

930 name = "Zigbee Application Layer Data Payload for Inter-PAN Transmission" 

931 fields_desc = [ 

932 FlagsField("frame_control", 0, 4, ['reserved1', 'security', 'ack_req', 'extended_hdr']), # noqa: E501 

933 BitEnumField("delivery_mode", 0, 2, {0: 'unicast', 2: 'broadcast', 3: 'group'}), # noqa: E501 

934 BitField("frametype", 3, 2), # value 0b11 (3) is a reserved frame type 

935 # Group Address present only when delivery mode field has a value of 0b11 (group delivery mode) # noqa: E501 

936 ConditionalField( 

937 XLEShortField("group_addr", 0x0), # 16-bit identifier of the group 

938 lambda pkt: pkt.getfieldval("delivery_mode") == 0b11 

939 ), 

940 # Cluster identifier 

941 XLEShortField("cluster", 0x0000), 

942 # Profile identifier 

943 EnumField("profile", 0, _aps_profile_identifiers, fmt="<H"), 

944 # ZigBee Payload 

945 ConditionalField( 

946 StrField("data", ""), 

947 lambda pkt: pkt.frametype == 3 

948 ), 

949 ] 

950 

951 

952# Zigbee Device Profile # 

953 

954 

955class ZDPActiveEPReq(Packet): 

956 name = "ZDP Transaction Data: Active_EP_req" 

957 fields_desc = [ 

958 # NWK Address (2 octets) 

959 XLEShortField("nwk_addr", 0), 

960 ] 

961 

962 

963class ZDPDeviceAnnce(Packet): 

964 name = "ZDP Transaction Data: Device_annce" 

965 fields_desc = [ 

966 # NWK Address (2 octets) 

967 XLEShortField("nwk_addr", 0), 

968 # IEEE Address (8 octets) 

969 dot15d4AddressField("ieee_addr", 0, adjust=lambda pkt, x: 8), 

970 # Capability Information (1 octet) 

971 BitField("allocate_address", 0, 1), 

972 BitField("security_capability", 0, 1), 

973 BitField("reserved2", 0, 1), 

974 BitField("reserved1", 0, 1), 

975 BitField("receiver_on_when_idle", 0, 1), 

976 BitField("power_source", 0, 1), 

977 BitField("device_type", 0, 1), 

978 BitField("alternate_pan_coordinator", 0, 1), 

979 ] 

980 

981 

982class ZigbeeDeviceProfile(Packet): 

983 name = "Zigbee Device Profile (ZDP) Frame" 

984 fields_desc = [ 

985 # Transaction Sequence Number (1 octet) 

986 ByteField("trans_seqnum", 0), 

987 ] 

988 

989 def guess_payload_class(self, payload): 

990 if self.underlayer.cluster == 0x0005: 

991 return ZDPActiveEPReq 

992 elif self.underlayer.cluster == 0x0013: 

993 return ZDPDeviceAnnce 

994 return Packet.guess_payload_class(self, payload) 

995 

996 

997# ZigBee Cluster Library # 

998 

999 

1000_ZCL_attr_length = { 

1001 0x00: 0, # no data 

1002 0x08: 1, # 8-bit data 

1003 0x09: 2, # 16-bit data 

1004 0x0a: 3, # 24-bit data 

1005 0x0b: 4, # 32-bit data 

1006 0x0c: 5, # 40-bit data 

1007 0x0d: 6, # 48-bit data 

1008 0x0e: 7, # 56-bit data 

1009 0x0f: 8, # 64-bit data 

1010 0x10: 1, # boolean 

1011 0x18: 1, # 8-bit bitmap 

1012 0x19: 2, # 16-bit bitmap 

1013 0x1a: 3, # 24-bit bitmap 

1014 0x1b: 4, # 32-bit bitmap 

1015 0x1c: 5, # 40-bit bitmap 

1016 0x1d: 6, # 48-bit bitmap 

1017 0x1e: 7, # 46-bit bitmap 

1018 0x1f: 8, # 64-bit bitmap 

1019 0x20: 1, # Unsigned 8-bit integer 

1020 0x21: 2, # Unsigned 16-bit integer 

1021 0x22: 3, # Unsigned 24-bit integer 

1022 0x23: 4, # Unsigned 32-bit integer 

1023 0x24: 5, # Unsigned 40-bit integer 

1024 0x25: 6, # Unsigned 48-bit integer 

1025 0x26: 7, # Unsigned 56-bit integer 

1026 0x27: 8, # Unsigned 64-bit integer 

1027 0x28: 1, # Signed 8-bit integer 

1028 0x29: 2, # Signed 16-bit integer 

1029 0x2a: 3, # Signed 24-bit integer 

1030 0x2b: 4, # Signed 32-bit integer 

1031 0x2c: 5, # Signed 40-bit integer 

1032 0x2d: 6, # Signed 48-bit integer 

1033 0x2e: 7, # Signed 56-bit integer 

1034 0x2f: 8, # Signed 64-bit integer 

1035 0x30: 1, # 8-bit enumeration 

1036 0x31: 2, # 16-bit enumeration 

1037 0x38: 2, # Semi-precision 

1038 0x39: 4, # Single precision 

1039 0x3a: 8, # Double precision 

1040 0x41: (1, "!B"), # Octet string 

1041 0x42: (1, "!B"), # Character string 

1042 0x43: (2, "!H"), # Long octet string 

1043 0x44: (2, "!H"), # Long character string 

1044 # TODO (implement Ordered sequence & collection 

1045 0xe0: 4, # Time of day 

1046 0xe1: 4, # Date 

1047 0xe2: 4, # UTCTime 

1048 0xe8: 2, # Cluster ID 

1049 0xe9: 2, # Attribute ID 

1050 0xea: 4, # BACnet OID 

1051 0xf0: 8, # IEEE address 

1052 0xf1: 16, # 128-bit security key 

1053 0xff: 0, # Unknown 

1054} 

1055 

1056 

1057class _DiscreteString(StrLenField): 

1058 def getfield(self, pkt, s): 

1059 dtype = pkt.attribute_data_type 

1060 length = _ZCL_attr_length.get(dtype, None) 

1061 if length is None: 

1062 return b"", self.m2i(pkt, s) 

1063 elif isinstance(length, tuple): # Variable length 

1064 size, fmt = length 

1065 # We add size as we include the length tag in the string 

1066 length = struct.unpack(fmt, s[:size])[0] + size 

1067 if isinstance(length, int): 

1068 self.length_from = lambda x: length 

1069 return StrLenField.getfield(self, pkt, s) 

1070 return s 

1071 

1072 

1073class ZCLReadAttributeStatusRecord(Packet): 

1074 name = "ZCL Read Attribute Status Record" 

1075 fields_desc = [ 

1076 # Attribute Identifier 

1077 XLEShortField("attribute_identifier", 0), 

1078 # Status 

1079 ByteEnumField("status", 0, _zcl_enumerated_status_values), 

1080 # Attribute data type (0/1 octet), and data (0/variable size) 

1081 # are only included if status == 0x00 (SUCCESS) 

1082 ConditionalField( 

1083 ByteEnumField("attribute_data_type", 0, _zcl_attribute_data_types), 

1084 lambda pkt:pkt.status == 0x00 

1085 ), 

1086 ConditionalField( 

1087 _DiscreteString("attribute_value", ""), 

1088 lambda pkt:pkt.status == 0x00 

1089 ), 

1090 ] 

1091 

1092 def extract_padding(self, s): 

1093 return "", s 

1094 

1095 

1096class ZCLWriteAttributeRecord(Packet): 

1097 name = "ZCL Write Attribute Record" 

1098 fields_desc = [ 

1099 # Attribute Identifier (2 octets) 

1100 XLEShortField("attribute_identifier", 0), 

1101 # Attribute Data Type (1 octet) 

1102 ByteEnumField("attribute_data_type", 0, _zcl_attribute_data_types), 

1103 # Attribute Data (variable) 

1104 _DiscreteString("attribute_data", ""), 

1105 ] 

1106 

1107 def extract_padding(self, s): 

1108 return "", s 

1109 

1110 

1111class ZCLWriteAttributeStatusRecord(Packet): 

1112 name = "ZCL Write Attribute Status Record" 

1113 fields_desc = [ 

1114 # Status (1 octet) 

1115 ByteEnumField("status", 0, _zcl_enumerated_status_values), 

1116 # Attribute Identifier (0/2 octets) 

1117 ConditionalField( 

1118 XLEShortField("attribute_identifier", 0), 

1119 lambda pkt:pkt.status != 0x00 

1120 ), 

1121 ] 

1122 

1123 def extract_padding(self, s): 

1124 return "", s 

1125 

1126 

1127class ZCLConfigureReportingRecord(Packet): 

1128 name = "ZCL Configure Reporting Record" 

1129 fields_desc = [ 

1130 # Direction (1 octet) 

1131 ByteField("attribute_direction", 0), 

1132 # Attribute Identifier (2 octets) 

1133 XLEShortField("attribute_identifier", 0), 

1134 # Attribute Data Type (0/1 octet) 

1135 ConditionalField( 

1136 ByteEnumField("attribute_data_type", 0, _zcl_attribute_data_types), 

1137 lambda pkt:pkt.attribute_direction == 0x00 

1138 ), 

1139 # Minimum Reporting Interval (0/2 octets) 

1140 ConditionalField( 

1141 XLEShortField("min_reporting_interval", 0), 

1142 lambda pkt:pkt.attribute_direction == 0x00 

1143 ), 

1144 # Maximum Reporting Interval (0/2 octets) 

1145 ConditionalField( 

1146 XLEShortField("max_reporting_interval", 0), 

1147 lambda pkt:pkt.attribute_direction == 0x00 

1148 ), 

1149 # Reportable Change (variable) 

1150 ConditionalField( 

1151 _DiscreteString("reportable_change", ""), 

1152 lambda pkt:pkt.attribute_direction == 0x00 

1153 ), 

1154 # Timeout Period (0/2 octets) 

1155 ConditionalField( 

1156 XLEShortField("timeout_period", 0), 

1157 lambda pkt:pkt.attribute_direction == 0x01 

1158 ), 

1159 ] 

1160 

1161 def extract_padding(self, s): 

1162 return "", s 

1163 

1164 

1165class ZCLConfigureReportingResponseRecord(Packet): 

1166 name = "ZCL Configure Reporting Response Record" 

1167 fields_desc = [ 

1168 # Status (1 octet) 

1169 ByteEnumField("status", 0, _zcl_enumerated_status_values), 

1170 # Direction (0/1 octet) 

1171 ConditionalField( 

1172 ByteField("attribute_direction", 0), 

1173 lambda pkt:pkt.status != 0x00 

1174 ), 

1175 # Attribute Identifier (0/2 octets) 

1176 ConditionalField( 

1177 XLEShortField("attribute_identifier", 0), 

1178 lambda pkt:pkt.status != 0x00 

1179 ), 

1180 ] 

1181 

1182 def extract_padding(self, s): 

1183 return "", s 

1184 

1185 

1186class ZCLAttributeReport(Packet): 

1187 name = "ZCL Attribute Report" 

1188 fields_desc = [ 

1189 # Attribute Identifier (2 octets) 

1190 XLEShortField("attribute_identifier", 0), 

1191 # Attribute Data Type (1 octet) 

1192 ByteEnumField("attribute_data_type", 0, _zcl_attribute_data_types), 

1193 # Attribute Data (variable) 

1194 _DiscreteString("attribute_data", ""), 

1195 ] 

1196 

1197 def extract_padding(self, s): 

1198 return "", s 

1199 

1200 

1201class ZCLGeneralReadAttributes(Packet): 

1202 name = "General Domain: Command Frame Payload: read_attributes" 

1203 fields_desc = [ 

1204 FieldListField("attribute_identifiers", [], XLEShortField("", 0x0000)), 

1205 ] 

1206 

1207 

1208class ZCLGeneralReadAttributesResponse(Packet): 

1209 name = "General Domain: Command Frame Payload: read_attributes_response" 

1210 fields_desc = [ 

1211 PacketListField("read_attribute_status_record", [], ZCLReadAttributeStatusRecord), # noqa: E501 

1212 ] 

1213 

1214 

1215class ZCLGeneralWriteAttributes(Packet): 

1216 name = "General Domain: Command Frame Payload: write_attributes" 

1217 fields_desc = [ 

1218 PacketListField("write_records", [], ZCLWriteAttributeRecord), 

1219 ] 

1220 

1221 

1222class ZCLGeneralWriteAttributesResponse(Packet): 

1223 name = "General Domain: Command Frame Payload: write_attributes_response" 

1224 fields_desc = [ 

1225 PacketListField("status_records", [], ZCLWriteAttributeStatusRecord), 

1226 ] 

1227 

1228 

1229class ZCLGeneralConfigureReporting(Packet): 

1230 name = "General Domain: Command Frame Payload: configure_reporting" 

1231 fields_desc = [ 

1232 PacketListField("config_records", [], ZCLConfigureReportingRecord), 

1233 ] 

1234 

1235 

1236class ZCLGeneralConfigureReportingResponse(Packet): 

1237 name = "General Domain: Command Frame Payload: configure_reporting_response" # noqa: E501 

1238 fields_desc = [ 

1239 PacketListField("status_records", [], ZCLConfigureReportingResponseRecord), # noqa: E501 

1240 ] 

1241 

1242 

1243class ZCLGeneralReportAttributes(Packet): 

1244 name = "General Domain: Command Frame Payload: report_attributes" 

1245 fields_desc = [ 

1246 PacketListField("attribute_reports", [], ZCLAttributeReport), 

1247 ] 

1248 

1249 

1250class ZCLGeneralDefaultResponse(Packet): 

1251 name = "General Domain: Command Frame Payload: default_response" 

1252 fields_desc = [ 

1253 # Response Command Identifier (1 octet) 

1254 ByteField("response_command_identifier", 0), 

1255 # Status (1 octet) 

1256 ByteEnumField("status", 0, _zcl_enumerated_status_values), 

1257 ] 

1258 

1259 

1260class ZCLIASZoneZoneEnrollResponse(Packet): 

1261 name = "IAS Zone Cluster: Zone Enroll Response Command (Server: Received)" 

1262 fields_desc = [ 

1263 # Enroll Response Code (1 octet) 

1264 ByteEnumField("rsp_code", 0, _zcl_ias_zone_enroll_response_codes), 

1265 # Zone ID (1 octet) 

1266 ByteField("zone_id", 0), 

1267 ] 

1268 

1269 

1270class ZCLIASZoneZoneStatusChangeNotification(Packet): 

1271 name = "IAS Zone Cluster: Zone Status Change Notification Command (Server: Generated)" # noqa: E501 

1272 fields_desc = [ 

1273 # Zone Status (2 octets) 

1274 StrFixedLenField("zone_status", b'\x00\x00', length=2), 

1275 # Extended Status (1 octet) 

1276 StrFixedLenField("extended_status", b'\x00', length=1), 

1277 # Zone ID (1 octet) 

1278 ByteField("zone_id", 0), 

1279 # Delay (2 octets) 

1280 XLEShortField("delay", 0), 

1281 ] 

1282 

1283 

1284class ZCLIASZoneZoneEnrollRequest(Packet): 

1285 name = "IAS Zone Cluster: Zone Enroll Request Command (Server: Generated)" 

1286 fields_desc = [ 

1287 # Zone Type (2 octets) 

1288 EnumField("zone_type", 0, _zcl_ias_zone_zone_types, fmt="<H"), 

1289 # Manufacturer Code (2 octets) 

1290 XLEShortField("manuf_code", 0), 

1291 ] 

1292 

1293 

1294class ZCLMeteringGetProfile(Packet): 

1295 name = "Metering Cluster: Get Profile Command (Server: Received)" 

1296 fields_desc = [ 

1297 # Interval Channel (8-bit Enumeration): 1 octet 

1298 ByteField("Interval_Channel", 0), # 0 == Consumption Delivered ; 1 == Consumption Received # noqa: E501 

1299 # End Time (UTCTime): 4 octets 

1300 XLEIntField("End_Time", 0x00000000), 

1301 # NumberOfPeriods (Unsigned 8-bit Integer): 1 octet 

1302 ByteField("NumberOfPeriods", 1), # Represents the number of intervals being requested. # noqa: E501 

1303 ] 

1304 

1305 

1306class ZCLPriceGetCurrentPrice(Packet): 

1307 name = "Price Cluster: Get Current Price Command (Server: Received)" 

1308 fields_desc = [ 

1309 BitField("reserved", 0, 7), 

1310 BitField("Requestor_Rx_On_When_Idle", 0, 1), 

1311 ] 

1312 

1313 

1314class ZCLPriceGetScheduledPrices(Packet): 

1315 name = "Price Cluster: Get Scheduled Prices Command (Server: Received)" 

1316 fields_desc = [ 

1317 XLEIntField("start_time", 0x00000000), # UTCTime (4 octets) 

1318 ByteField("number_of_events", 0), # Number of Events (1 octet) 

1319 ] 

1320 

1321 

1322class ZCLPricePublishPrice(Packet): 

1323 name = "Price Cluster: Publish Price Command (Server: Generated)" 

1324 fields_desc = [ 

1325 XLEIntField("provider_id", 0x00000000), # Unsigned 32-bit Integer (4 octets) # noqa: E501 

1326 # Rate Label is a UTF-8 encoded Octet String (0-12 octets). The first Octet indicates the length. # noqa: E501 

1327 StrLenField("rate_label", "", length_from=lambda pkt:int(pkt.rate_label[0])), # TODO verify # noqa: E501 

1328 XLEIntField("issuer_event_id", 0x00000000), # Unsigned 32-bit Integer (4 octets) # noqa: E501 

1329 XLEIntField("current_time", 0x00000000), # UTCTime (4 octets) 

1330 ByteField("unit_of_measure", 0), # 8 bits enumeration (1 octet) 

1331 XLEShortField("currency", 0x0000), # Unsigned 16-bit Integer (2 octets) # noqa: E501 

1332 ByteField("price_trailing_digit", 0), # 8-bit BitMap (1 octet) 

1333 ByteField("number_of_price_tiers", 0), # 8-bit BitMap (1 octet) 

1334 XLEIntField("start_time", 0x00000000), # UTCTime (4 octets) 

1335 XLEShortField("duration_in_minutes", 0x0000), # Unsigned 16-bit Integer (2 octets) # noqa: E501 

1336 XLEIntField("price", 0x00000000), # Unsigned 32-bit Integer (4 octets) 

1337 ByteField("price_ratio", 0), # Unsigned 8-bit Integer (1 octet) 

1338 XLEIntField("generation_price", 0x00000000), # Unsigned 32-bit Integer (4 octets) # noqa: E501 

1339 ByteField("generation_price_ratio", 0), # Unsigned 8-bit Integer (1 octet) # noqa: E501 

1340 XLEIntField("alternate_cost_delivered", 0x00000000), # Unsigned 32-bit Integer (4 octets) # noqa: E501 

1341 ByteField("alternate_cost_unit", 0), # 8-bit enumeration (1 octet) 

1342 ByteField("alternate_cost_trailing_digit", 0), # 8-bit BitMap (1 octet) # noqa: E501 

1343 ByteField("number_of_block_thresholds", 0), # 8-bit BitMap (1 octet) 

1344 ByteField("price_control", 0), # 8-bit BitMap (1 octet) 

1345 ] 

1346 

1347 

1348class ZigbeeClusterLibrary(Packet): 

1349 name = "Zigbee Cluster Library (ZCL) Frame" 

1350 deprecated_fields = { 

1351 "direction": ("command_direction", "2.5.0"), 

1352 } 

1353 fields_desc = [ 

1354 # Frame control (8 bits) 

1355 BitField("reserved", 0, 3), 

1356 BitField("disable_default_response", 0, 1), # 0 default response command will be returned # noqa: E501 

1357 BitField("command_direction", 0, 1), # 0 command sent from client to server; 1 command sent from server to client # noqa: E501 

1358 BitField("manufacturer_specific", 0, 1), # 0 manufacturer code shall not be included in the ZCL frame # noqa: E501 

1359 # Frame Type 

1360 # 0b00 command acts across the entire profile 

1361 # 0b01 command is specific to a cluster 

1362 # 0b10 - 0b11 reserved 

1363 BitEnumField("zcl_frametype", 0, 2, {0: 'profile-wide', 1: 'cluster-specific', 2: 'reserved2', 3: 'reserved3'}), # noqa: E501 

1364 # Manufacturer code (0/16 bits) only present then manufacturer_specific field is set to 1 # noqa: E501 

1365 ConditionalField(XLEShortField("manufacturer_code", 0x0), 

1366 lambda pkt: pkt.getfieldval("manufacturer_specific") == 1 # noqa: E501 

1367 ), 

1368 # Transaction sequence number (8 bits) 

1369 ByteField("transaction_sequence", 0), 

1370 # Command identifier (8 bits): the cluster command 

1371 ByteEnumField("command_identifier", 0, _zcl_command_frames), 

1372 ] 

1373 

1374 def guess_payload_class(self, payload): 

1375 if self.zcl_frametype == 0x00: 

1376 # Profile-wide command 

1377 if (self.command_identifier in 

1378 {0x00, 0x01, 0x02, 0x04, 0x06, 0x07, 0x0a, 0x0b}): 

1379 # done in bind_layers 

1380 pass 

1381 elif self.zcl_frametype == 0x01: 

1382 # Cluster-specific command 

1383 if self.underlayer.cluster == 0x0500: 

1384 # IAS Zone 

1385 if self.command_direction == 0: 

1386 # Client-to-Server command 

1387 if self.command_identifier == 0x00: 

1388 return ZCLIASZoneZoneEnrollResponse 

1389 elif self.command_direction == 1: 

1390 # Server-to-Client command 

1391 if self.command_identifier == 0x00: 

1392 return ZCLIASZoneZoneStatusChangeNotification 

1393 elif self.command_identifier == 0x01: 

1394 return ZCLIASZoneZoneEnrollRequest 

1395 elif self.underlayer.cluster == 0x0700: 

1396 # Price cluster 

1397 if self.command_direction == 0: 

1398 # Client-to-Server command 

1399 if self.command_identifier == 0x00: 

1400 return ZCLPriceGetCurrentPrice 

1401 elif self.command_identifier == 0x01: 

1402 return ZCLPriceGetScheduledPrices 

1403 elif self.command_direction == 1: 

1404 # Server-to-Client command 

1405 if self.command_identifier == 0x00: 

1406 return ZCLPricePublishPrice 

1407 return Packet.guess_payload_class(self, payload) 

1408 

1409 

1410bind_layers(ZigbeeClusterLibrary, ZCLGeneralReadAttributes, 

1411 zcl_frametype=0x00, command_identifier=0x00) 

1412bind_layers(ZigbeeClusterLibrary, ZCLGeneralReadAttributesResponse, 

1413 zcl_frametype=0x00, command_identifier=0x01) 

1414bind_layers(ZigbeeClusterLibrary, ZCLGeneralWriteAttributes, 

1415 zcl_frametype=0x00, command_identifier=0x02) 

1416bind_layers(ZigbeeClusterLibrary, ZCLGeneralWriteAttributesResponse, 

1417 zcl_frametype=0x00, command_identifier=0x04) 

1418bind_layers(ZigbeeClusterLibrary, ZCLGeneralConfigureReporting, 

1419 zcl_frametype=0x00, command_identifier=0x06) 

1420bind_layers(ZigbeeClusterLibrary, ZCLGeneralConfigureReportingResponse, 

1421 zcl_frametype=0x00, command_identifier=0x07) 

1422bind_layers(ZigbeeClusterLibrary, ZCLGeneralReportAttributes, 

1423 zcl_frametype=0x00, command_identifier=0x0a) 

1424bind_layers(ZigbeeClusterLibrary, ZCLGeneralDefaultResponse, 

1425 zcl_frametype=0x00, command_identifier=0x0b) 

1426 

1427 

1428# Zigbee Encapsulation Protocol 

1429 

1430 

1431class ZEP2(Packet): 

1432 name = "Zigbee Encapsulation Protocol (V2)" 

1433 fields_desc = [ 

1434 StrFixedLenField("preamble", "EX", length=2), 

1435 ByteField("ver", 0), 

1436 ByteField("type", 0), 

1437 ByteField("channel", 0), 

1438 ShortField("device", 0), 

1439 ByteField("lqi_mode", 1), 

1440 ByteField("lqi_val", 0), 

1441 TimeStampField("timestamp", 0), 

1442 IntField("seq", 0), 

1443 BitField("res", 0, 80), # 10 bytes reserved field 

1444 ByteField("length", 0), 

1445 ] 

1446 

1447 @classmethod 

1448 def dispatch_hook(cls, _pkt=b"", *args, **kargs): 

1449 if _pkt and len(_pkt) >= 4: 

1450 v = orb(_pkt[2]) 

1451 if v == 1: 

1452 return ZEP1 

1453 elif v == 2: 

1454 return ZEP2 

1455 return cls 

1456 

1457 def guess_payload_class(self, payload): 

1458 if self.lqi_mode: 

1459 return Dot15d4 

1460 else: 

1461 return Dot15d4FCS 

1462 

1463 

1464class ZEP1(ZEP2): 

1465 name = "Zigbee Encapsulation Protocol (V1)" 

1466 fields_desc = [ 

1467 StrFixedLenField("preamble", "EX", length=2), 

1468 ByteField("ver", 0), 

1469 ByteField("channel", 0), 

1470 ShortField("device", 0), 

1471 ByteField("lqi_mode", 0), 

1472 ByteField("lqi_val", 0), 

1473 BitField("res", 0, 56), # 7 bytes reserved field 

1474 ByteField("len", 0), 

1475 ] 

1476 

1477 

1478# Bindings # 

1479 

1480# TODO: find a way to chose between ZigbeeNWK and SixLoWPAN (cf. sixlowpan.py) 

1481# Currently: use conf.dot15d4_protocol value 

1482# bind_layers( Dot15d4Data, ZigbeeNWK) 

1483 

1484bind_layers(ZigbeeAppDataPayload, ZigbeeAppCommandPayload, frametype=1) 

1485bind_layers(Dot15d4Beacon, ZigBeeBeacon) 

1486 

1487bind_bottom_up(UDP, ZEP2, sport=17754) 

1488bind_bottom_up(UDP, ZEP2, sport=17754) 

1489bind_layers(UDP, ZEP2, sport=17754, dport=17754)