Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/nfstream/meter.py: 13%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2------------------------------------------------------------------------------------------------------------------------
3meter.py
4Copyright (C) 2019-22 - NFStream Developers
5This file is part of NFStream, a Flexible Network Data Analysis Framework (https://www.nfstream.org/).
6NFStream is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public
7License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
8version.
9NFStream is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
10of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
11You should have received a copy of the GNU Lesser General Public License along with NFStream.
12If not, see <http://www.gnu.org/licenses/>.
13------------------------------------------------------------------------------------------------------------------------
14"""
16from .engine import create_engine, setup_capture, setup_dissector, activate_capture
17from .utils import set_affinity, InternalError, InternalState, NFEvent, NFMode
18from collections import OrderedDict
19from .flow import NFlow
22ENGINE_LOAD_ERR = "Error when loading engine library. This means that you are probably building nfstream from source \
23and something went wrong during the engine compilation step. Please see: \
24https://www.nfstream.org/docs/#building-nfstream-from-sourcesfor more information"
26NPCAP_LOAD_ERR = "Error finding npcap library. Please make sure you npcap is installed on your system."
28NDPI_LOAD_ERR = "Error while loading Dissector. This means that you are building nfstream with an out of sync nDPI."
30FLOW_KEY = "{}:{}:{}:{}:{}:{}:{}:{}:{}"
33class NFCache(OrderedDict):
34 """LRU Flow Cache
35 The NFCache object is used to cache flows entries such as MRU entries are kept on the end and LRU entries
36 will be at the start. Note that we use OrderedDict which leverages classical python dict combined with a doubly
37 linked list with sentinel nodes to track order.
38 By doing so, we can access in an efficient way idle connections entries that need to expired based on a timeout.
39 """
41 def __init__(self, *args, **kwds):
42 super().__init__(*args, **kwds)
44 def __getitem__(self, key):
45 return super().__getitem__(key)
47 def __setitem__(self, key, value):
48 super().__setitem__(key, value)
49 self.move_to_end(key) # now this item is the most recently updated
51 def __eq__(self, other):
52 return super().__eq__(other)
54 def get_lru_key(self):
55 return next(iter(self))
58def meter_scan(
59 meter_tick,
60 cache,
61 idle_timeout,
62 channel,
63 udps,
64 sync,
65 n_dissections,
66 statistics,
67 splt,
68 ffi,
69 lib,
70 dissector,
71):
72 """Checks flow cache for expired flow.
74 Expired flows are identified, added to channel and then removed from the cache.
75 """
76 remaining = True # We suppose that there is something to expire
77 scanned = 0
78 while (
79 remaining and scanned < 1000
80 ): # idle scan budget (each 10ms we scan 1000 as maximum)
81 try:
82 flow_key = cache.get_lru_key() # will return the LRU flow key.
83 flow = cache[flow_key]
84 if flow.is_idle(meter_tick, idle_timeout): # idle, expire it.
85 channel.put(
86 flow.expire(
87 udps, sync, n_dissections, statistics, splt, ffi, lib, dissector
88 )
89 )
90 del cache[flow_key]
91 del flow
92 scanned += 1
93 else:
94 remaining = False # LRU flow is not yet idle.
95 except StopIteration: # Empty cache
96 remaining = False
97 return scanned
100def get_flow_key(src_ip, src_port, dst_ip, dst_port, protocol, vlan_id, tunnel_id):
101 """Create a consistent direction agnostic flow key"""
102 if src_ip[1] < dst_ip[1] or ((src_ip[1] == dst_ip[1]) and (src_ip[0] < dst_ip[0])):
103 key = (
104 src_ip[0],
105 src_ip[1],
106 src_port,
107 dst_ip[0],
108 dst_ip[1],
109 dst_port,
110 protocol,
111 vlan_id,
112 tunnel_id,
113 )
114 else:
115 if src_ip[0] == dst_ip[0] and src_ip[1] == dst_ip[1]:
116 if src_port <= dst_port:
117 key = (
118 src_ip[0],
119 src_ip[1],
120 src_port,
121 dst_ip[0],
122 dst_ip[1],
123 dst_port,
124 protocol,
125 vlan_id,
126 tunnel_id,
127 )
128 else:
129 key = (
130 dst_ip[0],
131 dst_ip[1],
132 dst_port,
133 src_ip[0],
134 src_ip[1],
135 src_port,
136 protocol,
137 vlan_id,
138 tunnel_id,
139 )
140 else:
141 key = (
142 dst_ip[0],
143 dst_ip[1],
144 dst_port,
145 src_ip[0],
146 src_ip[1],
147 src_port,
148 protocol,
149 vlan_id,
150 tunnel_id,
151 )
152 return key
155def get_flow_key_from_pkt(packet):
156 """Create flow key from packet information (7-tuple)
158 A flow key uniquely determines a flow using source ip,
159 destination ip, source port, destination port, TCP/UDP protocol, VLAN ID
160 and tunnel ID of the packets.
161 """
162 return get_flow_key(
163 packet.src_ip,
164 packet.src_port,
165 packet.dst_ip,
166 packet.dst_port,
167 packet.protocol,
168 packet.vlan_id,
169 packet.tunnel_id,
170 )
173def consume(
174 packet,
175 cache,
176 active_timeout,
177 idle_timeout,
178 channel,
179 ffi,
180 lib,
181 udps,
182 sync,
183 accounting_mode,
184 n_dissections,
185 statistics,
186 splt,
187 dissector,
188 decode_tunnels,
189 system_visibility_mode,
190):
191 """consume a packet and produce flow"""
192 # We maintain state for active flows computation 1 for creation, 0 for update/cut, -1 for custom expire
193 flow_key = get_flow_key_from_pkt(packet)
194 try: # update flow
195 flow = cache[flow_key].update(
196 packet,
197 idle_timeout,
198 active_timeout,
199 ffi,
200 lib,
201 udps,
202 sync,
203 accounting_mode,
204 n_dissections,
205 statistics,
206 splt,
207 dissector,
208 )
209 if flow is not None:
210 if flow.expiration_id < 0: # custom expiration
211 channel.put(flow)
212 del cache[flow_key]
213 del flow
214 state = -1
215 else: # active/inactive expiration
216 channel.put(flow)
217 del cache[flow_key]
218 del flow
219 try:
220 cache[flow_key] = NFlow(
221 packet,
222 ffi,
223 lib,
224 udps,
225 sync,
226 accounting_mode,
227 n_dissections,
228 statistics,
229 splt,
230 dissector,
231 decode_tunnels,
232 system_visibility_mode,
233 )
234 if (
235 cache[flow_key].expiration_id == -1
236 ): # A user Plugin forced expiration on the first packet
237 channel.put(
238 cache[flow_key].expire(
239 udps,
240 sync,
241 n_dissections,
242 statistics,
243 splt,
244 ffi,
245 lib,
246 dissector,
247 )
248 )
249 del cache[flow_key]
250 state = 0
251 except OSError:
252 print(
253 "WARNING: Failed to allocate memory space for flow creation. Flow creation aborted."
254 )
255 state = 0
256 else:
257 state = 0
258 except KeyError: # create flow
259 try:
260 if sync:
261 flow = NFlow(
262 packet,
263 ffi,
264 lib,
265 udps,
266 sync,
267 accounting_mode,
268 n_dissections,
269 statistics,
270 splt,
271 dissector,
272 decode_tunnels,
273 system_visibility_mode,
274 )
275 if (
276 flow.expiration_id == -1
277 ): # A user Plugin forced expiration on the first packet
278 channel.put(
279 flow.expire(
280 udps,
281 sync,
282 n_dissections,
283 statistics,
284 splt,
285 ffi,
286 lib,
287 dissector,
288 )
289 )
290 del flow
291 state = 0
292 else:
293 cache[flow_key] = flow
294 state = 1
295 else:
296 cache[flow_key] = NFlow(
297 packet,
298 ffi,
299 lib,
300 udps,
301 sync,
302 accounting_mode,
303 n_dissections,
304 statistics,
305 splt,
306 dissector,
307 decode_tunnels,
308 system_visibility_mode,
309 )
310 state = 1
311 except OSError:
312 print(
313 "WARNING: Failed to allocate memory space for flow creation. Flow creation aborted."
314 )
315 state = 0
316 return state
319def meter_cleanup(
320 cache, channel, udps, sync, n_dissections, statistics, splt, ffi, lib, dissector
321):
322 """cleanup all entries in NFCache"""
323 for flow_key in list(cache.keys()):
324 flow = cache[flow_key]
325 # Push it on channel.
326 channel.put(
327 flow.expire(
328 udps, sync, n_dissections, statistics, splt, ffi, lib, dissector
329 )
330 )
331 del cache[flow_key]
332 del flow
335def capture_track(lib, capture, mode, interface_stats, tracker, processed, ignored):
336 """Update shared performance values"""
337 lib.capture_stats(capture, interface_stats, mode)
338 tracker[0].value = interface_stats.dropped
339 tracker[1].value = processed
340 tracker[2].value = ignored
343def send_error(root_idx, channel, msg):
344 if root_idx == 0:
345 channel.put(InternalError(NFEvent.ERROR, msg))
348def meter_workflow(
349 source,
350 snaplen,
351 decode_tunnels,
352 bpf_filter,
353 promisc,
354 n_roots,
355 root_idx,
356 mode,
357 idle_timeout,
358 active_timeout,
359 accounting_mode,
360 udps,
361 n_dissections,
362 statistics,
363 splt,
364 channel,
365 tracker,
366 lock,
367 group_id,
368 system_visibility_mode,
369 socket_buffer_size,
370):
371 """Metering workflow"""
372 set_affinity(root_idx + 1)
373 ffi, lib = create_engine()
374 if lib is None:
375 send_error(root_idx, channel, ENGINE_LOAD_ERR)
376 return
377 meter_tick, meter_scan_tick, meter_track_tick = (
378 0,
379 0,
380 0,
381 ) # meter, idle scan and perf track timelines
382 meter_scan_interval, meter_track_interval = (
383 10,
384 1000,
385 ) # we scan each 10 msecs and update perf each sec.
386 cache = NFCache()
387 dissector = setup_dissector(ffi, lib, n_dissections)
388 if dissector == ffi.NULL and n_dissections:
389 send_error(root_idx, channel, NDPI_LOAD_ERR)
390 return
391 active_flows, ignored_packets, processed_packets = 0, 0, 0
392 sync = False
393 if len(udps) > 0: # streamer started with udps: sync internal structures on update.
394 sync = True
395 interface_stats = ffi.new("struct nf_stat *")
396 # We ensure that processes start at the same time
397 if root_idx == n_roots - 1:
398 lock.release()
399 channel.put(InternalState(NFEvent.ALL_AFFINITY_SET))
400 else:
401 lock.acquire()
402 lock.release()
404 if mode == NFMode.MULTIPLE_FILES:
405 sources = source
406 else:
407 sources = [source]
409 for source_idx, source in enumerate(sources):
410 error_child = ffi.new("char[256]")
411 capture = setup_capture(
412 ffi,
413 lib,
414 source,
415 snaplen,
416 promisc,
417 mode,
418 error_child,
419 group_id,
420 socket_buffer_size,
421 )
422 if capture is None:
423 send_error(
424 root_idx,
425 channel,
426 ffi.string(error_child).decode("utf-8", errors="ignore"),
427 )
428 return
429 # Here the last operation, BPF filtering setup and activation.
430 if not activate_capture(capture, lib, error_child, bpf_filter, mode):
431 send_error(
432 root_idx,
433 channel,
434 ffi.string(error_child).decode("utf-8", errors="ignore"),
435 )
436 return
438 remaining_packets = True
439 while remaining_packets:
440 nf_packet = ffi.new("struct nf_packet *")
441 ret = lib.capture_next(
442 capture, nf_packet, decode_tunnels, n_roots, root_idx, int(mode)
443 )
444 if ret > 0: # Valid must be processed by meter
445 packet_time = nf_packet.time
446 if packet_time > meter_tick:
447 meter_tick = packet_time
448 else:
449 nf_packet.time = meter_tick # Force time order
450 if ret == 1: # Must be processed
451 processed_packets += 1
452 go_scan = False
453 if meter_tick - meter_scan_tick >= meter_scan_interval:
454 go_scan = True # Activate scan
455 meter_scan_tick = meter_tick
456 # Consume packet and return diff
457 diff = consume(
458 nf_packet,
459 cache,
460 active_timeout,
461 idle_timeout,
462 channel,
463 ffi,
464 lib,
465 udps,
466 sync,
467 accounting_mode,
468 n_dissections,
469 statistics,
470 splt,
471 dissector,
472 decode_tunnels,
473 system_visibility_mode,
474 )
475 active_flows += diff
476 if go_scan:
477 idles = meter_scan(
478 meter_tick,
479 cache,
480 idle_timeout,
481 channel,
482 udps,
483 sync,
484 n_dissections,
485 statistics,
486 splt,
487 ffi,
488 lib,
489 dissector,
490 )
491 active_flows -= idles
492 else: # time ticker
493 if meter_tick - meter_scan_tick >= meter_scan_interval:
494 idles = meter_scan(
495 meter_tick,
496 cache,
497 idle_timeout,
498 channel,
499 udps,
500 sync,
501 n_dissections,
502 statistics,
503 splt,
504 ffi,
505 lib,
506 dissector,
507 )
508 active_flows -= idles
509 meter_scan_tick = meter_tick
510 elif ret == 0: # Ignored packet
511 ignored_packets += 1
512 elif ret == -1: # Read error or empty buffer
513 pass
514 else: # End of file
515 remaining_packets = False # end of loop
516 if (
517 meter_tick - meter_track_tick >= meter_track_interval
518 ): # Performance tracking
519 capture_track(
520 lib,
521 capture,
522 mode,
523 interface_stats,
524 tracker,
525 processed_packets,
526 ignored_packets,
527 )
528 meter_track_tick = meter_tick
529 # Close capture
530 lib.capture_close(capture)
532 # Expire all remaining flows in the cache.
533 meter_cleanup(
534 cache, channel, udps, sync, n_dissections, statistics, splt, ffi, lib, dissector
535 )
536 # Clean dissector
537 lib.dissector_cleanup(dissector)
538 # Release engine library
539 channel.put(None)