1"""
2------------------------------------------------------------------------------------------------------------------------
3streamer.py
4Copyright (C) 2019-22 - NFStream Developers
5This file is part of NFStream, a Flexible Network Data Analysis Framework (https://www.nfstream.org/).
6NFStream is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public
7License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
8version.
9NFStream is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
10of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
11You should have received a copy of the GNU Lesser General Public License along with NFStream.
12If not, see <http://www.gnu.org/licenses/>.
13------------------------------------------------------------------------------------------------------------------------
14"""
15
16from multiprocessing import get_context
17import threading
18import pandas as pd
19import time as tm
20import os
21import platform
22import psutil
23from collections.abc import Iterable
24from os.path import isfile
25from .meter import meter_workflow
26from .anonymizer import NFAnonymizer
27from .engine import is_interface
28from .plugin import NFPlugin
29from .utils import (
30 csv_converter,
31 open_file,
32 RepeatedTimer,
33 update_performances,
34 set_affinity,
35 available_cpus_count,
36)
37from .utils import (
38 validate_flows_per_file,
39 NFMode,
40 create_csv_file_path,
41 NFEvent,
42 validate_rotate_files,
43)
44from .system import system_socket_worflow, match_flow_conn
45
46
47class NFStreamer(object):
48 streamer_id = 0 # class id generator
49 glock = threading.Lock()
50 is_windows = "windows" in platform.system().lower()
51
52 """ Network Flow Streamer
53
54 Examples:
55
56 >>> from nfstream import NFStreamer
57 >>> # Streamer object for reading traffic from a PCAP
58 >>> streamer = NFStreamer(source='path/to/file.pcap')
59 >>> # Converting data to pandas dataframe
60 >>> df = streamer.to_pandas()
61
62 """
63
64 def __init__(
65 self,
66 source=None,
67 decode_tunnels=True,
68 bpf_filter=None,
69 promiscuous_mode=True,
70 snapshot_length=1536,
71 socket_buffer_size=0,
72 idle_timeout=120, # https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt
73 active_timeout=1800,
74 accounting_mode=0,
75 udps=None,
76 n_dissections=20,
77 statistical_analysis=False,
78 splt_analysis=0,
79 n_meters=0,
80 max_nflows=0,
81 performance_report=0,
82 system_visibility_mode=0,
83 system_visibility_poll_ms=100,
84 ):
85 with NFStreamer.glock:
86 NFStreamer.streamer_id += 1
87 self._idx = NFStreamer.streamer_id
88 self._mode = NFMode.SINGLE_FILE
89 self.source = source
90 self.decode_tunnels = decode_tunnels
91 self.bpf_filter = bpf_filter
92 self.promiscuous_mode = promiscuous_mode
93 self.snapshot_length = snapshot_length
94 self.idle_timeout = idle_timeout
95 self.active_timeout = active_timeout
96 self.accounting_mode = accounting_mode
97 self.udps = udps
98 self.n_dissections = n_dissections
99 self.statistical_analysis = statistical_analysis
100 self.splt_analysis = splt_analysis
101 self.n_meters = n_meters
102 self.max_nflows = max_nflows
103 self.performance_report = performance_report
104 self.system_visibility_mode = system_visibility_mode
105 self.system_visibility_poll_ms = system_visibility_poll_ms
106
107 # NIC socket buffer size. Default is 0, which means that the pcap default value is used.
108 # The default values may vary depending on the OS and CPU architecture.
109 # Range: 0 - 2^31-1
110 self.socket_buffer_size = socket_buffer_size
111
112 if NFStreamer.is_windows:
113 self._mp_context = get_context("spawn")
114 else:
115 self._mp_context = get_context("fork")
116
117 @property
118 def source(self):
119 return self._source
120
121 @source.setter
122 def source(self, value):
123 if type(value) == list: # List of pcap files to consider as a single one.
124 if len(value) == 0:
125 raise ValueError("Please provide a non-empty list of sources.")
126 else:
127 for i in range(len(value)):
128 try:
129 value[i] = str(os.fspath(value[i]))
130 if not isfile(value[i]):
131 raise TypeError
132 except TypeError:
133 raise ValueError(
134 "Invalid pcap file path at index: " + str(i) + "."
135 )
136 self._mode = NFMode.MULTIPLE_FILES
137 else:
138 try:
139 value = str(os.fspath(value))
140 except TypeError:
141 raise ValueError(
142 "Please specify a pcap file path or a valid network interface name as source."
143 )
144 if isfile(value):
145 self._mode = NFMode.SINGLE_FILE
146 else:
147 interface = is_interface(value)
148 if interface is not None:
149 self._mode = NFMode.INTERFACE
150 value = interface
151 else:
152 raise ValueError(
153 "Please specify a pcap file path or a valid network interface name as source."
154 )
155 self._source = value
156
157 @property
158 def decode_tunnels(self):
159 return self._decode_tunnels
160
161 @decode_tunnels.setter
162 def decode_tunnels(self, value):
163 if not isinstance(value, bool):
164 raise ValueError(
165 "Please specify a valid decode_tunnels parameter (possible values: True, False)."
166 )
167 self._decode_tunnels = value
168
169 @property
170 def bpf_filter(self):
171 return self._bpf_filter
172
173 @bpf_filter.setter
174 def bpf_filter(self, value):
175 if not isinstance(value, str) and value is not None:
176 raise ValueError("Please specify a valid bpf_filter format.")
177 self._bpf_filter = value
178
179 @property
180 def promiscuous_mode(self):
181 return self._promiscuous_mode
182
183 @promiscuous_mode.setter
184 def promiscuous_mode(self, value):
185 if not isinstance(value, bool):
186 raise ValueError(
187 "Please specify a valid promiscuous_mode parameter (possible values: True, False)."
188 )
189 self._promiscuous_mode = value
190
191 @property
192 def snapshot_length(self):
193 return self._snapshot_length
194
195 @snapshot_length.setter
196 def snapshot_length(self, value):
197 if not isinstance(value, int) or value <= 0:
198 raise ValueError(
199 "Please specify a valid snapshot_length parameter (positive integer)."
200 )
201 self._snapshot_length = value
202
203 @property
204 def socket_buffer_size(self):
205 return self._socket_buffer_size
206
207 @socket_buffer_size.setter
208 def socket_buffer_size(self, value):
209 if not isinstance(value, int) or (value < 0 or value > 2**31 - 1):
210 raise ValueError(
211 "Please specify a valid socket_buffer_size parameter (positive integer <= 2^31-1)."
212 )
213 self._socket_buffer_size = value
214
215 @property
216 def idle_timeout(self):
217 return self._idle_timeout
218
219 @idle_timeout.setter
220 def idle_timeout(self, value):
221 if not isinstance(value, int) or (
222 (value < 0) or (value * 1000) > 18446744073709551615
223 ): # max uint64_t
224 raise ValueError(
225 "Please specify a valid idle_timeout parameter (positive integer in seconds)."
226 )
227 self._idle_timeout = value
228
229 @property
230 def active_timeout(self):
231 return self._active_timeout
232
233 @active_timeout.setter
234 def active_timeout(self, value):
235 if not isinstance(value, int) or (
236 (value < 0) or (value * 1000) > 18446744073709551615
237 ): # max uint64_t
238 raise ValueError(
239 "Please specify a valid active_timeout parameter (positive integer in seconds)."
240 )
241 self._active_timeout = value
242
243 @property
244 def accounting_mode(self):
245 return self._accounting_mode
246
247 @accounting_mode.setter
248 def accounting_mode(self, value):
249 if not isinstance(value, int) or (value not in [0, 1, 2, 3]):
250 raise ValueError(
251 "Please specify a valid accounting_mode parameter (possible values: 0, 1, 2, 3)."
252 )
253 self._accounting_mode = value
254
255 @property
256 def udps(self):
257 return self._udps
258
259 @udps.setter
260 def udps(self, value):
261 multiple = isinstance(value, Iterable)
262 if multiple:
263 for plugin in value:
264 if isinstance(plugin, NFPlugin):
265 pass
266 else:
267 raise ValueError(
268 "User defined plugins must inherit from NFPlugin type."
269 )
270 self._udps = value
271 else:
272 if isinstance(value, NFPlugin):
273 self._udps = (value,)
274 else:
275 if value is None:
276 self._udps = ()
277 else:
278 raise ValueError(
279 "User defined plugins must inherit from NFPlugin type."
280 )
281
282 @property
283 def n_dissections(self):
284 return self._n_dissections
285
286 @n_dissections.setter
287 def n_dissections(self, value):
288 if not isinstance(value, int) or (value < 0 or value > 255):
289 raise ValueError(
290 "Please specify a valid n_dissections parameter (possible values in : [0,...,255])."
291 )
292 self._n_dissections = value
293
294 @property
295 def statistical_analysis(self):
296 return self._statistical_analysis
297
298 @statistical_analysis.setter
299 def statistical_analysis(self, value):
300 if not isinstance(value, bool):
301 raise ValueError(
302 "Please specify a valid statistical_analysis parameter (possible values: True, False)."
303 )
304 self._statistical_analysis = value
305
306 @property
307 def splt_analysis(self):
308 return self._splt_analysis
309
310 @splt_analysis.setter
311 def splt_analysis(self, value):
312 if not isinstance(value, int) or (value < 0 or value > 65535):
313 raise ValueError(
314 "Please specify a valid splt_analysis parameter (possible values in : [0,...,65535])"
315 )
316 if value > 255:
317 print(
318 "[WARNING]: The specified splt_analysis parameter is higher than 255. High values can impact the performance of the tool."
319 )
320 self._splt_analysis = value
321
322 @property
323 def n_meters(self):
324 return self._n_meters
325
326 @n_meters.setter
327 def n_meters(self, value):
328 if isinstance(value, int) and value >= 0:
329 pass
330 else:
331 raise ValueError(
332 "Please specify a valid n_meters parameter (>=1 or 0 for auto scaling)."
333 )
334 c_cpus, c_cores = available_cpus_count(), psutil.cpu_count(logical=False)
335 if (
336 c_cores is None
337 ): # Patch for platforms returning None (https://github.com/giampaolo/psutil/issues/1078)
338 c_cores = c_cpus
339 if value == 0:
340 if platform.system() == "Linux" and self._mode == NFMode.INTERFACE:
341 self._n_meters = (
342 c_cpus - 1
343 ) # We are in live capture mode and kernel fanout will be available
344 # only on Linux, we set the n_meters to detected logical CPUs -1
345 else: # Windows, MacOS, offline capture
346 if c_cpus >= c_cores:
347 if (
348 c_cpus == 2 * c_cores or c_cpus == c_cores
349 ): # multi-thread or single threaded
350 self._n_meters = c_cores - 1
351 else:
352 self._n_meters = int(divmod(c_cpus / 2, 1)[0]) - 1
353 else: # weird case, fallback on cpu count.
354 self._n_meters = c_cpus - 1
355 else:
356 if (value + 1) <= c_cpus:
357 self._n_meters = value
358 else: # avoid contention
359 print(
360 "WARNING: n_meters set to :{} in order to avoid contention.".format(
361 c_cpus - 1
362 )
363 )
364 self._n_meters = c_cpus - 1
365 if self._n_meters == 0: # one CPU case
366 self._n_meters = 1
367
368 @property
369 def max_nflows(self):
370 return self._max_nflows
371
372 @max_nflows.setter
373 def max_nflows(self, value):
374 if isinstance(value, int) and value >= 0:
375 self._max_nflows = value - 1
376 else:
377 raise ValueError("Please specify a valid max_nflows parameter (>=0).")
378
379 @property
380 def performance_report(self):
381 return self._performance_report
382
383 @performance_report.setter
384 def performance_report(self, value):
385 if isinstance(value, int) and value >= 0:
386 pass
387 else:
388 raise ValueError(
389 "Please specify a valid performance_report parameter (>=1 for reporting interval (seconds)"
390 " or 0 to disable). [Available only for Live capture]"
391 )
392 self._performance_report = value
393
394 @property
395 def system_visibility_mode(self):
396 return self._system_visibility_mode
397
398 @system_visibility_mode.setter
399 def system_visibility_mode(self, value):
400 if isinstance(value, int) and value in [0, 1]:
401 if self._mode == NFMode.SINGLE_FILE and value > 0:
402 print(
403 "WARNING: system_visibility_mode switched to 0 in offline capture "
404 "(available only for live capture)"
405 )
406 value = 0
407 else:
408 pass
409 else:
410 raise ValueError(
411 "Please specify a valid system_visibility_mode parameter\n"
412 "0: disable\n"
413 "1: process information\n"
414 "[Available only for live capture on the system generating the traffic]"
415 )
416 self._system_visibility_mode = value
417
418 @property
419 def system_visibility_poll_ms(self):
420 return self._system_visibility_poll_ms
421
422 @system_visibility_poll_ms.setter
423 def system_visibility_poll_ms(self, value):
424 if isinstance(value, int) and value >= 0:
425 pass
426 else:
427 raise ValueError(
428 "Please specify a valid system_visibility_poll_ms parameter "
429 "(positive integer in milliseconds)"
430 )
431 self._system_visibility_poll_ms = value
432
433 def __iter__(self):
434 lock = self._mp_context.Lock()
435 lock.acquire()
436 meters = []
437 performances = []
438 n_terminated = 0
439 child_error = None
440 rt = None
441 socket_listener = None
442 browser_listener = None
443 conn_cache = {}
444
445 # To avoid issues on PyPy on Windows (See https://foss.heptapod.net/pypy/pypy/-/issues/3488), All
446 # multiprocessing Value invocation must be performed before the call to Queue.
447 n_meters = self.n_meters
448 idx_generator = self._mp_context.Value("i", 0)
449 for i in range(n_meters):
450 performances.append(
451 [
452 self._mp_context.Value("I", 0),
453 self._mp_context.Value("I", 0),
454 self._mp_context.Value("I", 0),
455 ]
456 )
457 channel = self._mp_context.Queue(maxsize=32767) # Backpressure strategy.
458 # We set it to (2^15-1) to cope with OSX max semaphore value.
459 group_id = os.getpid() + self._idx # Used for fanout on Linux systems
460 try:
461 for i in range(n_meters):
462 meters.append(
463 self._mp_context.Process(
464 target=meter_workflow,
465 args=(
466 self.source,
467 self.snapshot_length,
468 self.decode_tunnels,
469 self.bpf_filter,
470 self.promiscuous_mode,
471 n_meters,
472 i,
473 self._mode,
474 self.idle_timeout * 1000,
475 self.active_timeout * 1000,
476 self.accounting_mode,
477 self.udps,
478 self.n_dissections,
479 self.statistical_analysis,
480 self.splt_analysis,
481 channel,
482 performances[i],
483 lock,
484 group_id,
485 self.system_visibility_mode,
486 self.socket_buffer_size,
487 ),
488 )
489 )
490 meters[i].daemon = True # demonize meter
491 meters[i].start()
492 if self._mode == NFMode.INTERFACE and self.performance_report > 0:
493 if platform.system() == "Linux":
494 rt = RepeatedTimer(
495 self.performance_report,
496 update_performances,
497 performances,
498 True,
499 idx_generator,
500 )
501 else:
502 rt = RepeatedTimer(
503 self.performance_report,
504 update_performances,
505 performances,
506 False,
507 idx_generator,
508 )
509 if self._mode == NFMode.INTERFACE and self.system_visibility_mode:
510 socket_listener = self._mp_context.Process(
511 target=system_socket_worflow,
512 args=(
513 channel,
514 self.idle_timeout * 1000,
515 self.system_visibility_poll_ms / 1000,
516 ),
517 )
518 socket_listener.daemon = True # demonize socket_listener
519 socket_listener.start()
520
521 while True:
522 try:
523 recv = channel.get()
524 if recv is None: # termination and stats
525 n_terminated += 1
526 if n_terminated == n_meters:
527 break # We finish up when all metering jobs are terminated
528 else:
529 if recv.id == NFEvent.ERROR: # Error message
530 for i in range(n_meters): # We break workflow loop
531 meters[i].terminate()
532 child_error = recv.message
533 break
534 elif recv.id == NFEvent.ALL_AFFINITY_SET:
535 set_affinity(
536 0
537 ) # we pin streamer to core 0 as it's the less intensive task and several services runs
538 # by default on this core.
539 elif recv.id == NFEvent.SOCKET_CREATE:
540 conn_cache[recv.key] = [recv.process_name, recv.process_pid]
541 elif recv.id == NFEvent.SOCKET_REMOVE:
542 del conn_cache[recv.key]
543 else: # NFEvent.FLOW
544 recv.id = idx_generator.value # Unify ID
545 idx_generator.value = idx_generator.value + 1
546 if (
547 self._mode == NFMode.INTERFACE
548 and self.system_visibility_mode
549 ):
550 recv = match_flow_conn(conn_cache, recv)
551 yield recv
552 if recv.id == self.max_nflows:
553 raise KeyboardInterrupt # We reached the maximum flows count defined by the user.
554 except KeyboardInterrupt:
555 for i in range(n_meters): # We break workflow loop
556 meters[i].terminate()
557 break
558 for i in range(n_meters):
559 if meters[i].is_alive():
560 meters[i].join() # Join metering jobs
561 if self._mode == NFMode.INTERFACE and self.performance_report > 0:
562 rt.stop()
563 if self._mode == NFMode.INTERFACE and self.system_visibility_mode:
564 socket_listener.terminate()
565 channel.close() # We close the queue
566 channel.join_thread() # and we join its thread
567 if child_error is not None:
568 raise ValueError(child_error)
569 except (
570 ValueError
571 ) as observer_error: # job initiation failed due to some bad observer parameters.
572 raise ValueError(observer_error)
573
574 def to_csv(
575 self, path=None, columns_to_anonymize=(), flows_per_file=0, rotate_files=0
576 ):
577 validate_flows_per_file(flows_per_file)
578 validate_rotate_files(rotate_files)
579 chunked, chunk_idx = True, -1
580 if flows_per_file == 0:
581 chunked = False
582 output_path = create_csv_file_path(path, self.source)
583 total_flows, chunk_flows = 0, 0
584 anon = NFAnonymizer(cols_names=columns_to_anonymize)
585 f = None
586 for flow in self:
587 try:
588 if total_flows == 0 or (
589 chunked and (chunk_flows > flows_per_file)
590 ): # header creation
591 if f is not None:
592 f.close()
593 chunk_flows = 1
594 chunk_idx += 1
595 f = open_file(output_path, chunked, chunk_idx, rotate_files)
596 header = ",".join([str(i) for i in flow.keys()]) + "\n"
597 f.write(header.encode("utf-8"))
598 values = anon.process(flow)
599 csv_converter(values)
600 to_export = ",".join([str(i) for i in values]) + "\n"
601 f.write(to_export.encode("utf-8"))
602 total_flows = total_flows + 1
603 chunk_flows += 1
604 except KeyboardInterrupt:
605 pass
606 if f is not None:
607 if not f.closed:
608 f.close()
609 return total_flows
610
611 def to_pandas(self, columns_to_anonymize=()):
612 """streamer to pandas function"""
613 temp_file_path = "nfstream-{pid}-{iid}-{ts}.csv".format(
614 pid=os.getpid(), iid=NFStreamer.streamer_id, ts=tm.time()
615 )
616 total_flows = self.to_csv(
617 path=temp_file_path,
618 columns_to_anonymize=columns_to_anonymize,
619 flows_per_file=0,
620 )
621 if total_flows > 0: # If there is flows, return Dataframe else return None.
622 df = pd.read_csv(
623 temp_file_path, engine="c"
624 ) # Use C engine for superior performance (non-experimental)
625 if total_flows != df.shape[0]:
626 print(
627 "WARNING: {} flows ignored by pandas type conversion. Consider using to_csv() "
628 "method if drops are critical.".format(
629 abs(df.shape[0] - total_flows)
630 )
631 )
632 else:
633 df = None
634 if os.path.exists(temp_file_path):
635 os.remove(temp_file_path)
636 return df