Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/nfstream/flow.py: 5%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2------------------------------------------------------------------------------------------------------------------------
3flow.py
4Copyright (C) 2019-22 - NFStream Developers
5This file is part of NFStream, a Flexible Network Data Analysis Framework (https://www.nfstream.org/).
6NFStream is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public
7License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later
8version.
9NFStream is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
10of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
11You should have received a copy of the GNU Lesser General Public License along with NFStream.
12If not, see <http://www.gnu.org/licenses/>.
13------------------------------------------------------------------------------------------------------------------------
14"""
16from collections import namedtuple
17from math import sqrt
18from .utils import NFEvent
20# When NFStream is extended with plugins, packer C structure is pythonized using the following namedtuple.
21nf_packet = namedtuple(
22 "NFPacket",
23 [
24 "time",
25 "delta_time",
26 "direction",
27 "raw_size",
28 "ip_size",
29 "transport_size",
30 "payload_size",
31 "src_ip",
32 "src_mac",
33 "src_oui",
34 "dst_ip",
35 "dst_mac",
36 "dst_oui",
37 "src_port",
38 "dst_port",
39 "protocol",
40 "vlan_id",
41 "ip_version",
42 "ip_packet",
43 "syn",
44 "cwr",
45 "ece",
46 "urg",
47 "ack",
48 "psh",
49 "rst",
50 "fin",
51 "tunnel_id",
52 ],
53)
56class UDPS(object):
57 """dummy class that add udps slot the flexibility required for extensions"""
60def pythonize_packet(packet, ffi, flow):
61 """convert a cdata packet to a namedtuple"""
62 src_ip = flow.src_ip
63 dst_ip = flow.dst_ip
64 src_mac = flow.src_mac
65 dst_mac = flow.dst_mac
66 src_oui = flow.src_oui
67 dst_oui = flow.dst_oui
68 if packet.direction:
69 src_ip = flow.dst_ip
70 dst_ip = flow.src_ip
71 src_mac = flow.dst_mac
72 dst_mac = flow.src_mac
73 src_oui = flow.dst_oui
74 dst_oui = flow.src_oui
76 return nf_packet(
77 time=packet.time,
78 delta_time=packet.delta_time,
79 direction=packet.direction,
80 raw_size=packet.raw_size,
81 ip_size=packet.ip_size,
82 transport_size=packet.transport_size,
83 payload_size=packet.payload_size,
84 src_ip=src_ip,
85 src_mac=src_mac,
86 src_oui=src_oui,
87 dst_ip=dst_ip,
88 dst_mac=dst_mac,
89 dst_oui=dst_oui,
90 src_port=packet.src_port,
91 dst_port=packet.dst_port,
92 protocol=packet.protocol,
93 vlan_id=packet.vlan_id,
94 ip_version=packet.ip_version,
95 ip_packet=bytes(ffi.buffer(packet.ip_content, packet.ip_content_len)),
96 syn=packet.syn,
97 cwr=packet.cwr,
98 ece=packet.ece,
99 urg=packet.urg,
100 ack=packet.ack,
101 psh=packet.psh,
102 rst=packet.rst,
103 fin=packet.fin,
104 tunnel_id=packet.tunnel_id,
105 )
108class NFlow(object):
109 """
110 NFlow is NFStream representation of a network flow.
111 It is a slotted class for performances reasons, and slots are initiated according to NFStream detected mode.
112 If nfstream is used with extension, we refer to it as sync mode, and we need to update slots from C structure.
113 If not, nfstream will compute all configured metrics within C structure and update it only at init and expire.
114 Such logic allows us to provide maximum performances when running without extensions. When set with extension
115 we pay the cost of flexibility with attributes access/update.
117 """
119 __slots__ = (
120 "id",
121 "expiration_id",
122 "src_ip",
123 "src_mac",
124 "src_oui",
125 "src_port",
126 "dst_ip",
127 "dst_mac",
128 "dst_oui",
129 "dst_port",
130 "protocol",
131 "ip_version",
132 "vlan_id",
133 "tunnel_id",
134 "bidirectional_first_seen_ms",
135 "bidirectional_last_seen_ms",
136 "bidirectional_duration_ms",
137 "bidirectional_packets",
138 "bidirectional_bytes",
139 "src2dst_first_seen_ms",
140 "src2dst_last_seen_ms",
141 "src2dst_duration_ms",
142 "src2dst_packets",
143 "src2dst_bytes",
144 "dst2src_first_seen_ms",
145 "dst2src_last_seen_ms",
146 "dst2src_duration_ms",
147 "dst2src_packets",
148 "dst2src_bytes",
149 "bidirectional_min_ps",
150 "bidirectional_mean_ps",
151 "bidirectional_stddev_ps",
152 "bidirectional_max_ps",
153 "src2dst_min_ps",
154 "src2dst_mean_ps",
155 "src2dst_stddev_ps",
156 "src2dst_max_ps",
157 "dst2src_min_ps",
158 "dst2src_mean_ps",
159 "dst2src_stddev_ps",
160 "dst2src_max_ps",
161 "bidirectional_min_piat_ms",
162 "bidirectional_mean_piat_ms",
163 "bidirectional_stddev_piat_ms",
164 "bidirectional_max_piat_ms",
165 "src2dst_min_piat_ms",
166 "src2dst_mean_piat_ms",
167 "src2dst_stddev_piat_ms",
168 "src2dst_max_piat_ms",
169 "dst2src_min_piat_ms",
170 "dst2src_mean_piat_ms",
171 "dst2src_stddev_piat_ms",
172 "dst2src_max_piat_ms",
173 "bidirectional_syn_packets",
174 "bidirectional_cwr_packets",
175 "bidirectional_ece_packets",
176 "bidirectional_urg_packets",
177 "bidirectional_ack_packets",
178 "bidirectional_psh_packets",
179 "bidirectional_rst_packets",
180 "bidirectional_fin_packets",
181 "src2dst_syn_packets",
182 "src2dst_cwr_packets",
183 "src2dst_ece_packets",
184 "src2dst_urg_packets",
185 "src2dst_ack_packets",
186 "src2dst_psh_packets",
187 "src2dst_rst_packets",
188 "src2dst_fin_packets",
189 "dst2src_syn_packets",
190 "dst2src_cwr_packets",
191 "dst2src_ece_packets",
192 "dst2src_urg_packets",
193 "dst2src_ack_packets",
194 "dst2src_psh_packets",
195 "dst2src_rst_packets",
196 "dst2src_fin_packets",
197 "splt_direction",
198 "splt_ps",
199 "splt_piat_ms",
200 "application_name",
201 "application_category_name",
202 "application_is_guessed",
203 "application_confidence",
204 "requested_server_name",
205 "client_fingerprint",
206 "server_fingerprint",
207 "user_agent",
208 "content_type",
209 "_C",
210 "udps",
211 "system_process_pid",
212 "system_process_name",
213 "system_browser_tab",
214 )
216 def __init__(
217 self,
218 packet,
219 ffi,
220 lib,
221 udps,
222 sync,
223 accounting_mode,
224 n_dissections,
225 statistics,
226 splt,
227 dissector,
228 decode_tunnels,
229 system_visibility_mode,
230 ):
231 self.id = (
232 NFEvent.FLOW
233 ) # id set to NFLOW for internal communications and handled (incremented) by NFStreamer.
234 self.expiration_id = 0
235 # Initialize C structure.
236 self._C = lib.meter_initialize_flow(
237 packet, accounting_mode, statistics, splt, n_dissections, dissector, sync
238 )
239 if self._C == ffi.NULL: # raise OSError in order to be handled by meter.
240 raise OSError("Not enough memory for new flow creation.")
241 # Here we go for the first copy in order to make defined slots available
242 self.src_ip = ffi.string(self._C.src_ip_str).decode("utf-8", errors="ignore")
243 self.src_mac = ffi.string(self._C.src_mac_str).decode("utf-8", errors="ignore")
244 self.src_oui = ffi.string(self._C.src_oui).decode("utf-8", errors="ignore")
245 self.src_port = self._C.src_port
246 self.dst_ip = ffi.string(self._C.dst_ip_str).decode("utf-8", errors="ignore")
247 self.dst_mac = ffi.string(self._C.dst_mac_str).decode("utf-8", errors="ignore")
248 self.dst_oui = ffi.string(self._C.dst_oui).decode("utf-8", errors="ignore")
249 self.dst_port = self._C.dst_port
250 self.protocol = self._C.protocol
251 self.ip_version = self._C.ip_version
252 self.vlan_id = self._C.vlan_id
253 self.bidirectional_first_seen_ms = self._C.bidirectional_first_seen_ms
254 self.bidirectional_last_seen_ms = self._C.bidirectional_last_seen_ms
255 self.bidirectional_duration_ms = self._C.bidirectional_duration_ms
256 self.bidirectional_packets = self._C.bidirectional_packets
257 self.bidirectional_bytes = self._C.bidirectional_bytes
258 self.src2dst_first_seen_ms = self._C.src2dst_first_seen_ms
259 self.src2dst_last_seen_ms = self._C.src2dst_last_seen_ms
260 self.src2dst_duration_ms = self._C.src2dst_duration_ms
261 self.src2dst_packets = self._C.src2dst_packets
262 self.src2dst_bytes = self._C.src2dst_bytes
263 self.dst2src_first_seen_ms = self._C.dst2src_first_seen_ms
264 self.dst2src_last_seen_ms = self._C.dst2src_last_seen_ms
265 self.dst2src_duration_ms = self._C.dst2src_duration_ms
266 self.dst2src_packets = self._C.dst2src_packets
267 self.dst2src_bytes = self._C.dst2src_bytes
268 if decode_tunnels:
269 self.tunnel_id = self._C.tunnel_id
270 if statistics: # if statistical analysis set, we activate statistical slots.
271 self.bidirectional_min_ps = self._C.bidirectional_min_ps
272 self.bidirectional_mean_ps = self._C.bidirectional_mean_ps
273 self.bidirectional_stddev_ps = self._C.bidirectional_stddev_ps
274 self.bidirectional_max_ps = self._C.bidirectional_max_ps
275 self.src2dst_min_ps = self._C.src2dst_min_ps
276 self.src2dst_mean_ps = self._C.src2dst_mean_ps
277 self.src2dst_stddev_ps = self._C.src2dst_stddev_ps
278 self.src2dst_max_ps = self._C.src2dst_max_ps
279 self.dst2src_min_ps = self._C.dst2src_min_ps
280 self.dst2src_mean_ps = self._C.dst2src_mean_ps
281 self.dst2src_stddev_ps = self._C.dst2src_stddev_ps
282 self.dst2src_max_ps = self._C.dst2src_max_ps
283 self.bidirectional_min_piat_ms = self._C.bidirectional_min_piat_ms
284 self.bidirectional_mean_piat_ms = self._C.bidirectional_mean_piat_ms
285 self.bidirectional_stddev_piat_ms = self._C.bidirectional_stddev_piat_ms
286 self.bidirectional_max_piat_ms = self._C.bidirectional_max_piat_ms
287 self.src2dst_min_piat_ms = self._C.src2dst_min_piat_ms
288 self.src2dst_mean_piat_ms = self._C.src2dst_mean_piat_ms
289 self.src2dst_stddev_piat_ms = self._C.src2dst_stddev_piat_ms
290 self.src2dst_max_piat_ms = self._C.src2dst_max_piat_ms
291 self.dst2src_min_piat_ms = self._C.dst2src_min_piat_ms
292 self.dst2src_mean_piat_ms = self._C.dst2src_mean_piat_ms
293 self.dst2src_stddev_piat_ms = self._C.dst2src_stddev_piat_ms
294 self.dst2src_max_piat_ms = self._C.dst2src_max_piat_ms
295 self.bidirectional_syn_packets = self._C.bidirectional_syn_packets
296 self.bidirectional_cwr_packets = self._C.bidirectional_cwr_packets
297 self.bidirectional_ece_packets = self._C.bidirectional_ece_packets
298 self.bidirectional_urg_packets = self._C.bidirectional_urg_packets
299 self.bidirectional_ack_packets = self._C.bidirectional_ack_packets
300 self.bidirectional_psh_packets = self._C.bidirectional_psh_packets
301 self.bidirectional_rst_packets = self._C.bidirectional_rst_packets
302 self.bidirectional_fin_packets = self._C.bidirectional_fin_packets
303 self.src2dst_syn_packets = self._C.src2dst_syn_packets
304 self.src2dst_cwr_packets = self._C.src2dst_cwr_packets
305 self.src2dst_ece_packets = self._C.src2dst_ece_packets
306 self.src2dst_urg_packets = self._C.src2dst_urg_packets
307 self.src2dst_ack_packets = self._C.src2dst_ack_packets
308 self.src2dst_psh_packets = self._C.src2dst_psh_packets
309 self.src2dst_rst_packets = self._C.src2dst_rst_packets
310 self.src2dst_fin_packets = self._C.src2dst_fin_packets
311 self.dst2src_syn_packets = self._C.dst2src_syn_packets
312 self.dst2src_cwr_packets = self._C.dst2src_cwr_packets
313 self.dst2src_ece_packets = self._C.dst2src_ece_packets
314 self.dst2src_urg_packets = self._C.dst2src_urg_packets
315 self.dst2src_ack_packets = self._C.dst2src_ack_packets
316 self.dst2src_psh_packets = self._C.dst2src_psh_packets
317 self.dst2src_rst_packets = self._C.dst2src_rst_packets
318 self.dst2src_fin_packets = self._C.dst2src_fin_packets
319 if n_dissections: # Same for dissection when > 0
320 if sync:
321 self.application_name = ffi.string(self._C.application_name).decode(
322 "utf-8", errors="ignore"
323 )
324 self.application_category_name = ffi.string(
325 self._C.category_name
326 ).decode("utf-8", errors="ignore")
327 self.application_is_guessed = self._C.guessed
328 self.application_confidence = self._C.confidence
329 self.requested_server_name = ffi.string(
330 self._C.requested_server_name
331 ).decode("utf-8", errors="ignore")
332 self.client_fingerprint = ffi.string(self._C.c_hash).decode(
333 "utf-8", errors="ignore"
334 )
335 self.server_fingerprint = ffi.string(self._C.s_hash).decode(
336 "utf-8", errors="ignore"
337 )
338 self.user_agent = ffi.string(self._C.user_agent).decode(
339 "utf-8", errors="ignore"
340 )
341 self.content_type = ffi.string(self._C.content_type).decode(
342 "utf-8", errors="ignore"
343 )
344 else:
345 self.application_name = None
346 self.application_category_name = None
347 self.application_is_guessed = None
348 self.application_confidence = None
349 self.requested_server_name = None
350 self.client_fingerprint = None
351 self.server_fingerprint = None
352 self.user_agent = None
353 self.content_type = None
354 if splt: # If splt_analysis set (>0), we unpack the arrays structures.
355 self.splt_direction = ffi.unpack(self._C.splt_direction, splt)
356 self.splt_ps = ffi.unpack(self._C.splt_ps, splt)
357 self.splt_piat_ms = ffi.unpack(self._C.splt_piat_ms, splt)
358 if sync: # NFStream running with Plugins
359 self.udps = UDPS()
360 for udp in udps: # on_init entrypoint
361 udp.on_init(pythonize_packet(packet, ffi, self), self)
362 if system_visibility_mode > 0:
363 self.system_process_pid = -1
364 self.system_process_name = ""
365 if system_visibility_mode == 2:
366 self.system_browser_tab = ""
368 def update(
369 self,
370 packet,
371 idle_timeout,
372 active_timeout,
373 ffi,
374 lib,
375 udps,
376 sync,
377 accounting_mode,
378 n_dissections,
379 statistics,
380 splt,
381 dissector,
382 ):
383 """NFlow update method"""
384 # First, we update internal C structure.
385 ret = lib.meter_update_flow(
386 self._C,
387 packet,
388 idle_timeout,
389 active_timeout,
390 accounting_mode,
391 statistics,
392 splt,
393 n_dissections,
394 dissector,
395 sync,
396 )
397 if (
398 ret > 0
399 ): # If update done it will be zero, idle and active are matched to 1 and 2.
400 self.expiration_id = ret - 1
401 return self.expire(
402 udps, sync, n_dissections, statistics, splt, ffi, lib, dissector
403 ) # expire it.
404 if sync: # If running with Plugins
405 self.sync(n_dissections, statistics, splt, ffi, lib, sync)
406 # We need to copy computed values on C struct.
407 for udp in udps: # Then call each plugin on_update entrypoint.
408 udp.on_update(pythonize_packet(packet, ffi, self), self)
409 if (
410 self.expiration_id == -1
411 ): # One of the plugins set expiration to custom value (-1)
412 return self.expire(
413 udps, sync, n_dissections, statistics, splt, ffi, lib, dissector
414 ) # Expire it.
416 def expire(self, udps, sync, n_dissections, statistics, splt, ffi, lib, dissector):
417 """NFlow expiration method"""
418 # Call expiration of C structure.
419 lib.meter_expire_flow(self._C, n_dissections, dissector)
420 # Then sync (second copy in case of non sync mode)
421 self.sync(n_dissections, statistics, splt, ffi, lib, sync)
422 if sync: # Running with NFPlugins
423 for udp in udps:
424 udp.on_expire(self) # Call each Plugin on_expire entrypoint
425 lib.meter_free_flow(self._C, n_dissections, splt, 1) # then free C struct
426 del self._C # and remove it from NFlow slots.
427 return self
429 def sync(self, n_dissections, statistics, splt, ffi, lib, sync_mode):
430 """
431 NFlow synchronizer method
432 Will be called only twice when running without Plugins
433 Will be called at each update when running with Plugins
434 """
435 self.bidirectional_last_seen_ms = self._C.bidirectional_last_seen_ms
436 self.bidirectional_duration_ms = self._C.bidirectional_duration_ms
437 self.bidirectional_packets = self._C.bidirectional_packets
438 self.bidirectional_bytes = self._C.bidirectional_bytes
439 self.src2dst_last_seen_ms = self._C.src2dst_last_seen_ms
440 self.src2dst_duration_ms = self._C.src2dst_duration_ms
441 self.src2dst_packets = self._C.src2dst_packets
442 self.src2dst_bytes = self._C.src2dst_bytes
443 self.dst2src_first_seen_ms = self._C.dst2src_first_seen_ms
444 self.dst2src_last_seen_ms = self._C.dst2src_last_seen_ms
445 self.dst2src_duration_ms = self._C.dst2src_duration_ms
446 self.dst2src_packets = self._C.dst2src_packets
447 self.dst2src_bytes = self._C.dst2src_bytes
448 if statistics: # Statistical analysis activated
449 self.bidirectional_min_ps = self._C.bidirectional_min_ps
450 self.bidirectional_mean_ps = self._C.bidirectional_mean_ps
451 bidirectional_packets = self.bidirectional_packets
452 # NOTE: We need the root square of the variance to provide sample stddev (Var**0.5)/(n-1)
453 if bidirectional_packets > 1:
454 self.bidirectional_stddev_ps = sqrt(
455 self._C.bidirectional_stddev_ps / (bidirectional_packets - 1)
456 )
457 self.bidirectional_max_ps = self._C.bidirectional_max_ps
458 self.src2dst_min_ps = self._C.src2dst_min_ps
459 self.src2dst_mean_ps = self._C.src2dst_mean_ps
460 src2dst_packets = self.src2dst_packets
461 if src2dst_packets > 1:
462 self.src2dst_stddev_ps = sqrt(
463 self._C.src2dst_stddev_ps / (src2dst_packets - 1)
464 )
465 self.src2dst_max_ps = self._C.src2dst_max_ps
466 self.dst2src_min_ps = self._C.dst2src_min_ps
467 self.dst2src_mean_ps = self._C.dst2src_mean_ps
468 dst2src_packets = self.dst2src_packets
469 if dst2src_packets > 1:
470 self.dst2src_stddev_ps = sqrt(
471 self._C.dst2src_stddev_ps / (dst2src_packets - 1)
472 )
473 self.dst2src_max_ps = self._C.dst2src_max_ps
474 self.bidirectional_min_piat_ms = self._C.bidirectional_min_piat_ms
475 self.bidirectional_mean_piat_ms = self._C.bidirectional_mean_piat_ms
476 if bidirectional_packets > 2:
477 self.bidirectional_stddev_piat_ms = sqrt(
478 self._C.bidirectional_stddev_piat_ms / (bidirectional_packets - 2)
479 )
480 self.bidirectional_max_piat_ms = self._C.bidirectional_max_piat_ms
481 self.src2dst_min_piat_ms = self._C.src2dst_min_piat_ms
482 self.src2dst_mean_piat_ms = self._C.src2dst_mean_piat_ms
483 if src2dst_packets > 2:
484 self.src2dst_stddev_piat_ms = sqrt(
485 self._C.src2dst_stddev_piat_ms / (src2dst_packets - 2)
486 )
487 self.src2dst_max_piat_ms = self._C.src2dst_max_piat_ms
488 self.dst2src_min_piat_ms = self._C.dst2src_min_piat_ms
489 self.dst2src_mean_piat_ms = self._C.dst2src_mean_piat_ms
490 if dst2src_packets > 2:
491 self.dst2src_stddev_piat_ms = sqrt(
492 self._C.dst2src_stddev_piat_ms / (dst2src_packets - 2)
493 )
494 self.dst2src_max_piat_ms = self._C.dst2src_max_piat_ms
495 self.bidirectional_syn_packets = self._C.bidirectional_syn_packets
496 self.bidirectional_cwr_packets = self._C.bidirectional_cwr_packets
497 self.bidirectional_ece_packets = self._C.bidirectional_ece_packets
498 self.bidirectional_urg_packets = self._C.bidirectional_urg_packets
499 self.bidirectional_ack_packets = self._C.bidirectional_ack_packets
500 self.bidirectional_psh_packets = self._C.bidirectional_psh_packets
501 self.bidirectional_rst_packets = self._C.bidirectional_rst_packets
502 self.bidirectional_fin_packets = self._C.bidirectional_fin_packets
503 self.src2dst_syn_packets = self._C.src2dst_syn_packets
504 self.src2dst_cwr_packets = self._C.src2dst_cwr_packets
505 self.src2dst_ece_packets = self._C.src2dst_ece_packets
506 self.src2dst_urg_packets = self._C.src2dst_urg_packets
507 self.src2dst_ack_packets = self._C.src2dst_ack_packets
508 self.src2dst_psh_packets = self._C.src2dst_psh_packets
509 self.src2dst_rst_packets = self._C.src2dst_rst_packets
510 self.src2dst_fin_packets = self._C.src2dst_fin_packets
511 self.dst2src_syn_packets = self._C.dst2src_syn_packets
512 self.dst2src_cwr_packets = self._C.dst2src_cwr_packets
513 self.dst2src_ece_packets = self._C.dst2src_ece_packets
514 self.dst2src_urg_packets = self._C.dst2src_urg_packets
515 self.dst2src_ack_packets = self._C.dst2src_ack_packets
516 self.dst2src_psh_packets = self._C.dst2src_psh_packets
517 self.dst2src_rst_packets = self._C.dst2src_rst_packets
518 self.dst2src_fin_packets = self._C.dst2src_fin_packets
519 if n_dissections: # If dissection set (>0)
520 # We minimize updates to a single one, when detection completed.
521 if self._C.detection_completed < 2:
522 self.application_name = ffi.string(self._C.application_name).decode(
523 "utf-8", errors="ignore"
524 )
525 self.application_category_name = ffi.string(
526 self._C.category_name
527 ).decode("utf-8", errors="ignore")
528 self.requested_server_name = ffi.string(
529 self._C.requested_server_name
530 ).decode("utf-8", errors="ignore")
531 self.client_fingerprint = ffi.string(self._C.c_hash).decode(
532 "utf-8", errors="ignore"
533 )
534 self.server_fingerprint = ffi.string(self._C.s_hash).decode(
535 "utf-8", errors="ignore"
536 )
537 self.user_agent = ffi.string(self._C.user_agent).decode(
538 "utf-8", errors="ignore"
539 )
540 self.content_type = ffi.string(self._C.content_type).decode(
541 "utf-8", errors="ignore"
542 )
543 self.application_is_guessed = self._C.guessed
544 self.application_confidence = self._C.confidence
545 if splt:
546 if (
547 sync_mode
548 ): # Same for splt, once we reach splt limit, there is no need to sync it anymore.
549 if self._C.bidirectional_packets <= splt:
550 self.splt_direction = ffi.unpack(self._C.splt_direction, splt)
551 self.splt_ps = ffi.unpack(self._C.splt_ps, splt)
552 self.splt_piat_ms = ffi.unpack(self._C.splt_piat_ms, splt)
553 else:
554 if (
555 self._C.splt_closed == 0
556 ): # we also release the memory to keep only the obtained list.
557 lib.meter_free_flow(
558 self._C, n_dissections, splt, 0
559 ) # free SPLT
560 else:
561 self.splt_direction = ffi.unpack(self._C.splt_direction, splt)
562 self.splt_ps = ffi.unpack(self._C.splt_ps, splt)
563 self.splt_piat_ms = ffi.unpack(self._C.splt_piat_ms, splt)
564 # Memory will be released by freer.
566 def is_idle(self, tick, idle_timeout):
567 """is_idle method to check if NFlow is idle accoring to configured timeout"""
568 return (tick - idle_timeout) >= self._C.bidirectional_last_seen_ms
570 def __str__(self):
571 """String representation of NFlow"""
572 started = False
573 printable = "NFlow("
574 for attr_name in self.__slots__:
575 try:
576 if not started:
577 printable += attr_name + "=" + str(getattr(self, attr_name))
578 started = True
579 else:
580 if attr_name == "udps":
581 for udp_name in self.udps.__dict__.keys():
582 printable += (
583 ",\n "
584 + attr_name
585 + "."
586 + udp_name
587 + "="
588 + str(getattr(self.udps, udp_name))
589 )
590 else:
591 printable += (
592 ",\n "
593 + attr_name
594 + "="
595 + str(getattr(self, attr_name))
596 )
597 except AttributeError:
598 pass
599 printable += ")"
600 return printable
602 def keys(self):
603 """get NFlow keys"""
604 # Note we transform udps to udps.value_name as preprocessing for csv/pandas interfaces
605 ret = []
606 for attr_name in self.__slots__:
607 try:
608 getattr(self, attr_name)
609 if attr_name == "udps":
610 for udp_name in self.udps.__dict__.keys():
611 ret.append(attr_name + "." + udp_name)
612 else:
613 ret.append(attr_name)
614 except AttributeError:
615 pass
616 return ret
618 def values(self):
619 """get flow values"""
620 # Note: same indexing as keys.
621 ret = []
622 for attr_name in self.__slots__:
623 try:
624 attr_value = getattr(self, attr_name)
625 if attr_name == "udps":
626 for udp_value in self.udps.__dict__.values():
627 ret.append(udp_value)
628 else:
629 ret.append(attr_value)
630 except AttributeError:
631 pass
632 return ret