Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/nfstream/system.py: 24%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2------------------------------------------------------------------------------------------------------------------------
3system.py
4Copyright (C) 2019-22 - NFStream Developers
5This file is part of NFStream, a Flexible Network Data Analysis Framework (https://www.nfstream.org/).
6NFStream is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public
7License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
8version.
9NFStream is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
10of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
11You should have received a copy of the GNU Lesser General Public License along with NFStream.
12If not, see <http://www.gnu.org/licenses/>.
13------------------------------------------------------------------------------------------------------------------------
14"""
16from collections import OrderedDict, namedtuple
17from psutil import Process, net_connections
18from .meter import get_flow_key
19from socket import SocketKind
20from .utils import NFEvent
21import time
24NFSocket = namedtuple("NFSocket", ["id", "key", "process_pid", "process_name"])
27class ConnCache(OrderedDict):
28 """LRU Connections Cache
29 The ConnCache object is used to cache connections entries such as MRU entries are kept on the end and LRU entries
30 will be at the start. Note that we use OrderedDict which leverages classical python dict combined with a doubly
31 linked list with sentinel nodes to track order.
32 By doing so, we can access in an efficient way idle connections entries that need to expired based on a timeout.
33 """
35 def __init__(self, channel, timeout, *args, **kwds):
36 self.channel = channel
37 self.timeout = timeout + 5000
38 self.last_scan_time = 0
39 super().__init__(*args, **kwds)
41 def __getitem__(self, key):
42 return super().__getitem__(key)
44 def __setitem__(self, key, value):
45 super().__setitem__(key, value)
46 self.move_to_end(key) # now this item is the most recently updated
48 def __eq__(self, other):
49 return super().__eq__(other)
51 def get_lru_key(self):
52 return next(iter(self))
54 def scan(self, current_time):
55 """Scan and delete LRU entries based on a defined timeout"""
56 if (current_time - self.last_scan_time) > 10:
57 remaining = True # We suppose that there is something to expire
58 scanned = 0
59 while (
60 remaining and scanned <= 1000
61 ): # Each 10 ms we scan with 1000 entries budget
62 try:
63 lru_key = self.get_lru_key() # will return the LRU conn key.
64 lru_last_update_time = self[lru_key]
65 if current_time - lru_last_update_time >= self.timeout:
66 del self[lru_key]
67 self.channel.put(
68 NFSocket(NFEvent.SOCKET_REMOVE, lru_key, None, None)
69 ) # Send to streamer
70 scanned += 1
71 else:
72 remaining = False # LRU flow is not yet idle.
73 except StopIteration: # Empty cache
74 remaining = False
75 self.last_scan_time = current_time
78def simplify_protocol(protocol):
79 """Transform protocol IDs to 3 unique values: 6 for TCP, 17 for UDP and 0 for others"""
80 if protocol == 6:
81 return protocol
82 if protocol == 17:
83 return protocol
84 return 0
87def get_conn_key_from_flow(f):
88 """Compute a conn key from NFlow object attributes"""
89 return get_flow_key(
90 f.src_ip, f.src_port, f.dst_ip, f.dst_port, simplify_protocol(f.protocol), 0, 0
91 )
94def match_flow_conn(conn_cache, flow):
95 """Match a flow with a connection entry based on a shared key"""
96 if len(conn_cache) > 0:
97 flow_key = get_conn_key_from_flow(flow)
98 try:
99 flow_map_socket = conn_cache[flow_key]
100 flow.system_process_name = flow_map_socket[0]
101 flow.system_process_pid = flow_map_socket[1]
102 except KeyError:
103 pass
104 return flow
107def get_conn_key(c):
108 """Create a 5-tuple connection key tuple"""
109 if c.raddr != () and c.pid is not None:
110 if c.type == SocketKind.SOCK_STREAM: # TCP protocol
111 return get_flow_key(
112 c.laddr.ip, c.laddr.port, c.raddr.ip, c.raddr.port, 6, 0, 0
113 )
114 if c.type == SocketKind.SOCK_DGRAM: # UDP protocol
115 return get_flow_key(
116 c.laddr.ip, c.laddr.port, c.raddr.ip, c.raddr.port, 17, 0, 0
117 )
118 return get_flow_key(c.laddr.ip, c.laddr.port, c.raddr.ip, c.raddr.port, 0, 0, 0)
119 return None
122def system_socket_worflow(channel, idle_timeout, poll_period):
123 """Host ground-truth generation workflow"""
124 conn_cache = ConnCache(channel=channel, timeout=idle_timeout)
125 try:
126 while True:
127 current_time = time.time() * 1000
128 for conn in net_connections(kind="inet"):
129 # IMPORTANT: The rationale behind the usage of an active polling approach (net_connections call):
130 # System process visibility is intended to generate the most accurate ground truth for traffic
131 # classification end-host-based research experiments, as reported in the literature [1].
132 # Thus, it must be a cross-platform approach that works the same on Linux, macOS, and
133 # Windows (Gaming traffic classification challenges). On Linux, things can be done more elegantly
134 # using eBPF tracing exec calls or NetLink monitoring. However, this will requires specific
135 # implementation for Linux versus Windows and proper handling of old kernel versions.
136 # We prefer to keep it out of the nfstream codebase. As we use net_connections from psutil
137 # (https://github.com/giampaolo/psutil) (https://github.com/giampaolo/psutil) Python package,
138 # future work may include providing such enhancements to psutil project, and thus, a broader
139 # community can benefit from it.
140 # [1]: http://tomasz.bujlow.com/publications/2012_journal_TELFOR.pdf
141 key = get_conn_key(conn)
142 if key is not None: # We succeeded to obtain a key.
143 if key not in conn_cache: # Create and send
144 process_name = Process(conn.pid).name()
145 conn_cache[key] = current_time
146 channel.put(
147 NFSocket(NFEvent.SOCKET_CREATE, key, conn.pid, process_name)
148 ) # Send to streamer
149 else: # update time
150 conn_cache[key] = current_time
151 conn_cache.scan(current_time)
153 time.sleep(poll_period) # Sleep with configured poll period
154 # 0 will ensure the maximum active polling capacity and accuracy.
155 # Greater values will results is less intensive CPU load and less accuracy.
156 # A tradeoff must be decided (as always).
157 # Completeness versus Polling frequency study (per protocol, per platform)
158 # are provided in [1]
159 # [2]: https://dl.acm.org/doi/abs/10.1145/1629607.1629610
160 except KeyboardInterrupt:
161 return