Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/nfstream/system.py: 24%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

80 statements  

1""" 

2------------------------------------------------------------------------------------------------------------------------ 

3system.py 

4Copyright (C) 2019-22 - NFStream Developers 

5This file is part of NFStream, a Flexible Network Data Analysis Framework (https://www.nfstream.org/). 

6NFStream is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public 

7License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later 

8version. 

9NFStream is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty 

10of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. 

11You should have received a copy of the GNU Lesser General Public License along with NFStream. 

12If not, see <http://www.gnu.org/licenses/>. 

13------------------------------------------------------------------------------------------------------------------------ 

14""" 

15 

16from collections import OrderedDict, namedtuple 

17from psutil import Process, net_connections 

18from .meter import get_flow_key 

19from socket import SocketKind 

20from .utils import NFEvent 

21import time 

22 

23 

24NFSocket = namedtuple("NFSocket", ["id", "key", "process_pid", "process_name"]) 

25 

26 

27class ConnCache(OrderedDict): 

28 """LRU Connections Cache 

29 The ConnCache object is used to cache connections entries such as MRU entries are kept on the end and LRU entries 

30 will be at the start. Note that we use OrderedDict which leverages classical python dict combined with a doubly 

31 linked list with sentinel nodes to track order. 

32 By doing so, we can access in an efficient way idle connections entries that need to expired based on a timeout. 

33 """ 

34 

35 def __init__(self, channel, timeout, *args, **kwds): 

36 self.channel = channel 

37 self.timeout = timeout + 5000 

38 self.last_scan_time = 0 

39 super().__init__(*args, **kwds) 

40 

41 def __getitem__(self, key): 

42 return super().__getitem__(key) 

43 

44 def __setitem__(self, key, value): 

45 super().__setitem__(key, value) 

46 self.move_to_end(key) # now this item is the most recently updated 

47 

48 def __eq__(self, other): 

49 return super().__eq__(other) 

50 

51 def get_lru_key(self): 

52 return next(iter(self)) 

53 

54 def scan(self, current_time): 

55 """Scan and delete LRU entries based on a defined timeout""" 

56 if (current_time - self.last_scan_time) > 10: 

57 remaining = True # We suppose that there is something to expire 

58 scanned = 0 

59 while ( 

60 remaining and scanned <= 1000 

61 ): # Each 10 ms we scan with 1000 entries budget 

62 try: 

63 lru_key = self.get_lru_key() # will return the LRU conn key. 

64 lru_last_update_time = self[lru_key] 

65 if current_time - lru_last_update_time >= self.timeout: 

66 del self[lru_key] 

67 self.channel.put( 

68 NFSocket(NFEvent.SOCKET_REMOVE, lru_key, None, None) 

69 ) # Send to streamer 

70 scanned += 1 

71 else: 

72 remaining = False # LRU flow is not yet idle. 

73 except StopIteration: # Empty cache 

74 remaining = False 

75 self.last_scan_time = current_time 

76 

77 

78def simplify_protocol(protocol): 

79 """Transform protocol IDs to 3 unique values: 6 for TCP, 17 for UDP and 0 for others""" 

80 if protocol == 6: 

81 return protocol 

82 if protocol == 17: 

83 return protocol 

84 return 0 

85 

86 

87def get_conn_key_from_flow(f): 

88 """Compute a conn key from NFlow object attributes""" 

89 return get_flow_key( 

90 f.src_ip, f.src_port, f.dst_ip, f.dst_port, simplify_protocol(f.protocol), 0, 0 

91 ) 

92 

93 

94def match_flow_conn(conn_cache, flow): 

95 """Match a flow with a connection entry based on a shared key""" 

96 if len(conn_cache) > 0: 

97 flow_key = get_conn_key_from_flow(flow) 

98 try: 

99 flow_map_socket = conn_cache[flow_key] 

100 flow.system_process_name = flow_map_socket[0] 

101 flow.system_process_pid = flow_map_socket[1] 

102 except KeyError: 

103 pass 

104 return flow 

105 

106 

107def get_conn_key(c): 

108 """Create a 5-tuple connection key tuple""" 

109 if c.raddr != () and c.pid is not None: 

110 if c.type == SocketKind.SOCK_STREAM: # TCP protocol 

111 return get_flow_key( 

112 c.laddr.ip, c.laddr.port, c.raddr.ip, c.raddr.port, 6, 0, 0 

113 ) 

114 if c.type == SocketKind.SOCK_DGRAM: # UDP protocol 

115 return get_flow_key( 

116 c.laddr.ip, c.laddr.port, c.raddr.ip, c.raddr.port, 17, 0, 0 

117 ) 

118 return get_flow_key(c.laddr.ip, c.laddr.port, c.raddr.ip, c.raddr.port, 0, 0, 0) 

119 return None 

120 

121 

122def system_socket_worflow(channel, idle_timeout, poll_period): 

123 """Host ground-truth generation workflow""" 

124 conn_cache = ConnCache(channel=channel, timeout=idle_timeout) 

125 try: 

126 while True: 

127 current_time = time.time() * 1000 

128 for conn in net_connections(kind="inet"): 

129 # IMPORTANT: The rationale behind the usage of an active polling approach (net_connections call): 

130 # System process visibility is intended to generate the most accurate ground truth for traffic 

131 # classification end-host-based research experiments, as reported in the literature [1]. 

132 # Thus, it must be a cross-platform approach that works the same on Linux, macOS, and 

133 # Windows (Gaming traffic classification challenges). On Linux, things can be done more elegantly 

134 # using eBPF tracing exec calls or NetLink monitoring. However, this will requires specific 

135 # implementation for Linux versus Windows and proper handling of old kernel versions. 

136 # We prefer to keep it out of the nfstream codebase. As we use net_connections from psutil 

137 # (https://github.com/giampaolo/psutil) (https://github.com/giampaolo/psutil) Python package, 

138 # future work may include providing such enhancements to psutil project, and thus, a broader 

139 # community can benefit from it. 

140 # [1]: http://tomasz.bujlow.com/publications/2012_journal_TELFOR.pdf 

141 key = get_conn_key(conn) 

142 if key is not None: # We succeeded to obtain a key. 

143 if key not in conn_cache: # Create and send 

144 process_name = Process(conn.pid).name() 

145 conn_cache[key] = current_time 

146 channel.put( 

147 NFSocket(NFEvent.SOCKET_CREATE, key, conn.pid, process_name) 

148 ) # Send to streamer 

149 else: # update time 

150 conn_cache[key] = current_time 

151 conn_cache.scan(current_time) 

152 

153 time.sleep(poll_period) # Sleep with configured poll period 

154 # 0 will ensure the maximum active polling capacity and accuracy. 

155 # Greater values will results is less intensive CPU load and less accuracy. 

156 # A tradeoff must be decided (as always). 

157 # Completeness versus Polling frequency study (per protocol, per platform) 

158 # are provided in [1] 

159 # [2]: https://dl.acm.org/doi/abs/10.1145/1629607.1629610 

160 except KeyboardInterrupt: 

161 return