Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/arch/linux/rtnetlink.py: 80%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

207 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter 

5 

6""" 

7This file implements the rtnetlink API that is used to read the network 

8configuration of the machine. 

9""" 

10 

11import socket 

12import struct 

13import time 

14 

15import scapy.utils6 

16 

17from scapy.consts import BIG_ENDIAN 

18from scapy.config import conf 

19from scapy.error import log_loading 

20from scapy.packet import ( 

21 Packet, 

22 bind_layers, 

23) 

24from scapy.utils import atol, itom 

25 

26from scapy.fields import ( 

27 ByteEnumField, 

28 ByteField, 

29 EnumField, 

30 Field, 

31 FieldLenField, 

32 FlagsField, 

33 IP6Field, 

34 IPField, 

35 LenField, 

36 MACField, 

37 MayEnd, 

38 MultipleTypeField, 

39 PacketListField, 

40 PadField, 

41 StrLenField, 

42 XStrLenField, 

43) 

44 

45from scapy.arch.common import _iff_flags 

46 

47# Typing imports 

48from typing import ( 

49 Any, 

50 Dict, 

51 List, 

52 Optional, 

53 Tuple, 

54 Type, 

55) 

56 

57# from <linux/netlink.h> and <linux/rtnetlink.h> 

58 

59 

60# Common header 

61 

62 

63class rtmsghdr(Packet): 

64 fields_desc = [ 

65 LenField("nlmsg_len", None, fmt="=L"), 

66 EnumField( 

67 "nlmsg_type", 

68 0, 

69 { 

70 # netlink.h 

71 3: "NLMSG_DONE", 

72 # rtnetlink.h 

73 16: "RTM_NEWLINK", 

74 17: "RTM_DELLINK", 

75 18: "RTM_GETLINK", 

76 19: "RTM_SETLINK", 

77 20: "RTM_NEWADDR", 

78 21: "RTM_DELADDR", 

79 22: "RTM_GETADDR", 

80 # 23: unused 

81 24: "RTM_NEWROUTE", 

82 25: "RTM_DELROUTE", 

83 26: "RTM_GETROUTE", 

84 # 27: unused 

85 }, 

86 fmt="=H", 

87 ), 

88 FlagsField( 

89 "nlmsg_flags", 

90 0, 

91 16 if BIG_ENDIAN else -16, 

92 { 

93 0x01: "NLM_F_REQUEST", 

94 0x02: "NLM_F_MULTI", 

95 0x04: "NLM_F_ACK", 

96 0x08: "NLM_F_ECHO", 

97 0x10: "NLM_F_DUMP_INTR", 

98 0x20: "NLM_F_DUMP_FILTERED", 

99 # GET modifiers 

100 0x100: "NLM_F_ROOT", 

101 0x200: "NLM_F_MATCH", 

102 0x400: "NLM_F_ATOMIC", 

103 }, 

104 ), 

105 Field("nlmsg_seq", 0, fmt="=L"), 

106 Field("nlmsg_pid", 0, fmt="=L"), 

107 ] 

108 

109 def post_build(self, pkt: bytes, pay: bytes) -> bytes: 

110 pkt += pay 

111 if self.nlmsg_len is None: 

112 pkt = struct.pack("=L", len(pkt)) + pkt[4:] 

113 return pkt 

114 

115 def extract_padding(self, s: bytes) -> Tuple[bytes, Optional[bytes]]: 

116 return s[: self.nlmsg_len - 16], s[self.nlmsg_len - 16 :] 

117 

118 def answers(self, other: Packet) -> bool: 

119 return bool(other.nlmsg_seq == self.nlmsg_seq) 

120 

121 

122# DONE 

123 

124 

125class nlmsgerr_rtattr(Packet): 

126 fields_desc = [ 

127 FieldLenField( 

128 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4 

129 ), 

130 EnumField( 

131 "rta_type", 

132 0, 

133 {}, 

134 fmt="=H", 

135 ), 

136 PadField( 

137 MultipleTypeField( 

138 [], 

139 StrLenField( 

140 "rta_data", 

141 b"", 

142 length_from=lambda pkt: pkt.rta_len - 4, 

143 ), 

144 ), 

145 align=4, 

146 ), 

147 ] 

148 

149 def default_payload_class(self, payload: bytes) -> Type[Packet]: 

150 return conf.padding_layer 

151 

152 

153class nlmsgerr(Packet): 

154 fields_desc = [ 

155 MayEnd(Field("status", 0, fmt="=L")), 

156 # Pay 

157 PacketListField("data", [], nlmsgerr_rtattr), 

158 ] 

159 

160 

161bind_layers(rtmsghdr, nlmsgerr, nlmsg_type=3) 

162 

163 

164# LINK messages 

165 

166 

167class ifla_af_spec_inet_rtattr(Packet): 

168 fields_desc = [ 

169 FieldLenField( 

170 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4 

171 ), 

172 EnumField( 

173 "rta_type", 

174 0, 

175 { 

176 0x00: "IFLA_INET_UNSPEC", 

177 0x01: "IFLA_INET_CONF", 

178 }, 

179 fmt="=H", 

180 ), 

181 PadField( 

182 MultipleTypeField( 

183 [], 

184 XStrLenField( 

185 "rta_data", 

186 b"", 

187 length_from=lambda pkt: pkt.rta_len - 4, 

188 ), 

189 ), 

190 align=4, 

191 ), 

192 ] 

193 

194 def default_payload_class(self, payload: bytes) -> Type[Packet]: 

195 return conf.padding_layer 

196 

197 

198class ifla_af_spec_inet6_rtattr(Packet): 

199 fields_desc = [ 

200 FieldLenField( 

201 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4 

202 ), 

203 EnumField( 

204 "rta_type", 

205 0, 

206 { 

207 0x00: "IFLA_INET6_UNSPEC", 

208 0x01: "IFLA_INET6_FLAGS", 

209 0x02: "IFLA_INET6_CONF", 

210 0x03: "IFLA_INET6_STATS", 

211 0x04: "IFLA_INET6_MCAST", 

212 0x05: "IFLA_INET6_CACHEINFO", 

213 0x06: "IFLA_INET6_ICMP6STATS", 

214 0x07: "IFLA_INET6_TOKEN", 

215 0x08: "IFLA_INET6_ADDR_GEN_MODE", 

216 0x09: "IFLA_INET6_RA_MTU", 

217 }, 

218 fmt="=H", 

219 ), 

220 PadField( 

221 MultipleTypeField( 

222 [], 

223 XStrLenField( 

224 "rta_data", 

225 b"", 

226 length_from=lambda pkt: pkt.rta_len - 4, 

227 ), 

228 ), 

229 align=4, 

230 ), 

231 ] 

232 

233 def default_payload_class(self, payload: bytes) -> Type[Packet]: 

234 return conf.padding_layer 

235 

236 

237class ifla_af_spec_rtattr(Packet): 

238 fields_desc = [ 

239 FieldLenField( 

240 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4 

241 ), 

242 EnumField("rta_type", 0, socket.AddressFamily, fmt="=H"), 

243 PadField( 

244 MultipleTypeField( 

245 [ 

246 ( 

247 # AF_INET 

248 PacketListField( 

249 "rta_data", 

250 [], 

251 ifla_af_spec_inet_rtattr, 

252 length_from=lambda pkt: pkt.rta_len - 4, 

253 ), 

254 lambda pkt: pkt.rta_type == 2, 

255 ), 

256 ( 

257 # AF_INET6 

258 PacketListField( 

259 "rta_data", 

260 [], 

261 ifla_af_spec_inet6_rtattr, 

262 length_from=lambda pkt: pkt.rta_len - 4, 

263 ), 

264 lambda pkt: pkt.rta_type == 10, 

265 ), 

266 ], 

267 XStrLenField( 

268 "rta_data", 

269 b"", 

270 length_from=lambda pkt: pkt.rta_len - 4, 

271 ), 

272 ), 

273 align=4, 

274 ), 

275 ] 

276 

277 def default_payload_class(self, payload: bytes) -> Type[Packet]: 

278 return conf.padding_layer 

279 

280 

281class ifinfomsg_rtattr(Packet): 

282 fields_desc = [ 

283 FieldLenField( 

284 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4 

285 ), 

286 EnumField( 

287 "rta_type", 

288 0, 

289 { 

290 0x00: "IFLA_UNSPEC", 

291 0x01: "IFLA_ADDRESS", 

292 0x02: "IFLA_BROADCAST", 

293 0x03: "IFLA_IFNAME", 

294 0x04: "IFLA_MTU", 

295 0x05: "IFLA_LINK", 

296 0x06: "IFLA_QDISC", 

297 0x07: "IFLA_STATS", 

298 0x08: "IFLA_COST", 

299 0x09: "IFLA_PRIORITY", 

300 0x0A: "IFLA_MASTER", 

301 0x0B: "IFLA_WIRELESS", 

302 0x0C: "IFLA_PROTINFO", 

303 0x0D: "IFLA_TXQLEN", 

304 0x0E: "IFLA_MAP", 

305 0x0F: "IFLA_WEIGHT", 

306 0x10: "IFLA_OPERSTATE", 

307 0x11: "IFLA_LINKMODE", 

308 0x12: "IFLA_LINKINFO", 

309 0x13: "IFLA_NET_NS_PID", 

310 0x14: "IFLA_IFALIAS", 

311 0x15: "IFLA_NUM_VS", 

312 0x16: "IFLA_VFINFO_LIST", 

313 0x17: "IFLA_STATS64", 

314 0x18: "IFLA_VF_PORTS", 

315 0x19: "IFLA_PORT_SELF", 

316 0x1A: "IFLA_AF_SPEC", 

317 0x1B: "IFLA_GROUP", 

318 0x1C: "IFLA_NET_NS_FD", 

319 0x1D: "IFLA_EXT_MASK", 

320 0x1E: "IFLA_PROMISCUITY", 

321 0x1F: "IFLA_NUM_TX_QUEUES", 

322 0x20: "IFLA_NUM_RX_QUEUES", 

323 0x21: "IFLA_CARRIER", 

324 0x22: "IFLA_PHYS_PORT_ID", 

325 0x23: "IFLA_CARRIER_CHANGES", 

326 0x24: "IFLA_PHYS_SWITCH_ID", 

327 0x25: "IFLA_LINK_NETNSID", 

328 0x26: "IFLA_PHYS_PORT_NAME", 

329 0x27: "IFLA_PROTO_DOWN", 

330 0x28: "IFLA_GSO_MAX_SEGS", 

331 0x29: "IFLA_GSO_MAX_SIZE", 

332 0x2A: "IFLA_PAD", 

333 0x2B: "IFLA_XDP", 

334 0x2C: "IFLA_EVENT", 

335 0x2D: "IFLA_NEW_NETNSID", 

336 0x2E: "IFLA_IF_NETNSID", 

337 0x2F: "IFLA_CARRIER_UP_COUNT", 

338 0x30: "IFLA_CARRIER_DOWN_COUNT", 

339 0x31: "IFLA_NEW_IFINDEX", 

340 0x32: "IFLA_MIN_MTU", 

341 0x33: "IFLA_MAX_MTU", 

342 0x34: "IFLA_PROP_LIST", 

343 0x35: "IFLA_ALT_IFNAME", 

344 0x36: "IFLA_PERM_ADDRESS", 

345 0x37: "IFLA_PROTO_DOWN_REASON", 

346 0x38: "IFLA_PARENT_DEV_NAME", 

347 0x39: "IFLA_PARENT_DEV_BUS_NAME", 

348 0x3A: "IFLA_GRO_MAX_SIZE", 

349 0x3B: "IFLA_TSO_MAX_SIZE", 

350 0x3C: "IFLA_TSO_MAX_SEGS", 

351 0x3D: "IFLA_ALLMULTI", 

352 }, 

353 fmt="=H", 

354 ), 

355 PadField( 

356 MultipleTypeField( 

357 [ 

358 ( 

359 # IFLA_ADDRESS 

360 MACField("rta_data", "00:00:00:00:00:00"), 

361 lambda pkt: pkt.rta_type in [0x01, 0x36], 

362 ), 

363 ( 

364 # IFLA_IFNAME 

365 StrLenField( 

366 "rta_data", b"", length_from=lambda pkt: pkt.rta_len - 4 

367 ), 

368 lambda pkt: pkt.rta_type in [0x03], 

369 ), 

370 ( 

371 # IFLA_AF_SPEC 

372 PacketListField( 

373 "rta_data", 

374 [], 

375 ifla_af_spec_rtattr, 

376 length_from=lambda pkt: pkt.rta_len - 4, 

377 ), 

378 lambda pkt: pkt.rta_type == 0x1A, 

379 ), 

380 ], 

381 XStrLenField( 

382 "rta_data", 

383 b"", 

384 length_from=lambda pkt: pkt.rta_len - 4, 

385 ), 

386 ), 

387 align=4, 

388 ), 

389 ] 

390 

391 def default_payload_class(self, payload: bytes) -> Type[Packet]: 

392 return conf.padding_layer 

393 

394 

395class ifinfomsg(Packet): 

396 fields_desc = [ 

397 ByteEnumField("ifi_family", 0, socket.AddressFamily), # type: ignore 

398 ByteField("res", 0), 

399 Field("ifi_type", 0, fmt="=H"), 

400 Field("ifi_index", 0, fmt="=i"), 

401 FlagsField( 

402 "ifi_flags", 

403 0, 

404 32 if BIG_ENDIAN else -32, 

405 _iff_flags, 

406 ), 

407 Field("ifi_change", 0, fmt="=I"), 

408 # Pay 

409 PacketListField("data", [], ifinfomsg_rtattr), 

410 ] 

411 

412 

413bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=16) 

414bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=17) 

415bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=18) 

416bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=19) 

417 

418 

419# ADDR messages 

420 

421 

422class ifaddrmsg_rtattr(Packet): 

423 fields_desc = [ 

424 FieldLenField( 

425 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4 

426 ), 

427 EnumField( 

428 "rta_type", 

429 0, 

430 { 

431 0x00: "IFA_UNSPEC", 

432 0x01: "IFA_ADDRESS", 

433 0x02: "IFA_LOCAL", 

434 0x03: "IFA_LABEL", 

435 0x04: "IFA_BROADCAST", 

436 0x05: "IFA_ANYCAST", 

437 0x06: "IFA_CACHEINFO", 

438 0x07: "IFA_MULTICAST", 

439 0x08: "IFA_FLAGS", 

440 0x09: "IFA_RT_PRIORITY", 

441 0x0A: "IFA_TARGET_NETNSID", 

442 0x0B: "IFA_PROTO", 

443 }, 

444 fmt="=H", 

445 ), 

446 PadField( 

447 MultipleTypeField( 

448 [ 

449 # IFA_ADDRESS, IFA_LOCAL, IFA_BROADCAST 

450 ( 

451 IPField("rta_data", "0.0.0.0"), 

452 lambda pkt: pkt.parent 

453 and pkt.parent.ifa_family == 2 

454 and pkt.rta_type in [0x01, 0x02, 0x04], 

455 ), 

456 ( 

457 IP6Field("rta_data", "::"), 

458 lambda pkt: pkt.parent 

459 and pkt.parent.ifa_family == 10 

460 and pkt.rta_type in [0x01, 0x02, 0x04], 

461 ), 

462 ( 

463 # IFA_LABEL 

464 StrLenField( 

465 "rta_data", b"", length_from=lambda pkt: pkt.rta_len - 4 

466 ), 

467 lambda pkt: pkt.rta_type in [0x03], 

468 ), 

469 ], 

470 XStrLenField( 

471 "rta_data", 

472 b"", 

473 length_from=lambda pkt: pkt.rta_len - 4, 

474 ), 

475 ), 

476 align=4, 

477 ), 

478 ] 

479 

480 def default_payload_class(self, payload: bytes) -> Type[Packet]: 

481 return conf.padding_layer 

482 

483 

484class ifaddrmsg(Packet): 

485 fields_desc = [ 

486 ByteEnumField("ifa_family", 0, socket.AddressFamily), # type: ignore 

487 ByteField("ifa_prefixlen", 0), 

488 FlagsField( 

489 "ifa_flags", 

490 0, 

491 -8, 

492 { 

493 0x01: "IFA_F_SECONDARY", 

494 0x02: "IFA_F_NODAD", 

495 0x04: "IFA_F_OPTIMISTIC", 

496 0x08: "IFA_F_DADFAILED", 

497 0x10: "IFA_F_HOMEADDRESS", 

498 0x20: "IFA_F_DEPRECATED", 

499 0x40: "IFA_F_TENTATIVE", 

500 0x80: "IFA_F_PERMANENT", 

501 }, 

502 ), 

503 ByteField("ifa_scope", 0), 

504 Field("ifa_index", 0, fmt="=L"), 

505 # Pay 

506 PacketListField("data", [], ifaddrmsg_rtattr), 

507 ] 

508 

509 

510bind_layers(rtmsghdr, ifaddrmsg, nlmsg_type=20) 

511bind_layers(rtmsghdr, ifaddrmsg, nlmsg_type=21) 

512bind_layers(rtmsghdr, ifaddrmsg, nlmsg_type=22) 

513 

514 

515# ROUTE messages 

516 

517 

518RT_CLASS = { 

519 0: "RT_TABLE_UNSPEC", 

520 252: "RT_TABLE_COMPAT", 

521 253: "RT_TABLE_DEFAULT", 

522 254: "RT_TABLE_MAIN", 

523 255: "RT_TABLE_LOCAL", 

524} 

525 

526 

527class rtmsg_rtattr(Packet): 

528 fields_desc = [ 

529 FieldLenField( 

530 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4 

531 ), 

532 EnumField( 

533 "rta_type", 

534 0, 

535 { 

536 0x00: "RTA_UNSPEC", 

537 0x01: "RTA_DST", 

538 0x02: "RTS_SRC", 

539 0x03: "RTS_IIF", 

540 0x04: "RTS_OIF", 

541 0x05: "RTA_GATEWAY", 

542 0x06: "RTA_PRIORITY", 

543 0x07: "RTA_PREFSRC", 

544 0x08: "RTA_METRICS", 

545 0x09: "RTA_MULTIPATH", 

546 0x0B: "RTA_FLOW", 

547 0x0C: "RTA_CACHEINFO", 

548 0x0F: "RTA_TABLE", 

549 0x10: "RTA_MARK", 

550 0x11: "RTA_MFC_STATS", 

551 0x12: "RTA_VIA", 

552 0x13: "RTA_NEWDST", 

553 0x14: "RTA_PREF", 

554 0x15: "RTA_ENCAP_TYPE", 

555 0x16: "RTA_ENCAP", 

556 0x17: "RTA_EXPIRES", 

557 0x18: "RTA_PAD", 

558 0x19: "RTA_UID", 

559 0x1A: "RTA_TTL_PROPAGATE", 

560 0x1B: "RTA_IP_PROTO", 

561 0x1C: "RTA_SPORT", 

562 0x1D: "RTA_DPORT", 

563 0x1E: "RTA_NH_ID", 

564 }, 

565 fmt="=H", 

566 ), 

567 PadField( 

568 MultipleTypeField( 

569 [ 

570 # RTA_DST, RTA_SRC, RTA_PREFSRC, RTA_GATEWAY 

571 ( 

572 IPField("rta_data", "0.0.0.0"), 

573 lambda pkt: pkt.parent 

574 and pkt.parent.rtm_family == 2 

575 and pkt.rta_type in [0x01, 0x02, 0x05, 0x07], 

576 ), 

577 ( 

578 IP6Field("rta_data", "::"), 

579 lambda pkt: pkt.parent 

580 and pkt.parent.rtm_family == 10 

581 and pkt.rta_type in [0x01, 0x02, 0x05, 0x07], 

582 ), 

583 # RTS_OIF, RTA_PRIORITY 

584 ( 

585 Field("rta_data", 0, fmt="=I"), 

586 lambda pkt: pkt.rta_type in [0x04, 0x06, 0x10], 

587 ), 

588 # RTA_TABLE 

589 ( 

590 EnumField("rta_data", 0, RT_CLASS, fmt="=I"), 

591 lambda pkt: pkt.rta_type in [0x0F], 

592 ), 

593 ], 

594 XStrLenField( 

595 "rta_data", 

596 b"", 

597 length_from=lambda pkt: pkt.rta_len - 4, 

598 ), 

599 ), 

600 align=4, 

601 ), 

602 ] 

603 

604 def default_payload_class(self, payload: bytes) -> Type[Packet]: 

605 return conf.padding_layer 

606 

607 

608class rtmsg(Packet): 

609 fields_desc = [ 

610 ByteEnumField("rtm_family", 0, socket.AddressFamily), # type: ignore 

611 ByteField("rtm_dst_len", 0), 

612 ByteField("rtm_src_len", 0), 

613 ByteField("rtm_tos", 0), 

614 ByteEnumField( 

615 "rtm_table", 

616 0, 

617 RT_CLASS, 

618 ), 

619 ByteEnumField( 

620 "rtm_protocol", 

621 0, 

622 { 

623 0x00: "RTPROT_UNSPEC", 

624 0x01: "RTPROT_REDIRECT", 

625 0x02: "RTPROT_KERNEL", 

626 0x03: "RTPROT_BOOT", 

627 0x04: "RTPROT_STATIC", 

628 }, 

629 ), 

630 ByteEnumField( 

631 "rtm_scope", 

632 0, 

633 { 

634 0: "RT_SCOPE_UNIVERSE", 

635 200: "RT_SCOPE_SITE", 

636 253: "RT_SCOPE_LINK", 

637 254: "RT_SCOPE_HOST", 

638 255: "RT_SCOPE_NOWHERE", 

639 }, 

640 ), 

641 ByteEnumField( 

642 "rtm_type", 

643 0, 

644 { 

645 0x00: "RTN_UNSPEC", 

646 0x01: "RTN_UNICAST", 

647 0x02: "RTN_LOCAL", 

648 0x03: "RTN_BROADCAST", 

649 0x04: "RTN_ANYCAST", 

650 0x05: "RTN_MULTICAST", 

651 0x06: "RTN_BLACKHOLE", 

652 0x07: "RTN_UNREACHABLE", 

653 0x08: "RTN_PROHIBIT", 

654 0x09: "RTN_THROW", 

655 0x0A: "RTN_NAT", 

656 0x0B: "RTN_XRESOLVE", 

657 }, 

658 ), 

659 FlagsField( 

660 "rtm_flags", 

661 0, 

662 32 if BIG_ENDIAN else -32, 

663 { 

664 0x100: "RTM_F_NOTIFY", 

665 0x200: "RTM_F_CLONED", 

666 0x400: "RTM_F_EQUALIZE", 

667 0x800: "RTM_F_PREFIX", 

668 0x1000: "RTM_F_LOOKUP_TABLE", 

669 0x2000: "RTM_F_FIB_MATCH", 

670 0x4000: "RTM_F_OFFLOAD", 

671 0x8000: "RTM_F_TRAP", 

672 0x20000000: "RTM_F_OFFLOAD_FAILED", 

673 }, 

674 ), 

675 # Pay 

676 PacketListField("data", [], rtmsg_rtattr), 

677 ] 

678 

679 

680bind_layers(rtmsghdr, rtmsg, nlmsg_type=24) 

681bind_layers(rtmsghdr, rtmsg, nlmsg_type=25) 

682bind_layers(rtmsghdr, rtmsg, nlmsg_type=26) 

683 

684 

685class rtmsghdrs(Packet): 

686 fields_desc = [ 

687 PacketListField( 

688 "msgs", 

689 [], 

690 rtmsghdr, 

691 # 65535 / len(rtmsghdr) 

692 max_count=4096, 

693 ), 

694 ] 

695 

696 

697# Utils 

698 

699 

700SOL_NETLINK = 270 

701NETLINK_EXT_ACK = 11 

702NETLINK_GET_STRICT_CHK = 12 

703 

704 

705def _sr1_rtrequest(pkt: Packet) -> List[Packet]: 

706 """ 

707 Send / Receive a rtnetlink request 

708 """ 

709 # Create socket 

710 sock = socket.socket( 

711 socket.AF_NETLINK, 

712 socket.SOCK_RAW | socket.SOCK_CLOEXEC, 

713 socket.NETLINK_ROUTE, 

714 ) 

715 # Configure socket 

716 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 32768) 

717 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1048576) 

718 sock.setsockopt(SOL_NETLINK, NETLINK_EXT_ACK, 1) 

719 sock.bind((0, 0)) # bind to kernel 

720 sock.setsockopt(SOL_NETLINK, NETLINK_GET_STRICT_CHK, 1) 

721 # Request routes 

722 sock.send(bytes(rtmsghdrs(msgs=[pkt]))) 

723 results: List[Packet] = [] 

724 try: 

725 while True: 

726 msgs = rtmsghdrs(sock.recv(65535)) 

727 if not msgs: 

728 log_loading.warning("Failed to read the routes using RTNETLINK !") 

729 return [] 

730 for msg in msgs.msgs: 

731 # Keep going until we find the end of the MULTI format 

732 if not msg.nlmsg_flags.NLM_F_MULTI or msg.nlmsg_type == 3: 

733 if msg.nlmsg_type == 3 and nlmsgerr in msg and msg.status != 0: 

734 # NLMSG_DONE with errors 

735 if msg.data and msg.data[0].rta_type == 1: 

736 log_loading.warning( 

737 "Scapy RTNETLINK error on %s: '%s'. Please report !", 

738 pkt.sprintf("%nlmsg_type%"), 

739 msg.data[0].rta_data.decode(), 

740 ) 

741 return [] 

742 return results 

743 results.append(msg) 

744 finally: 

745 sock.close() 

746 

747 

748def _get_ips(af_family=socket.AF_UNSPEC): 

749 # type: (socket.AddressFamily) -> Dict[int, List[Dict[str, Any]]] 

750 """ 

751 Return a mapping of all interfaces IP using a NETLINK socket. 

752 """ 

753 results = _sr1_rtrequest( 

754 rtmsghdr( 

755 nlmsg_type="RTM_GETADDR", 

756 nlmsg_flags="NLM_F_REQUEST+NLM_F_ROOT+NLM_F_MATCH", 

757 nlmsg_seq=int(time.time()), 

758 ) 

759 / ifaddrmsg( 

760 ifa_family=af_family, 

761 data=[], 

762 ) 

763 ) 

764 ips: Dict[int, List[Dict[str, Any]]] = {} 

765 for msg in results: 

766 ifindex = msg.ifa_index 

767 address = None 

768 family = msg.ifa_family 

769 for attr in msg.data: 

770 if attr.rta_type == 0x01: # IFA_ADDRESS 

771 address = attr.rta_data 

772 break 

773 if address is not None: 

774 data = { 

775 "af_family": family, 

776 "index": ifindex, 

777 "address": address, 

778 } 

779 if family == 10: # ipv6 

780 data["scope"] = scapy.utils6.in6_getscope(address) 

781 ips.setdefault(ifindex, list()).append(data) 

782 return ips 

783 

784 

785def _get_if_list(): 

786 # type: () -> Dict[int, Dict[str, Any]] 

787 """ 

788 Read the interfaces list using a NETLINK socket. 

789 """ 

790 results = _sr1_rtrequest( 

791 rtmsghdr( 

792 nlmsg_type="RTM_GETLINK", 

793 nlmsg_flags="NLM_F_REQUEST+NLM_F_ROOT+NLM_F_MATCH", 

794 nlmsg_seq=int(time.time()), 

795 ) 

796 / ifinfomsg( 

797 data=[], 

798 ) 

799 ) 

800 lifips = _get_ips() 

801 interfaces = {} 

802 for msg in results: 

803 ifindex = msg.ifi_index 

804 ifname = None 

805 mac = "00:00:00:00:00:00" 

806 ifflags = msg.ifi_flags 

807 ips = [] 

808 for attr in msg.data: 

809 if attr.rta_type == 0x01: # IFLA_ADDRESS 

810 mac = attr.rta_data 

811 elif attr.rta_type == 0x03: # IFLA_NAME 

812 ifname = attr.rta_data[:-1].decode() 

813 if ifname is not None: 

814 if ifindex in lifips: 

815 ips = lifips[ifindex] 

816 interfaces[ifindex] = { 

817 "name": ifname, 

818 "index": ifindex, 

819 "flags": ifflags, 

820 "mac": mac, 

821 "ips": ips, 

822 } 

823 return interfaces 

824 

825 

826def in6_getifaddr(): 

827 # type: () -> List[Tuple[str, int, str]] 

828 """ 

829 Returns a list of 3-tuples of the form (addr, scope, iface) where 

830 'addr' is the address of scope 'scope' associated to the interface 

831 'iface'. 

832 

833 This is the list of all addresses of all interfaces available on 

834 the system. 

835 """ 

836 ips = _get_ips(af_family=socket.AF_INET6) 

837 ifaces = _get_if_list() 

838 result = [] 

839 for intip in ips.values(): 

840 for ip in intip: 

841 if ip["index"] in ifaces: 

842 result.append((ip["address"], ip["scope"], ifaces[ip["index"]]["name"])) 

843 return result 

844 

845 

846def _read_routes(af_family): 

847 # type: (socket.AddressFamily) -> List[Packet] 

848 """ 

849 Read routes using a NETLINK socket. 

850 """ 

851 results = [] 

852 for rttable in ["RT_TABLE_LOCAL", "RT_TABLE_MAIN"]: 

853 results.extend( 

854 _sr1_rtrequest( 

855 rtmsghdr( 

856 nlmsg_type="RTM_GETROUTE", 

857 nlmsg_flags="NLM_F_REQUEST+NLM_F_ROOT+NLM_F_MATCH", 

858 nlmsg_seq=int(time.time()), 

859 ) 

860 / rtmsg( 

861 rtm_family=af_family, 

862 data=[ 

863 rtmsg_rtattr(rta_type="RTA_TABLE", rta_data=rttable), 

864 ], 

865 ) 

866 ) 

867 ) 

868 return [msg for msg in results if msg.nlmsg_type == 24] # RTM_NEWROUTE 

869 

870 

871def read_routes(): 

872 # type: () -> List[Tuple[int, int, str, str, str, int]] 

873 """ 

874 Read IPv4 routes for current process 

875 """ 

876 routes = [] 

877 ifaces = _get_if_list() 

878 results = _read_routes(socket.AF_INET) 

879 for msg in results: 

880 # Process the RTM_NEWROUTE 

881 net = 0 

882 mask = itom(msg.rtm_dst_len) 

883 gw = "0.0.0.0" 

884 iface = "" 

885 addr = "0.0.0.0" 

886 metric = 0 

887 for attr in msg.data: 

888 if attr.rta_type == 0x01: # RTA_DST 

889 net = atol(attr.rta_data) 

890 elif attr.rta_type == 0x04: # RTS_OIF 

891 index = attr.rta_data 

892 if index in ifaces: 

893 iface = ifaces[index]["name"] 

894 else: 

895 iface = str(index) 

896 elif attr.rta_type == 0x05: # RTA_GATEWAY 

897 gw = attr.rta_data 

898 elif attr.rta_type == 0x06: # RTA_PRIORITY 

899 metric = attr.rta_data 

900 elif attr.rta_type == 0x07: # RTA_PREFSRC 

901 addr = attr.rta_data 

902 routes.append((net, mask, gw, iface, addr, metric)) 

903 return routes 

904 

905 

906def read_routes6(): 

907 # type: () -> List[Tuple[str, int, str, str, List[str], int]] 

908 """ 

909 Read IPv6 routes for current process 

910 """ 

911 routes = [] 

912 ifaces = _get_if_list() 

913 results = _read_routes(socket.AF_INET6) 

914 lifaddr = _get_ips(af_family=socket.AF_INET6) 

915 for msg in results: 

916 # Process the RTM_NEWROUTE 

917 prefix = "::" 

918 plen = msg.rtm_dst_len 

919 nh = "::" 

920 index = 0 

921 iface = "" 

922 metric = 0 

923 for attr in msg.data: 

924 if attr.rta_type == 0x01: # RTA_DST 

925 prefix = attr.rta_data 

926 elif attr.rta_type == 0x04: # RTS_OIF 

927 index = attr.rta_data 

928 if index in ifaces: 

929 iface = ifaces[index]["name"] 

930 else: 

931 iface = str(index) 

932 elif attr.rta_type == 0x05: # RTA_GATEWAY 

933 nh = attr.rta_data 

934 elif attr.rta_type == 0x06: # RTA_PRIORITY 

935 metric = attr.rta_data 

936 devaddrs = ((x["address"], x["scope"], iface) for x in lifaddr.get(index, [])) 

937 cset = scapy.utils6.construct_source_candidate_set(prefix, plen, devaddrs) 

938 if cset: 

939 routes.append((prefix, plen, nh, iface, cset, metric)) 

940 return routes