Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/nfstream/utils.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

95 statements  

1""" 

2------------------------------------------------------------------------------------------------------------------------ 

3utils.py 

4Copyright (C) 2019-22 - NFStream Developers 

5This file is part of NFStream, a Flexible Network Data Analysis Framework (https://www.nfstream.org/). 

6NFStream is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public 

7License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later 

8version. 

9NFStream is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty 

10of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. 

11You should have received a copy of the GNU Lesser General Public License along with NFStream. 

12If not, see <http://www.gnu.org/licenses/>. 

13------------------------------------------------------------------------------------------------------------------------ 

14""" 

15 

16import json 

17import platform 

18import psutil 

19from threading import Timer 

20from collections import namedtuple 

21from enum import Enum, IntEnum 

22 

23 

24class NFEvent(Enum): 

25 FLOW = -1 

26 ERROR = -2 

27 SOCKET_CREATE = -3 

28 SOCKET_REMOVE = -4 

29 ALL_AFFINITY_SET = -5 

30 

31 

32class NFMode(IntEnum): 

33 SINGLE_FILE = 0 

34 INTERFACE = 1 

35 MULTIPLE_FILES = 2 

36 

37 

38InternalError = namedtuple("InternalError", ["id", "message"]) 

39 

40InternalState = namedtuple("InternalState", ["id"]) 

41 

42 

43def validate_flows_per_file(n): 

44 """Simple parameter validator""" 

45 if not isinstance(n, int) or isinstance(n, int) and n < 0: 

46 raise ValueError("Please specify a valid flows_per_file parameter (>= 0).") 

47 

48 

49def validate_rotate_files(n): 

50 """Simple parameter validator""" 

51 if not isinstance(n, int) or isinstance(n, int) and n < 0: 

52 raise ValueError("Please specify a valid rotate_files parameter (>= 0).") 

53 

54 

55def create_csv_file_path(path, source): 

56 """File path creator""" 

57 if path is None: 

58 if type(source) == list: 

59 return str(source[0]) + ".csv" 

60 return str(source) + ".csv" 

61 return path 

62 

63 

64def csv_converter(values): 

65 """Convert non numeric values to string using their __str__ method and ensure proper quoting""" 

66 for idx, value in enumerate(values): 

67 if not isinstance(value, float) and not isinstance(value, int): 

68 if value is None: 

69 values[idx] = "" 

70 else: 

71 values[idx] = str(values[idx]) 

72 values[idx] = values[idx].replace('"', '\\"') 

73 values[idx] = '"' + values[idx] + '"' 

74 

75 

76def open_file(path, chunked, chunk_idx, rotate_files): 

77 """File opener taking chunk mode into consideration""" 

78 if not chunked: 

79 return open(path, "wb") 

80 else: 

81 if rotate_files: 

82 return open( 

83 path.replace("csv", "{}.csv".format(chunk_idx % rotate_files)), "wb" 

84 ) 

85 return open(path.replace("csv", "{}.csv".format(chunk_idx)), "wb") 

86 

87 

88def update_performances(performances, is_linux, flows_count): 

89 """Update performance report and check platform for consistency""" 

90 drops = 0 

91 processed = 0 

92 ignored = 0 

93 load = [] 

94 for meter in performances: 

95 if is_linux: 

96 drops += meter[0].value 

97 ignored += meter[2].value 

98 else: 

99 drops = max(meter[0].value, drops) 

100 ignored = max(meter[2].value, ignored) 

101 processed += meter[1].value 

102 load.append(meter[1].value) 

103 print( 

104 json.dumps( 

105 { 

106 "flows_expired": flows_count.value, 

107 "packets_processed": processed, 

108 "packets_ignored": ignored, 

109 "packets_dropped_filtered_by_kernel": drops, 

110 "meters_packets_processing_balance": load, 

111 } 

112 ) 

113 ) 

114 

115 

116class RepeatedTimer(object): 

117 """Repeated timer thread""" 

118 

119 def __init__(self, interval, function, *args, **kwargs): 

120 self._timer = None 

121 self.interval = interval 

122 self.function = function 

123 self.args = args 

124 self.kwargs = kwargs 

125 self.is_running = False 

126 self.start() 

127 

128 def _run(self): 

129 self.is_running = False 

130 self.start() 

131 self.function(*self.args, **self.kwargs) 

132 

133 def start(self): 

134 if not self.is_running: 

135 self._timer = Timer(self.interval, self._run) 

136 self._timer.start() 

137 self.is_running = True 

138 

139 def stop(self): 

140 self._timer.cancel() 

141 self.is_running = False 

142 

143 

144def chunks_of_list(lst, n): 

145 """create list of chunks of size n from a list""" 

146 for i in range(0, len(lst), n): 

147 yield lst[i : i + n] 

148 

149 

150def set_affinity(idx): 

151 """CPU affinity setter""" 

152 if platform.system() == "Linux": 

153 c_cpus = psutil.Process().cpu_affinity() 

154 temp = list(chunks_of_list(c_cpus, 2)) 

155 x = len(temp) 

156 try: 

157 psutil.Process().cpu_affinity(list(temp[idx % x])) 

158 except OSError as err: 

159 print("WARNING: failed to set CPU affinity ({err})".format(err)) 

160 

161 

162def available_cpus_count(): 

163 if platform.system() == "Linux": 

164 return len(psutil.Process().cpu_affinity()) 

165 return psutil.cpu_count(logical=True)