1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Gabriel Potter
5
6"""
7This file implements the rtnetlink API that is used to read the network
8configuration of the machine.
9"""
10
11import socket
12import struct
13import time
14
15import scapy.utils6
16
17from scapy.consts import BIG_ENDIAN
18from scapy.config import conf
19from scapy.error import log_loading
20from scapy.packet import (
21 Packet,
22 bind_layers,
23)
24from scapy.utils import atol, itom
25
26from scapy.fields import (
27 ByteEnumField,
28 ByteField,
29 EnumField,
30 Field,
31 FieldLenField,
32 FlagsField,
33 IP6Field,
34 IPField,
35 LenField,
36 MACField,
37 MayEnd,
38 MultipleTypeField,
39 PacketListField,
40 PadField,
41 StrLenField,
42 XStrLenField,
43)
44
45from scapy.arch.common import _iff_flags
46
47# Typing imports
48from typing import (
49 Any,
50 Dict,
51 List,
52 Optional,
53 Tuple,
54 Type,
55)
56
57# from <linux/netlink.h> and <linux/rtnetlink.h>
58
59
60# Common header
61
62
63class rtmsghdr(Packet):
64 fields_desc = [
65 LenField("nlmsg_len", None, fmt="=L"),
66 EnumField(
67 "nlmsg_type",
68 0,
69 {
70 # netlink.h
71 3: "NLMSG_DONE",
72 # rtnetlink.h
73 16: "RTM_NEWLINK",
74 17: "RTM_DELLINK",
75 18: "RTM_GETLINK",
76 19: "RTM_SETLINK",
77 20: "RTM_NEWADDR",
78 21: "RTM_DELADDR",
79 22: "RTM_GETADDR",
80 # 23: unused
81 24: "RTM_NEWROUTE",
82 25: "RTM_DELROUTE",
83 26: "RTM_GETROUTE",
84 # 27: unused
85 },
86 fmt="=H",
87 ),
88 FlagsField(
89 "nlmsg_flags",
90 0,
91 16 if BIG_ENDIAN else -16,
92 {
93 0x01: "NLM_F_REQUEST",
94 0x02: "NLM_F_MULTI",
95 0x04: "NLM_F_ACK",
96 0x08: "NLM_F_ECHO",
97 0x10: "NLM_F_DUMP_INTR",
98 0x20: "NLM_F_DUMP_FILTERED",
99 # GET modifiers
100 0x100: "NLM_F_ROOT",
101 0x200: "NLM_F_MATCH",
102 0x400: "NLM_F_ATOMIC",
103 },
104 ),
105 Field("nlmsg_seq", 0, fmt="=L"),
106 Field("nlmsg_pid", 0, fmt="=L"),
107 ]
108
109 def post_build(self, pkt: bytes, pay: bytes) -> bytes:
110 pkt += pay
111 if self.nlmsg_len is None:
112 pkt = struct.pack("=L", len(pkt)) + pkt[4:]
113 return pkt
114
115 def extract_padding(self, s: bytes) -> Tuple[bytes, Optional[bytes]]:
116 return s[: self.nlmsg_len - 16], s[self.nlmsg_len - 16 :]
117
118 def answers(self, other: Packet) -> bool:
119 return bool(other.nlmsg_seq == self.nlmsg_seq)
120
121
122# DONE
123
124
125class nlmsgerr_rtattr(Packet):
126 fields_desc = [
127 FieldLenField(
128 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
129 ),
130 EnumField(
131 "rta_type",
132 0,
133 {},
134 fmt="=H",
135 ),
136 PadField(
137 MultipleTypeField(
138 [],
139 StrLenField(
140 "rta_data",
141 b"",
142 length_from=lambda pkt: pkt.rta_len - 4,
143 ),
144 ),
145 align=4,
146 ),
147 ]
148
149 def default_payload_class(self, payload: bytes) -> Type[Packet]:
150 return conf.padding_layer
151
152
153class nlmsgerr(Packet):
154 fields_desc = [
155 MayEnd(Field("status", 0, fmt="=L")),
156 # Pay
157 PacketListField("data", [], nlmsgerr_rtattr),
158 ]
159
160
161bind_layers(rtmsghdr, nlmsgerr, nlmsg_type=3)
162
163
164# LINK messages
165
166
167class ifla_af_spec_inet_rtattr(Packet):
168 fields_desc = [
169 FieldLenField(
170 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
171 ),
172 EnumField(
173 "rta_type",
174 0,
175 {
176 0x00: "IFLA_INET_UNSPEC",
177 0x01: "IFLA_INET_CONF",
178 },
179 fmt="=H",
180 ),
181 PadField(
182 MultipleTypeField(
183 [],
184 XStrLenField(
185 "rta_data",
186 b"",
187 length_from=lambda pkt: pkt.rta_len - 4,
188 ),
189 ),
190 align=4,
191 ),
192 ]
193
194 def default_payload_class(self, payload: bytes) -> Type[Packet]:
195 return conf.padding_layer
196
197
198class ifla_af_spec_inet6_rtattr(Packet):
199 fields_desc = [
200 FieldLenField(
201 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
202 ),
203 EnumField(
204 "rta_type",
205 0,
206 {
207 0x00: "IFLA_INET6_UNSPEC",
208 0x01: "IFLA_INET6_FLAGS",
209 0x02: "IFLA_INET6_CONF",
210 0x03: "IFLA_INET6_STATS",
211 0x04: "IFLA_INET6_MCAST",
212 0x05: "IFLA_INET6_CACHEINFO",
213 0x06: "IFLA_INET6_ICMP6STATS",
214 0x07: "IFLA_INET6_TOKEN",
215 0x08: "IFLA_INET6_ADDR_GEN_MODE",
216 0x09: "IFLA_INET6_RA_MTU",
217 },
218 fmt="=H",
219 ),
220 PadField(
221 MultipleTypeField(
222 [],
223 XStrLenField(
224 "rta_data",
225 b"",
226 length_from=lambda pkt: pkt.rta_len - 4,
227 ),
228 ),
229 align=4,
230 ),
231 ]
232
233 def default_payload_class(self, payload: bytes) -> Type[Packet]:
234 return conf.padding_layer
235
236
237class ifla_af_spec_rtattr(Packet):
238 fields_desc = [
239 FieldLenField(
240 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
241 ),
242 EnumField("rta_type", 0, socket.AddressFamily, fmt="=H"),
243 PadField(
244 MultipleTypeField(
245 [
246 (
247 # AF_INET
248 PacketListField(
249 "rta_data",
250 [],
251 ifla_af_spec_inet_rtattr,
252 length_from=lambda pkt: pkt.rta_len - 4,
253 ),
254 lambda pkt: pkt.rta_type == 2,
255 ),
256 (
257 # AF_INET6
258 PacketListField(
259 "rta_data",
260 [],
261 ifla_af_spec_inet6_rtattr,
262 length_from=lambda pkt: pkt.rta_len - 4,
263 ),
264 lambda pkt: pkt.rta_type == 10,
265 ),
266 ],
267 XStrLenField(
268 "rta_data",
269 b"",
270 length_from=lambda pkt: pkt.rta_len - 4,
271 ),
272 ),
273 align=4,
274 ),
275 ]
276
277 def default_payload_class(self, payload: bytes) -> Type[Packet]:
278 return conf.padding_layer
279
280
281class ifinfomsg_rtattr(Packet):
282 fields_desc = [
283 FieldLenField(
284 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
285 ),
286 EnumField(
287 "rta_type",
288 0,
289 {
290 0x00: "IFLA_UNSPEC",
291 0x01: "IFLA_ADDRESS",
292 0x02: "IFLA_BROADCAST",
293 0x03: "IFLA_IFNAME",
294 0x04: "IFLA_MTU",
295 0x05: "IFLA_LINK",
296 0x06: "IFLA_QDISC",
297 0x07: "IFLA_STATS",
298 0x08: "IFLA_COST",
299 0x09: "IFLA_PRIORITY",
300 0x0A: "IFLA_MASTER",
301 0x0B: "IFLA_WIRELESS",
302 0x0C: "IFLA_PROTINFO",
303 0x0D: "IFLA_TXQLEN",
304 0x0E: "IFLA_MAP",
305 0x0F: "IFLA_WEIGHT",
306 0x10: "IFLA_OPERSTATE",
307 0x11: "IFLA_LINKMODE",
308 0x12: "IFLA_LINKINFO",
309 0x13: "IFLA_NET_NS_PID",
310 0x14: "IFLA_IFALIAS",
311 0x15: "IFLA_NUM_VS",
312 0x16: "IFLA_VFINFO_LIST",
313 0x17: "IFLA_STATS64",
314 0x18: "IFLA_VF_PORTS",
315 0x19: "IFLA_PORT_SELF",
316 0x1A: "IFLA_AF_SPEC",
317 0x1B: "IFLA_GROUP",
318 0x1C: "IFLA_NET_NS_FD",
319 0x1D: "IFLA_EXT_MASK",
320 0x1E: "IFLA_PROMISCUITY",
321 0x1F: "IFLA_NUM_TX_QUEUES",
322 0x20: "IFLA_NUM_RX_QUEUES",
323 0x21: "IFLA_CARRIER",
324 0x22: "IFLA_PHYS_PORT_ID",
325 0x23: "IFLA_CARRIER_CHANGES",
326 0x24: "IFLA_PHYS_SWITCH_ID",
327 0x25: "IFLA_LINK_NETNSID",
328 0x26: "IFLA_PHYS_PORT_NAME",
329 0x27: "IFLA_PROTO_DOWN",
330 0x28: "IFLA_GSO_MAX_SEGS",
331 0x29: "IFLA_GSO_MAX_SIZE",
332 0x2A: "IFLA_PAD",
333 0x2B: "IFLA_XDP",
334 0x2C: "IFLA_EVENT",
335 0x2D: "IFLA_NEW_NETNSID",
336 0x2E: "IFLA_IF_NETNSID",
337 0x2F: "IFLA_CARRIER_UP_COUNT",
338 0x30: "IFLA_CARRIER_DOWN_COUNT",
339 0x31: "IFLA_NEW_IFINDEX",
340 0x32: "IFLA_MIN_MTU",
341 0x33: "IFLA_MAX_MTU",
342 0x34: "IFLA_PROP_LIST",
343 0x35: "IFLA_ALT_IFNAME",
344 0x36: "IFLA_PERM_ADDRESS",
345 0x37: "IFLA_PROTO_DOWN_REASON",
346 0x38: "IFLA_PARENT_DEV_NAME",
347 0x39: "IFLA_PARENT_DEV_BUS_NAME",
348 0x3A: "IFLA_GRO_MAX_SIZE",
349 0x3B: "IFLA_TSO_MAX_SIZE",
350 0x3C: "IFLA_TSO_MAX_SEGS",
351 0x3D: "IFLA_ALLMULTI",
352 },
353 fmt="=H",
354 ),
355 PadField(
356 MultipleTypeField(
357 [
358 (
359 # IFLA_ADDRESS
360 MACField("rta_data", "00:00:00:00:00:00"),
361 lambda pkt: pkt.rta_type in [0x01, 0x36],
362 ),
363 (
364 # IFLA_IFNAME
365 StrLenField(
366 "rta_data", b"", length_from=lambda pkt: pkt.rta_len - 4
367 ),
368 lambda pkt: pkt.rta_type in [0x03],
369 ),
370 (
371 # IFLA_AF_SPEC
372 PacketListField(
373 "rta_data",
374 [],
375 ifla_af_spec_rtattr,
376 length_from=lambda pkt: pkt.rta_len - 4,
377 ),
378 lambda pkt: pkt.rta_type == 0x1A,
379 ),
380 ],
381 XStrLenField(
382 "rta_data",
383 b"",
384 length_from=lambda pkt: pkt.rta_len - 4,
385 ),
386 ),
387 align=4,
388 ),
389 ]
390
391 def default_payload_class(self, payload: bytes) -> Type[Packet]:
392 return conf.padding_layer
393
394
395class ifinfomsg(Packet):
396 fields_desc = [
397 ByteEnumField("ifi_family", 0, socket.AddressFamily),
398 ByteField("res", 0),
399 Field("ifi_type", 0, fmt="=H"),
400 Field("ifi_index", 0, fmt="=i"),
401 FlagsField(
402 "ifi_flags",
403 0,
404 32 if BIG_ENDIAN else -32,
405 _iff_flags,
406 ),
407 Field("ifi_change", 0, fmt="=I"),
408 # Pay
409 PacketListField("data", [], ifinfomsg_rtattr),
410 ]
411
412
413bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=16)
414bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=17)
415bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=18)
416bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=19)
417
418
419# ADDR messages
420
421
422class ifaddrmsg_rtattr(Packet):
423 fields_desc = [
424 FieldLenField(
425 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
426 ),
427 EnumField(
428 "rta_type",
429 0,
430 {
431 0x00: "IFA_UNSPEC",
432 0x01: "IFA_ADDRESS",
433 0x02: "IFA_LOCAL",
434 0x03: "IFA_LABEL",
435 0x04: "IFA_BROADCAST",
436 0x05: "IFA_ANYCAST",
437 0x06: "IFA_CACHEINFO",
438 0x07: "IFA_MULTICAST",
439 0x08: "IFA_FLAGS",
440 0x09: "IFA_RT_PRIORITY",
441 0x0A: "IFA_TARGET_NETNSID",
442 0x0B: "IFA_PROTO",
443 },
444 fmt="=H",
445 ),
446 PadField(
447 MultipleTypeField(
448 [
449 # IFA_ADDRESS, IFA_LOCAL, IFA_BROADCAST
450 (
451 IPField("rta_data", "0.0.0.0"),
452 lambda pkt: pkt.parent
453 and pkt.parent.ifa_family == 2
454 and pkt.rta_type in [0x01, 0x02, 0x04],
455 ),
456 (
457 IP6Field("rta_data", "::"),
458 lambda pkt: pkt.parent
459 and pkt.parent.ifa_family == 10
460 and pkt.rta_type in [0x01, 0x02, 0x04],
461 ),
462 (
463 # IFA_LABEL
464 StrLenField(
465 "rta_data", b"", length_from=lambda pkt: pkt.rta_len - 4
466 ),
467 lambda pkt: pkt.rta_type in [0x03],
468 ),
469 ],
470 XStrLenField(
471 "rta_data",
472 b"",
473 length_from=lambda pkt: pkt.rta_len - 4,
474 ),
475 ),
476 align=4,
477 ),
478 ]
479
480 def default_payload_class(self, payload: bytes) -> Type[Packet]:
481 return conf.padding_layer
482
483
484class ifaddrmsg(Packet):
485 fields_desc = [
486 ByteEnumField("ifa_family", 0, socket.AddressFamily),
487 ByteField("ifa_prefixlen", 0),
488 FlagsField(
489 "ifa_flags",
490 0,
491 -8,
492 {
493 0x01: "IFA_F_SECONDARY",
494 0x02: "IFA_F_NODAD",
495 0x04: "IFA_F_OPTIMISTIC",
496 0x08: "IFA_F_DADFAILED",
497 0x10: "IFA_F_HOMEADDRESS",
498 0x20: "IFA_F_DEPRECATED",
499 0x40: "IFA_F_TENTATIVE",
500 0x80: "IFA_F_PERMANENT",
501 },
502 ),
503 ByteField("ifa_scope", 0),
504 Field("ifa_index", 0, fmt="=L"),
505 # Pay
506 PacketListField("data", [], ifaddrmsg_rtattr),
507 ]
508
509
510bind_layers(rtmsghdr, ifaddrmsg, nlmsg_type=20)
511bind_layers(rtmsghdr, ifaddrmsg, nlmsg_type=21)
512bind_layers(rtmsghdr, ifaddrmsg, nlmsg_type=22)
513
514
515# ROUTE messages
516
517
518RT_CLASS = {
519 0: "RT_TABLE_UNSPEC",
520 252: "RT_TABLE_COMPAT",
521 253: "RT_TABLE_DEFAULT",
522 254: "RT_TABLE_MAIN",
523 255: "RT_TABLE_LOCAL",
524}
525
526
527class rtmsg_rtattr(Packet):
528 fields_desc = [
529 FieldLenField(
530 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
531 ),
532 EnumField(
533 "rta_type",
534 0,
535 {
536 0x00: "RTA_UNSPEC",
537 0x01: "RTA_DST",
538 0x02: "RTS_SRC",
539 0x03: "RTS_IIF",
540 0x04: "RTS_OIF",
541 0x05: "RTA_GATEWAY",
542 0x06: "RTA_PRIORITY",
543 0x07: "RTA_PREFSRC",
544 0x08: "RTA_METRICS",
545 0x09: "RTA_MULTIPATH",
546 0x0B: "RTA_FLOW",
547 0x0C: "RTA_CACHEINFO",
548 0x0F: "RTA_TABLE",
549 0x10: "RTA_MARK",
550 0x11: "RTA_MFC_STATS",
551 0x12: "RTA_VIA",
552 0x13: "RTA_NEWDST",
553 0x14: "RTA_PREF",
554 0x15: "RTA_ENCAP_TYPE",
555 0x16: "RTA_ENCAP",
556 0x17: "RTA_EXPIRES",
557 0x18: "RTA_PAD",
558 0x19: "RTA_UID",
559 0x1A: "RTA_TTL_PROPAGATE",
560 0x1B: "RTA_IP_PROTO",
561 0x1C: "RTA_SPORT",
562 0x1D: "RTA_DPORT",
563 0x1E: "RTA_NH_ID",
564 },
565 fmt="=H",
566 ),
567 PadField(
568 MultipleTypeField(
569 [
570 # RTA_DST, RTA_SRC, RTA_PREFSRC, RTA_GATEWAY
571 (
572 IPField("rta_data", "0.0.0.0"),
573 lambda pkt: pkt.parent
574 and pkt.parent.rtm_family == 2
575 and pkt.rta_type in [0x01, 0x02, 0x05, 0x07],
576 ),
577 (
578 IP6Field("rta_data", "::"),
579 lambda pkt: pkt.parent
580 and pkt.parent.rtm_family == 10
581 and pkt.rta_type in [0x01, 0x02, 0x05, 0x07],
582 ),
583 # RTS_OIF, RTA_PRIORITY
584 (
585 Field("rta_data", 0, fmt="=I"),
586 lambda pkt: pkt.rta_type in [0x04, 0x06, 0x10],
587 ),
588 # RTA_TABLE
589 (
590 EnumField("rta_data", 0, RT_CLASS, fmt="=I"),
591 lambda pkt: pkt.rta_type in [0x0F],
592 ),
593 ],
594 XStrLenField(
595 "rta_data",
596 b"",
597 length_from=lambda pkt: pkt.rta_len - 4,
598 ),
599 ),
600 align=4,
601 ),
602 ]
603
604 def default_payload_class(self, payload: bytes) -> Type[Packet]:
605 return conf.padding_layer
606
607
608class rtmsg(Packet):
609 fields_desc = [
610 ByteEnumField("rtm_family", 0, socket.AddressFamily),
611 ByteField("rtm_dst_len", 0),
612 ByteField("rtm_src_len", 0),
613 ByteField("rtm_tos", 0),
614 ByteEnumField(
615 "rtm_table",
616 0,
617 RT_CLASS,
618 ),
619 ByteEnumField(
620 "rtm_protocol",
621 0,
622 {
623 0x00: "RTPROT_UNSPEC",
624 0x01: "RTPROT_REDIRECT",
625 0x02: "RTPROT_KERNEL",
626 0x03: "RTPROT_BOOT",
627 0x04: "RTPROT_STATIC",
628 },
629 ),
630 ByteEnumField(
631 "rtm_scope",
632 0,
633 {
634 0: "RT_SCOPE_UNIVERSE",
635 200: "RT_SCOPE_SITE",
636 253: "RT_SCOPE_LINK",
637 254: "RT_SCOPE_HOST",
638 255: "RT_SCOPE_NOWHERE",
639 },
640 ),
641 ByteEnumField(
642 "rtm_type",
643 0,
644 {
645 0x00: "RTN_UNSPEC",
646 0x01: "RTN_UNICAST",
647 0x02: "RTN_LOCAL",
648 0x03: "RTN_BROADCAST",
649 0x04: "RTN_ANYCAST",
650 0x05: "RTN_MULTICAST",
651 0x06: "RTN_BLACKHOLE",
652 0x07: "RTN_UNREACHABLE",
653 0x08: "RTN_PROHIBIT",
654 0x09: "RTN_THROW",
655 0x0A: "RTN_NAT",
656 0x0B: "RTN_XRESOLVE",
657 },
658 ),
659 FlagsField(
660 "rtm_flags",
661 0,
662 32 if BIG_ENDIAN else -32,
663 {
664 0x100: "RTM_F_NOTIFY",
665 0x200: "RTM_F_CLONED",
666 0x400: "RTM_F_EQUALIZE",
667 0x800: "RTM_F_PREFIX",
668 0x1000: "RTM_F_LOOKUP_TABLE",
669 0x2000: "RTM_F_FIB_MATCH",
670 0x4000: "RTM_F_OFFLOAD",
671 0x8000: "RTM_F_TRAP",
672 0x20000000: "RTM_F_OFFLOAD_FAILED",
673 },
674 ),
675 # Pay
676 PacketListField("data", [], rtmsg_rtattr),
677 ]
678
679
680bind_layers(rtmsghdr, rtmsg, nlmsg_type=24)
681bind_layers(rtmsghdr, rtmsg, nlmsg_type=25)
682bind_layers(rtmsghdr, rtmsg, nlmsg_type=26)
683
684
685class rtmsghdrs(Packet):
686 fields_desc = [
687 PacketListField(
688 "msgs",
689 [],
690 rtmsghdr,
691 # 65535 / len(rtmsghdr)
692 max_count=4096,
693 ),
694 ]
695
696
697# Utils
698
699
700SOL_NETLINK = 270
701NETLINK_EXT_ACK = 11
702NETLINK_GET_STRICT_CHK = 12
703
704
705def _sr1_rtrequest(pkt: Packet) -> List[Packet]:
706 """
707 Send / Receive a rtnetlink request
708 """
709 # Create socket
710 sock = socket.socket(
711 socket.AF_NETLINK,
712 socket.SOCK_RAW | socket.SOCK_CLOEXEC,
713 socket.NETLINK_ROUTE,
714 )
715 # Configure socket
716 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 32768)
717 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1048576)
718 try:
719 sock.setsockopt(SOL_NETLINK, NETLINK_EXT_ACK, 1)
720 except OSError:
721 # Linux 4.12+ only
722 pass
723 sock.bind((0, 0)) # bind to kernel
724 try:
725 sock.setsockopt(SOL_NETLINK, NETLINK_GET_STRICT_CHK, 1)
726 except OSError:
727 # Linux 4.20+ only
728 pass
729 # Request routes
730 sock.send(bytes(rtmsghdrs(msgs=[pkt])))
731 results: List[Packet] = []
732 try:
733 while True:
734 msgs = rtmsghdrs(sock.recv(65535))
735 if not msgs:
736 log_loading.warning("Failed to read the routes using RTNETLINK !")
737 return []
738 for msg in msgs.msgs:
739 # Keep going until we find the end of the MULTI format
740 if not msg.nlmsg_flags.NLM_F_MULTI or msg.nlmsg_type == 3:
741 if msg.nlmsg_type == 3 and nlmsgerr in msg and msg.status != 0:
742 # NLMSG_DONE with errors
743 if msg.data and msg.data[0].rta_type == 1:
744 log_loading.debug(
745 "Scapy RTNETLINK error on %s: '%s'. Please report !",
746 pkt.sprintf("%nlmsg_type%"),
747 msg.data[0].rta_data.decode(),
748 )
749 return []
750 return results
751 results.append(msg)
752 finally:
753 sock.close()
754
755
756def _get_ips(af_family=socket.AF_UNSPEC):
757 # type: (socket.AddressFamily) -> Dict[int, List[Dict[str, Any]]]
758 """
759 Return a mapping of all interfaces IP using a NETLINK socket.
760 """
761 results = _sr1_rtrequest(
762 rtmsghdr(
763 nlmsg_type="RTM_GETADDR",
764 nlmsg_flags="NLM_F_REQUEST+NLM_F_ROOT+NLM_F_MATCH",
765 nlmsg_seq=int(time.time()),
766 )
767 / ifaddrmsg(
768 ifa_family=af_family,
769 data=[],
770 )
771 )
772 ips: Dict[int, List[Dict[str, Any]]] = {}
773 for msg in results:
774 ifindex = msg.ifa_index
775 address = None
776 family = msg.ifa_family
777 for attr in msg.data:
778 if attr.rta_type == 0x01: # IFA_ADDRESS
779 address = attr.rta_data
780 break
781 if address is not None:
782 data = {
783 "af_family": family,
784 "index": ifindex,
785 "address": address,
786 }
787 if family == 10: # ipv6
788 data["scope"] = scapy.utils6.in6_getscope(address)
789 ips.setdefault(ifindex, list()).append(data)
790 return ips
791
792
793def _get_if_list():
794 # type: () -> Dict[int, Dict[str, Any]]
795 """
796 Read the interfaces list using a NETLINK socket.
797 """
798 results = _sr1_rtrequest(
799 rtmsghdr(
800 nlmsg_type="RTM_GETLINK",
801 nlmsg_flags="NLM_F_REQUEST+NLM_F_ROOT+NLM_F_MATCH",
802 nlmsg_seq=int(time.time()),
803 )
804 / ifinfomsg(
805 data=[],
806 )
807 )
808 lifips = _get_ips()
809 interfaces = {}
810 for msg in results:
811 ifindex = msg.ifi_index
812 ifname = None
813 mac = "00:00:00:00:00:00"
814 itype = msg.ifi_type
815 ifflags = msg.ifi_flags
816 ips = []
817 for attr in msg.data:
818 if attr.rta_type == 0x01: # IFLA_ADDRESS
819 mac = attr.rta_data
820 elif attr.rta_type == 0x03: # IFLA_NAME
821 ifname = attr.rta_data[:-1].decode()
822 if ifname is not None:
823 if ifindex in lifips:
824 ips = lifips[ifindex]
825 interfaces[ifindex] = {
826 "name": ifname,
827 "index": ifindex,
828 "flags": ifflags,
829 "mac": mac,
830 "type": itype,
831 "ips": ips,
832 }
833 return interfaces
834
835
836def in6_getifaddr():
837 # type: () -> List[Tuple[str, int, str]]
838 """
839 Returns a list of 3-tuples of the form (addr, scope, iface) where
840 'addr' is the address of scope 'scope' associated to the interface
841 'iface'.
842
843 This is the list of all addresses of all interfaces available on
844 the system.
845 """
846 ips = _get_ips(af_family=socket.AF_INET6)
847 ifaces = _get_if_list()
848 result = []
849 for intip in ips.values():
850 for ip in intip:
851 if ip["index"] in ifaces:
852 result.append((ip["address"], ip["scope"], ifaces[ip["index"]]["name"]))
853 return result
854
855
856def _read_routes(af_family):
857 # type: (socket.AddressFamily) -> List[Packet]
858 """
859 Read routes using a NETLINK socket.
860 """
861 results = []
862 for rttable in ["RT_TABLE_LOCAL", "RT_TABLE_MAIN"]:
863 results.extend(
864 _sr1_rtrequest(
865 rtmsghdr(
866 nlmsg_type="RTM_GETROUTE",
867 nlmsg_flags="NLM_F_REQUEST+NLM_F_ROOT+NLM_F_MATCH",
868 nlmsg_seq=int(time.time()),
869 )
870 / rtmsg(
871 rtm_family=af_family,
872 data=[
873 rtmsg_rtattr(rta_type="RTA_TABLE", rta_data=rttable),
874 ],
875 )
876 )
877 )
878 return [msg for msg in results if msg.nlmsg_type == 24] # RTM_NEWROUTE
879
880
881def read_routes():
882 # type: () -> List[Tuple[int, int, str, str, str, int]]
883 """
884 Read IPv4 routes for current process
885 """
886 routes = []
887 ifaces = _get_if_list()
888 results = _read_routes(socket.AF_INET)
889 for msg in results:
890 # Omit stupid answers (some OS conf appears to lead to this)
891 if msg.rtm_family != socket.AF_INET:
892 continue
893 # Process the RTM_NEWROUTE
894 net = 0
895 mask = itom(msg.rtm_dst_len)
896 gw = "0.0.0.0"
897 iface = ""
898 addr = "0.0.0.0"
899 metric = 0
900 for attr in msg.data:
901 if attr.rta_type == 0x01: # RTA_DST
902 net = atol(attr.rta_data)
903 elif attr.rta_type == 0x04: # RTS_OIF
904 index = attr.rta_data
905 if index in ifaces:
906 iface = ifaces[index]["name"]
907 else:
908 iface = str(index)
909 elif attr.rta_type == 0x05: # RTA_GATEWAY
910 gw = attr.rta_data
911 elif attr.rta_type == 0x06: # RTA_PRIORITY
912 metric = attr.rta_data
913 elif attr.rta_type == 0x07: # RTA_PREFSRC
914 addr = attr.rta_data
915 routes.append((net, mask, gw, iface, addr, metric))
916 # Add multicast routes, as those are missing by default
917 for _iface in ifaces.values():
918 if _iface['flags'].MULTICAST:
919 try:
920 addr = next(
921 x["address"]
922 for x in _iface["ips"]
923 if x["af_family"] == socket.AF_INET
924 )
925 except StopIteration:
926 continue
927 routes.append((
928 0xe0000000, 0xf0000000, "0.0.0.0", _iface["name"], addr, 250
929 ))
930 return routes
931
932
933def read_routes6():
934 # type: () -> List[Tuple[str, int, str, str, List[str], int]]
935 """
936 Read IPv6 routes for current process
937 """
938 routes = []
939 ifaces = _get_if_list()
940 results = _read_routes(socket.AF_INET6)
941 lifaddr = _get_ips(af_family=socket.AF_INET6)
942 for msg in results:
943 # Omit stupid answers (some OS conf appears to lead to this)
944 if msg.rtm_family != socket.AF_INET6:
945 continue
946 # Process the RTM_NEWROUTE
947 prefix = "::"
948 plen = msg.rtm_dst_len
949 nh = "::"
950 index = 0
951 iface = ""
952 metric = 0
953 for attr in msg.data:
954 if attr.rta_type == 0x01: # RTA_DST
955 prefix = attr.rta_data
956 elif attr.rta_type == 0x04: # RTS_OIF
957 index = attr.rta_data
958 if index in ifaces:
959 iface = ifaces[index]["name"]
960 else:
961 iface = str(index)
962 elif attr.rta_type == 0x05: # RTA_GATEWAY
963 nh = attr.rta_data
964 elif attr.rta_type == 0x06: # RTA_PRIORITY
965 metric = attr.rta_data
966 devaddrs = ((x["address"], x["scope"], iface) for x in lifaddr.get(index, []))
967 cset = scapy.utils6.construct_source_candidate_set(prefix, plen, devaddrs)
968 if cset:
969 routes.append((prefix, plen, nh, iface, cset, metric))
970 # Add multicast routes, as those are missing by default
971 for _iface in ifaces.values():
972 if _iface['flags'].MULTICAST:
973 addrs = [
974 x["address"]
975 for x in _iface["ips"]
976 if x["af_family"] == socket.AF_INET6
977 ]
978 if not addrs:
979 continue
980 routes.append((
981 "ff00::", 8, "::", _iface["name"], addrs, 250
982 ))
983 return routes