Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/lltd.py: 73%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

218 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6"""LLTD Protocol 

7 

8https://msdn.microsoft.com/en-us/library/cc233983.aspx 

9 

10""" 

11 

12from array import array 

13 

14from scapy.fields import BitField, FlagsField, ByteField, ByteEnumField, \ 

15 ShortField, ShortEnumField, ThreeBytesField, IntField, IntEnumField, \ 

16 LongField, MultiEnumField, FieldLenField, FieldListField, \ 

17 PacketListField, StrLenField, StrLenFieldUtf16, ConditionalField, MACField 

18from scapy.packet import Packet, Padding, bind_layers 

19from scapy.plist import PacketList 

20from scapy.layers.l2 import Ether 

21from scapy.layers.inet import IPField 

22from scapy.layers.inet6 import IP6Field 

23from scapy.data import ETHER_ANY 

24from scapy.compat import orb, chb 

25 

26 

27# Protocol layers 

28################## 

29 

30 

31class LLTD(Packet): 

32 name = "LLTD" 

33 answer_hashret = { 

34 # (tos, function) tuple mapping (answer -> query), used by 

35 # .hashret() 

36 (1, 1): (0, 0), 

37 (0, 12): (0, 11), 

38 } 

39 fields_desc = [ 

40 ByteField("version", 1), 

41 ByteEnumField("tos", 0, { 

42 0: "Topology discovery", 

43 1: "Quick discovery", 

44 2: "QoS diagnostics", 

45 }), 

46 ByteField("reserved", 0), 

47 MultiEnumField("function", 0, { 

48 0: { 

49 0: "Discover", 

50 1: "Hello", 

51 2: "Emit", 

52 3: "Train", 

53 4: "Probe", 

54 5: "Ack", 

55 6: "Query", 

56 7: "QueryResp", 

57 8: "Reset", 

58 9: "Charge", 

59 10: "Flat", 

60 11: "QueryLargeTlv", 

61 12: "QueryLargeTlvResp", 

62 }, 

63 1: { 

64 0: "Discover", 

65 1: "Hello", 

66 8: "Reset", 

67 }, 

68 2: { 

69 0: "QosInitializeSink", 

70 1: "QosReady", 

71 2: "QosProbe", 

72 3: "QosQuery", 

73 4: "QosQueryResp", 

74 5: "QosReset", 

75 6: "QosError", 

76 7: "QosAck", 

77 8: "QosCounterSnapshot", 

78 9: "QosCounterResult", 

79 10: "QosCounterLease", 

80 }, 

81 }, depends_on=lambda pkt: pkt.tos, fmt="B"), 

82 MACField("real_dst", None), 

83 MACField("real_src", None), 

84 ConditionalField(ShortField("xid", 0), 

85 lambda pkt: pkt.function in [0, 8]), 

86 ConditionalField(ShortField("seq", 0), 

87 lambda pkt: pkt.function not in [0, 8]), 

88 ] 

89 

90 def post_build(self, pkt, pay): 

91 if (self.real_dst is None or self.real_src is None) and \ 

92 isinstance(self.underlayer, Ether): 

93 eth = self.underlayer 

94 if self.real_dst is None: 

95 pkt = (pkt[:4] + eth.fields_desc[0].i2m(eth, eth.dst) + 

96 pkt[10:]) 

97 if self.real_src is None: 

98 pkt = (pkt[:10] + eth.fields_desc[1].i2m(eth, eth.src) + 

99 pkt[16:]) 

100 return pkt + pay 

101 

102 def mysummary(self): 

103 if isinstance(self.underlayer, Ether): 

104 return self.underlayer.sprintf( 

105 'LLTD %src% > %dst% %LLTD.tos% - %LLTD.function%' 

106 ) 

107 else: 

108 return self.sprintf('LLTD %tos% - %function%') 

109 

110 def hashret(self): 

111 tos, function = self.tos, self.function 

112 return b"%c%c" % self.answer_hashret.get((tos, function), 

113 (tos, function)) 

114 

115 def answers(self, other): 

116 if not isinstance(other, LLTD): 

117 return False 

118 if self.tos == 0: 

119 if self.function == 0 and isinstance(self.payload, LLTDDiscover) \ 

120 and len(self[LLTDDiscover].stations_list) == 1: 

121 # "Topology discovery - Discover" with one MAC address 

122 # discovered answers a "Quick discovery - Hello" 

123 return other.tos == 1 and \ 

124 other.function == 1 and \ 

125 LLTDAttributeHostID in other and \ 

126 other[LLTDAttributeHostID].mac == \ 

127 self[LLTDDiscover].stations_list[0] 

128 elif self.function == 12: 

129 # "Topology discovery - QueryLargeTlvResp" answers 

130 # "Topology discovery - QueryLargeTlv" with same .seq 

131 # value 

132 return other.tos == 0 and other.function == 11 \ 

133 and other.seq == self.seq 

134 elif self.tos == 1: 

135 if self.function == 1 and isinstance(self.payload, LLTDHello): 

136 # "Quick discovery - Hello" answers a "Topology 

137 # discovery - Discover" 

138 return other.tos == 0 and other.function == 0 and \ 

139 other.real_src == self.current_mapper_address 

140 return False 

141 

142 

143class LLTDHello(Packet): 

144 name = "LLTD - Hello" 

145 show_summary = False 

146 fields_desc = [ 

147 ShortField("gen_number", 0), 

148 MACField("current_mapper_address", ETHER_ANY), 

149 MACField("apparent_mapper_address", ETHER_ANY), 

150 ] 

151 

152 

153class LLTDDiscover(Packet): 

154 name = "LLTD - Discover" 

155 fields_desc = [ 

156 ShortField("gen_number", 0), 

157 FieldLenField("stations_count", None, count_of="stations_list", 

158 fmt="H"), 

159 FieldListField("stations_list", [], MACField("", ETHER_ANY), 

160 count_from=lambda pkt: pkt.stations_count) 

161 ] 

162 

163 def mysummary(self): 

164 return (self.sprintf("Stations: %stations_list%") 

165 if self.stations_list else "No station", [LLTD]) 

166 

167 

168class LLTDEmiteeDesc(Packet): 

169 name = "LLTD - Emitee Desc" 

170 fields_desc = [ 

171 ByteEnumField("type", 0, {0: "Train", 1: "Probe"}), 

172 ByteField("pause", 0), 

173 MACField("src", None), 

174 MACField("dst", ETHER_ANY), 

175 ] 

176 

177 

178class LLTDEmit(Packet): 

179 name = "LLTD - Emit" 

180 fields_desc = [ 

181 FieldLenField("descs_count", None, count_of="descs_list", 

182 fmt="H"), 

183 PacketListField("descs_list", [], LLTDEmiteeDesc, 

184 count_from=lambda pkt: pkt.descs_count), 

185 ] 

186 

187 def mysummary(self): 

188 return ", ".join(desc.sprintf("%src% > %dst%") 

189 for desc in self.descs_list), [LLTD] 

190 

191 

192class LLTDRecveeDesc(Packet): 

193 name = "LLTD - Recvee Desc" 

194 fields_desc = [ 

195 ShortEnumField("type", 0, {0: "Probe", 1: "ARP or ICMPv6"}), 

196 MACField("real_src", ETHER_ANY), 

197 MACField("ether_src", ETHER_ANY), 

198 MACField("ether_dst", ETHER_ANY), 

199 ] 

200 

201 

202class LLTDQueryResp(Packet): 

203 name = "LLTD - Query Response" 

204 fields_desc = [ 

205 FlagsField("flags", 0, 2, "ME"), 

206 BitField("descs_count", None, 14), 

207 PacketListField("descs_list", [], LLTDRecveeDesc, 

208 count_from=lambda pkt: pkt.descs_count), 

209 ] 

210 

211 def post_build(self, pkt, pay): 

212 if self.descs_count is None: 

213 # descs_count should be a FieldLenField but has an 

214 # unsupported format (14 bits) 

215 flags = orb(pkt[0]) & 0xc0 

216 count = len(self.descs_list) 

217 pkt = chb(flags + (count >> 8)) + chb(count % 256) + pkt[2:] 

218 return pkt + pay 

219 

220 def mysummary(self): 

221 return self.sprintf("%d response%s" % ( 

222 self.descs_count, 

223 "s" if self.descs_count > 1 else "")), [LLTD] 

224 

225 

226class LLTDQueryLargeTlv(Packet): 

227 name = "LLTD - Query Large Tlv" 

228 fields_desc = [ 

229 ByteEnumField("type", 14, { 

230 14: "Icon image", 

231 17: "Friendly Name", 

232 19: "Hardware ID", 

233 22: "AP Association Table", 

234 24: "Detailed Icon Image", 

235 26: "Component Table", 

236 28: "Repeater AP Table", 

237 }), 

238 ThreeBytesField("offset", 0), 

239 ] 

240 

241 def mysummary(self): 

242 return self.sprintf("%type% (offset %offset%)"), [LLTD] 

243 

244 

245class LLTDQueryLargeTlvResp(Packet): 

246 name = "LLTD - Query Large Tlv Response" 

247 fields_desc = [ 

248 FlagsField("flags", 0, 2, "RM"), 

249 BitField("len", None, 14), 

250 StrLenField("value", "", length_from=lambda pkt: pkt.len) 

251 ] 

252 

253 def post_build(self, pkt, pay): 

254 if self.len is None: 

255 # len should be a FieldLenField but has an unsupported 

256 # format (14 bits) 

257 flags = orb(pkt[0]) & 0xc0 

258 length = len(self.value) 

259 pkt = chb(flags + (length >> 8)) + chb(length % 256) + pkt[2:] 

260 return pkt + pay 

261 

262 def mysummary(self): 

263 return self.sprintf("%%len%% bytes%s" % ( 

264 " (last)" if not self.flags & 2 else "" 

265 )), [LLTD] 

266 

267 

268class LLTDAttribute(Packet): 

269 name = "LLTD Attribute" 

270 show_indent = False 

271 show_summary = False 

272 # section 2.2.1.1 

273 fields_desc = [ 

274 ByteEnumField("type", 0, { 

275 0: "End Of Property", 

276 1: "Host ID", 

277 2: "Characteristics", 

278 3: "Physical Medium", 

279 7: "IPv4 Address", 

280 9: "802.11 Max Rate", 

281 10: "Performance Counter Frequency", 

282 12: "Link Speed", 

283 14: "Icon Image", 

284 15: "Machine Name", 

285 18: "Device UUID", 

286 20: "QoS Characteristics", 

287 21: "802.11 Physical Medium", 

288 24: "Detailed Icon Image", 

289 }), 

290 FieldLenField("len", None, length_of="value", fmt="B"), 

291 StrLenField("value", "", length_from=lambda pkt: pkt.len), 

292 ] 

293 

294 @classmethod 

295 def dispatch_hook(cls, _pkt=None, *_, **kargs): 

296 if _pkt: 

297 cmd = orb(_pkt[0]) 

298 elif "type" in kargs: 

299 cmd = kargs["type"] 

300 if isinstance(cmd, str): 

301 cmd = cls.fields_desc[0].s2i[cmd] 

302 else: 

303 return cls 

304 return SPECIFIC_CLASSES.get(cmd, cls) 

305 

306 

307SPECIFIC_CLASSES = {} 

308 

309 

310def _register_lltd_specific_class(*attr_types): 

311 """This can be used as a class decorator; if we want to support Python 

312 2.5, we have to replace 

313 

314@_register_lltd_specific_class(x[, y[, ...]]) 

315class LLTDAttributeSpecific(LLTDAttribute): 

316[...] 

317 

318by 

319 

320class LLTDAttributeSpecific(LLTDAttribute): 

321[...] 

322LLTDAttributeSpecific = _register_lltd_specific_class(x[, y[, ...]])( 

323 LLTDAttributeSpecific 

324) 

325 

326 """ 

327 def _register(cls): 

328 for attr_type in attr_types: 

329 SPECIFIC_CLASSES[attr_type] = cls 

330 type_fld = LLTDAttribute.fields_desc[0].copy() 

331 type_fld.default = attr_types[0] 

332 cls.fields_desc = [type_fld] + cls.fields_desc 

333 return cls 

334 return _register 

335 

336 

337@_register_lltd_specific_class(0) 

338class LLTDAttributeEOP(LLTDAttribute): 

339 name = "LLTD Attribute - End Of Property" 

340 fields_desc = [] 

341 

342 

343@_register_lltd_specific_class(1) 

344class LLTDAttributeHostID(LLTDAttribute): 

345 name = "LLTD Attribute - Host ID" 

346 fields_desc = [ 

347 ByteField("len", 6), 

348 MACField("mac", ETHER_ANY), 

349 ] 

350 

351 def mysummary(self): 

352 return "ID: %s" % self.mac, [LLTD, LLTDAttributeMachineName] 

353 

354 

355@_register_lltd_specific_class(2) 

356class LLTDAttributeCharacteristics(LLTDAttribute): 

357 name = "LLTD Attribute - Characteristics" 

358 fields_desc = [ 

359 # According to MS doc, "this field MUST be set to 0x02". But 

360 # according to MS implementation, that's wrong. 

361 # ByteField("len", 2), 

362 FieldLenField("len", None, length_of="reserved2", fmt="B", 

363 adjust=lambda _, x: x + 2), 

364 FlagsField("flags", 0, 5, "PXFML"), 

365 BitField("reserved1", 0, 11), 

366 StrLenField("reserved2", "", length_from=lambda x: x.len - 2) 

367 ] 

368 

369 

370@_register_lltd_specific_class(3) 

371class LLTDAttributePhysicalMedium(LLTDAttribute): 

372 name = "LLTD Attribute - Physical Medium" 

373 fields_desc = [ 

374 ByteField("len", 4), 

375 IntEnumField("medium", 6, { 

376 # https://www.iana.org/assignments/ianaiftype-mib/ianaiftype-mib 

377 1: "other", 

378 2: "regular1822", 

379 3: "hdh1822", 

380 4: "ddnX25", 

381 5: "rfc877x25", 

382 6: "ethernetCsmacd", 

383 7: "iso88023Csmacd", 

384 8: "iso88024TokenBus", 

385 9: "iso88025TokenRing", 

386 10: "iso88026Man", 

387 11: "starLan", 

388 12: "proteon10Mbit", 

389 13: "proteon80Mbit", 

390 14: "hyperchannel", 

391 15: "fddi", 

392 16: "lapb", 

393 17: "sdlc", 

394 18: "ds1", 

395 19: "e1", 

396 20: "basicISDN", 

397 21: "primaryISDN", 

398 22: "propPointToPointSerial", 

399 23: "ppp", 

400 24: "softwareLoopback", 

401 25: "eon", 

402 26: "ethernet3Mbit", 

403 27: "nsip", 

404 28: "slip", 

405 29: "ultra", 

406 30: "ds3", 

407 31: "sip", 

408 32: "frameRelay", 

409 33: "rs232", 

410 34: "para", 

411 35: "arcnet", 

412 36: "arcnetPlus", 

413 37: "atm", 

414 38: "miox25", 

415 39: "sonet", 

416 40: "x25ple", 

417 41: "iso88022llc", 

418 42: "localTalk", 

419 43: "smdsDxi", 

420 44: "frameRelayService", 

421 45: "v35", 

422 46: "hssi", 

423 47: "hippi", 

424 48: "modem", 

425 49: "aal5", 

426 50: "sonetPath", 

427 51: "sonetVT", 

428 52: "smdsIcip", 

429 53: "propVirtual", 

430 54: "propMultiplexor", 

431 55: "ieee80212", 

432 56: "fibreChannel", 

433 57: "hippiInterface", 

434 58: "frameRelayInterconnect", 

435 59: "aflane8023", 

436 60: "aflane8025", 

437 61: "cctEmul", 

438 62: "fastEther", 

439 63: "isdn", 

440 64: "v11", 

441 65: "v36", 

442 66: "g703at64k", 

443 67: "g703at2mb", 

444 68: "qllc", 

445 69: "fastEtherFX", 

446 70: "channel", 

447 71: "ieee80211", 

448 72: "ibm370parChan", 

449 73: "escon", 

450 74: "dlsw", 

451 75: "isdns", 

452 76: "isdnu", 

453 77: "lapd", 

454 78: "ipSwitch", 

455 79: "rsrb", 

456 80: "atmLogical", 

457 81: "ds0", 

458 82: "ds0Bundle", 

459 83: "bsc", 

460 84: "async", 

461 85: "cnr", 

462 86: "iso88025Dtr", 

463 87: "eplrs", 

464 88: "arap", 

465 89: "propCnls", 

466 90: "hostPad", 

467 91: "termPad", 

468 92: "frameRelayMPI", 

469 93: "x213", 

470 94: "adsl", 

471 95: "radsl", 

472 96: "sdsl", 

473 97: "vdsl", 

474 98: "iso88025CRFPInt", 

475 99: "myrinet", 

476 100: "voiceEM", 

477 101: "voiceFXO", 

478 102: "voiceFXS", 

479 103: "voiceEncap", 

480 104: "voiceOverIp", 

481 105: "atmDxi", 

482 106: "atmFuni", 

483 107: "atmIma", 

484 108: "pppMultilinkBundle", 

485 109: "ipOverCdlc", 

486 110: "ipOverClaw", 

487 111: "stackToStack", 

488 112: "virtualIpAddress", 

489 113: "mpc", 

490 114: "ipOverAtm", 

491 115: "iso88025Fiber", 

492 116: "tdlc", 

493 117: "gigabitEthernet", 

494 118: "hdlc", 

495 119: "lapf", 

496 120: "v37", 

497 121: "x25mlp", 

498 122: "x25huntGroup", 

499 123: "transpHdlc", 

500 124: "interleave", 

501 125: "fast", 

502 126: "ip", 

503 127: "docsCableMaclayer", 

504 128: "docsCableDownstream", 

505 129: "docsCableUpstream", 

506 130: "a12MppSwitch", 

507 131: "tunnel", 

508 132: "coffee", 

509 133: "ces", 

510 134: "atmSubInterface", 

511 135: "l2vlan", 

512 136: "l3ipvlan", 

513 137: "l3ipxvlan", 

514 138: "digitalPowerline", 

515 139: "mediaMailOverIp", 

516 140: "dtm", 

517 141: "dcn", 

518 142: "ipForward", 

519 143: "msdsl", 

520 144: "ieee1394", 

521 145: "if-gsn", 

522 146: "dvbRccMacLayer", 

523 147: "dvbRccDownstream", 

524 148: "dvbRccUpstream", 

525 149: "atmVirtual", 

526 150: "mplsTunnel", 

527 151: "srp", 

528 152: "voiceOverAtm", 

529 153: "voiceOverFrameRelay", 

530 154: "idsl", 

531 155: "compositeLink", 

532 156: "ss7SigLink", 

533 157: "propWirelessP2P", 

534 158: "frForward", 

535 159: "rfc1483", 

536 160: "usb", 

537 161: "ieee8023adLag", 

538 162: "bgppolicyaccounting", 

539 163: "frf16MfrBundle", 

540 164: "h323Gatekeeper", 

541 165: "h323Proxy", 

542 166: "mpls", 

543 167: "mfSigLink", 

544 168: "hdsl2", 

545 169: "shdsl", 

546 170: "ds1FDL", 

547 171: "pos", 

548 172: "dvbAsiIn", 

549 173: "dvbAsiOut", 

550 174: "plc", 

551 175: "nfas", 

552 176: "tr008", 

553 177: "gr303RDT", 

554 178: "gr303IDT", 

555 179: "isup", 

556 180: "propDocsWirelessMaclayer", 

557 181: "propDocsWirelessDownstream", 

558 182: "propDocsWirelessUpstream", 

559 183: "hiperlan2", 

560 184: "propBWAp2Mp", 

561 185: "sonetOverheadChannel", 

562 186: "digitalWrapperOverheadChannel", 

563 187: "aal2", 

564 188: "radioMAC", 

565 189: "atmRadio", 

566 190: "imt", 

567 191: "mvl", 

568 192: "reachDSL", 

569 193: "frDlciEndPt", 

570 194: "atmVciEndPt", 

571 195: "opticalChannel", 

572 196: "opticalTransport", 

573 197: "propAtm", 

574 198: "voiceOverCable", 

575 199: "infiniband", 

576 200: "teLink", 

577 201: "q2931", 

578 202: "virtualTg", 

579 203: "sipTg", 

580 204: "sipSig", 

581 205: "docsCableUpstreamChannel", 

582 206: "econet", 

583 207: "pon155", 

584 208: "pon622", 

585 209: "bridge", 

586 210: "linegroup", 

587 211: "voiceEMFGD", 

588 212: "voiceFGDEANA", 

589 213: "voiceDID", 

590 214: "mpegTransport", 

591 215: "sixToFour", 

592 216: "gtp", 

593 217: "pdnEtherLoop1", 

594 218: "pdnEtherLoop2", 

595 219: "opticalChannelGroup", 

596 220: "homepna", 

597 221: "gfp", 

598 222: "ciscoISLvlan", 

599 223: "actelisMetaLOOP", 

600 224: "fcipLink", 

601 225: "rpr", 

602 226: "qam", 

603 227: "lmp", 

604 228: "cblVectaStar", 

605 229: "docsCableMCmtsDownstream", 

606 230: "adsl2", 

607 231: "macSecControlledIF", 

608 232: "macSecUncontrolledIF", 

609 233: "aviciOpticalEther", 

610 234: "atmbond", 

611 235: "voiceFGDOS", 

612 236: "mocaVersion1", 

613 237: "ieee80216WMAN", 

614 238: "adsl2plus", 

615 239: "dvbRcsMacLayer", 

616 240: "dvbTdm", 

617 241: "dvbRcsTdma", 

618 242: "x86Laps", 

619 243: "wwanPP", 

620 244: "wwanPP2", 

621 245: "voiceEBS", 

622 246: "ifPwType", 

623 247: "ilan", 

624 248: "pip", 

625 249: "aluELP", 

626 250: "gpon", 

627 251: "vdsl2", 

628 252: "capwapDot11Profile", 

629 253: "capwapDot11Bss", 

630 254: "capwapWtpVirtualRadio", 

631 255: "bits", 

632 256: "docsCableUpstreamRfPort", 

633 257: "cableDownstreamRfPort", 

634 258: "vmwareVirtualNic", 

635 259: "ieee802154", 

636 260: "otnOdu", 

637 261: "otnOtu", 

638 262: "ifVfiType", 

639 263: "g9981", 

640 264: "g9982", 

641 265: "g9983", 

642 266: "aluEpon", 

643 267: "aluEponOnu", 

644 268: "aluEponPhysicalUni", 

645 269: "aluEponLogicalLink", 

646 271: "aluGponPhysicalUni", 

647 272: "vmwareNicTeam", 

648 277: "docsOfdmDownstream", 

649 278: "docsOfdmaUpstream", 

650 279: "gfast", 

651 280: "sdci", 

652 }), 

653 ] 

654 

655 

656@_register_lltd_specific_class(7) 

657class LLTDAttributeIPv4Address(LLTDAttribute): 

658 name = "LLTD Attribute - IPv4 Address" 

659 fields_desc = [ 

660 ByteField("len", 4), 

661 IPField("ipv4", "0.0.0.0"), 

662 ] 

663 

664 

665@_register_lltd_specific_class(8) 

666class LLTDAttributeIPv6Address(LLTDAttribute): 

667 name = "LLTD Attribute - IPv6 Address" 

668 fields_desc = [ 

669 ByteField("len", 16), 

670 IP6Field("ipv6", "::"), 

671 ] 

672 

673 

674@_register_lltd_specific_class(9) 

675class LLTDAttribute80211MaxRate(LLTDAttribute): 

676 name = "LLTD Attribute - 802.11 Max Rate" 

677 fields_desc = [ 

678 ByteField("len", 2), 

679 ShortField("rate", 0), 

680 ] 

681 

682 

683@_register_lltd_specific_class(10) 

684class LLTDAttributePerformanceCounterFrequency(LLTDAttribute): 

685 name = "LLTD Attribute - Performance Counter Frequency" 

686 fields_desc = [ 

687 ByteField("len", 8), 

688 LongField("freq", 0), 

689 ] 

690 

691 

692@_register_lltd_specific_class(12) 

693class LLTDAttributeLinkSpeed(LLTDAttribute): 

694 name = "LLTD Attribute - Link Speed" 

695 fields_desc = [ 

696 ByteField("len", 4), 

697 IntField("speed", 0), 

698 ] 

699 

700 

701@_register_lltd_specific_class(14, 24, 26) 

702class LLTDAttributeLargeTLV(LLTDAttribute): 

703 name = "LLTD Attribute - Large TLV" 

704 fields_desc = [ 

705 ByteField("len", 0), 

706 ] 

707 

708 

709@_register_lltd_specific_class(15) 

710class LLTDAttributeMachineName(LLTDAttribute): 

711 name = "LLTD Attribute - Machine Name" 

712 fields_desc = [ 

713 FieldLenField("len", None, length_of="hostname", fmt="B"), 

714 StrLenFieldUtf16("hostname", "", length_from=lambda pkt: pkt.len), 

715 ] 

716 

717 def mysummary(self): 

718 return (f"Hostname: {self.hostname!r}", 

719 [LLTD, LLTDAttributeHostID]) 

720 

721 

722@_register_lltd_specific_class(18) 

723class LLTDAttributeDeviceUUID(LLTDAttribute): 

724 name = "LLTD Attribute - Device UUID" 

725 fields_desc = [ 

726 FieldLenField("len", None, length_of="uuid", fmt="B"), 

727 StrLenField("uuid", b"\x00" * 16, length_from=lambda pkt: pkt.len), 

728 ] 

729 

730 

731@_register_lltd_specific_class(20) 

732class LLTDAttributeQOSCharacteristics(LLTDAttribute): 

733 name = "LLTD Attribute - QoS Characteristics" 

734 fields_desc = [ 

735 ByteField("len", 4), 

736 FlagsField("flags", 0, 3, "EQP"), 

737 BitField("reserved1", 0, 13), 

738 ShortField("reserved2", 0), 

739 ] 

740 

741 

742@_register_lltd_specific_class(21) 

743class LLTDAttribute80211PhysicalMedium(LLTDAttribute): 

744 name = "LLTD Attribute - 802.11 Physical Medium" 

745 fields_desc = [ 

746 ByteField("len", 1), 

747 ByteEnumField("medium", 0, { 

748 0: "Unknown", 

749 1: "FHSS 2.4 GHz", 

750 2: "DSSS 2.4 GHz", 

751 3: "IR Baseband", 

752 4: "OFDM 5 GHz", 

753 5: "HRDSSS", 

754 6: "ERP", 

755 }), 

756 ] 

757 

758 

759@_register_lltd_specific_class(25) 

760class LLTDAttributeSeesList(LLTDAttribute): 

761 name = "LLTD Attribute - Sees List Working Set" 

762 fields_desc = [ 

763 ByteField("len", 2), 

764 ShortField("max_entries", 0), 

765 ] 

766 

767 

768bind_layers(Ether, LLTD, type=0x88d9) 

769bind_layers(LLTD, LLTDDiscover, tos=0, function=0) 

770bind_layers(LLTD, LLTDDiscover, tos=1, function=0) 

771bind_layers(LLTD, LLTDHello, tos=0, function=1) 

772bind_layers(LLTD, LLTDHello, tos=1, function=1) 

773bind_layers(LLTD, LLTDEmit, tos=0, function=2) 

774bind_layers(LLTD, LLTDQueryResp, tos=0, function=7) 

775bind_layers(LLTD, LLTDQueryLargeTlv, tos=0, function=11) 

776bind_layers(LLTD, LLTDQueryLargeTlvResp, tos=0, function=12) 

777bind_layers(LLTDHello, LLTDAttribute) 

778bind_layers(LLTDAttribute, LLTDAttribute) 

779bind_layers(LLTDAttribute, Padding, type=0) 

780bind_layers(LLTDEmiteeDesc, Padding) 

781bind_layers(LLTDRecveeDesc, Padding) 

782 

783 

784# Utils 

785######## 

786 

787class LargeTlvBuilder(object): 

788 """An object to build content fetched through LLTDQueryLargeTlv / 

789 LLTDQueryLargeTlvResp packets. 

790 

791 Usable with a PacketList() object: 

792 >>> p = LargeTlvBuilder() 

793 >>> p.parse(rdpcap('capture_file.cap')) 

794 

795 Or during a network capture: 

796 >>> p = LargeTlvBuilder() 

797 >>> sniff(filter="ether proto 0x88d9", prn=p.parse) 

798 

799 To get the result, use .get_data() 

800 

801 """ 

802 

803 def __init__(self): 

804 self.types_offsets = {} 

805 self.data = {} 

806 

807 def parse(self, plist): 

808 """Update the builder using the provided `plist`. `plist` can 

809 be either a Packet() or a PacketList(). 

810 

811 """ 

812 if not isinstance(plist, PacketList): 

813 plist = PacketList(plist) 

814 for pkt in plist[LLTD]: 

815 if LLTDQueryLargeTlv in pkt: 

816 key = "%s:%s:%d" % (pkt.real_dst, pkt.real_src, pkt.seq) 

817 self.types_offsets[key] = (pkt[LLTDQueryLargeTlv].type, 

818 pkt[LLTDQueryLargeTlv].offset) 

819 elif LLTDQueryLargeTlvResp in pkt: 

820 try: 

821 key = "%s:%s:%d" % (pkt.real_src, pkt.real_dst, pkt.seq) 

822 content, offset = self.types_offsets[key] 

823 except KeyError: 

824 continue 

825 loc = slice(offset, offset + pkt[LLTDQueryLargeTlvResp].len) 

826 key = "%s > %s [%s]" % ( 

827 pkt.real_src, pkt.real_dst, 

828 LLTDQueryLargeTlv.fields_desc[0].i2s.get(content, content), 

829 ) 

830 data = self.data.setdefault(key, array("B")) 

831 datalen = len(data) 

832 if datalen < loc.stop: 

833 data.extend(array("B", b"\x00" * (loc.stop - datalen))) 

834 data[loc] = array("B", pkt[LLTDQueryLargeTlvResp].value) 

835 

836 def get_data(self): 

837 """Returns a dictionary object, keys are strings "source > 

838 destincation [content type]", and values are the content 

839 fetched, also as a string. 

840 

841 """ 

842 return {key: "".join(chr(byte) for byte in data) 

843 for key, data in self.data.items()}