Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/arch/linux/rtnetlink.py: 78%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

233 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter 

5 

6""" 

7This file implements the rtnetlink API that is used to read the network 

8configuration of the machine. 

9""" 

10 

11import socket 

12import struct 

13import time 

14 

15import scapy.utils6 

16 

17from scapy.consts import BIG_ENDIAN 

18from scapy.config import conf 

19from scapy.error import log_loading 

20from scapy.packet import ( 

21 Packet, 

22 bind_layers, 

23) 

24from scapy.utils import atol, itom 

25 

26from scapy.fields import ( 

27 ByteEnumField, 

28 ByteField, 

29 EnumField, 

30 Field, 

31 FieldLenField, 

32 FlagsField, 

33 IP6Field, 

34 IPField, 

35 LenField, 

36 MACField, 

37 MayEnd, 

38 MultipleTypeField, 

39 PacketListField, 

40 PadField, 

41 StrLenField, 

42 XStrLenField, 

43) 

44 

45from scapy.arch.common import _iff_flags 

46 

47# Typing imports 

48from typing import ( 

49 Any, 

50 Dict, 

51 List, 

52 Optional, 

53 Tuple, 

54 Type, 

55) 

56 

57# from <linux/netlink.h> and <linux/rtnetlink.h> 

58 

59 

60# Common header 

61 

62 

63class rtmsghdr(Packet): 

64 fields_desc = [ 

65 LenField("nlmsg_len", None, fmt="=L"), 

66 EnumField( 

67 "nlmsg_type", 

68 0, 

69 { 

70 # netlink.h 

71 3: "NLMSG_DONE", 

72 # rtnetlink.h 

73 16: "RTM_NEWLINK", 

74 17: "RTM_DELLINK", 

75 18: "RTM_GETLINK", 

76 19: "RTM_SETLINK", 

77 20: "RTM_NEWADDR", 

78 21: "RTM_DELADDR", 

79 22: "RTM_GETADDR", 

80 # 23: unused 

81 24: "RTM_NEWROUTE", 

82 25: "RTM_DELROUTE", 

83 26: "RTM_GETROUTE", 

84 # 27: unused 

85 }, 

86 fmt="=H", 

87 ), 

88 FlagsField( 

89 "nlmsg_flags", 

90 0, 

91 16 if BIG_ENDIAN else -16, 

92 { 

93 0x01: "NLM_F_REQUEST", 

94 0x02: "NLM_F_MULTI", 

95 0x04: "NLM_F_ACK", 

96 0x08: "NLM_F_ECHO", 

97 0x10: "NLM_F_DUMP_INTR", 

98 0x20: "NLM_F_DUMP_FILTERED", 

99 # GET modifiers 

100 0x100: "NLM_F_ROOT", 

101 0x200: "NLM_F_MATCH", 

102 0x400: "NLM_F_ATOMIC", 

103 }, 

104 ), 

105 Field("nlmsg_seq", 0, fmt="=L"), 

106 Field("nlmsg_pid", 0, fmt="=L"), 

107 ] 

108 

109 def post_build(self, pkt: bytes, pay: bytes) -> bytes: 

110 pkt += pay 

111 if self.nlmsg_len is None: 

112 pkt = struct.pack("=L", len(pkt)) + pkt[4:] 

113 return pkt 

114 

115 def extract_padding(self, s: bytes) -> Tuple[bytes, Optional[bytes]]: 

116 return s[: self.nlmsg_len - 16], s[self.nlmsg_len - 16 :] 

117 

118 def answers(self, other: Packet) -> bool: 

119 return bool(other.nlmsg_seq == self.nlmsg_seq) 

120 

121 

122# DONE 

123 

124 

125class nlmsgerr_rtattr(Packet): 

126 fields_desc = [ 

127 FieldLenField( 

128 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4 

129 ), 

130 EnumField( 

131 "rta_type", 

132 0, 

133 {}, 

134 fmt="=H", 

135 ), 

136 PadField( 

137 MultipleTypeField( 

138 [], 

139 StrLenField( 

140 "rta_data", 

141 b"", 

142 length_from=lambda pkt: pkt.rta_len - 4, 

143 ), 

144 ), 

145 align=4, 

146 ), 

147 ] 

148 

149 def default_payload_class(self, payload: bytes) -> Type[Packet]: 

150 return conf.padding_layer 

151 

152 

153class nlmsgerr(Packet): 

154 fields_desc = [ 

155 MayEnd(Field("status", 0, fmt="=L")), 

156 # Pay 

157 PacketListField("data", [], nlmsgerr_rtattr), 

158 ] 

159 

160 

161bind_layers(rtmsghdr, nlmsgerr, nlmsg_type=3) 

162 

163 

164# LINK messages 

165 

166 

167class ifla_af_spec_inet_rtattr(Packet): 

168 fields_desc = [ 

169 FieldLenField( 

170 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4 

171 ), 

172 EnumField( 

173 "rta_type", 

174 0, 

175 { 

176 0x00: "IFLA_INET_UNSPEC", 

177 0x01: "IFLA_INET_CONF", 

178 }, 

179 fmt="=H", 

180 ), 

181 PadField( 

182 MultipleTypeField( 

183 [], 

184 XStrLenField( 

185 "rta_data", 

186 b"", 

187 length_from=lambda pkt: pkt.rta_len - 4, 

188 ), 

189 ), 

190 align=4, 

191 ), 

192 ] 

193 

194 def default_payload_class(self, payload: bytes) -> Type[Packet]: 

195 return conf.padding_layer 

196 

197 

198class ifla_af_spec_inet6_rtattr(Packet): 

199 fields_desc = [ 

200 FieldLenField( 

201 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4 

202 ), 

203 EnumField( 

204 "rta_type", 

205 0, 

206 { 

207 0x00: "IFLA_INET6_UNSPEC", 

208 0x01: "IFLA_INET6_FLAGS", 

209 0x02: "IFLA_INET6_CONF", 

210 0x03: "IFLA_INET6_STATS", 

211 0x04: "IFLA_INET6_MCAST", 

212 0x05: "IFLA_INET6_CACHEINFO", 

213 0x06: "IFLA_INET6_ICMP6STATS", 

214 0x07: "IFLA_INET6_TOKEN", 

215 0x08: "IFLA_INET6_ADDR_GEN_MODE", 

216 0x09: "IFLA_INET6_RA_MTU", 

217 }, 

218 fmt="=H", 

219 ), 

220 PadField( 

221 MultipleTypeField( 

222 [], 

223 XStrLenField( 

224 "rta_data", 

225 b"", 

226 length_from=lambda pkt: pkt.rta_len - 4, 

227 ), 

228 ), 

229 align=4, 

230 ), 

231 ] 

232 

233 def default_payload_class(self, payload: bytes) -> Type[Packet]: 

234 return conf.padding_layer 

235 

236 

237class ifla_af_spec_rtattr(Packet): 

238 fields_desc = [ 

239 FieldLenField( 

240 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4 

241 ), 

242 EnumField("rta_type", 0, socket.AddressFamily, fmt="=H"), 

243 PadField( 

244 MultipleTypeField( 

245 [ 

246 ( 

247 # AF_INET 

248 PacketListField( 

249 "rta_data", 

250 [], 

251 ifla_af_spec_inet_rtattr, 

252 length_from=lambda pkt: pkt.rta_len - 4, 

253 ), 

254 lambda pkt: pkt.rta_type == 2, 

255 ), 

256 ( 

257 # AF_INET6 

258 PacketListField( 

259 "rta_data", 

260 [], 

261 ifla_af_spec_inet6_rtattr, 

262 length_from=lambda pkt: pkt.rta_len - 4, 

263 ), 

264 lambda pkt: pkt.rta_type == 10, 

265 ), 

266 ], 

267 XStrLenField( 

268 "rta_data", 

269 b"", 

270 length_from=lambda pkt: pkt.rta_len - 4, 

271 ), 

272 ), 

273 align=4, 

274 ), 

275 ] 

276 

277 def default_payload_class(self, payload: bytes) -> Type[Packet]: 

278 return conf.padding_layer 

279 

280 

281class ifinfomsg_rtattr(Packet): 

282 fields_desc = [ 

283 FieldLenField( 

284 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4 

285 ), 

286 EnumField( 

287 "rta_type", 

288 0, 

289 { 

290 0x00: "IFLA_UNSPEC", 

291 0x01: "IFLA_ADDRESS", 

292 0x02: "IFLA_BROADCAST", 

293 0x03: "IFLA_IFNAME", 

294 0x04: "IFLA_MTU", 

295 0x05: "IFLA_LINK", 

296 0x06: "IFLA_QDISC", 

297 0x07: "IFLA_STATS", 

298 0x08: "IFLA_COST", 

299 0x09: "IFLA_PRIORITY", 

300 0x0A: "IFLA_MASTER", 

301 0x0B: "IFLA_WIRELESS", 

302 0x0C: "IFLA_PROTINFO", 

303 0x0D: "IFLA_TXQLEN", 

304 0x0E: "IFLA_MAP", 

305 0x0F: "IFLA_WEIGHT", 

306 0x10: "IFLA_OPERSTATE", 

307 0x11: "IFLA_LINKMODE", 

308 0x12: "IFLA_LINKINFO", 

309 0x13: "IFLA_NET_NS_PID", 

310 0x14: "IFLA_IFALIAS", 

311 0x15: "IFLA_NUM_VS", 

312 0x16: "IFLA_VFINFO_LIST", 

313 0x17: "IFLA_STATS64", 

314 0x18: "IFLA_VF_PORTS", 

315 0x19: "IFLA_PORT_SELF", 

316 0x1A: "IFLA_AF_SPEC", 

317 0x1B: "IFLA_GROUP", 

318 0x1C: "IFLA_NET_NS_FD", 

319 0x1D: "IFLA_EXT_MASK", 

320 0x1E: "IFLA_PROMISCUITY", 

321 0x1F: "IFLA_NUM_TX_QUEUES", 

322 0x20: "IFLA_NUM_RX_QUEUES", 

323 0x21: "IFLA_CARRIER", 

324 0x22: "IFLA_PHYS_PORT_ID", 

325 0x23: "IFLA_CARRIER_CHANGES", 

326 0x24: "IFLA_PHYS_SWITCH_ID", 

327 0x25: "IFLA_LINK_NETNSID", 

328 0x26: "IFLA_PHYS_PORT_NAME", 

329 0x27: "IFLA_PROTO_DOWN", 

330 0x28: "IFLA_GSO_MAX_SEGS", 

331 0x29: "IFLA_GSO_MAX_SIZE", 

332 0x2A: "IFLA_PAD", 

333 0x2B: "IFLA_XDP", 

334 0x2C: "IFLA_EVENT", 

335 0x2D: "IFLA_NEW_NETNSID", 

336 0x2E: "IFLA_IF_NETNSID", 

337 0x2F: "IFLA_CARRIER_UP_COUNT", 

338 0x30: "IFLA_CARRIER_DOWN_COUNT", 

339 0x31: "IFLA_NEW_IFINDEX", 

340 0x32: "IFLA_MIN_MTU", 

341 0x33: "IFLA_MAX_MTU", 

342 0x34: "IFLA_PROP_LIST", 

343 0x35: "IFLA_ALT_IFNAME", 

344 0x36: "IFLA_PERM_ADDRESS", 

345 0x37: "IFLA_PROTO_DOWN_REASON", 

346 0x38: "IFLA_PARENT_DEV_NAME", 

347 0x39: "IFLA_PARENT_DEV_BUS_NAME", 

348 0x3A: "IFLA_GRO_MAX_SIZE", 

349 0x3B: "IFLA_TSO_MAX_SIZE", 

350 0x3C: "IFLA_TSO_MAX_SEGS", 

351 0x3D: "IFLA_ALLMULTI", 

352 }, 

353 fmt="=H", 

354 ), 

355 PadField( 

356 MultipleTypeField( 

357 [ 

358 ( 

359 # IFLA_ADDRESS 

360 MACField("rta_data", "00:00:00:00:00:00"), 

361 lambda pkt: pkt.rta_type in [0x01, 0x36], 

362 ), 

363 ( 

364 # IFLA_IFNAME 

365 StrLenField( 

366 "rta_data", b"", length_from=lambda pkt: pkt.rta_len - 4 

367 ), 

368 lambda pkt: pkt.rta_type in [0x03], 

369 ), 

370 ( 

371 # IFLA_AF_SPEC 

372 PacketListField( 

373 "rta_data", 

374 [], 

375 ifla_af_spec_rtattr, 

376 length_from=lambda pkt: pkt.rta_len - 4, 

377 ), 

378 lambda pkt: pkt.rta_type == 0x1A, 

379 ), 

380 ], 

381 XStrLenField( 

382 "rta_data", 

383 b"", 

384 length_from=lambda pkt: pkt.rta_len - 4, 

385 ), 

386 ), 

387 align=4, 

388 ), 

389 ] 

390 

391 def default_payload_class(self, payload: bytes) -> Type[Packet]: 

392 return conf.padding_layer 

393 

394 

395class ifinfomsg(Packet): 

396 fields_desc = [ 

397 ByteEnumField("ifi_family", 0, socket.AddressFamily), 

398 ByteField("res", 0), 

399 Field("ifi_type", 0, fmt="=H"), 

400 Field("ifi_index", 0, fmt="=i"), 

401 FlagsField( 

402 "ifi_flags", 

403 0, 

404 32 if BIG_ENDIAN else -32, 

405 _iff_flags, 

406 ), 

407 Field("ifi_change", 0, fmt="=I"), 

408 # Pay 

409 PacketListField("data", [], ifinfomsg_rtattr), 

410 ] 

411 

412 

413bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=16) 

414bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=17) 

415bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=18) 

416bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=19) 

417 

418 

419# ADDR messages 

420 

421 

422class ifaddrmsg_rtattr(Packet): 

423 fields_desc = [ 

424 FieldLenField( 

425 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4 

426 ), 

427 EnumField( 

428 "rta_type", 

429 0, 

430 { 

431 0x00: "IFA_UNSPEC", 

432 0x01: "IFA_ADDRESS", 

433 0x02: "IFA_LOCAL", 

434 0x03: "IFA_LABEL", 

435 0x04: "IFA_BROADCAST", 

436 0x05: "IFA_ANYCAST", 

437 0x06: "IFA_CACHEINFO", 

438 0x07: "IFA_MULTICAST", 

439 0x08: "IFA_FLAGS", 

440 0x09: "IFA_RT_PRIORITY", 

441 0x0A: "IFA_TARGET_NETNSID", 

442 0x0B: "IFA_PROTO", 

443 }, 

444 fmt="=H", 

445 ), 

446 PadField( 

447 MultipleTypeField( 

448 [ 

449 # IFA_ADDRESS, IFA_LOCAL, IFA_BROADCAST 

450 ( 

451 IPField("rta_data", "0.0.0.0"), 

452 lambda pkt: pkt.parent 

453 and pkt.parent.ifa_family == 2 

454 and pkt.rta_type in [0x01, 0x02, 0x04], 

455 ), 

456 ( 

457 IP6Field("rta_data", "::"), 

458 lambda pkt: pkt.parent 

459 and pkt.parent.ifa_family == 10 

460 and pkt.rta_type in [0x01, 0x02, 0x04], 

461 ), 

462 ( 

463 # IFA_LABEL 

464 StrLenField( 

465 "rta_data", b"", length_from=lambda pkt: pkt.rta_len - 4 

466 ), 

467 lambda pkt: pkt.rta_type in [0x03], 

468 ), 

469 ], 

470 XStrLenField( 

471 "rta_data", 

472 b"", 

473 length_from=lambda pkt: pkt.rta_len - 4, 

474 ), 

475 ), 

476 align=4, 

477 ), 

478 ] 

479 

480 def default_payload_class(self, payload: bytes) -> Type[Packet]: 

481 return conf.padding_layer 

482 

483 

484class ifaddrmsg(Packet): 

485 fields_desc = [ 

486 ByteEnumField("ifa_family", 0, socket.AddressFamily), 

487 ByteField("ifa_prefixlen", 0), 

488 FlagsField( 

489 "ifa_flags", 

490 0, 

491 -8, 

492 { 

493 0x01: "IFA_F_SECONDARY", 

494 0x02: "IFA_F_NODAD", 

495 0x04: "IFA_F_OPTIMISTIC", 

496 0x08: "IFA_F_DADFAILED", 

497 0x10: "IFA_F_HOMEADDRESS", 

498 0x20: "IFA_F_DEPRECATED", 

499 0x40: "IFA_F_TENTATIVE", 

500 0x80: "IFA_F_PERMANENT", 

501 }, 

502 ), 

503 ByteField("ifa_scope", 0), 

504 Field("ifa_index", 0, fmt="=L"), 

505 # Pay 

506 PacketListField("data", [], ifaddrmsg_rtattr), 

507 ] 

508 

509 

510bind_layers(rtmsghdr, ifaddrmsg, nlmsg_type=20) 

511bind_layers(rtmsghdr, ifaddrmsg, nlmsg_type=21) 

512bind_layers(rtmsghdr, ifaddrmsg, nlmsg_type=22) 

513 

514 

515# ROUTE messages 

516 

517 

518RT_CLASS = { 

519 0: "RT_TABLE_UNSPEC", 

520 252: "RT_TABLE_COMPAT", 

521 253: "RT_TABLE_DEFAULT", 

522 254: "RT_TABLE_MAIN", 

523 255: "RT_TABLE_LOCAL", 

524} 

525 

526 

527class rtmsg_rtattr(Packet): 

528 fields_desc = [ 

529 FieldLenField( 

530 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4 

531 ), 

532 EnumField( 

533 "rta_type", 

534 0, 

535 { 

536 0x00: "RTA_UNSPEC", 

537 0x01: "RTA_DST", 

538 0x02: "RTS_SRC", 

539 0x03: "RTS_IIF", 

540 0x04: "RTS_OIF", 

541 0x05: "RTA_GATEWAY", 

542 0x06: "RTA_PRIORITY", 

543 0x07: "RTA_PREFSRC", 

544 0x08: "RTA_METRICS", 

545 0x09: "RTA_MULTIPATH", 

546 0x0B: "RTA_FLOW", 

547 0x0C: "RTA_CACHEINFO", 

548 0x0F: "RTA_TABLE", 

549 0x10: "RTA_MARK", 

550 0x11: "RTA_MFC_STATS", 

551 0x12: "RTA_VIA", 

552 0x13: "RTA_NEWDST", 

553 0x14: "RTA_PREF", 

554 0x15: "RTA_ENCAP_TYPE", 

555 0x16: "RTA_ENCAP", 

556 0x17: "RTA_EXPIRES", 

557 0x18: "RTA_PAD", 

558 0x19: "RTA_UID", 

559 0x1A: "RTA_TTL_PROPAGATE", 

560 0x1B: "RTA_IP_PROTO", 

561 0x1C: "RTA_SPORT", 

562 0x1D: "RTA_DPORT", 

563 0x1E: "RTA_NH_ID", 

564 }, 

565 fmt="=H", 

566 ), 

567 PadField( 

568 MultipleTypeField( 

569 [ 

570 # RTA_DST, RTA_SRC, RTA_PREFSRC, RTA_GATEWAY 

571 ( 

572 IPField("rta_data", "0.0.0.0"), 

573 lambda pkt: pkt.parent 

574 and pkt.parent.rtm_family == 2 

575 and pkt.rta_type in [0x01, 0x02, 0x05, 0x07], 

576 ), 

577 ( 

578 IP6Field("rta_data", "::"), 

579 lambda pkt: pkt.parent 

580 and pkt.parent.rtm_family == 10 

581 and pkt.rta_type in [0x01, 0x02, 0x05, 0x07], 

582 ), 

583 # RTS_OIF, RTA_PRIORITY 

584 ( 

585 Field("rta_data", 0, fmt="=I"), 

586 lambda pkt: pkt.rta_type in [0x04, 0x06, 0x10], 

587 ), 

588 # RTA_TABLE 

589 ( 

590 EnumField("rta_data", 0, RT_CLASS, fmt="=I"), 

591 lambda pkt: pkt.rta_type in [0x0F], 

592 ), 

593 ], 

594 XStrLenField( 

595 "rta_data", 

596 b"", 

597 length_from=lambda pkt: pkt.rta_len - 4, 

598 ), 

599 ), 

600 align=4, 

601 ), 

602 ] 

603 

604 def default_payload_class(self, payload: bytes) -> Type[Packet]: 

605 return conf.padding_layer 

606 

607 

608class rtmsg(Packet): 

609 fields_desc = [ 

610 ByteEnumField("rtm_family", 0, socket.AddressFamily), 

611 ByteField("rtm_dst_len", 0), 

612 ByteField("rtm_src_len", 0), 

613 ByteField("rtm_tos", 0), 

614 ByteEnumField( 

615 "rtm_table", 

616 0, 

617 RT_CLASS, 

618 ), 

619 ByteEnumField( 

620 "rtm_protocol", 

621 0, 

622 { 

623 0x00: "RTPROT_UNSPEC", 

624 0x01: "RTPROT_REDIRECT", 

625 0x02: "RTPROT_KERNEL", 

626 0x03: "RTPROT_BOOT", 

627 0x04: "RTPROT_STATIC", 

628 }, 

629 ), 

630 ByteEnumField( 

631 "rtm_scope", 

632 0, 

633 { 

634 0: "RT_SCOPE_UNIVERSE", 

635 200: "RT_SCOPE_SITE", 

636 253: "RT_SCOPE_LINK", 

637 254: "RT_SCOPE_HOST", 

638 255: "RT_SCOPE_NOWHERE", 

639 }, 

640 ), 

641 ByteEnumField( 

642 "rtm_type", 

643 0, 

644 { 

645 0x00: "RTN_UNSPEC", 

646 0x01: "RTN_UNICAST", 

647 0x02: "RTN_LOCAL", 

648 0x03: "RTN_BROADCAST", 

649 0x04: "RTN_ANYCAST", 

650 0x05: "RTN_MULTICAST", 

651 0x06: "RTN_BLACKHOLE", 

652 0x07: "RTN_UNREACHABLE", 

653 0x08: "RTN_PROHIBIT", 

654 0x09: "RTN_THROW", 

655 0x0A: "RTN_NAT", 

656 0x0B: "RTN_XRESOLVE", 

657 }, 

658 ), 

659 FlagsField( 

660 "rtm_flags", 

661 0, 

662 32 if BIG_ENDIAN else -32, 

663 { 

664 0x100: "RTM_F_NOTIFY", 

665 0x200: "RTM_F_CLONED", 

666 0x400: "RTM_F_EQUALIZE", 

667 0x800: "RTM_F_PREFIX", 

668 0x1000: "RTM_F_LOOKUP_TABLE", 

669 0x2000: "RTM_F_FIB_MATCH", 

670 0x4000: "RTM_F_OFFLOAD", 

671 0x8000: "RTM_F_TRAP", 

672 0x20000000: "RTM_F_OFFLOAD_FAILED", 

673 }, 

674 ), 

675 # Pay 

676 PacketListField("data", [], rtmsg_rtattr), 

677 ] 

678 

679 

680bind_layers(rtmsghdr, rtmsg, nlmsg_type=24) 

681bind_layers(rtmsghdr, rtmsg, nlmsg_type=25) 

682bind_layers(rtmsghdr, rtmsg, nlmsg_type=26) 

683 

684 

685class rtmsghdrs(Packet): 

686 fields_desc = [ 

687 PacketListField( 

688 "msgs", 

689 [], 

690 rtmsghdr, 

691 # 65535 / len(rtmsghdr) 

692 max_count=4096, 

693 ), 

694 ] 

695 

696 

697# Utils 

698 

699 

700SOL_NETLINK = 270 

701NETLINK_EXT_ACK = 11 

702NETLINK_GET_STRICT_CHK = 12 

703 

704 

705def _sr1_rtrequest(pkt: Packet) -> List[Packet]: 

706 """ 

707 Send / Receive a rtnetlink request 

708 """ 

709 # Create socket 

710 sock = socket.socket( 

711 socket.AF_NETLINK, 

712 socket.SOCK_RAW | socket.SOCK_CLOEXEC, 

713 socket.NETLINK_ROUTE, 

714 ) 

715 # Configure socket 

716 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 32768) 

717 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1048576) 

718 try: 

719 sock.setsockopt(SOL_NETLINK, NETLINK_EXT_ACK, 1) 

720 except OSError: 

721 # Linux 4.12+ only 

722 pass 

723 sock.bind((0, 0)) # bind to kernel 

724 try: 

725 sock.setsockopt(SOL_NETLINK, NETLINK_GET_STRICT_CHK, 1) 

726 except OSError: 

727 # Linux 4.20+ only 

728 pass 

729 # Request routes 

730 sock.send(bytes(rtmsghdrs(msgs=[pkt]))) 

731 results: List[Packet] = [] 

732 try: 

733 while True: 

734 msgs = rtmsghdrs(sock.recv(65535)) 

735 if not msgs: 

736 log_loading.warning("Failed to read the routes using RTNETLINK !") 

737 return [] 

738 for msg in msgs.msgs: 

739 # Keep going until we find the end of the MULTI format 

740 if not msg.nlmsg_flags.NLM_F_MULTI or msg.nlmsg_type == 3: 

741 if msg.nlmsg_type == 3 and nlmsgerr in msg and msg.status != 0: 

742 # NLMSG_DONE with errors 

743 if msg.data and msg.data[0].rta_type == 1: 

744 log_loading.debug( 

745 "Scapy RTNETLINK error on %s: '%s'. Please report !", 

746 pkt.sprintf("%nlmsg_type%"), 

747 msg.data[0].rta_data.decode(), 

748 ) 

749 return [] 

750 return results 

751 results.append(msg) 

752 finally: 

753 sock.close() 

754 

755 

756def _get_ips(af_family=socket.AF_UNSPEC): 

757 # type: (socket.AddressFamily) -> Dict[int, List[Dict[str, Any]]] 

758 """ 

759 Return a mapping of all interfaces IP using a NETLINK socket. 

760 """ 

761 results = _sr1_rtrequest( 

762 rtmsghdr( 

763 nlmsg_type="RTM_GETADDR", 

764 nlmsg_flags="NLM_F_REQUEST+NLM_F_ROOT+NLM_F_MATCH", 

765 nlmsg_seq=int(time.time()), 

766 ) 

767 / ifaddrmsg( 

768 ifa_family=af_family, 

769 data=[], 

770 ) 

771 ) 

772 ips: Dict[int, List[Dict[str, Any]]] = {} 

773 for msg in results: 

774 ifindex = msg.ifa_index 

775 address = None 

776 family = msg.ifa_family 

777 for attr in msg.data: 

778 if attr.rta_type == 0x01: # IFA_ADDRESS 

779 address = attr.rta_data 

780 break 

781 if address is not None: 

782 data = { 

783 "af_family": family, 

784 "index": ifindex, 

785 "address": address, 

786 } 

787 if family == 10: # ipv6 

788 data["scope"] = scapy.utils6.in6_getscope(address) 

789 ips.setdefault(ifindex, list()).append(data) 

790 return ips 

791 

792 

793def _get_if_list(): 

794 # type: () -> Dict[int, Dict[str, Any]] 

795 """ 

796 Read the interfaces list using a NETLINK socket. 

797 """ 

798 results = _sr1_rtrequest( 

799 rtmsghdr( 

800 nlmsg_type="RTM_GETLINK", 

801 nlmsg_flags="NLM_F_REQUEST+NLM_F_ROOT+NLM_F_MATCH", 

802 nlmsg_seq=int(time.time()), 

803 ) 

804 / ifinfomsg( 

805 data=[], 

806 ) 

807 ) 

808 lifips = _get_ips() 

809 interfaces = {} 

810 for msg in results: 

811 ifindex = msg.ifi_index 

812 ifname = None 

813 mac = "00:00:00:00:00:00" 

814 itype = msg.ifi_type 

815 ifflags = msg.ifi_flags 

816 ips = [] 

817 for attr in msg.data: 

818 if attr.rta_type == 0x01: # IFLA_ADDRESS 

819 mac = attr.rta_data 

820 elif attr.rta_type == 0x03: # IFLA_NAME 

821 ifname = attr.rta_data[:-1].decode() 

822 if ifname is not None: 

823 if ifindex in lifips: 

824 ips = lifips[ifindex] 

825 interfaces[ifindex] = { 

826 "name": ifname, 

827 "index": ifindex, 

828 "flags": ifflags, 

829 "mac": mac, 

830 "type": itype, 

831 "ips": ips, 

832 } 

833 return interfaces 

834 

835 

836def in6_getifaddr(): 

837 # type: () -> List[Tuple[str, int, str]] 

838 """ 

839 Returns a list of 3-tuples of the form (addr, scope, iface) where 

840 'addr' is the address of scope 'scope' associated to the interface 

841 'iface'. 

842 

843 This is the list of all addresses of all interfaces available on 

844 the system. 

845 """ 

846 ips = _get_ips(af_family=socket.AF_INET6) 

847 ifaces = _get_if_list() 

848 result = [] 

849 for intip in ips.values(): 

850 for ip in intip: 

851 if ip["index"] in ifaces: 

852 result.append((ip["address"], ip["scope"], ifaces[ip["index"]]["name"])) 

853 return result 

854 

855 

856def _read_routes(af_family): 

857 # type: (socket.AddressFamily) -> List[Packet] 

858 """ 

859 Read routes using a NETLINK socket. 

860 """ 

861 results = [] 

862 for rttable in ["RT_TABLE_LOCAL", "RT_TABLE_MAIN"]: 

863 results.extend( 

864 _sr1_rtrequest( 

865 rtmsghdr( 

866 nlmsg_type="RTM_GETROUTE", 

867 nlmsg_flags="NLM_F_REQUEST+NLM_F_ROOT+NLM_F_MATCH", 

868 nlmsg_seq=int(time.time()), 

869 ) 

870 / rtmsg( 

871 rtm_family=af_family, 

872 data=[ 

873 rtmsg_rtattr(rta_type="RTA_TABLE", rta_data=rttable), 

874 ], 

875 ) 

876 ) 

877 ) 

878 return [msg for msg in results if msg.nlmsg_type == 24] # RTM_NEWROUTE 

879 

880 

881def read_routes(): 

882 # type: () -> List[Tuple[int, int, str, str, str, int]] 

883 """ 

884 Read IPv4 routes for current process 

885 """ 

886 routes = [] 

887 ifaces = _get_if_list() 

888 results = _read_routes(socket.AF_INET) 

889 for msg in results: 

890 # Omit stupid answers (some OS conf appears to lead to this) 

891 if msg.rtm_family != socket.AF_INET: 

892 continue 

893 # Process the RTM_NEWROUTE 

894 net = 0 

895 mask = itom(msg.rtm_dst_len) 

896 gw = "0.0.0.0" 

897 iface = "" 

898 addr = "0.0.0.0" 

899 metric = 0 

900 for attr in msg.data: 

901 if attr.rta_type == 0x01: # RTA_DST 

902 net = atol(attr.rta_data) 

903 elif attr.rta_type == 0x04: # RTS_OIF 

904 index = attr.rta_data 

905 if index in ifaces: 

906 iface = ifaces[index]["name"] 

907 else: 

908 iface = str(index) 

909 elif attr.rta_type == 0x05: # RTA_GATEWAY 

910 gw = attr.rta_data 

911 elif attr.rta_type == 0x06: # RTA_PRIORITY 

912 metric = attr.rta_data 

913 elif attr.rta_type == 0x07: # RTA_PREFSRC 

914 addr = attr.rta_data 

915 routes.append((net, mask, gw, iface, addr, metric)) 

916 # Add multicast routes, as those are missing by default 

917 for _iface in ifaces.values(): 

918 if _iface['flags'].MULTICAST: 

919 try: 

920 addr = next( 

921 x["address"] 

922 for x in _iface["ips"] 

923 if x["af_family"] == socket.AF_INET 

924 ) 

925 except StopIteration: 

926 continue 

927 routes.append(( 

928 0xe0000000, 0xf0000000, "0.0.0.0", _iface["name"], addr, 250 

929 )) 

930 return routes 

931 

932 

933def read_routes6(): 

934 # type: () -> List[Tuple[str, int, str, str, List[str], int]] 

935 """ 

936 Read IPv6 routes for current process 

937 """ 

938 routes = [] 

939 ifaces = _get_if_list() 

940 results = _read_routes(socket.AF_INET6) 

941 lifaddr = _get_ips(af_family=socket.AF_INET6) 

942 for msg in results: 

943 # Omit stupid answers (some OS conf appears to lead to this) 

944 if msg.rtm_family != socket.AF_INET6: 

945 continue 

946 # Process the RTM_NEWROUTE 

947 prefix = "::" 

948 plen = msg.rtm_dst_len 

949 nh = "::" 

950 index = 0 

951 iface = "" 

952 metric = 0 

953 for attr in msg.data: 

954 if attr.rta_type == 0x01: # RTA_DST 

955 prefix = attr.rta_data 

956 elif attr.rta_type == 0x04: # RTS_OIF 

957 index = attr.rta_data 

958 if index in ifaces: 

959 iface = ifaces[index]["name"] 

960 else: 

961 iface = str(index) 

962 elif attr.rta_type == 0x05: # RTA_GATEWAY 

963 nh = attr.rta_data 

964 elif attr.rta_type == 0x06: # RTA_PRIORITY 

965 metric = attr.rta_data 

966 devaddrs = ((x["address"], x["scope"], iface) for x in lifaddr.get(index, [])) 

967 cset = scapy.utils6.construct_source_candidate_set(prefix, plen, devaddrs) 

968 if cset: 

969 routes.append((prefix, plen, nh, iface, cset, metric)) 

970 # Add multicast routes, as those are missing by default 

971 for _iface in ifaces.values(): 

972 if _iface['flags'].MULTICAST: 

973 addrs = [ 

974 x["address"] 

975 for x in _iface["ips"] 

976 if x["af_family"] == socket.AF_INET6 

977 ] 

978 if not addrs: 

979 continue 

980 routes.append(( 

981 "ff00::", 8, "::", _iface["name"], addrs, 250 

982 )) 

983 return routes