1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Gabriel Potter
5
6"""
7This file implements the rtnetlink API that is used to read the network
8configuration of the machine.
9"""
10
11import socket
12import struct
13import time
14
15import scapy.utils6
16
17from scapy.consts import BIG_ENDIAN
18from scapy.config import conf
19from scapy.error import log_loading
20from scapy.packet import (
21 Packet,
22 bind_layers,
23)
24from scapy.utils import atol, itom
25
26from scapy.fields import (
27 ByteEnumField,
28 ByteField,
29 EnumField,
30 Field,
31 FieldLenField,
32 FlagsField,
33 IP6Field,
34 IPField,
35 LenField,
36 MACField,
37 MayEnd,
38 MultipleTypeField,
39 PacketListField,
40 PadField,
41 StrLenField,
42 XStrLenField,
43)
44
45from scapy.arch.common import _iff_flags
46
47# Typing imports
48from typing import (
49 Any,
50 Dict,
51 List,
52 Optional,
53 Tuple,
54 Type,
55)
56
57# from <linux/netlink.h> and <linux/rtnetlink.h>
58
59
60# Common header
61
62
63class rtmsghdr(Packet):
64 fields_desc = [
65 LenField("nlmsg_len", None, fmt="=L"),
66 EnumField(
67 "nlmsg_type",
68 0,
69 {
70 # netlink.h
71 3: "NLMSG_DONE",
72 # rtnetlink.h
73 16: "RTM_NEWLINK",
74 17: "RTM_DELLINK",
75 18: "RTM_GETLINK",
76 19: "RTM_SETLINK",
77 20: "RTM_NEWADDR",
78 21: "RTM_DELADDR",
79 22: "RTM_GETADDR",
80 # 23: unused
81 24: "RTM_NEWROUTE",
82 25: "RTM_DELROUTE",
83 26: "RTM_GETROUTE",
84 # 27: unused
85 },
86 fmt="=H",
87 ),
88 FlagsField(
89 "nlmsg_flags",
90 0,
91 16 if BIG_ENDIAN else -16,
92 {
93 0x01: "NLM_F_REQUEST",
94 0x02: "NLM_F_MULTI",
95 0x04: "NLM_F_ACK",
96 0x08: "NLM_F_ECHO",
97 0x10: "NLM_F_DUMP_INTR",
98 0x20: "NLM_F_DUMP_FILTERED",
99 # GET modifiers
100 0x100: "NLM_F_ROOT",
101 0x200: "NLM_F_MATCH",
102 0x400: "NLM_F_ATOMIC",
103 },
104 ),
105 Field("nlmsg_seq", 0, fmt="=L"),
106 Field("nlmsg_pid", 0, fmt="=L"),
107 ]
108
109 def post_build(self, pkt: bytes, pay: bytes) -> bytes:
110 pkt += pay
111 if self.nlmsg_len is None:
112 pkt = struct.pack("=L", len(pkt)) + pkt[4:]
113 return pkt
114
115 def extract_padding(self, s: bytes) -> Tuple[bytes, Optional[bytes]]:
116 return s[: self.nlmsg_len - 16], s[self.nlmsg_len - 16 :]
117
118 def answers(self, other: Packet) -> bool:
119 return bool(other.nlmsg_seq == self.nlmsg_seq)
120
121
122# DONE
123
124
125class nlmsgerr_rtattr(Packet):
126 fields_desc = [
127 FieldLenField(
128 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
129 ),
130 EnumField(
131 "rta_type",
132 0,
133 {},
134 fmt="=H",
135 ),
136 PadField(
137 MultipleTypeField(
138 [],
139 StrLenField(
140 "rta_data",
141 b"",
142 length_from=lambda pkt: pkt.rta_len - 4,
143 ),
144 ),
145 align=4,
146 ),
147 ]
148
149 def default_payload_class(self, payload: bytes) -> Type[Packet]:
150 return conf.padding_layer
151
152
153class nlmsgerr(Packet):
154 fields_desc = [
155 MayEnd(Field("status", 0, fmt="=L")),
156 # Pay
157 PacketListField("data", [], nlmsgerr_rtattr),
158 ]
159
160
161bind_layers(rtmsghdr, nlmsgerr, nlmsg_type=3)
162
163
164# LINK messages
165
166
167class ifla_af_spec_inet_rtattr(Packet):
168 fields_desc = [
169 FieldLenField(
170 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
171 ),
172 EnumField(
173 "rta_type",
174 0,
175 {
176 0x00: "IFLA_INET_UNSPEC",
177 0x01: "IFLA_INET_CONF",
178 },
179 fmt="=H",
180 ),
181 PadField(
182 MultipleTypeField(
183 [],
184 XStrLenField(
185 "rta_data",
186 b"",
187 length_from=lambda pkt: pkt.rta_len - 4,
188 ),
189 ),
190 align=4,
191 ),
192 ]
193
194 def default_payload_class(self, payload: bytes) -> Type[Packet]:
195 return conf.padding_layer
196
197
198class ifla_af_spec_inet6_rtattr(Packet):
199 fields_desc = [
200 FieldLenField(
201 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
202 ),
203 EnumField(
204 "rta_type",
205 0,
206 {
207 0x00: "IFLA_INET6_UNSPEC",
208 0x01: "IFLA_INET6_FLAGS",
209 0x02: "IFLA_INET6_CONF",
210 0x03: "IFLA_INET6_STATS",
211 0x04: "IFLA_INET6_MCAST",
212 0x05: "IFLA_INET6_CACHEINFO",
213 0x06: "IFLA_INET6_ICMP6STATS",
214 0x07: "IFLA_INET6_TOKEN",
215 0x08: "IFLA_INET6_ADDR_GEN_MODE",
216 0x09: "IFLA_INET6_RA_MTU",
217 },
218 fmt="=H",
219 ),
220 PadField(
221 MultipleTypeField(
222 [],
223 XStrLenField(
224 "rta_data",
225 b"",
226 length_from=lambda pkt: pkt.rta_len - 4,
227 ),
228 ),
229 align=4,
230 ),
231 ]
232
233 def default_payload_class(self, payload: bytes) -> Type[Packet]:
234 return conf.padding_layer
235
236
237class ifla_af_spec_rtattr(Packet):
238 fields_desc = [
239 FieldLenField(
240 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
241 ),
242 EnumField("rta_type", 0, socket.AddressFamily, fmt="=H"),
243 PadField(
244 MultipleTypeField(
245 [
246 (
247 # AF_INET
248 PacketListField(
249 "rta_data",
250 [],
251 ifla_af_spec_inet_rtattr,
252 length_from=lambda pkt: pkt.rta_len - 4,
253 ),
254 lambda pkt: pkt.rta_type == 2,
255 ),
256 (
257 # AF_INET6
258 PacketListField(
259 "rta_data",
260 [],
261 ifla_af_spec_inet6_rtattr,
262 length_from=lambda pkt: pkt.rta_len - 4,
263 ),
264 lambda pkt: pkt.rta_type == 10,
265 ),
266 ],
267 XStrLenField(
268 "rta_data",
269 b"",
270 length_from=lambda pkt: pkt.rta_len - 4,
271 ),
272 ),
273 align=4,
274 ),
275 ]
276
277 def default_payload_class(self, payload: bytes) -> Type[Packet]:
278 return conf.padding_layer
279
280
281class ifinfomsg_rtattr(Packet):
282 fields_desc = [
283 FieldLenField(
284 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
285 ),
286 EnumField(
287 "rta_type",
288 0,
289 {
290 0x00: "IFLA_UNSPEC",
291 0x01: "IFLA_ADDRESS",
292 0x02: "IFLA_BROADCAST",
293 0x03: "IFLA_IFNAME",
294 0x04: "IFLA_MTU",
295 0x05: "IFLA_LINK",
296 0x06: "IFLA_QDISC",
297 0x07: "IFLA_STATS",
298 0x08: "IFLA_COST",
299 0x09: "IFLA_PRIORITY",
300 0x0A: "IFLA_MASTER",
301 0x0B: "IFLA_WIRELESS",
302 0x0C: "IFLA_PROTINFO",
303 0x0D: "IFLA_TXQLEN",
304 0x0E: "IFLA_MAP",
305 0x0F: "IFLA_WEIGHT",
306 0x10: "IFLA_OPERSTATE",
307 0x11: "IFLA_LINKMODE",
308 0x12: "IFLA_LINKINFO",
309 0x13: "IFLA_NET_NS_PID",
310 0x14: "IFLA_IFALIAS",
311 0x15: "IFLA_NUM_VS",
312 0x16: "IFLA_VFINFO_LIST",
313 0x17: "IFLA_STATS64",
314 0x18: "IFLA_VF_PORTS",
315 0x19: "IFLA_PORT_SELF",
316 0x1A: "IFLA_AF_SPEC",
317 0x1B: "IFLA_GROUP",
318 0x1C: "IFLA_NET_NS_FD",
319 0x1D: "IFLA_EXT_MASK",
320 0x1E: "IFLA_PROMISCUITY",
321 0x1F: "IFLA_NUM_TX_QUEUES",
322 0x20: "IFLA_NUM_RX_QUEUES",
323 0x21: "IFLA_CARRIER",
324 0x22: "IFLA_PHYS_PORT_ID",
325 0x23: "IFLA_CARRIER_CHANGES",
326 0x24: "IFLA_PHYS_SWITCH_ID",
327 0x25: "IFLA_LINK_NETNSID",
328 0x26: "IFLA_PHYS_PORT_NAME",
329 0x27: "IFLA_PROTO_DOWN",
330 0x28: "IFLA_GSO_MAX_SEGS",
331 0x29: "IFLA_GSO_MAX_SIZE",
332 0x2A: "IFLA_PAD",
333 0x2B: "IFLA_XDP",
334 0x2C: "IFLA_EVENT",
335 0x2D: "IFLA_NEW_NETNSID",
336 0x2E: "IFLA_IF_NETNSID",
337 0x2F: "IFLA_CARRIER_UP_COUNT",
338 0x30: "IFLA_CARRIER_DOWN_COUNT",
339 0x31: "IFLA_NEW_IFINDEX",
340 0x32: "IFLA_MIN_MTU",
341 0x33: "IFLA_MAX_MTU",
342 0x34: "IFLA_PROP_LIST",
343 0x35: "IFLA_ALT_IFNAME",
344 0x36: "IFLA_PERM_ADDRESS",
345 0x37: "IFLA_PROTO_DOWN_REASON",
346 0x38: "IFLA_PARENT_DEV_NAME",
347 0x39: "IFLA_PARENT_DEV_BUS_NAME",
348 0x3A: "IFLA_GRO_MAX_SIZE",
349 0x3B: "IFLA_TSO_MAX_SIZE",
350 0x3C: "IFLA_TSO_MAX_SEGS",
351 0x3D: "IFLA_ALLMULTI",
352 },
353 fmt="=H",
354 ),
355 PadField(
356 MultipleTypeField(
357 [
358 (
359 # IFLA_ADDRESS
360 MACField("rta_data", "00:00:00:00:00:00"),
361 lambda pkt: pkt.rta_type in [0x01, 0x36],
362 ),
363 (
364 # IFLA_IFNAME
365 StrLenField(
366 "rta_data", b"", length_from=lambda pkt: pkt.rta_len - 4
367 ),
368 lambda pkt: pkt.rta_type in [0x03],
369 ),
370 (
371 # IFLA_AF_SPEC
372 PacketListField(
373 "rta_data",
374 [],
375 ifla_af_spec_rtattr,
376 length_from=lambda pkt: pkt.rta_len - 4,
377 ),
378 lambda pkt: pkt.rta_type == 0x1A,
379 ),
380 ],
381 XStrLenField(
382 "rta_data",
383 b"",
384 length_from=lambda pkt: pkt.rta_len - 4,
385 ),
386 ),
387 align=4,
388 ),
389 ]
390
391 def default_payload_class(self, payload: bytes) -> Type[Packet]:
392 return conf.padding_layer
393
394
395class ifinfomsg(Packet):
396 fields_desc = [
397 ByteEnumField("ifi_family", 0, socket.AddressFamily), # type: ignore
398 ByteField("res", 0),
399 Field("ifi_type", 0, fmt="=H"),
400 Field("ifi_index", 0, fmt="=i"),
401 FlagsField(
402 "ifi_flags",
403 0,
404 32 if BIG_ENDIAN else -32,
405 _iff_flags,
406 ),
407 Field("ifi_change", 0, fmt="=I"),
408 # Pay
409 PacketListField("data", [], ifinfomsg_rtattr),
410 ]
411
412
413bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=16)
414bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=17)
415bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=18)
416bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=19)
417
418
419# ADDR messages
420
421
422class ifaddrmsg_rtattr(Packet):
423 fields_desc = [
424 FieldLenField(
425 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
426 ),
427 EnumField(
428 "rta_type",
429 0,
430 {
431 0x00: "IFA_UNSPEC",
432 0x01: "IFA_ADDRESS",
433 0x02: "IFA_LOCAL",
434 0x03: "IFA_LABEL",
435 0x04: "IFA_BROADCAST",
436 0x05: "IFA_ANYCAST",
437 0x06: "IFA_CACHEINFO",
438 0x07: "IFA_MULTICAST",
439 0x08: "IFA_FLAGS",
440 0x09: "IFA_RT_PRIORITY",
441 0x0A: "IFA_TARGET_NETNSID",
442 0x0B: "IFA_PROTO",
443 },
444 fmt="=H",
445 ),
446 PadField(
447 MultipleTypeField(
448 [
449 # IFA_ADDRESS, IFA_LOCAL, IFA_BROADCAST
450 (
451 IPField("rta_data", "0.0.0.0"),
452 lambda pkt: pkt.parent
453 and pkt.parent.ifa_family == 2
454 and pkt.rta_type in [0x01, 0x02, 0x04],
455 ),
456 (
457 IP6Field("rta_data", "::"),
458 lambda pkt: pkt.parent
459 and pkt.parent.ifa_family == 10
460 and pkt.rta_type in [0x01, 0x02, 0x04],
461 ),
462 (
463 # IFA_LABEL
464 StrLenField(
465 "rta_data", b"", length_from=lambda pkt: pkt.rta_len - 4
466 ),
467 lambda pkt: pkt.rta_type in [0x03],
468 ),
469 ],
470 XStrLenField(
471 "rta_data",
472 b"",
473 length_from=lambda pkt: pkt.rta_len - 4,
474 ),
475 ),
476 align=4,
477 ),
478 ]
479
480 def default_payload_class(self, payload: bytes) -> Type[Packet]:
481 return conf.padding_layer
482
483
484class ifaddrmsg(Packet):
485 fields_desc = [
486 ByteEnumField("ifa_family", 0, socket.AddressFamily), # type: ignore
487 ByteField("ifa_prefixlen", 0),
488 FlagsField(
489 "ifa_flags",
490 0,
491 -8,
492 {
493 0x01: "IFA_F_SECONDARY",
494 0x02: "IFA_F_NODAD",
495 0x04: "IFA_F_OPTIMISTIC",
496 0x08: "IFA_F_DADFAILED",
497 0x10: "IFA_F_HOMEADDRESS",
498 0x20: "IFA_F_DEPRECATED",
499 0x40: "IFA_F_TENTATIVE",
500 0x80: "IFA_F_PERMANENT",
501 },
502 ),
503 ByteField("ifa_scope", 0),
504 Field("ifa_index", 0, fmt="=L"),
505 # Pay
506 PacketListField("data", [], ifaddrmsg_rtattr),
507 ]
508
509
510bind_layers(rtmsghdr, ifaddrmsg, nlmsg_type=20)
511bind_layers(rtmsghdr, ifaddrmsg, nlmsg_type=21)
512bind_layers(rtmsghdr, ifaddrmsg, nlmsg_type=22)
513
514
515# ROUTE messages
516
517
518RT_CLASS = {
519 0: "RT_TABLE_UNSPEC",
520 252: "RT_TABLE_COMPAT",
521 253: "RT_TABLE_DEFAULT",
522 254: "RT_TABLE_MAIN",
523 255: "RT_TABLE_LOCAL",
524}
525
526
527class rtmsg_rtattr(Packet):
528 fields_desc = [
529 FieldLenField(
530 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
531 ),
532 EnumField(
533 "rta_type",
534 0,
535 {
536 0x00: "RTA_UNSPEC",
537 0x01: "RTA_DST",
538 0x02: "RTS_SRC",
539 0x03: "RTS_IIF",
540 0x04: "RTS_OIF",
541 0x05: "RTA_GATEWAY",
542 0x06: "RTA_PRIORITY",
543 0x07: "RTA_PREFSRC",
544 0x08: "RTA_METRICS",
545 0x09: "RTA_MULTIPATH",
546 0x0B: "RTA_FLOW",
547 0x0C: "RTA_CACHEINFO",
548 0x0F: "RTA_TABLE",
549 0x10: "RTA_MARK",
550 0x11: "RTA_MFC_STATS",
551 0x12: "RTA_VIA",
552 0x13: "RTA_NEWDST",
553 0x14: "RTA_PREF",
554 0x15: "RTA_ENCAP_TYPE",
555 0x16: "RTA_ENCAP",
556 0x17: "RTA_EXPIRES",
557 0x18: "RTA_PAD",
558 0x19: "RTA_UID",
559 0x1A: "RTA_TTL_PROPAGATE",
560 0x1B: "RTA_IP_PROTO",
561 0x1C: "RTA_SPORT",
562 0x1D: "RTA_DPORT",
563 0x1E: "RTA_NH_ID",
564 },
565 fmt="=H",
566 ),
567 PadField(
568 MultipleTypeField(
569 [
570 # RTA_DST, RTA_SRC, RTA_PREFSRC, RTA_GATEWAY
571 (
572 IPField("rta_data", "0.0.0.0"),
573 lambda pkt: pkt.parent
574 and pkt.parent.rtm_family == 2
575 and pkt.rta_type in [0x01, 0x02, 0x05, 0x07],
576 ),
577 (
578 IP6Field("rta_data", "::"),
579 lambda pkt: pkt.parent
580 and pkt.parent.rtm_family == 10
581 and pkt.rta_type in [0x01, 0x02, 0x05, 0x07],
582 ),
583 # RTS_OIF, RTA_PRIORITY
584 (
585 Field("rta_data", 0, fmt="=I"),
586 lambda pkt: pkt.rta_type in [0x04, 0x06, 0x10],
587 ),
588 # RTA_TABLE
589 (
590 EnumField("rta_data", 0, RT_CLASS, fmt="=I"),
591 lambda pkt: pkt.rta_type in [0x0F],
592 ),
593 ],
594 XStrLenField(
595 "rta_data",
596 b"",
597 length_from=lambda pkt: pkt.rta_len - 4,
598 ),
599 ),
600 align=4,
601 ),
602 ]
603
604 def default_payload_class(self, payload: bytes) -> Type[Packet]:
605 return conf.padding_layer
606
607
608class rtmsg(Packet):
609 fields_desc = [
610 ByteEnumField("rtm_family", 0, socket.AddressFamily), # type: ignore
611 ByteField("rtm_dst_len", 0),
612 ByteField("rtm_src_len", 0),
613 ByteField("rtm_tos", 0),
614 ByteEnumField(
615 "rtm_table",
616 0,
617 RT_CLASS,
618 ),
619 ByteEnumField(
620 "rtm_protocol",
621 0,
622 {
623 0x00: "RTPROT_UNSPEC",
624 0x01: "RTPROT_REDIRECT",
625 0x02: "RTPROT_KERNEL",
626 0x03: "RTPROT_BOOT",
627 0x04: "RTPROT_STATIC",
628 },
629 ),
630 ByteEnumField(
631 "rtm_scope",
632 0,
633 {
634 0: "RT_SCOPE_UNIVERSE",
635 200: "RT_SCOPE_SITE",
636 253: "RT_SCOPE_LINK",
637 254: "RT_SCOPE_HOST",
638 255: "RT_SCOPE_NOWHERE",
639 },
640 ),
641 ByteEnumField(
642 "rtm_type",
643 0,
644 {
645 0x00: "RTN_UNSPEC",
646 0x01: "RTN_UNICAST",
647 0x02: "RTN_LOCAL",
648 0x03: "RTN_BROADCAST",
649 0x04: "RTN_ANYCAST",
650 0x05: "RTN_MULTICAST",
651 0x06: "RTN_BLACKHOLE",
652 0x07: "RTN_UNREACHABLE",
653 0x08: "RTN_PROHIBIT",
654 0x09: "RTN_THROW",
655 0x0A: "RTN_NAT",
656 0x0B: "RTN_XRESOLVE",
657 },
658 ),
659 FlagsField(
660 "rtm_flags",
661 0,
662 32 if BIG_ENDIAN else -32,
663 {
664 0x100: "RTM_F_NOTIFY",
665 0x200: "RTM_F_CLONED",
666 0x400: "RTM_F_EQUALIZE",
667 0x800: "RTM_F_PREFIX",
668 0x1000: "RTM_F_LOOKUP_TABLE",
669 0x2000: "RTM_F_FIB_MATCH",
670 0x4000: "RTM_F_OFFLOAD",
671 0x8000: "RTM_F_TRAP",
672 0x20000000: "RTM_F_OFFLOAD_FAILED",
673 },
674 ),
675 # Pay
676 PacketListField("data", [], rtmsg_rtattr),
677 ]
678
679
680bind_layers(rtmsghdr, rtmsg, nlmsg_type=24)
681bind_layers(rtmsghdr, rtmsg, nlmsg_type=25)
682bind_layers(rtmsghdr, rtmsg, nlmsg_type=26)
683
684
685class rtmsghdrs(Packet):
686 fields_desc = [
687 PacketListField(
688 "msgs",
689 [],
690 rtmsghdr,
691 # 65535 / len(rtmsghdr)
692 max_count=4096,
693 ),
694 ]
695
696
697# Utils
698
699
700SOL_NETLINK = 270
701NETLINK_EXT_ACK = 11
702NETLINK_GET_STRICT_CHK = 12
703
704
705def _sr1_rtrequest(pkt: Packet) -> List[Packet]:
706 """
707 Send / Receive a rtnetlink request
708 """
709 # Create socket
710 sock = socket.socket(
711 socket.AF_NETLINK,
712 socket.SOCK_RAW | socket.SOCK_CLOEXEC,
713 socket.NETLINK_ROUTE,
714 )
715 # Configure socket
716 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 32768)
717 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1048576)
718 sock.setsockopt(SOL_NETLINK, NETLINK_EXT_ACK, 1)
719 sock.bind((0, 0)) # bind to kernel
720 sock.setsockopt(SOL_NETLINK, NETLINK_GET_STRICT_CHK, 1)
721 # Request routes
722 sock.send(bytes(rtmsghdrs(msgs=[pkt])))
723 results: List[Packet] = []
724 try:
725 while True:
726 msgs = rtmsghdrs(sock.recv(65535))
727 if not msgs:
728 log_loading.warning("Failed to read the routes using RTNETLINK !")
729 return []
730 for msg in msgs.msgs:
731 # Keep going until we find the end of the MULTI format
732 if not msg.nlmsg_flags.NLM_F_MULTI or msg.nlmsg_type == 3:
733 if msg.nlmsg_type == 3 and nlmsgerr in msg and msg.status != 0:
734 # NLMSG_DONE with errors
735 if msg.data and msg.data[0].rta_type == 1:
736 log_loading.warning(
737 "Scapy RTNETLINK error on %s: '%s'. Please report !",
738 pkt.sprintf("%nlmsg_type%"),
739 msg.data[0].rta_data.decode(),
740 )
741 return []
742 return results
743 results.append(msg)
744 finally:
745 sock.close()
746
747
748def _get_ips(af_family=socket.AF_UNSPEC):
749 # type: (socket.AddressFamily) -> Dict[int, List[Dict[str, Any]]]
750 """
751 Return a mapping of all interfaces IP using a NETLINK socket.
752 """
753 results = _sr1_rtrequest(
754 rtmsghdr(
755 nlmsg_type="RTM_GETADDR",
756 nlmsg_flags="NLM_F_REQUEST+NLM_F_ROOT+NLM_F_MATCH",
757 nlmsg_seq=int(time.time()),
758 )
759 / ifaddrmsg(
760 ifa_family=af_family,
761 data=[],
762 )
763 )
764 ips: Dict[int, List[Dict[str, Any]]] = {}
765 for msg in results:
766 ifindex = msg.ifa_index
767 address = None
768 family = msg.ifa_family
769 for attr in msg.data:
770 if attr.rta_type == 0x01: # IFA_ADDRESS
771 address = attr.rta_data
772 break
773 if address is not None:
774 data = {
775 "af_family": family,
776 "index": ifindex,
777 "address": address,
778 }
779 if family == 10: # ipv6
780 data["scope"] = scapy.utils6.in6_getscope(address)
781 ips.setdefault(ifindex, list()).append(data)
782 return ips
783
784
785def _get_if_list():
786 # type: () -> Dict[int, Dict[str, Any]]
787 """
788 Read the interfaces list using a NETLINK socket.
789 """
790 results = _sr1_rtrequest(
791 rtmsghdr(
792 nlmsg_type="RTM_GETLINK",
793 nlmsg_flags="NLM_F_REQUEST+NLM_F_ROOT+NLM_F_MATCH",
794 nlmsg_seq=int(time.time()),
795 )
796 / ifinfomsg(
797 data=[],
798 )
799 )
800 lifips = _get_ips()
801 interfaces = {}
802 for msg in results:
803 ifindex = msg.ifi_index
804 ifname = None
805 mac = "00:00:00:00:00:00"
806 ifflags = msg.ifi_flags
807 ips = []
808 for attr in msg.data:
809 if attr.rta_type == 0x01: # IFLA_ADDRESS
810 mac = attr.rta_data
811 elif attr.rta_type == 0x03: # IFLA_NAME
812 ifname = attr.rta_data[:-1].decode()
813 if ifname is not None:
814 if ifindex in lifips:
815 ips = lifips[ifindex]
816 interfaces[ifindex] = {
817 "name": ifname,
818 "index": ifindex,
819 "flags": ifflags,
820 "mac": mac,
821 "ips": ips,
822 }
823 return interfaces
824
825
826def in6_getifaddr():
827 # type: () -> List[Tuple[str, int, str]]
828 """
829 Returns a list of 3-tuples of the form (addr, scope, iface) where
830 'addr' is the address of scope 'scope' associated to the interface
831 'iface'.
832
833 This is the list of all addresses of all interfaces available on
834 the system.
835 """
836 ips = _get_ips(af_family=socket.AF_INET6)
837 ifaces = _get_if_list()
838 result = []
839 for intip in ips.values():
840 for ip in intip:
841 if ip["index"] in ifaces:
842 result.append((ip["address"], ip["scope"], ifaces[ip["index"]]["name"]))
843 return result
844
845
846def _read_routes(af_family):
847 # type: (socket.AddressFamily) -> List[Packet]
848 """
849 Read routes using a NETLINK socket.
850 """
851 results = []
852 for rttable in ["RT_TABLE_LOCAL", "RT_TABLE_MAIN"]:
853 results.extend(
854 _sr1_rtrequest(
855 rtmsghdr(
856 nlmsg_type="RTM_GETROUTE",
857 nlmsg_flags="NLM_F_REQUEST+NLM_F_ROOT+NLM_F_MATCH",
858 nlmsg_seq=int(time.time()),
859 )
860 / rtmsg(
861 rtm_family=af_family,
862 data=[
863 rtmsg_rtattr(rta_type="RTA_TABLE", rta_data=rttable),
864 ],
865 )
866 )
867 )
868 return [msg for msg in results if msg.nlmsg_type == 24] # RTM_NEWROUTE
869
870
871def read_routes():
872 # type: () -> List[Tuple[int, int, str, str, str, int]]
873 """
874 Read IPv4 routes for current process
875 """
876 routes = []
877 ifaces = _get_if_list()
878 results = _read_routes(socket.AF_INET)
879 for msg in results:
880 # Process the RTM_NEWROUTE
881 net = 0
882 mask = itom(msg.rtm_dst_len)
883 gw = "0.0.0.0"
884 iface = ""
885 addr = "0.0.0.0"
886 metric = 0
887 for attr in msg.data:
888 if attr.rta_type == 0x01: # RTA_DST
889 net = atol(attr.rta_data)
890 elif attr.rta_type == 0x04: # RTS_OIF
891 index = attr.rta_data
892 if index in ifaces:
893 iface = ifaces[index]["name"]
894 else:
895 iface = str(index)
896 elif attr.rta_type == 0x05: # RTA_GATEWAY
897 gw = attr.rta_data
898 elif attr.rta_type == 0x06: # RTA_PRIORITY
899 metric = attr.rta_data
900 elif attr.rta_type == 0x07: # RTA_PREFSRC
901 addr = attr.rta_data
902 routes.append((net, mask, gw, iface, addr, metric))
903 return routes
904
905
906def read_routes6():
907 # type: () -> List[Tuple[str, int, str, str, List[str], int]]
908 """
909 Read IPv6 routes for current process
910 """
911 routes = []
912 ifaces = _get_if_list()
913 results = _read_routes(socket.AF_INET6)
914 lifaddr = _get_ips(af_family=socket.AF_INET6)
915 for msg in results:
916 # Process the RTM_NEWROUTE
917 prefix = "::"
918 plen = msg.rtm_dst_len
919 nh = "::"
920 index = 0
921 iface = ""
922 metric = 0
923 for attr in msg.data:
924 if attr.rta_type == 0x01: # RTA_DST
925 prefix = attr.rta_data
926 elif attr.rta_type == 0x04: # RTS_OIF
927 index = attr.rta_data
928 if index in ifaces:
929 iface = ifaces[index]["name"]
930 else:
931 iface = str(index)
932 elif attr.rta_type == 0x05: # RTA_GATEWAY
933 nh = attr.rta_data
934 elif attr.rta_type == 0x06: # RTA_PRIORITY
935 metric = attr.rta_data
936 devaddrs = ((x["address"], x["scope"], iface) for x in lifaddr.get(index, []))
937 cset = scapy.utils6.construct_source_candidate_set(prefix, plen, devaddrs)
938 if cset:
939 routes.append((prefix, plen, nh, iface, cset, metric))
940 return routes