Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/nfstream/utils.py: 44%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2------------------------------------------------------------------------------------------------------------------------
3utils.py
4Copyright (C) 2019-22 - NFStream Developers
5This file is part of NFStream, a Flexible Network Data Analysis Framework (https://www.nfstream.org/).
6NFStream is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public
7License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
8version.
9NFStream is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
10of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
11You should have received a copy of the GNU Lesser General Public License along with NFStream.
12If not, see <http://www.gnu.org/licenses/>.
13------------------------------------------------------------------------------------------------------------------------
14"""
16import json
17import platform
18import psutil
19from threading import Timer
20from collections import namedtuple
21from enum import Enum, IntEnum
24class NFEvent(Enum):
25 FLOW = -1
26 ERROR = -2
27 SOCKET_CREATE = -3
28 SOCKET_REMOVE = -4
29 ALL_AFFINITY_SET = -5
32class NFMode(IntEnum):
33 SINGLE_FILE = 0
34 INTERFACE = 1
35 MULTIPLE_FILES = 2
38InternalError = namedtuple("InternalError", ["id", "message"])
40InternalState = namedtuple("InternalState", ["id"])
43def validate_flows_per_file(n):
44 """Simple parameter validator"""
45 if not isinstance(n, int) or isinstance(n, int) and n < 0:
46 raise ValueError("Please specify a valid flows_per_file parameter (>= 0).")
49def validate_rotate_files(n):
50 """Simple parameter validator"""
51 if not isinstance(n, int) or isinstance(n, int) and n < 0:
52 raise ValueError("Please specify a valid rotate_files parameter (>= 0).")
55def create_csv_file_path(path, source):
56 """File path creator"""
57 if path is None:
58 if type(source) == list:
59 return str(source[0]) + ".csv"
60 return str(source) + ".csv"
61 return path
64def csv_converter(values):
65 """Convert non numeric values to string using their __str__ method and ensure proper quoting"""
66 for idx, value in enumerate(values):
67 if not isinstance(value, float) and not isinstance(value, int):
68 if value is None:
69 values[idx] = ""
70 else:
71 values[idx] = str(values[idx])
72 values[idx] = values[idx].replace('"', '\\"')
73 values[idx] = '"' + values[idx] + '"'
76def open_file(path, chunked, chunk_idx, rotate_files):
77 """File opener taking chunk mode into consideration"""
78 if not chunked:
79 return open(path, "wb")
80 else:
81 if rotate_files:
82 return open(
83 path.replace("csv", "{}.csv".format(chunk_idx % rotate_files)), "wb"
84 )
85 return open(path.replace("csv", "{}.csv".format(chunk_idx)), "wb")
88def update_performances(performances, is_linux, flows_count):
89 """Update performance report and check platform for consistency"""
90 drops = 0
91 processed = 0
92 ignored = 0
93 load = []
94 for meter in performances:
95 if is_linux:
96 drops += meter[0].value
97 ignored += meter[2].value
98 else:
99 drops = max(meter[0].value, drops)
100 ignored = max(meter[2].value, ignored)
101 processed += meter[1].value
102 load.append(meter[1].value)
103 print(
104 json.dumps(
105 {
106 "flows_expired": flows_count.value,
107 "packets_processed": processed,
108 "packets_ignored": ignored,
109 "packets_dropped_filtered_by_kernel": drops,
110 "meters_packets_processing_balance": load,
111 }
112 )
113 )
116class RepeatedTimer(object):
117 """Repeated timer thread"""
119 def __init__(self, interval, function, *args, **kwargs):
120 self._timer = None
121 self.interval = interval
122 self.function = function
123 self.args = args
124 self.kwargs = kwargs
125 self.is_running = False
126 self.start()
128 def _run(self):
129 self.is_running = False
130 self.start()
131 self.function(*self.args, **self.kwargs)
133 def start(self):
134 if not self.is_running:
135 self._timer = Timer(self.interval, self._run)
136 self._timer.start()
137 self.is_running = True
139 def stop(self):
140 self._timer.cancel()
141 self.is_running = False
144def chunks_of_list(lst, n):
145 """create list of chunks of size n from a list"""
146 for i in range(0, len(lst), n):
147 yield lst[i : i + n]
150def set_affinity(idx):
151 """CPU affinity setter"""
152 if platform.system() == "Linux":
153 c_cpus = psutil.Process().cpu_affinity()
154 temp = list(chunks_of_list(c_cpus, 2))
155 x = len(temp)
156 try:
157 psutil.Process().cpu_affinity(list(temp[idx % x]))
158 except OSError as err:
159 print("WARNING: failed to set CPU affinity ({err})".format(err))
162def available_cpus_count():
163 if platform.system() == "Linux":
164 return len(psutil.Process().cpu_affinity())
165 return psutil.cpu_count(logical=True)