1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Gabriel Potter
5
6"""
7This file implements the rtnetlink API that is used to read the network
8configuration of the machine.
9"""
10
11import socket
12import struct
13import time
14
15import scapy.utils6
16
17from scapy.consts import BIG_ENDIAN
18from scapy.config import conf
19from scapy.error import log_loading
20from scapy.packet import (
21 Packet,
22 bind_layers,
23)
24from scapy.utils import atol, itom
25
26from scapy.fields import (
27 ByteEnumField,
28 ByteField,
29 EnumField,
30 Field,
31 FieldLenField,
32 FlagsField,
33 IP6Field,
34 IPField,
35 LenField,
36 MACField,
37 MayEnd,
38 MultipleTypeField,
39 PacketListField,
40 PadField,
41 StrLenField,
42 XStrLenField,
43)
44
45from scapy.arch.common import _iff_flags
46
47# Typing imports
48from typing import (
49 Any,
50 Dict,
51 List,
52 Optional,
53 Tuple,
54 Type,
55)
56
57# from <linux/netlink.h> and <linux/rtnetlink.h>
58
59
60# Common header
61
62
63class rtmsghdr(Packet):
64 fields_desc = [
65 LenField("nlmsg_len", None, fmt="=L"),
66 EnumField(
67 "nlmsg_type",
68 0,
69 {
70 # netlink.h
71 3: "NLMSG_DONE",
72 # rtnetlink.h
73 16: "RTM_NEWLINK",
74 17: "RTM_DELLINK",
75 18: "RTM_GETLINK",
76 19: "RTM_SETLINK",
77 20: "RTM_NEWADDR",
78 21: "RTM_DELADDR",
79 22: "RTM_GETADDR",
80 # 23: unused
81 24: "RTM_NEWROUTE",
82 25: "RTM_DELROUTE",
83 26: "RTM_GETROUTE",
84 # 27: unused
85 },
86 fmt="=H",
87 ),
88 FlagsField(
89 "nlmsg_flags",
90 0,
91 16 if BIG_ENDIAN else -16,
92 {
93 0x01: "NLM_F_REQUEST",
94 0x02: "NLM_F_MULTI",
95 0x04: "NLM_F_ACK",
96 0x08: "NLM_F_ECHO",
97 0x10: "NLM_F_DUMP_INTR",
98 0x20: "NLM_F_DUMP_FILTERED",
99 # GET modifiers
100 0x100: "NLM_F_ROOT",
101 0x200: "NLM_F_MATCH",
102 0x400: "NLM_F_ATOMIC",
103 },
104 ),
105 Field("nlmsg_seq", 0, fmt="=L"),
106 Field("nlmsg_pid", 0, fmt="=L"),
107 ]
108
109 def post_build(self, pkt: bytes, pay: bytes) -> bytes:
110 pkt += pay
111 if self.nlmsg_len is None:
112 pkt = struct.pack("=L", len(pkt)) + pkt[4:]
113 return pkt
114
115 def extract_padding(self, s: bytes) -> Tuple[bytes, Optional[bytes]]:
116 return s[: self.nlmsg_len - 16], s[self.nlmsg_len - 16 :]
117
118 def answers(self, other: Packet) -> bool:
119 return bool(other.nlmsg_seq == self.nlmsg_seq)
120
121
122# DONE
123
124
125class nlmsgerr_rtattr(Packet):
126 fields_desc = [
127 FieldLenField(
128 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
129 ),
130 EnumField(
131 "rta_type",
132 0,
133 {},
134 fmt="=H",
135 ),
136 PadField(
137 MultipleTypeField(
138 [],
139 StrLenField(
140 "rta_data",
141 b"",
142 length_from=lambda pkt: pkt.rta_len - 4,
143 ),
144 ),
145 align=4,
146 ),
147 ]
148
149 def default_payload_class(self, payload: bytes) -> Type[Packet]:
150 return conf.padding_layer
151
152
153class nlmsgerr(Packet):
154 fields_desc = [
155 MayEnd(Field("status", 0, fmt="=L")),
156 # Pay
157 PacketListField("data", [], nlmsgerr_rtattr),
158 ]
159
160
161bind_layers(rtmsghdr, nlmsgerr, nlmsg_type=3)
162
163
164# LINK messages
165
166
167class ifla_af_spec_inet_rtattr(Packet):
168 fields_desc = [
169 FieldLenField(
170 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
171 ),
172 EnumField(
173 "rta_type",
174 0,
175 {
176 0x00: "IFLA_INET_UNSPEC",
177 0x01: "IFLA_INET_CONF",
178 },
179 fmt="=H",
180 ),
181 PadField(
182 MultipleTypeField(
183 [],
184 XStrLenField(
185 "rta_data",
186 b"",
187 length_from=lambda pkt: pkt.rta_len - 4,
188 ),
189 ),
190 align=4,
191 ),
192 ]
193
194 def default_payload_class(self, payload: bytes) -> Type[Packet]:
195 return conf.padding_layer
196
197
198class ifla_af_spec_inet6_rtattr(Packet):
199 fields_desc = [
200 FieldLenField(
201 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
202 ),
203 EnumField(
204 "rta_type",
205 0,
206 {
207 0x00: "IFLA_INET6_UNSPEC",
208 0x01: "IFLA_INET6_FLAGS",
209 0x02: "IFLA_INET6_CONF",
210 0x03: "IFLA_INET6_STATS",
211 0x04: "IFLA_INET6_MCAST",
212 0x05: "IFLA_INET6_CACHEINFO",
213 0x06: "IFLA_INET6_ICMP6STATS",
214 0x07: "IFLA_INET6_TOKEN",
215 0x08: "IFLA_INET6_ADDR_GEN_MODE",
216 0x09: "IFLA_INET6_RA_MTU",
217 },
218 fmt="=H",
219 ),
220 PadField(
221 MultipleTypeField(
222 [],
223 XStrLenField(
224 "rta_data",
225 b"",
226 length_from=lambda pkt: pkt.rta_len - 4,
227 ),
228 ),
229 align=4,
230 ),
231 ]
232
233 def default_payload_class(self, payload: bytes) -> Type[Packet]:
234 return conf.padding_layer
235
236
237class ifla_af_spec_rtattr(Packet):
238 fields_desc = [
239 FieldLenField(
240 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
241 ),
242 EnumField("rta_type", 0, socket.AddressFamily, fmt="=H"),
243 PadField(
244 MultipleTypeField(
245 [
246 (
247 # AF_INET
248 PacketListField(
249 "rta_data",
250 [],
251 ifla_af_spec_inet_rtattr,
252 length_from=lambda pkt: pkt.rta_len - 4,
253 ),
254 lambda pkt: pkt.rta_type == 2,
255 ),
256 (
257 # AF_INET6
258 PacketListField(
259 "rta_data",
260 [],
261 ifla_af_spec_inet6_rtattr,
262 length_from=lambda pkt: pkt.rta_len - 4,
263 ),
264 lambda pkt: pkt.rta_type == 10,
265 ),
266 ],
267 XStrLenField(
268 "rta_data",
269 b"",
270 length_from=lambda pkt: pkt.rta_len - 4,
271 ),
272 ),
273 align=4,
274 ),
275 ]
276
277 def default_payload_class(self, payload: bytes) -> Type[Packet]:
278 return conf.padding_layer
279
280
281class ifinfomsg_rtattr(Packet):
282 fields_desc = [
283 FieldLenField(
284 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
285 ),
286 EnumField(
287 "rta_type",
288 0,
289 {
290 0x00: "IFLA_UNSPEC",
291 0x01: "IFLA_ADDRESS",
292 0x02: "IFLA_BROADCAST",
293 0x03: "IFLA_IFNAME",
294 0x04: "IFLA_MTU",
295 0x05: "IFLA_LINK",
296 0x06: "IFLA_QDISC",
297 0x07: "IFLA_STATS",
298 0x08: "IFLA_COST",
299 0x09: "IFLA_PRIORITY",
300 0x0A: "IFLA_MASTER",
301 0x0B: "IFLA_WIRELESS",
302 0x0C: "IFLA_PROTINFO",
303 0x0D: "IFLA_TXQLEN",
304 0x0E: "IFLA_MAP",
305 0x0F: "IFLA_WEIGHT",
306 0x10: "IFLA_OPERSTATE",
307 0x11: "IFLA_LINKMODE",
308 0x12: "IFLA_LINKINFO",
309 0x13: "IFLA_NET_NS_PID",
310 0x14: "IFLA_IFALIAS",
311 0x15: "IFLA_NUM_VS",
312 0x16: "IFLA_VFINFO_LIST",
313 0x17: "IFLA_STATS64",
314 0x18: "IFLA_VF_PORTS",
315 0x19: "IFLA_PORT_SELF",
316 0x1A: "IFLA_AF_SPEC",
317 0x1B: "IFLA_GROUP",
318 0x1C: "IFLA_NET_NS_FD",
319 0x1D: "IFLA_EXT_MASK",
320 0x1E: "IFLA_PROMISCUITY",
321 0x1F: "IFLA_NUM_TX_QUEUES",
322 0x20: "IFLA_NUM_RX_QUEUES",
323 0x21: "IFLA_CARRIER",
324 0x22: "IFLA_PHYS_PORT_ID",
325 0x23: "IFLA_CARRIER_CHANGES",
326 0x24: "IFLA_PHYS_SWITCH_ID",
327 0x25: "IFLA_LINK_NETNSID",
328 0x26: "IFLA_PHYS_PORT_NAME",
329 0x27: "IFLA_PROTO_DOWN",
330 0x28: "IFLA_GSO_MAX_SEGS",
331 0x29: "IFLA_GSO_MAX_SIZE",
332 0x2A: "IFLA_PAD",
333 0x2B: "IFLA_XDP",
334 0x2C: "IFLA_EVENT",
335 0x2D: "IFLA_NEW_NETNSID",
336 0x2E: "IFLA_IF_NETNSID",
337 0x2F: "IFLA_CARRIER_UP_COUNT",
338 0x30: "IFLA_CARRIER_DOWN_COUNT",
339 0x31: "IFLA_NEW_IFINDEX",
340 0x32: "IFLA_MIN_MTU",
341 0x33: "IFLA_MAX_MTU",
342 0x34: "IFLA_PROP_LIST",
343 0x35: "IFLA_ALT_IFNAME",
344 0x36: "IFLA_PERM_ADDRESS",
345 0x37: "IFLA_PROTO_DOWN_REASON",
346 0x38: "IFLA_PARENT_DEV_NAME",
347 0x39: "IFLA_PARENT_DEV_BUS_NAME",
348 0x3A: "IFLA_GRO_MAX_SIZE",
349 0x3B: "IFLA_TSO_MAX_SIZE",
350 0x3C: "IFLA_TSO_MAX_SEGS",
351 0x3D: "IFLA_ALLMULTI",
352 },
353 fmt="=H",
354 ),
355 PadField(
356 MultipleTypeField(
357 [
358 (
359 # IFLA_ADDRESS
360 MACField("rta_data", "00:00:00:00:00:00"),
361 lambda pkt: pkt.rta_type in [0x01, 0x36],
362 ),
363 (
364 # IFLA_IFNAME
365 StrLenField(
366 "rta_data", b"", length_from=lambda pkt: pkt.rta_len - 4
367 ),
368 lambda pkt: pkt.rta_type in [0x03],
369 ),
370 (
371 # IFLA_AF_SPEC
372 PacketListField(
373 "rta_data",
374 [],
375 ifla_af_spec_rtattr,
376 length_from=lambda pkt: pkt.rta_len - 4,
377 ),
378 lambda pkt: pkt.rta_type == 0x1A,
379 ),
380 ],
381 XStrLenField(
382 "rta_data",
383 b"",
384 length_from=lambda pkt: pkt.rta_len - 4,
385 ),
386 ),
387 align=4,
388 ),
389 ]
390
391 def default_payload_class(self, payload: bytes) -> Type[Packet]:
392 return conf.padding_layer
393
394
395class ifinfomsg(Packet):
396 fields_desc = [
397 ByteEnumField("ifi_family", 0, socket.AddressFamily),
398 ByteField("res", 0),
399 Field("ifi_type", 0, fmt="=H"),
400 Field("ifi_index", 0, fmt="=i"),
401 FlagsField(
402 "ifi_flags",
403 0,
404 32 if BIG_ENDIAN else -32,
405 _iff_flags,
406 ),
407 Field("ifi_change", 0, fmt="=I"),
408 # Pay
409 PacketListField("data", [], ifinfomsg_rtattr),
410 ]
411
412
413bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=16)
414bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=17)
415bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=18)
416bind_layers(rtmsghdr, ifinfomsg, nlmsg_type=19)
417
418
419# ADDR messages
420
421
422class ifaddrmsg_rtattr(Packet):
423 fields_desc = [
424 FieldLenField(
425 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
426 ),
427 EnumField(
428 "rta_type",
429 0,
430 {
431 0x00: "IFA_UNSPEC",
432 0x01: "IFA_ADDRESS",
433 0x02: "IFA_LOCAL",
434 0x03: "IFA_LABEL",
435 0x04: "IFA_BROADCAST",
436 0x05: "IFA_ANYCAST",
437 0x06: "IFA_CACHEINFO",
438 0x07: "IFA_MULTICAST",
439 0x08: "IFA_FLAGS",
440 0x09: "IFA_RT_PRIORITY",
441 0x0A: "IFA_TARGET_NETNSID",
442 0x0B: "IFA_PROTO",
443 },
444 fmt="=H",
445 ),
446 PadField(
447 MultipleTypeField(
448 [
449 # IFA_ADDRESS, IFA_LOCAL, IFA_BROADCAST
450 (
451 IPField("rta_data", "0.0.0.0"),
452 lambda pkt: pkt.parent
453 and pkt.parent.ifa_family == 2
454 and pkt.rta_type in [0x01, 0x02, 0x04],
455 ),
456 (
457 IP6Field("rta_data", "::"),
458 lambda pkt: pkt.parent
459 and pkt.parent.ifa_family == 10
460 and pkt.rta_type in [0x01, 0x02, 0x04],
461 ),
462 (
463 # IFA_LABEL
464 StrLenField(
465 "rta_data", b"", length_from=lambda pkt: pkt.rta_len - 4
466 ),
467 lambda pkt: pkt.rta_type in [0x03],
468 ),
469 ],
470 XStrLenField(
471 "rta_data",
472 b"",
473 length_from=lambda pkt: pkt.rta_len - 4,
474 ),
475 ),
476 align=4,
477 ),
478 ]
479
480 def default_payload_class(self, payload: bytes) -> Type[Packet]:
481 return conf.padding_layer
482
483
484class ifaddrmsg(Packet):
485 fields_desc = [
486 ByteEnumField("ifa_family", 0, socket.AddressFamily),
487 ByteField("ifa_prefixlen", 0),
488 FlagsField(
489 "ifa_flags",
490 0,
491 -8,
492 {
493 0x01: "IFA_F_SECONDARY",
494 0x02: "IFA_F_NODAD",
495 0x04: "IFA_F_OPTIMISTIC",
496 0x08: "IFA_F_DADFAILED",
497 0x10: "IFA_F_HOMEADDRESS",
498 0x20: "IFA_F_DEPRECATED",
499 0x40: "IFA_F_TENTATIVE",
500 0x80: "IFA_F_PERMANENT",
501 },
502 ),
503 ByteField("ifa_scope", 0),
504 Field("ifa_index", 0, fmt="=L"),
505 # Pay
506 PacketListField("data", [], ifaddrmsg_rtattr),
507 ]
508
509
510bind_layers(rtmsghdr, ifaddrmsg, nlmsg_type=20)
511bind_layers(rtmsghdr, ifaddrmsg, nlmsg_type=21)
512bind_layers(rtmsghdr, ifaddrmsg, nlmsg_type=22)
513
514
515# ROUTE messages
516
517
518RT_CLASS = {
519 0: "RT_TABLE_UNSPEC",
520 252: "RT_TABLE_COMPAT",
521 253: "RT_TABLE_DEFAULT",
522 254: "RT_TABLE_MAIN",
523 255: "RT_TABLE_LOCAL",
524}
525
526
527class rtmsg_rtattr(Packet):
528 fields_desc = [
529 FieldLenField(
530 "rta_len", None, length_of="rta_data", fmt="=H", adjust=lambda _, x: x + 4
531 ),
532 EnumField(
533 "rta_type",
534 0,
535 {
536 0x00: "RTA_UNSPEC",
537 0x01: "RTA_DST",
538 0x02: "RTS_SRC",
539 0x03: "RTS_IIF",
540 0x04: "RTS_OIF",
541 0x05: "RTA_GATEWAY",
542 0x06: "RTA_PRIORITY",
543 0x07: "RTA_PREFSRC",
544 0x08: "RTA_METRICS",
545 0x09: "RTA_MULTIPATH",
546 0x0B: "RTA_FLOW",
547 0x0C: "RTA_CACHEINFO",
548 0x0F: "RTA_TABLE",
549 0x10: "RTA_MARK",
550 0x11: "RTA_MFC_STATS",
551 0x12: "RTA_VIA",
552 0x13: "RTA_NEWDST",
553 0x14: "RTA_PREF",
554 0x15: "RTA_ENCAP_TYPE",
555 0x16: "RTA_ENCAP",
556 0x17: "RTA_EXPIRES",
557 0x18: "RTA_PAD",
558 0x19: "RTA_UID",
559 0x1A: "RTA_TTL_PROPAGATE",
560 0x1B: "RTA_IP_PROTO",
561 0x1C: "RTA_SPORT",
562 0x1D: "RTA_DPORT",
563 0x1E: "RTA_NH_ID",
564 },
565 fmt="=H",
566 ),
567 PadField(
568 MultipleTypeField(
569 [
570 # RTA_DST, RTA_SRC, RTA_PREFSRC, RTA_GATEWAY
571 (
572 IPField("rta_data", "0.0.0.0"),
573 lambda pkt: pkt.parent
574 and pkt.parent.rtm_family == 2
575 and pkt.rta_type in [0x01, 0x02, 0x05, 0x07],
576 ),
577 (
578 IP6Field("rta_data", "::"),
579 lambda pkt: pkt.parent
580 and pkt.parent.rtm_family == 10
581 and pkt.rta_type in [0x01, 0x02, 0x05, 0x07],
582 ),
583 # RTS_OIF, RTA_PRIORITY
584 (
585 Field("rta_data", 0, fmt="=I"),
586 lambda pkt: pkt.rta_type in [0x04, 0x06, 0x10],
587 ),
588 # RTA_TABLE
589 (
590 EnumField("rta_data", 0, RT_CLASS, fmt="=I"),
591 lambda pkt: pkt.rta_type in [0x0F],
592 ),
593 ],
594 XStrLenField(
595 "rta_data",
596 b"",
597 length_from=lambda pkt: pkt.rta_len - 4,
598 ),
599 ),
600 align=4,
601 ),
602 ]
603
604 def default_payload_class(self, payload: bytes) -> Type[Packet]:
605 return conf.padding_layer
606
607
608class rtmsg(Packet):
609 fields_desc = [
610 ByteEnumField("rtm_family", 0, socket.AddressFamily),
611 ByteField("rtm_dst_len", 0),
612 ByteField("rtm_src_len", 0),
613 ByteField("rtm_tos", 0),
614 ByteEnumField(
615 "rtm_table",
616 0,
617 RT_CLASS,
618 ),
619 ByteEnumField(
620 "rtm_protocol",
621 0,
622 {
623 0x00: "RTPROT_UNSPEC",
624 0x01: "RTPROT_REDIRECT",
625 0x02: "RTPROT_KERNEL",
626 0x03: "RTPROT_BOOT",
627 0x04: "RTPROT_STATIC",
628 },
629 ),
630 ByteEnumField(
631 "rtm_scope",
632 0,
633 {
634 0: "RT_SCOPE_UNIVERSE",
635 200: "RT_SCOPE_SITE",
636 253: "RT_SCOPE_LINK",
637 254: "RT_SCOPE_HOST",
638 255: "RT_SCOPE_NOWHERE",
639 },
640 ),
641 ByteEnumField(
642 "rtm_type",
643 0,
644 {
645 0x00: "RTN_UNSPEC",
646 0x01: "RTN_UNICAST",
647 0x02: "RTN_LOCAL",
648 0x03: "RTN_BROADCAST",
649 0x04: "RTN_ANYCAST",
650 0x05: "RTN_MULTICAST",
651 0x06: "RTN_BLACKHOLE",
652 0x07: "RTN_UNREACHABLE",
653 0x08: "RTN_PROHIBIT",
654 0x09: "RTN_THROW",
655 0x0A: "RTN_NAT",
656 0x0B: "RTN_XRESOLVE",
657 },
658 ),
659 FlagsField(
660 "rtm_flags",
661 0,
662 32 if BIG_ENDIAN else -32,
663 {
664 0x100: "RTM_F_NOTIFY",
665 0x200: "RTM_F_CLONED",
666 0x400: "RTM_F_EQUALIZE",
667 0x800: "RTM_F_PREFIX",
668 0x1000: "RTM_F_LOOKUP_TABLE",
669 0x2000: "RTM_F_FIB_MATCH",
670 0x4000: "RTM_F_OFFLOAD",
671 0x8000: "RTM_F_TRAP",
672 0x20000000: "RTM_F_OFFLOAD_FAILED",
673 },
674 ),
675 # Pay
676 PacketListField("data", [], rtmsg_rtattr),
677 ]
678
679
680bind_layers(rtmsghdr, rtmsg, nlmsg_type=24)
681bind_layers(rtmsghdr, rtmsg, nlmsg_type=25)
682bind_layers(rtmsghdr, rtmsg, nlmsg_type=26)
683
684
685class rtmsghdrs(Packet):
686 fields_desc = [
687 PacketListField(
688 "msgs",
689 [],
690 rtmsghdr,
691 # 65535 / len(rtmsghdr)
692 max_count=4096,
693 ),
694 ]
695
696
697# Utils
698
699
700SOL_NETLINK = 270
701NETLINK_EXT_ACK = 11
702NETLINK_GET_STRICT_CHK = 12
703
704
705def _sr1_rtrequest(pkt: Packet) -> List[Packet]:
706 """
707 Send / Receive a rtnetlink request
708 """
709 # Create socket
710 sock = socket.socket(
711 socket.AF_NETLINK,
712 socket.SOCK_RAW | socket.SOCK_CLOEXEC,
713 socket.NETLINK_ROUTE,
714 )
715 # Configure socket
716 sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 32768)
717 sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 1048576)
718 try:
719 sock.setsockopt(SOL_NETLINK, NETLINK_EXT_ACK, 1)
720 except OSError:
721 # Linux 4.12+ only
722 pass
723 sock.bind((0, 0)) # bind to kernel
724 try:
725 sock.setsockopt(SOL_NETLINK, NETLINK_GET_STRICT_CHK, 1)
726 except OSError:
727 # Linux 4.20+ only
728 pass
729 # Request routes
730 sock.send(bytes(rtmsghdrs(msgs=[pkt])))
731 results: List[Packet] = []
732 try:
733 while True:
734 msgs = rtmsghdrs(sock.recv(65535))
735 if not msgs:
736 log_loading.warning("Failed to read the routes using RTNETLINK !")
737 return []
738 for msg in msgs.msgs:
739 # Keep going until we find the end of the MULTI format
740 if not msg.nlmsg_flags.NLM_F_MULTI or msg.nlmsg_type == 3:
741 if msg.nlmsg_type == 3 and nlmsgerr in msg and msg.status != 0:
742 # NLMSG_DONE with errors
743 if msg.data and msg.data[0].rta_type == 1:
744 log_loading.debug(
745 "Scapy RTNETLINK error on %s: '%s'. Please report !",
746 pkt.sprintf("%nlmsg_type%"),
747 msg.data[0].rta_data.decode(),
748 )
749 return []
750 return results
751 results.append(msg)
752 finally:
753 sock.close()
754
755
756def _get_ips(af_family=socket.AF_UNSPEC):
757 # type: (socket.AddressFamily) -> Dict[int, List[Dict[str, Any]]]
758 """
759 Return a mapping of all interfaces IP using a NETLINK socket.
760 """
761 results = _sr1_rtrequest(
762 rtmsghdr(
763 nlmsg_type="RTM_GETADDR",
764 nlmsg_flags="NLM_F_REQUEST+NLM_F_ROOT+NLM_F_MATCH",
765 nlmsg_seq=int(time.time()),
766 )
767 / ifaddrmsg(
768 ifa_family=af_family,
769 data=[],
770 )
771 )
772 ips: Dict[int, List[Dict[str, Any]]] = {}
773 for msg in results:
774 ifindex = msg.ifa_index
775 address = None
776 family = msg.ifa_family
777 local = None
778 for attr in msg.data:
779 if attr.rta_type == 0x01: # IFA_ADDRESS
780 address = attr.rta_data
781 elif attr.rta_type == 0x02: # IFA_LOCAL
782 local = attr.rta_data
783 # include/uapi/linux/if_addr.h: for point-to-point links, IFA_LOCAL is the local
784 # interface address and IFA_ADDRESS is the peer address
785 local_address = local if local is not None else address
786 if local_address is not None:
787 data = {
788 "af_family": family,
789 "index": ifindex,
790 "address": local_address,
791 }
792 if family == 10: # ipv6
793 data["scope"] = scapy.utils6.in6_getscope(local_address)
794 ips.setdefault(ifindex, list()).append(data)
795 return ips
796
797
798def _get_if_list():
799 # type: () -> Dict[int, Dict[str, Any]]
800 """
801 Read the interfaces list using a NETLINK socket.
802 """
803 results = _sr1_rtrequest(
804 rtmsghdr(
805 nlmsg_type="RTM_GETLINK",
806 nlmsg_flags="NLM_F_REQUEST+NLM_F_ROOT+NLM_F_MATCH",
807 nlmsg_seq=int(time.time()),
808 )
809 / ifinfomsg(
810 data=[],
811 )
812 )
813 lifips = _get_ips()
814 interfaces = {}
815 for msg in results:
816 ifindex = msg.ifi_index
817 ifname = None
818 mac = "00:00:00:00:00:00"
819 itype = msg.ifi_type
820 ifflags = msg.ifi_flags
821 ips = []
822 for attr in msg.data:
823 if attr.rta_type == 0x01: # IFLA_ADDRESS
824 mac = attr.rta_data
825 elif attr.rta_type == 0x03: # IFLA_NAME
826 ifname = attr.rta_data[:-1].decode()
827 if ifname is not None:
828 if ifindex in lifips:
829 ips = lifips[ifindex]
830 interfaces[ifindex] = {
831 "name": ifname,
832 "index": ifindex,
833 "flags": ifflags,
834 "mac": mac,
835 "type": itype,
836 "ips": ips,
837 }
838 return interfaces
839
840
841def in6_getifaddr():
842 # type: () -> List[Tuple[str, int, str]]
843 """
844 Returns a list of 3-tuples of the form (addr, scope, iface) where
845 'addr' is the address of scope 'scope' associated to the interface
846 'iface'.
847
848 This is the list of all addresses of all interfaces available on
849 the system.
850 """
851 ips = _get_ips(af_family=socket.AF_INET6)
852 ifaces = _get_if_list()
853 result = []
854 for intip in ips.values():
855 for ip in intip:
856 if ip["index"] in ifaces:
857 result.append((ip["address"], ip["scope"], ifaces[ip["index"]]["name"]))
858 return result
859
860
861def _read_routes(af_family):
862 # type: (socket.AddressFamily) -> List[Packet]
863 """
864 Read routes using a NETLINK socket.
865 """
866 results = []
867 for rttable in ["RT_TABLE_LOCAL", "RT_TABLE_MAIN"]:
868 results.extend(
869 _sr1_rtrequest(
870 rtmsghdr(
871 nlmsg_type="RTM_GETROUTE",
872 nlmsg_flags="NLM_F_REQUEST+NLM_F_ROOT+NLM_F_MATCH",
873 nlmsg_seq=int(time.time()),
874 )
875 / rtmsg(
876 rtm_family=af_family,
877 data=[
878 rtmsg_rtattr(rta_type="RTA_TABLE", rta_data=rttable),
879 ],
880 )
881 )
882 )
883 return [msg for msg in results if msg.nlmsg_type == 24] # RTM_NEWROUTE
884
885
886def read_routes():
887 # type: () -> List[Tuple[int, int, str, str, str, int]]
888 """
889 Read IPv4 routes for current process
890 """
891 routes = []
892 ifaces = _get_if_list()
893 results = _read_routes(socket.AF_INET)
894 for msg in results:
895 # Omit stupid answers (some OS conf appears to lead to this)
896 if msg.rtm_family != socket.AF_INET:
897 continue
898 # Process the RTM_NEWROUTE
899 net = 0
900 mask = itom(msg.rtm_dst_len)
901 gw = "0.0.0.0"
902 iface = ""
903 addr = "0.0.0.0"
904 metric = 0
905 for attr in msg.data:
906 if attr.rta_type == 0x01: # RTA_DST
907 net = atol(attr.rta_data)
908 elif attr.rta_type == 0x04: # RTS_OIF
909 index = attr.rta_data
910 if index in ifaces:
911 iface = ifaces[index]["name"]
912 else:
913 iface = str(index)
914 elif attr.rta_type == 0x05: # RTA_GATEWAY
915 gw = attr.rta_data
916 elif attr.rta_type == 0x06: # RTA_PRIORITY
917 metric = attr.rta_data
918 elif attr.rta_type == 0x07: # RTA_PREFSRC
919 addr = attr.rta_data
920 routes.append((net, mask, gw, iface, addr, metric))
921 # Add multicast routes, as those are missing by default
922 for _iface in ifaces.values():
923 if _iface['flags'].MULTICAST:
924 try:
925 addr = next(
926 x["address"]
927 for x in _iface["ips"]
928 if x["af_family"] == socket.AF_INET
929 )
930 except StopIteration:
931 continue
932 routes.append((
933 0xe0000000, 0xf0000000, "0.0.0.0", _iface["name"], addr, 250
934 ))
935 return routes
936
937
938def read_routes6():
939 # type: () -> List[Tuple[str, int, str, str, List[str], int]]
940 """
941 Read IPv6 routes for current process
942 """
943 routes = []
944 ifaces = _get_if_list()
945 results = _read_routes(socket.AF_INET6)
946 lifaddr = _get_ips(af_family=socket.AF_INET6)
947 for msg in results:
948 # Omit stupid answers (some OS conf appears to lead to this)
949 if msg.rtm_family != socket.AF_INET6:
950 continue
951 # Process the RTM_NEWROUTE
952 prefix = "::"
953 plen = msg.rtm_dst_len
954 nh = "::"
955 index = 0
956 iface = ""
957 metric = 0
958 for attr in msg.data:
959 if attr.rta_type == 0x01: # RTA_DST
960 prefix = attr.rta_data
961 elif attr.rta_type == 0x04: # RTS_OIF
962 index = attr.rta_data
963 if index in ifaces:
964 iface = ifaces[index]["name"]
965 else:
966 iface = str(index)
967 elif attr.rta_type == 0x05: # RTA_GATEWAY
968 nh = attr.rta_data
969 elif attr.rta_type == 0x06: # RTA_PRIORITY
970 metric = attr.rta_data
971 devaddrs = ((x["address"], x["scope"], iface) for x in lifaddr.get(index, []))
972 cset = scapy.utils6.construct_source_candidate_set(prefix, plen, devaddrs)
973 if cset:
974 routes.append((prefix, plen, nh, iface, cset, metric))
975 # Add multicast routes, as those are missing by default
976 for _iface in ifaces.values():
977 if _iface['flags'].MULTICAST:
978 addrs = [
979 x["address"]
980 for x in _iface["ips"]
981 if x["af_family"] == socket.AF_INET6
982 ]
983 if not addrs:
984 continue
985 routes.append((
986 "ff00::", 8, "::", _iface["name"], addrs, 250
987 ))
988 return routes