/src/suricata7/src/flow-manager.c
Line | Count | Source (jump to first uncovered line) |
1 | | /* Copyright (C) 2007-2024 Open Information Security Foundation |
2 | | * |
3 | | * You can copy, redistribute or modify this Program under the terms of |
4 | | * the GNU General Public License version 2 as published by the Free |
5 | | * Software Foundation. |
6 | | * |
7 | | * This program is distributed in the hope that it will be useful, |
8 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
9 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
10 | | * GNU General Public License for more details. |
11 | | * |
12 | | * You should have received a copy of the GNU General Public License |
13 | | * version 2 along with this program; if not, write to the Free Software |
14 | | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
15 | | * 02110-1301, USA. |
16 | | */ |
17 | | |
18 | | /** |
19 | | * \file |
20 | | * |
21 | | * \author Anoop Saldanha <anoopsaldanha@gmail.com> |
22 | | * \author Victor Julien <victor@inliniac.net> |
23 | | */ |
24 | | |
25 | | #include "suricata-common.h" |
26 | | #include "conf.h" |
27 | | #include "threadvars.h" |
28 | | #include "tm-threads.h" |
29 | | #include "runmodes.h" |
30 | | |
31 | | #include "util-random.h" |
32 | | #include "util-time.h" |
33 | | |
34 | | #include "flow.h" |
35 | | #include "flow-queue.h" |
36 | | #include "flow-hash.h" |
37 | | #include "flow-util.h" |
38 | | #include "flow-private.h" |
39 | | #include "flow-timeout.h" |
40 | | #include "flow-manager.h" |
41 | | #include "flow-storage.h" |
42 | | #include "flow-spare-pool.h" |
43 | | |
44 | | #include "stream-tcp-reassemble.h" |
45 | | #include "stream-tcp.h" |
46 | | |
47 | | #include "util-unittest.h" |
48 | | #include "util-unittest-helper.h" |
49 | | #include "util-device.h" |
50 | | |
51 | | #include "util-debug.h" |
52 | | |
53 | | #include "threads.h" |
54 | | #include "detect.h" |
55 | | #include "detect-engine-state.h" |
56 | | #include "stream.h" |
57 | | |
58 | | #include "app-layer-parser.h" |
59 | | |
60 | | #include "host-timeout.h" |
61 | | #include "defrag-timeout.h" |
62 | | #include "ippair-timeout.h" |
63 | | #include "app-layer-htp-range.h" |
64 | | |
65 | | #include "output-flow.h" |
66 | | |
67 | | #include "runmode-unix-socket.h" |
68 | | |
69 | | /* Run mode selected at suricata.c */ |
70 | | extern int run_mode; |
71 | | |
72 | | /** queue to pass flows to cleanup/log thread(s) */ |
73 | | FlowQueue flow_recycle_q; |
74 | | |
75 | | /* multi flow manager support */ |
76 | | static uint32_t flowmgr_number = 1; |
77 | | /* atomic counter for flow managers, to assign instance id */ |
78 | | SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt); |
79 | | |
80 | | /* multi flow recycler support */ |
81 | | static uint32_t flowrec_number = 1; |
82 | | /* atomic counter for flow recyclers, to assign instance id */ |
83 | | SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt); |
84 | | SC_ATOMIC_DECLARE(uint32_t, flowrec_busy); |
85 | | SC_ATOMIC_EXTERN(unsigned int, flow_flags); |
86 | | |
87 | | SCCtrlCondT flow_manager_ctrl_cond; |
88 | | SCCtrlMutex flow_manager_ctrl_mutex; |
89 | | SCCtrlCondT flow_recycler_ctrl_cond; |
90 | | SCCtrlMutex flow_recycler_ctrl_mutex; |
91 | | |
92 | | void FlowTimeoutsInit(void) |
93 | 71 | { |
94 | 71 | SC_ATOMIC_SET(flow_timeouts, flow_timeouts_normal); |
95 | 71 | } |
96 | | |
97 | | void FlowTimeoutsEmergency(void) |
98 | 0 | { |
99 | 0 | SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg); |
100 | 0 | } |
101 | | |
102 | | /* 1 seconds */ |
103 | | #define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1 |
104 | | #define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0 |
105 | | /* 0.3 seconds */ |
106 | | #define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0 |
107 | | #define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 300000 |
108 | | #define NEW_FLOW_COUNT_COND 10 |
109 | | |
110 | | typedef struct FlowTimeoutCounters_ { |
111 | | uint32_t rows_checked; |
112 | | uint32_t rows_skipped; |
113 | | uint32_t rows_empty; |
114 | | uint32_t rows_maxlen; |
115 | | |
116 | | uint32_t flows_checked; |
117 | | uint32_t flows_notimeout; |
118 | | uint32_t flows_timeout; |
119 | | uint32_t flows_removed; |
120 | | uint32_t flows_aside; |
121 | | uint32_t flows_aside_needs_work; |
122 | | |
123 | | uint32_t bypassed_count; |
124 | | uint64_t bypassed_pkts; |
125 | | uint64_t bypassed_bytes; |
126 | | } FlowTimeoutCounters; |
127 | | |
128 | | /** |
129 | | * \brief Used to disable flow manager thread(s). |
130 | | * |
131 | | * \todo Kinda hackish since it uses the tv name to identify flow manager |
132 | | * thread. We need an all weather identification scheme. |
133 | | */ |
134 | | void FlowDisableFlowManagerThread(void) |
135 | 0 | { |
136 | 0 | SCMutexLock(&tv_root_lock); |
137 | | /* flow manager thread(s) is/are a part of mgmt threads */ |
138 | 0 | for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) { |
139 | 0 | if (strncasecmp(tv->name, thread_name_flow_mgr, |
140 | 0 | strlen(thread_name_flow_mgr)) == 0) |
141 | 0 | { |
142 | 0 | TmThreadsSetFlag(tv, THV_KILL); |
143 | 0 | } |
144 | 0 | } |
145 | 0 | SCMutexUnlock(&tv_root_lock); |
146 | |
|
147 | 0 | struct timeval start_ts; |
148 | 0 | struct timeval cur_ts; |
149 | 0 | gettimeofday(&start_ts, NULL); |
150 | |
|
151 | 0 | again: |
152 | 0 | gettimeofday(&cur_ts, NULL); |
153 | 0 | if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) { |
154 | 0 | FatalError("unable to get all flow manager " |
155 | 0 | "threads to shutdown in time"); |
156 | 0 | } |
157 | | |
158 | 0 | SCMutexLock(&tv_root_lock); |
159 | 0 | for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) { |
160 | 0 | if (strncasecmp(tv->name, thread_name_flow_mgr, |
161 | 0 | strlen(thread_name_flow_mgr)) == 0) |
162 | 0 | { |
163 | 0 | if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) { |
164 | 0 | SCMutexUnlock(&tv_root_lock); |
165 | | /* sleep outside lock */ |
166 | 0 | SleepMsec(1); |
167 | 0 | goto again; |
168 | 0 | } |
169 | 0 | } |
170 | 0 | } |
171 | 0 | SCMutexUnlock(&tv_root_lock); |
172 | | |
173 | | /* reset count, so we can kill and respawn (unix socket) */ |
174 | 0 | SC_ATOMIC_SET(flowmgr_cnt, 0); |
175 | 0 | return; |
176 | 0 | } |
177 | | |
178 | | /** \internal |
179 | | * \brief check if a flow is timed out |
180 | | * |
181 | | * \param f flow |
182 | | * \param ts timestamp |
183 | | * |
184 | | * \retval 0 not timed out |
185 | | * \retval 1 timed out |
186 | | */ |
187 | | static int FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg) |
188 | 0 | { |
189 | 0 | uint32_t flow_times_out_at = f->timeout_at; |
190 | 0 | if (emerg) { |
191 | 0 | extern FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX]; |
192 | 0 | flow_times_out_at -= FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap); |
193 | 0 | } |
194 | 0 | if (*next_ts == 0 || flow_times_out_at < *next_ts) |
195 | 0 | *next_ts = flow_times_out_at; |
196 | | |
197 | | /* do the timeout check */ |
198 | 0 | if ((uint64_t)flow_times_out_at >= SCTIME_SECS(ts)) { |
199 | 0 | return 0; |
200 | 0 | } |
201 | | |
202 | 0 | return 1; |
203 | 0 | } |
204 | | |
205 | | /** \internal |
206 | | * \brief check timeout of captured bypassed flow by querying capture method |
207 | | * |
208 | | * \param f Flow |
209 | | * \param ts timestamp |
210 | | * \param counters Flow timeout counters |
211 | | * |
212 | | * \retval 0 not timeout |
213 | | * \retval 1 timeout (or not capture bypassed) |
214 | | */ |
215 | | static inline int FlowBypassedTimeout(Flow *f, SCTime_t ts, FlowTimeoutCounters *counters) |
216 | 0 | { |
217 | | #ifdef CAPTURE_OFFLOAD |
218 | | if (f->flow_state != FLOW_STATE_CAPTURE_BYPASSED) { |
219 | | return 1; |
220 | | } |
221 | | |
222 | | FlowBypassInfo *fc = FlowGetStorageById(f, GetFlowBypassInfoID()); |
223 | | if (fc && fc->BypassUpdate) { |
224 | | /* flow will be possibly updated */ |
225 | | uint64_t pkts_tosrc = fc->tosrcpktcnt; |
226 | | uint64_t bytes_tosrc = fc->tosrcbytecnt; |
227 | | uint64_t pkts_todst = fc->todstpktcnt; |
228 | | uint64_t bytes_todst = fc->todstbytecnt; |
229 | | bool update = fc->BypassUpdate(f, fc->bypass_data, SCTIME_SECS(ts)); |
230 | | if (update) { |
231 | | SCLogDebug("Updated flow: %"PRId64"", FlowGetId(f)); |
232 | | pkts_tosrc = fc->tosrcpktcnt - pkts_tosrc; |
233 | | bytes_tosrc = fc->tosrcbytecnt - bytes_tosrc; |
234 | | pkts_todst = fc->todstpktcnt - pkts_todst; |
235 | | bytes_todst = fc->todstbytecnt - bytes_todst; |
236 | | if (f->livedev) { |
237 | | SC_ATOMIC_ADD(f->livedev->bypassed, |
238 | | pkts_tosrc + pkts_todst); |
239 | | } |
240 | | counters->bypassed_pkts += pkts_tosrc + pkts_todst; |
241 | | counters->bypassed_bytes += bytes_tosrc + bytes_todst; |
242 | | return 0; |
243 | | } else { |
244 | | SCLogDebug("No new packet, dead flow %"PRId64"", FlowGetId(f)); |
245 | | if (f->livedev) { |
246 | | if (FLOW_IS_IPV4(f)) { |
247 | | LiveDevSubBypassStats(f->livedev, 1, AF_INET); |
248 | | } else if (FLOW_IS_IPV6(f)) { |
249 | | LiveDevSubBypassStats(f->livedev, 1, AF_INET6); |
250 | | } |
251 | | } |
252 | | counters->bypassed_count++; |
253 | | return 1; |
254 | | } |
255 | | } |
256 | | #endif /* CAPTURE_OFFLOAD */ |
257 | 0 | return 1; |
258 | 0 | } |
259 | | |
260 | | typedef struct FlowManagerTimeoutThread { |
261 | | /* used to temporarily store flows that have timed out and are |
262 | | * removed from the hash to reduce locking contention */ |
263 | | FlowQueuePrivate aside_queue; |
264 | | } FlowManagerTimeoutThread; |
265 | | |
266 | | /** |
267 | | * \internal |
268 | | * |
269 | | * \brief Process the temporary Aside Queue |
270 | | * This means that as long as a flow f is not waiting on detection |
271 | | * engine to finish dealing with it, f will be put in the recycle |
272 | | * queue for further processing later on. |
273 | | * |
274 | | * \param td FM Timeout Thread instance |
275 | | * \param counters Flow Timeout counters to be updated |
276 | | * |
277 | | * \retval Number of flows that were recycled |
278 | | */ |
279 | | static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters) |
280 | 0 | { |
281 | 0 | FlowQueuePrivate recycle = { NULL, NULL, 0 }; |
282 | 0 | counters->flows_aside += td->aside_queue.len; |
283 | |
|
284 | 0 | uint32_t cnt = 0; |
285 | 0 | Flow *f; |
286 | 0 | while ((f = FlowQueuePrivateGetFromTop(&td->aside_queue)) != NULL) { |
287 | | /* flow is still locked */ |
288 | |
|
289 | 0 | if (f->proto == IPPROTO_TCP && |
290 | 0 | !(f->flags & (FLOW_TIMEOUT_REASSEMBLY_DONE | FLOW_ACTION_DROP)) && |
291 | 0 | !FlowIsBypassed(f) && FlowForceReassemblyNeedReassembly(f) == 1) { |
292 | | /* Send the flow to its thread */ |
293 | 0 | FlowForceReassemblyForFlow(f); |
294 | 0 | FLOWLOCK_UNLOCK(f); |
295 | | /* flow ownership is already passed to the worker thread */ |
296 | |
|
297 | 0 | counters->flows_aside_needs_work++; |
298 | 0 | continue; |
299 | 0 | } |
300 | 0 | FLOWLOCK_UNLOCK(f); |
301 | |
|
302 | 0 | FlowQueuePrivateAppendFlow(&recycle, f); |
303 | 0 | if (recycle.len == 100) { |
304 | 0 | FlowQueueAppendPrivate(&flow_recycle_q, &recycle); |
305 | 0 | FlowWakeupFlowRecyclerThread(); |
306 | 0 | } |
307 | 0 | cnt++; |
308 | 0 | } |
309 | 0 | if (recycle.len) { |
310 | 0 | FlowQueueAppendPrivate(&flow_recycle_q, &recycle); |
311 | 0 | FlowWakeupFlowRecyclerThread(); |
312 | 0 | } |
313 | 0 | return cnt; |
314 | 0 | } |
315 | | |
316 | | /** |
317 | | * \internal |
318 | | * |
319 | | * \brief check all flows in a hash row for timing out |
320 | | * |
321 | | * \param f last flow in the hash row |
322 | | * \param ts timestamp |
323 | | * \param emergency bool indicating emergency mode |
324 | | * \param counters ptr to FlowTimeoutCounters structure |
325 | | */ |
326 | | static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCTime_t ts, |
327 | | int emergency, FlowTimeoutCounters *counters, uint32_t *next_ts) |
328 | 0 | { |
329 | 0 | uint32_t checked = 0; |
330 | 0 | Flow *prev_f = NULL; |
331 | |
|
332 | 0 | do { |
333 | 0 | checked++; |
334 | | |
335 | | /* check flow timeout based on lastts and state. Both can be |
336 | | * accessed w/o Flow lock as we do have the hash row lock (so flow |
337 | | * can't disappear) and flow_state is atomic. lastts can only |
338 | | * be modified when we have both the flow and hash row lock */ |
339 | | |
340 | | /* timeout logic goes here */ |
341 | 0 | if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == 0) { |
342 | |
|
343 | 0 | counters->flows_notimeout++; |
344 | |
|
345 | 0 | prev_f = f; |
346 | 0 | f = f->next; |
347 | 0 | continue; |
348 | 0 | } |
349 | | |
350 | 0 | FLOWLOCK_WRLOCK(f); |
351 | |
|
352 | 0 | Flow *next_flow = f->next; |
353 | | |
354 | | /* never prune a flow that is used by a packet we |
355 | | * are currently processing in one of the threads */ |
356 | 0 | if (!FlowBypassedTimeout(f, ts, counters)) { |
357 | 0 | FLOWLOCK_UNLOCK(f); |
358 | 0 | prev_f = f; |
359 | 0 | f = f->next; |
360 | 0 | continue; |
361 | 0 | } |
362 | | |
363 | 0 | f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT; |
364 | |
|
365 | 0 | counters->flows_timeout++; |
366 | |
|
367 | 0 | RemoveFromHash(f, prev_f); |
368 | |
|
369 | 0 | FlowQueuePrivateAppendFlow(&td->aside_queue, f); |
370 | | /* flow is still locked in the queue */ |
371 | |
|
372 | 0 | f = next_flow; |
373 | 0 | } while (f != NULL); |
374 | | |
375 | 0 | counters->flows_checked += checked; |
376 | 0 | if (checked > counters->rows_maxlen) |
377 | 0 | counters->rows_maxlen = checked; |
378 | 0 | } |
379 | | |
380 | | /** |
381 | | * \internal |
382 | | * |
383 | | * \brief Clear evicted list from Flow Manager. |
384 | | * All the evicted flows are removed from the Flow bucket and added |
385 | | * to the temporary Aside Queue. |
386 | | * |
387 | | * \param td FM timeout thread instance |
388 | | * \param f head of the evicted list |
389 | | */ |
390 | | static void FlowManagerHashRowClearEvictedList( |
391 | | FlowManagerTimeoutThread *td, Flow *f, SCTime_t ts, FlowTimeoutCounters *counters) |
392 | 0 | { |
393 | 0 | do { |
394 | 0 | FLOWLOCK_WRLOCK(f); |
395 | 0 | Flow *next_flow = f->next; |
396 | 0 | f->next = NULL; |
397 | 0 | f->fb = NULL; |
398 | |
|
399 | 0 | FlowQueuePrivateAppendFlow(&td->aside_queue, f); |
400 | | /* flow is still locked in the queue */ |
401 | |
|
402 | 0 | f = next_flow; |
403 | 0 | } while (f != NULL); |
404 | 0 | } |
405 | | |
406 | | /** |
407 | | * \brief time out flows from the hash |
408 | | * |
409 | | * \param ts timestamp |
410 | | * \param hash_min min hash index to consider |
411 | | * \param hash_max max hash index to consider |
412 | | * \param counters ptr to FlowTimeoutCounters structure |
413 | | * |
414 | | * \retval cnt number of timed out flow |
415 | | */ |
416 | | static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const uint32_t hash_min, |
417 | | const uint32_t hash_max, FlowTimeoutCounters *counters) |
418 | 0 | { |
419 | 0 | uint32_t cnt = 0; |
420 | 0 | const int emergency = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY)); |
421 | 0 | const uint32_t rows_checked = hash_max - hash_min; |
422 | 0 | uint32_t rows_skipped = 0; |
423 | 0 | uint32_t rows_empty = 0; |
424 | |
|
425 | 0 | #if __WORDSIZE==64 |
426 | 0 | #define BITS 64 |
427 | 0 | #define TYPE uint64_t |
428 | | #else |
429 | | #define BITS 32 |
430 | | #define TYPE uint32_t |
431 | | #endif |
432 | |
|
433 | 0 | const uint32_t ts_secs = SCTIME_SECS(ts); |
434 | 0 | for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) { |
435 | 0 | TYPE check_bits = 0; |
436 | 0 | const uint32_t check = MIN(BITS, (hash_max - idx)); |
437 | 0 | for (uint32_t i = 0; i < check; i++) { |
438 | 0 | FlowBucket *fb = &flow_hash[idx+i]; |
439 | 0 | check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT( |
440 | 0 | fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= ts_secs) |
441 | 0 | << (TYPE)i; |
442 | 0 | } |
443 | 0 | if (check_bits == 0) |
444 | 0 | continue; |
445 | | |
446 | 0 | for (uint32_t i = 0; i < check; i++) { |
447 | 0 | FlowBucket *fb = &flow_hash[idx+i]; |
448 | 0 | if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= ts_secs) { |
449 | 0 | FBLOCK_LOCK(fb); |
450 | 0 | Flow *evicted = NULL; |
451 | 0 | if (fb->evicted != NULL || fb->head != NULL) { |
452 | 0 | if (fb->evicted != NULL) { |
453 | | /* transfer out of bucket so we can do additional work outside |
454 | | * of the bucket lock */ |
455 | 0 | evicted = fb->evicted; |
456 | 0 | fb->evicted = NULL; |
457 | 0 | } |
458 | 0 | if (fb->head != NULL) { |
459 | 0 | uint32_t next_ts = 0; |
460 | 0 | FlowManagerHashRowTimeout(td, fb->head, ts, emergency, counters, &next_ts); |
461 | |
|
462 | 0 | if (SC_ATOMIC_GET(fb->next_ts) != next_ts) |
463 | 0 | SC_ATOMIC_SET(fb->next_ts, next_ts); |
464 | 0 | } |
465 | 0 | if (fb->evicted == NULL && fb->head == NULL) { |
466 | | /* row is empty */ |
467 | 0 | SC_ATOMIC_SET(fb->next_ts, UINT_MAX); |
468 | 0 | } |
469 | 0 | } else { |
470 | 0 | SC_ATOMIC_SET(fb->next_ts, UINT_MAX); |
471 | 0 | rows_empty++; |
472 | 0 | } |
473 | 0 | FBLOCK_UNLOCK(fb); |
474 | | /* processed evicted list */ |
475 | 0 | if (evicted) { |
476 | 0 | FlowManagerHashRowClearEvictedList(td, evicted, ts, counters); |
477 | 0 | } |
478 | 0 | } else { |
479 | 0 | rows_skipped++; |
480 | 0 | } |
481 | 0 | } |
482 | 0 | if (td->aside_queue.len) { |
483 | 0 | cnt += ProcessAsideQueue(td, counters); |
484 | 0 | } |
485 | 0 | } |
486 | |
|
487 | 0 | counters->rows_checked += rows_checked; |
488 | 0 | counters->rows_skipped += rows_skipped; |
489 | 0 | counters->rows_empty += rows_empty; |
490 | |
|
491 | 0 | if (td->aside_queue.len) { |
492 | 0 | cnt += ProcessAsideQueue(td, counters); |
493 | 0 | } |
494 | 0 | counters->flows_removed += cnt; |
495 | | /* coverity[missing_unlock : FALSE] */ |
496 | 0 | return cnt; |
497 | 0 | } |
498 | | |
499 | | /** \internal |
500 | | * |
501 | | * \brief handle timeout for a slice of hash rows |
502 | | * If we wrap around we call FlowTimeoutHash twice |
503 | | * \param td FM timeout thread |
504 | | * \param ts timeout in seconds |
505 | | * \param hash_min lower bound of the row slice |
506 | | * \param hash_max upper bound of the row slice |
507 | | * \param counters Flow timeout counters to be passed |
508 | | * \param rows number of rows for this worker unit |
509 | | * \param pos absolute position of the beginning of row slice in the hash table |
510 | | * |
511 | | * \retval number of successfully timed out flows |
512 | | */ |
513 | | static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, SCTime_t ts, |
514 | | const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters, |
515 | | const uint32_t rows, uint32_t *pos) |
516 | 0 | { |
517 | 0 | uint32_t start = 0; |
518 | 0 | uint32_t end = 0; |
519 | 0 | uint32_t cnt = 0; |
520 | 0 | uint32_t rows_left = rows; |
521 | |
|
522 | 0 | again: |
523 | 0 | start = (*pos); |
524 | 0 | if (start >= hash_max) { |
525 | 0 | start = hash_min; |
526 | 0 | } |
527 | 0 | end = start + rows_left; |
528 | 0 | if (end > hash_max) { |
529 | 0 | end = hash_max; |
530 | 0 | } |
531 | 0 | *pos = (end == hash_max) ? hash_min : end; |
532 | 0 | rows_left = rows_left - (end - start); |
533 | |
|
534 | 0 | cnt += FlowTimeoutHash(td, ts, start, end, counters); |
535 | 0 | if (rows_left) { |
536 | 0 | goto again; |
537 | 0 | } |
538 | 0 | return cnt; |
539 | 0 | } |
540 | | |
541 | | /** |
542 | | * \internal |
543 | | * |
544 | | * \brief move all flows out of a hash row |
545 | | * |
546 | | * \param f last flow in the hash row |
547 | | * \param recycle_q Flow recycle queue |
548 | | * \param mode emergency or not |
549 | | * |
550 | | * \retval cnt number of flows removed from the hash and added to the recycle queue |
551 | | */ |
552 | | static uint32_t FlowManagerHashRowCleanup(Flow *f, FlowQueuePrivate *recycle_q, const int mode) |
553 | 0 | { |
554 | 0 | uint32_t cnt = 0; |
555 | |
|
556 | 0 | do { |
557 | 0 | FLOWLOCK_WRLOCK(f); |
558 | |
|
559 | 0 | Flow *next_flow = f->next; |
560 | | |
561 | | /* remove from the hash */ |
562 | 0 | if (mode == 0) { |
563 | 0 | RemoveFromHash(f, NULL); |
564 | 0 | } else { |
565 | 0 | FlowBucket *fb = f->fb; |
566 | 0 | fb->evicted = f->next; |
567 | 0 | f->next = NULL; |
568 | 0 | f->fb = NULL; |
569 | 0 | } |
570 | 0 | f->flow_end_flags |= FLOW_END_FLAG_SHUTDOWN; |
571 | | |
572 | | /* no one is referring to this flow, removed from hash |
573 | | * so we can unlock it and move it to the recycle queue. */ |
574 | 0 | FLOWLOCK_UNLOCK(f); |
575 | 0 | FlowQueuePrivateAppendFlow(recycle_q, f); |
576 | |
|
577 | 0 | cnt++; |
578 | |
|
579 | 0 | f = next_flow; |
580 | 0 | } while (f != NULL); |
581 | |
|
582 | 0 | return cnt; |
583 | 0 | } |
584 | | |
585 | | /** |
586 | | * \brief remove all flows from the hash |
587 | | * |
588 | | * \retval cnt number of removes out flows |
589 | | */ |
590 | | static uint32_t FlowCleanupHash(void) |
591 | 0 | { |
592 | 0 | FlowQueuePrivate local_queue = { NULL, NULL, 0 }; |
593 | 0 | uint32_t cnt = 0; |
594 | |
|
595 | 0 | for (uint32_t idx = 0; idx < flow_config.hash_size; idx++) { |
596 | 0 | FlowBucket *fb = &flow_hash[idx]; |
597 | |
|
598 | 0 | FBLOCK_LOCK(fb); |
599 | |
|
600 | 0 | if (fb->head != NULL) { |
601 | | /* we have a flow, or more than one */ |
602 | 0 | cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0); |
603 | 0 | } |
604 | 0 | if (fb->evicted != NULL) { |
605 | | /* we have a flow, or more than one */ |
606 | 0 | cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1); |
607 | 0 | } |
608 | |
|
609 | 0 | FBLOCK_UNLOCK(fb); |
610 | 0 | if (local_queue.len >= 25) { |
611 | 0 | FlowQueueAppendPrivate(&flow_recycle_q, &local_queue); |
612 | 0 | FlowWakeupFlowRecyclerThread(); |
613 | 0 | } |
614 | 0 | } |
615 | 0 | FlowQueueAppendPrivate(&flow_recycle_q, &local_queue); |
616 | 0 | FlowWakeupFlowRecyclerThread(); |
617 | |
|
618 | 0 | return cnt; |
619 | 0 | } |
620 | | |
621 | | typedef struct FlowQueueTimeoutCounters { |
622 | | uint32_t flows_removed; |
623 | | uint32_t flows_timeout; |
624 | | } FlowQueueTimeoutCounters; |
625 | | |
626 | | typedef struct FlowCounters_ { |
627 | | uint16_t flow_mgr_full_pass; |
628 | | uint16_t flow_mgr_rows_sec; |
629 | | |
630 | | uint16_t flow_mgr_spare; |
631 | | uint16_t flow_emerg_mode_enter; |
632 | | uint16_t flow_emerg_mode_over; |
633 | | |
634 | | uint16_t flow_mgr_flows_checked; |
635 | | uint16_t flow_mgr_flows_notimeout; |
636 | | uint16_t flow_mgr_flows_timeout; |
637 | | uint16_t flow_mgr_flows_aside; |
638 | | uint16_t flow_mgr_flows_aside_needs_work; |
639 | | |
640 | | uint16_t flow_mgr_rows_maxlen; |
641 | | |
642 | | uint16_t flow_bypassed_cnt_clo; |
643 | | uint16_t flow_bypassed_pkts; |
644 | | uint16_t flow_bypassed_bytes; |
645 | | |
646 | | uint16_t memcap_pressure; |
647 | | uint16_t memcap_pressure_max; |
648 | | } FlowCounters; |
649 | | |
650 | | typedef struct FlowManagerThreadData_ { |
651 | | uint32_t instance; |
652 | | uint32_t min; |
653 | | uint32_t max; |
654 | | |
655 | | FlowCounters cnt; |
656 | | |
657 | | FlowManagerTimeoutThread timeout; |
658 | | } FlowManagerThreadData; |
659 | | |
660 | | static void FlowCountersInit(ThreadVars *t, FlowCounters *fc) |
661 | 0 | { |
662 | 0 | fc->flow_mgr_full_pass = StatsRegisterCounter("flow.mgr.full_hash_pass", t); |
663 | 0 | fc->flow_mgr_rows_sec = StatsRegisterCounter("flow.mgr.rows_per_sec", t); |
664 | |
|
665 | 0 | fc->flow_mgr_spare = StatsRegisterCounter("flow.spare", t); |
666 | 0 | fc->flow_emerg_mode_enter = StatsRegisterCounter("flow.emerg_mode_entered", t); |
667 | 0 | fc->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t); |
668 | |
|
669 | 0 | fc->flow_mgr_rows_maxlen = StatsRegisterMaxCounter("flow.mgr.rows_maxlen", t); |
670 | 0 | fc->flow_mgr_flows_checked = StatsRegisterCounter("flow.mgr.flows_checked", t); |
671 | 0 | fc->flow_mgr_flows_notimeout = StatsRegisterCounter("flow.mgr.flows_notimeout", t); |
672 | 0 | fc->flow_mgr_flows_timeout = StatsRegisterCounter("flow.mgr.flows_timeout", t); |
673 | 0 | fc->flow_mgr_flows_aside = StatsRegisterCounter("flow.mgr.flows_evicted", t); |
674 | 0 | fc->flow_mgr_flows_aside_needs_work = StatsRegisterCounter("flow.mgr.flows_evicted_needs_work", t); |
675 | |
|
676 | 0 | fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t); |
677 | 0 | fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t); |
678 | 0 | fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t); |
679 | |
|
680 | 0 | fc->memcap_pressure = StatsRegisterCounter("memcap_pressure", t); |
681 | 0 | fc->memcap_pressure_max = StatsRegisterMaxCounter("memcap_pressure_max", t); |
682 | 0 | } |
683 | | |
684 | | static void FlowCountersUpdate( |
685 | | ThreadVars *th_v, const FlowManagerThreadData *ftd, const FlowTimeoutCounters *counters) |
686 | 0 | { |
687 | 0 | StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_checked, (uint64_t)counters->flows_checked); |
688 | 0 | StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_notimeout, (uint64_t)counters->flows_notimeout); |
689 | |
|
690 | 0 | StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout, (uint64_t)counters->flows_timeout); |
691 | 0 | StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside, (uint64_t)counters->flows_aside); |
692 | 0 | StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside_needs_work, |
693 | 0 | (uint64_t)counters->flows_aside_needs_work); |
694 | |
|
695 | 0 | StatsAddUI64(th_v, ftd->cnt.flow_bypassed_cnt_clo, (uint64_t)counters->bypassed_count); |
696 | 0 | StatsAddUI64(th_v, ftd->cnt.flow_bypassed_pkts, (uint64_t)counters->bypassed_pkts); |
697 | 0 | StatsAddUI64(th_v, ftd->cnt.flow_bypassed_bytes, (uint64_t)counters->bypassed_bytes); |
698 | |
|
699 | 0 | StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_maxlen, (uint64_t)counters->rows_maxlen); |
700 | 0 | } |
701 | | |
702 | | static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data) |
703 | 0 | { |
704 | 0 | FlowManagerThreadData *ftd = SCCalloc(1, sizeof(FlowManagerThreadData)); |
705 | 0 | if (ftd == NULL) |
706 | 0 | return TM_ECODE_FAILED; |
707 | | |
708 | 0 | ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1); |
709 | 0 | SCLogDebug("flow manager instance %u", ftd->instance); |
710 | | |
711 | | /* set the min and max value used for hash row walking |
712 | | * each thread has it's own section of the flow hash */ |
713 | 0 | uint32_t range = flow_config.hash_size / flowmgr_number; |
714 | |
|
715 | 0 | ftd->min = ftd->instance * range; |
716 | 0 | ftd->max = (ftd->instance + 1) * range; |
717 | | |
718 | | /* last flow-manager takes on hash_size % flowmgr_number extra rows */ |
719 | 0 | if ((ftd->instance + 1) == flowmgr_number) { |
720 | 0 | ftd->max = flow_config.hash_size; |
721 | 0 | } |
722 | 0 | BUG_ON(ftd->min > flow_config.hash_size || ftd->max > flow_config.hash_size); |
723 | | |
724 | 0 | SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max); |
725 | | |
726 | | /* pass thread data back to caller */ |
727 | 0 | *data = ftd; |
728 | |
|
729 | 0 | FlowCountersInit(t, &ftd->cnt); |
730 | |
|
731 | 0 | PacketPoolInit(); |
732 | 0 | return TM_ECODE_OK; |
733 | 0 | } |
734 | | |
735 | | static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data) |
736 | 0 | { |
737 | 0 | StreamTcpThreadCacheCleanup(); |
738 | 0 | PacketPoolDestroy(); |
739 | 0 | SCFree(data); |
740 | 0 | return TM_ECODE_OK; |
741 | 0 | } |
742 | | |
743 | | /** \internal |
744 | | * \brief calculate number of rows to scan and how much time to sleep |
745 | | * based on the busy score `mp` (0 idle, 100 max busy). |
746 | | * |
747 | | * We try to to make sure we scan the hash once a second. The number size |
748 | | * of the slice of the hash scanned is determined by our busy score 'mp'. |
749 | | * We sleep for the remainder of the second after processing the slice, |
750 | | * or at least an approximation of it. |
751 | | * A minimum busy score of 10 is assumed to avoid a longer than 10 second |
752 | | * full hash pass. This is to avoid burstiness in scanning when there is |
753 | | * a rapid increase of the busy score, which could lead to the flow manager |
754 | | * suddenly scanning a much larger slice of the hash leading to a burst |
755 | | * in scan/eviction work. |
756 | | * |
757 | | * \param rows number of rows for the work unit |
758 | | * \param mp current memcap pressure value |
759 | | * \param emergency emergency mode is set or not |
760 | | * \param wu_sleep holds value of sleep time per worker unit |
761 | | * \param wu_rows holds value of calculated rows to be processed per second |
762 | | * \param rows_sec same as wu_rows, only used for counter updates |
763 | | */ |
764 | | static void GetWorkUnitSizing(const uint32_t rows, const uint32_t mp, const bool emergency, |
765 | | uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec) |
766 | 0 | { |
767 | 0 | if (emergency) { |
768 | 0 | *wu_rows = rows; |
769 | 0 | *wu_sleep = 250; |
770 | 0 | return; |
771 | 0 | } |
772 | | /* minimum busy score is 10 */ |
773 | 0 | const uint32_t emp = MAX(mp, 10); |
774 | 0 | const uint32_t rows_per_sec = (uint32_t)((float)rows * (float)((float)emp / (float)100)); |
775 | | /* calc how much time we estimate the work will take, in ms. We assume |
776 | | * each row takes an average of 1usec. Maxing out at 1sec. */ |
777 | 0 | const uint32_t work_per_unit = MIN(rows_per_sec / 1000, 1000); |
778 | | /* calc how much time we need to sleep to get to the per second cadence |
779 | | * but sleeping for at least 250ms. */ |
780 | 0 | const uint32_t sleep_per_unit = MAX(250, 1000 - work_per_unit); |
781 | 0 | SCLogDebug("mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec, |
782 | 0 | sleep_per_unit); |
783 | |
|
784 | 0 | *wu_sleep = sleep_per_unit; |
785 | 0 | *wu_rows = rows_per_sec; |
786 | 0 | *rows_sec = rows_per_sec; |
787 | 0 | } |
788 | | |
789 | | /** \brief Thread that manages the flow table and times out flows. |
790 | | * |
791 | | * \param td ThreadVars cast to void ptr |
792 | | * |
793 | | * Keeps an eye on the spare list, alloc flows if needed... |
794 | | */ |
795 | | static TmEcode FlowManager(ThreadVars *th_v, void *thread_data) |
796 | 0 | { |
797 | 0 | FlowManagerThreadData *ftd = thread_data; |
798 | 0 | const uint32_t rows = ftd->max - ftd->min; |
799 | 0 | const bool time_is_live = TimeModeIsLive(); |
800 | |
|
801 | 0 | uint32_t emerg_over_cnt = 0; |
802 | 0 | uint64_t next_run_ms = 0; |
803 | 0 | uint32_t pos = ftd->min; |
804 | 0 | uint32_t rows_sec = 0; |
805 | 0 | uint32_t rows_per_wu = 0; |
806 | 0 | uint64_t sleep_per_wu = 0; |
807 | 0 | bool prev_emerg = false; |
808 | 0 | uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */ |
809 | 0 | SCTime_t ts; |
810 | |
|
811 | 0 | uint32_t mp = MemcapsGetPressure() * 100; |
812 | 0 | if (ftd->instance == 0) { |
813 | 0 | StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp); |
814 | 0 | StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp); |
815 | 0 | } |
816 | 0 | GetWorkUnitSizing(rows, mp, false, &sleep_per_wu, &rows_per_wu, &rows_sec); |
817 | 0 | StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec); |
818 | |
|
819 | 0 | TmThreadsSetFlag(th_v, THV_RUNNING); |
820 | | /* don't start our activities until time is setup */ |
821 | 0 | while (!TimeModeIsReady()) { |
822 | 0 | if (suricata_ctl_flags != 0) |
823 | 0 | return TM_ECODE_OK; |
824 | 0 | usleep(10); |
825 | 0 | } |
826 | | |
827 | 0 | while (1) |
828 | 0 | { |
829 | 0 | if (TmThreadsCheckFlag(th_v, THV_PAUSE)) { |
830 | 0 | TmThreadsSetFlag(th_v, THV_PAUSED); |
831 | 0 | TmThreadTestThreadUnPaused(th_v); |
832 | 0 | TmThreadsUnsetFlag(th_v, THV_PAUSED); |
833 | 0 | } |
834 | |
|
835 | 0 | bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0); |
836 | | |
837 | | /* Get the time */ |
838 | 0 | ts = TimeGet(); |
839 | 0 | SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(ts)); |
840 | 0 | uint64_t ts_ms = SCTIME_MSECS(ts); |
841 | 0 | const bool emerge_p = (emerg && !prev_emerg); |
842 | 0 | if (emerge_p) { |
843 | 0 | next_run_ms = 0; |
844 | 0 | prev_emerg = true; |
845 | 0 | SCLogNotice("Flow emergency mode entered..."); |
846 | 0 | StatsIncr(th_v, ftd->cnt.flow_emerg_mode_enter); |
847 | 0 | } |
848 | 0 | if (ts_ms >= next_run_ms) { |
849 | 0 | if (ftd->instance == 0) { |
850 | 0 | const uint32_t sq_len = FlowSpareGetPoolSize(); |
851 | 0 | const uint32_t spare_perc = sq_len * 100 / MAX(flow_config.prealloc, 1); |
852 | | /* see if we still have enough spare flows */ |
853 | 0 | if (spare_perc < 90 || spare_perc > 110) { |
854 | 0 | FlowSparePoolUpdate(sq_len); |
855 | 0 | } |
856 | 0 | } |
857 | | |
858 | | /* try to time out flows */ |
859 | | // clang-format off |
860 | 0 | FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, }; |
861 | | // clang-format on |
862 | |
|
863 | 0 | if (emerg) { |
864 | | /* in emergency mode, do a full pass of the hash table */ |
865 | 0 | FlowTimeoutHash(&ftd->timeout, ts, ftd->min, ftd->max, &counters); |
866 | 0 | StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass); |
867 | 0 | } else { |
868 | 0 | SCLogDebug("hash %u:%u slice starting at %u with %u rows", ftd->min, ftd->max, pos, |
869 | 0 | rows_per_wu); |
870 | |
|
871 | 0 | const uint32_t ppos = pos; |
872 | 0 | FlowTimeoutHashInChunks( |
873 | 0 | &ftd->timeout, ts, ftd->min, ftd->max, &counters, rows_per_wu, &pos); |
874 | 0 | if (ppos > pos) { |
875 | 0 | StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass); |
876 | 0 | } |
877 | 0 | } |
878 | |
|
879 | 0 | const uint32_t spare_pool_len = FlowSpareGetPoolSize(); |
880 | 0 | StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)spare_pool_len); |
881 | |
|
882 | 0 | FlowCountersUpdate(th_v, ftd, &counters); |
883 | |
|
884 | 0 | if (emerg == true) { |
885 | 0 | SCLogDebug("flow_sparse_q.len = %" PRIu32 " prealloc: %" PRIu32 |
886 | 0 | "flow_spare_q status: %" PRIu32 "%% flows at the queue", |
887 | 0 | spare_pool_len, flow_config.prealloc, |
888 | 0 | spare_pool_len * 100 / MAX(flow_config.prealloc, 1)); |
889 | | |
890 | | /* only if we have pruned this "emergency_recovery" percentage |
891 | | * of flows, we will unset the emergency bit */ |
892 | 0 | if ((spare_pool_len * 100 / MAX(flow_config.prealloc, 1)) > |
893 | 0 | flow_config.emergency_recovery) { |
894 | 0 | emerg_over_cnt++; |
895 | 0 | } else { |
896 | 0 | emerg_over_cnt = 0; |
897 | 0 | } |
898 | |
|
899 | 0 | if (emerg_over_cnt >= 30) { |
900 | 0 | SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY); |
901 | 0 | FlowTimeoutsReset(); |
902 | |
|
903 | 0 | emerg = false; |
904 | 0 | prev_emerg = false; |
905 | 0 | emerg_over_cnt = 0; |
906 | 0 | SCLogNotice("Flow emergency mode over, back to normal... unsetting" |
907 | 0 | " FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX ", " |
908 | 0 | "ts.tv_usec:%" PRIuMAX ") flow_spare_q status(): %" PRIu32 |
909 | 0 | "%% flows at the queue", |
910 | 0 | (uintmax_t)SCTIME_SECS(ts), (uintmax_t)SCTIME_USECS(ts), |
911 | 0 | spare_pool_len * 100 / MAX(flow_config.prealloc, 1)); |
912 | |
|
913 | 0 | StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over); |
914 | 0 | } |
915 | 0 | } |
916 | | |
917 | | /* update work units */ |
918 | 0 | const uint32_t pmp = mp; |
919 | 0 | mp = MemcapsGetPressure() * 100; |
920 | 0 | if (ftd->instance == 0) { |
921 | 0 | StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp); |
922 | 0 | StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp); |
923 | 0 | } |
924 | 0 | GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec); |
925 | 0 | if (pmp != mp) { |
926 | 0 | StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec); |
927 | 0 | } |
928 | |
|
929 | 0 | next_run_ms = ts_ms + sleep_per_wu; |
930 | 0 | } |
931 | 0 | if (other_last_sec == 0 || other_last_sec < (uint32_t)SCTIME_SECS(ts)) { |
932 | 0 | if (ftd->instance == 0) { |
933 | 0 | DefragTimeoutHash(ts); |
934 | 0 | HostTimeoutHash(ts); |
935 | 0 | IPPairTimeoutHash(ts); |
936 | 0 | HttpRangeContainersTimeoutHash(ts); |
937 | 0 | other_last_sec = (uint32_t)SCTIME_SECS(ts); |
938 | 0 | } |
939 | 0 | } |
940 | |
|
941 | 0 | if (TmThreadsCheckFlag(th_v, THV_KILL)) { |
942 | 0 | StatsSyncCounters(th_v); |
943 | 0 | break; |
944 | 0 | } |
945 | | |
946 | 0 | if (emerg || !time_is_live) { |
947 | 0 | usleep(250); |
948 | 0 | } else { |
949 | 0 | struct timeval cond_tv; |
950 | 0 | gettimeofday(&cond_tv, NULL); |
951 | 0 | struct timeval add_tv; |
952 | 0 | add_tv.tv_sec = 0; |
953 | 0 | add_tv.tv_usec = (sleep_per_wu * 1000); |
954 | 0 | timeradd(&cond_tv, &add_tv, &cond_tv); |
955 | |
|
956 | 0 | struct timespec cond_time = FROM_TIMEVAL(cond_tv); |
957 | 0 | SCCtrlMutexLock(&flow_manager_ctrl_mutex); |
958 | 0 | while (1) { |
959 | 0 | int rc = SCCtrlCondTimedwait( |
960 | 0 | &flow_manager_ctrl_cond, &flow_manager_ctrl_mutex, &cond_time); |
961 | 0 | if (rc == ETIMEDOUT || rc < 0) |
962 | 0 | break; |
963 | 0 | if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) { |
964 | 0 | break; |
965 | 0 | } |
966 | 0 | } |
967 | 0 | SCCtrlMutexUnlock(&flow_manager_ctrl_mutex); |
968 | 0 | } |
969 | |
|
970 | 0 | SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":""); |
971 | |
|
972 | 0 | StatsSyncCountersIfSignalled(th_v); |
973 | 0 | } |
974 | 0 | return TM_ECODE_OK; |
975 | 0 | } |
976 | | |
977 | | /** \brief spawn the flow manager thread */ |
978 | | void FlowManagerThreadSpawn(void) |
979 | 0 | { |
980 | 0 | intmax_t setting = 1; |
981 | 0 | (void)ConfGetInt("flow.managers", &setting); |
982 | |
|
983 | 0 | if (setting < 1 || setting > 1024) { |
984 | 0 | FatalError("invalid flow.managers setting %" PRIdMAX, setting); |
985 | 0 | } |
986 | 0 | flowmgr_number = (uint32_t)setting; |
987 | |
|
988 | 0 | SCCtrlCondInit(&flow_manager_ctrl_cond, NULL); |
989 | 0 | SCCtrlMutexInit(&flow_manager_ctrl_mutex, NULL); |
990 | |
|
991 | 0 | SCLogConfig("using %u flow manager threads", flowmgr_number); |
992 | 0 | StatsRegisterGlobalCounter("flow.memuse", FlowGetMemuse); |
993 | |
|
994 | 0 | for (uint32_t u = 0; u < flowmgr_number; u++) { |
995 | 0 | char name[TM_THREAD_NAME_MAX]; |
996 | 0 | snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_mgr, u+1); |
997 | |
|
998 | 0 | ThreadVars *tv_flowmgr = TmThreadCreateMgmtThreadByName(name, |
999 | 0 | "FlowManager", 0); |
1000 | 0 | BUG_ON(tv_flowmgr == NULL); |
1001 | | |
1002 | 0 | if (tv_flowmgr == NULL) { |
1003 | 0 | FatalError("flow manager thread creation failed"); |
1004 | 0 | } |
1005 | 0 | if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) { |
1006 | 0 | FatalError("flow manager thread spawn failed"); |
1007 | 0 | } |
1008 | 0 | } |
1009 | 0 | return; |
1010 | 0 | } |
1011 | | |
1012 | | typedef struct FlowRecyclerThreadData_ { |
1013 | | void *output_thread_data; |
1014 | | |
1015 | | uint16_t counter_flows; |
1016 | | uint16_t counter_queue_avg; |
1017 | | uint16_t counter_queue_max; |
1018 | | |
1019 | | uint16_t counter_flow_active; |
1020 | | uint16_t counter_tcp_active_sessions; |
1021 | | FlowEndCounters fec; |
1022 | | } FlowRecyclerThreadData; |
1023 | | |
1024 | | static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data) |
1025 | 0 | { |
1026 | 0 | FlowRecyclerThreadData *ftd = SCCalloc(1, sizeof(FlowRecyclerThreadData)); |
1027 | 0 | if (ftd == NULL) |
1028 | 0 | return TM_ECODE_FAILED; |
1029 | 0 | if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) { |
1030 | 0 | SCLogError("initializing flow log API for thread failed"); |
1031 | 0 | SCFree(ftd); |
1032 | 0 | return TM_ECODE_FAILED; |
1033 | 0 | } |
1034 | 0 | SCLogDebug("output_thread_data %p", ftd->output_thread_data); |
1035 | |
|
1036 | 0 | ftd->counter_flows = StatsRegisterCounter("flow.recycler.recycled", t); |
1037 | 0 | ftd->counter_queue_avg = StatsRegisterAvgCounter("flow.recycler.queue_avg", t); |
1038 | 0 | ftd->counter_queue_max = StatsRegisterMaxCounter("flow.recycler.queue_max", t); |
1039 | |
|
1040 | 0 | ftd->counter_flow_active = StatsRegisterCounter("flow.active", t); |
1041 | 0 | ftd->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", t); |
1042 | |
|
1043 | 0 | FlowEndCountersRegister(t, &ftd->fec); |
1044 | |
|
1045 | 0 | *data = ftd; |
1046 | 0 | return TM_ECODE_OK; |
1047 | 0 | } |
1048 | | |
1049 | | static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data) |
1050 | 0 | { |
1051 | 0 | StreamTcpThreadCacheCleanup(); |
1052 | |
|
1053 | 0 | FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)data; |
1054 | 0 | if (ftd->output_thread_data != NULL) |
1055 | 0 | OutputFlowLogThreadDeinit(t, ftd->output_thread_data); |
1056 | |
|
1057 | 0 | SCFree(data); |
1058 | 0 | return TM_ECODE_OK; |
1059 | 0 | } |
1060 | | |
1061 | | static void Recycler(ThreadVars *tv, FlowRecyclerThreadData *ftd, Flow *f) |
1062 | 0 | { |
1063 | 0 | FLOWLOCK_WRLOCK(f); |
1064 | |
|
1065 | 0 | (void)OutputFlowLog(tv, ftd->output_thread_data, f); |
1066 | |
|
1067 | 0 | FlowEndCountersUpdate(tv, &ftd->fec, f); |
1068 | 0 | if (f->proto == IPPROTO_TCP && f->protoctx != NULL) { |
1069 | 0 | StatsDecr(tv, ftd->counter_tcp_active_sessions); |
1070 | 0 | } |
1071 | 0 | StatsDecr(tv, ftd->counter_flow_active); |
1072 | |
|
1073 | 0 | FlowClearMemory(f, f->protomap); |
1074 | 0 | FLOWLOCK_UNLOCK(f); |
1075 | 0 | } |
1076 | | |
1077 | | extern uint32_t flow_spare_pool_block_size; |
1078 | | |
1079 | | /** \brief Thread that manages timed out flows. |
1080 | | * |
1081 | | * \param td ThreadVars cast to void ptr |
1082 | | */ |
1083 | | static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data) |
1084 | 0 | { |
1085 | 0 | FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data; |
1086 | 0 | BUG_ON(ftd == NULL); |
1087 | 0 | const bool time_is_live = TimeModeIsLive(); |
1088 | 0 | uint64_t recycled_cnt = 0; |
1089 | 0 | FlowQueuePrivate ret_queue = { NULL, NULL, 0 }; |
1090 | |
|
1091 | 0 | TmThreadsSetFlag(th_v, THV_RUNNING); |
1092 | |
|
1093 | 0 | while (1) |
1094 | 0 | { |
1095 | 0 | if (TmThreadsCheckFlag(th_v, THV_PAUSE)) { |
1096 | 0 | TmThreadsSetFlag(th_v, THV_PAUSED); |
1097 | 0 | TmThreadTestThreadUnPaused(th_v); |
1098 | 0 | TmThreadsUnsetFlag(th_v, THV_PAUSED); |
1099 | 0 | } |
1100 | 0 | SC_ATOMIC_ADD(flowrec_busy,1); |
1101 | 0 | FlowQueuePrivate list = FlowQueueExtractPrivate(&flow_recycle_q); |
1102 | |
|
1103 | 0 | StatsAddUI64(th_v, ftd->counter_queue_avg, list.len); |
1104 | 0 | StatsSetUI64(th_v, ftd->counter_queue_max, list.len); |
1105 | |
|
1106 | 0 | const int bail = (TmThreadsCheckFlag(th_v, THV_KILL)); |
1107 | | |
1108 | | /* Get the time */ |
1109 | 0 | SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(TimeGet())); |
1110 | |
|
1111 | 0 | uint64_t cnt = 0; |
1112 | 0 | Flow *f; |
1113 | 0 | while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) { |
1114 | 0 | Recycler(th_v, ftd, f); |
1115 | 0 | cnt++; |
1116 | | |
1117 | | /* for every full sized block, add it to the spare pool */ |
1118 | 0 | FlowQueuePrivateAppendFlow(&ret_queue, f); |
1119 | 0 | if (ret_queue.len == flow_spare_pool_block_size) { |
1120 | 0 | FlowSparePoolReturnFlows(&ret_queue); |
1121 | 0 | } |
1122 | 0 | } |
1123 | 0 | if (ret_queue.len > 0) { |
1124 | 0 | FlowSparePoolReturnFlows(&ret_queue); |
1125 | 0 | } |
1126 | 0 | if (cnt > 0) { |
1127 | 0 | recycled_cnt += cnt; |
1128 | 0 | StatsAddUI64(th_v, ftd->counter_flows, cnt); |
1129 | 0 | } |
1130 | 0 | SC_ATOMIC_SUB(flowrec_busy,1); |
1131 | |
|
1132 | 0 | if (bail) { |
1133 | 0 | break; |
1134 | 0 | } |
1135 | | |
1136 | 0 | const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY); |
1137 | 0 | if (emerg || !time_is_live) { |
1138 | 0 | usleep(250); |
1139 | 0 | } else { |
1140 | 0 | struct timeval cond_tv; |
1141 | 0 | gettimeofday(&cond_tv, NULL); |
1142 | 0 | cond_tv.tv_sec += 1; |
1143 | 0 | struct timespec cond_time = FROM_TIMEVAL(cond_tv); |
1144 | 0 | SCCtrlMutexLock(&flow_recycler_ctrl_mutex); |
1145 | 0 | while (1) { |
1146 | 0 | int rc = SCCtrlCondTimedwait( |
1147 | 0 | &flow_recycler_ctrl_cond, &flow_recycler_ctrl_mutex, &cond_time); |
1148 | 0 | if (rc == ETIMEDOUT || rc < 0) { |
1149 | 0 | break; |
1150 | 0 | } |
1151 | 0 | if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) { |
1152 | 0 | break; |
1153 | 0 | } |
1154 | 0 | if (SC_ATOMIC_GET(flow_recycle_q.non_empty) == true) { |
1155 | 0 | break; |
1156 | 0 | } |
1157 | 0 | } |
1158 | 0 | SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex); |
1159 | 0 | } |
1160 | |
|
1161 | 0 | SCLogDebug("woke up..."); |
1162 | |
|
1163 | 0 | StatsSyncCountersIfSignalled(th_v); |
1164 | 0 | } |
1165 | 0 | StatsSyncCounters(th_v); |
1166 | 0 | SCLogPerf("%"PRIu64" flows processed", recycled_cnt); |
1167 | 0 | return TM_ECODE_OK; |
1168 | 0 | } |
1169 | | |
1170 | | static bool FlowRecyclerReadyToShutdown(void) |
1171 | 0 | { |
1172 | 0 | if (SC_ATOMIC_GET(flowrec_busy) != 0) { |
1173 | 0 | return false; |
1174 | 0 | } |
1175 | 0 | uint32_t len = 0; |
1176 | 0 | FQLOCK_LOCK(&flow_recycle_q); |
1177 | 0 | len = flow_recycle_q.qlen; |
1178 | 0 | FQLOCK_UNLOCK(&flow_recycle_q); |
1179 | |
|
1180 | 0 | return ((len == 0)); |
1181 | 0 | } |
1182 | | |
1183 | | /** \brief spawn the flow recycler thread */ |
1184 | | void FlowRecyclerThreadSpawn(void) |
1185 | 0 | { |
1186 | 0 | intmax_t setting = 1; |
1187 | 0 | (void)ConfGetInt("flow.recyclers", &setting); |
1188 | |
|
1189 | 0 | if (setting < 1 || setting > 1024) { |
1190 | 0 | FatalError("invalid flow.recyclers setting %" PRIdMAX, setting); |
1191 | 0 | } |
1192 | 0 | flowrec_number = (uint32_t)setting; |
1193 | |
|
1194 | 0 | SCCtrlCondInit(&flow_recycler_ctrl_cond, NULL); |
1195 | 0 | SCCtrlMutexInit(&flow_recycler_ctrl_mutex, NULL); |
1196 | |
|
1197 | 0 | SCLogConfig("using %u flow recycler threads", flowrec_number); |
1198 | |
|
1199 | 0 | for (uint32_t u = 0; u < flowrec_number; u++) { |
1200 | 0 | char name[TM_THREAD_NAME_MAX]; |
1201 | 0 | snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_rec, u+1); |
1202 | |
|
1203 | 0 | ThreadVars *tv_flowrec = TmThreadCreateMgmtThreadByName(name, |
1204 | 0 | "FlowRecycler", 0); |
1205 | |
|
1206 | 0 | if (tv_flowrec == NULL) { |
1207 | 0 | FatalError("flow recycler thread creation failed"); |
1208 | 0 | } |
1209 | 0 | if (TmThreadSpawn(tv_flowrec) != TM_ECODE_OK) { |
1210 | 0 | FatalError("flow recycler thread spawn failed"); |
1211 | 0 | } |
1212 | 0 | } |
1213 | 0 | return; |
1214 | 0 | } |
1215 | | |
1216 | | /** |
1217 | | * \brief Used to disable flow recycler thread(s). |
1218 | | * |
1219 | | * \note this should only be called when the flow manager is already gone |
1220 | | * |
1221 | | * \todo Kinda hackish since it uses the tv name to identify flow recycler |
1222 | | * thread. We need an all weather identification scheme. |
1223 | | */ |
1224 | | void FlowDisableFlowRecyclerThread(void) |
1225 | 0 | { |
1226 | | /* move all flows still in the hash to the recycler queue */ |
1227 | 0 | #ifndef DEBUG |
1228 | 0 | (void)FlowCleanupHash(); |
1229 | | #else |
1230 | | uint32_t flows = FlowCleanupHash(); |
1231 | | SCLogDebug("flows to progress: %u", flows); |
1232 | | #endif |
1233 | | |
1234 | | /* make sure all flows are processed */ |
1235 | 0 | do { |
1236 | 0 | FlowWakeupFlowRecyclerThread(); |
1237 | 0 | usleep(10); |
1238 | 0 | } while (FlowRecyclerReadyToShutdown() == false); |
1239 | |
|
1240 | 0 | SCMutexLock(&tv_root_lock); |
1241 | | /* flow recycler thread(s) is/are a part of mgmt threads */ |
1242 | 0 | for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) { |
1243 | 0 | if (strncasecmp(tv->name, thread_name_flow_rec, |
1244 | 0 | strlen(thread_name_flow_rec)) == 0) |
1245 | 0 | { |
1246 | 0 | TmThreadsSetFlag(tv, THV_KILL); |
1247 | 0 | } |
1248 | 0 | } |
1249 | 0 | SCMutexUnlock(&tv_root_lock); |
1250 | |
|
1251 | 0 | struct timeval start_ts; |
1252 | 0 | struct timeval cur_ts; |
1253 | 0 | gettimeofday(&start_ts, NULL); |
1254 | |
|
1255 | 0 | again: |
1256 | 0 | gettimeofday(&cur_ts, NULL); |
1257 | 0 | if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) { |
1258 | 0 | FatalError("unable to get all flow recycler " |
1259 | 0 | "threads to shutdown in time"); |
1260 | 0 | } |
1261 | | |
1262 | 0 | SCMutexLock(&tv_root_lock); |
1263 | 0 | for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) { |
1264 | 0 | if (strncasecmp(tv->name, thread_name_flow_rec, |
1265 | 0 | strlen(thread_name_flow_rec)) == 0) |
1266 | 0 | { |
1267 | 0 | if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) { |
1268 | 0 | SCMutexUnlock(&tv_root_lock); |
1269 | 0 | FlowWakeupFlowRecyclerThread(); |
1270 | | /* sleep outside lock */ |
1271 | 0 | SleepMsec(1); |
1272 | 0 | goto again; |
1273 | 0 | } |
1274 | 0 | } |
1275 | 0 | } |
1276 | 0 | SCMutexUnlock(&tv_root_lock); |
1277 | | |
1278 | | /* reset count, so we can kill and respawn (unix socket) */ |
1279 | 0 | SC_ATOMIC_SET(flowrec_cnt, 0); |
1280 | 0 | return; |
1281 | 0 | } |
1282 | | |
1283 | | void TmModuleFlowManagerRegister (void) |
1284 | 71 | { |
1285 | 71 | tmm_modules[TMM_FLOWMANAGER].name = "FlowManager"; |
1286 | 71 | tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit; |
1287 | 71 | tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit; |
1288 | 71 | tmm_modules[TMM_FLOWMANAGER].Management = FlowManager; |
1289 | 71 | tmm_modules[TMM_FLOWMANAGER].cap_flags = 0; |
1290 | 71 | tmm_modules[TMM_FLOWMANAGER].flags = TM_FLAG_MANAGEMENT_TM; |
1291 | 71 | SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name); |
1292 | | |
1293 | 71 | SC_ATOMIC_INIT(flowmgr_cnt); |
1294 | 71 | SC_ATOMIC_INITPTR(flow_timeouts); |
1295 | 71 | } |
1296 | | |
1297 | | void TmModuleFlowRecyclerRegister (void) |
1298 | 71 | { |
1299 | 71 | tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler"; |
1300 | 71 | tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit; |
1301 | 71 | tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit; |
1302 | 71 | tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler; |
1303 | 71 | tmm_modules[TMM_FLOWRECYCLER].cap_flags = 0; |
1304 | 71 | tmm_modules[TMM_FLOWRECYCLER].flags = TM_FLAG_MANAGEMENT_TM; |
1305 | 71 | SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name); |
1306 | | |
1307 | 71 | SC_ATOMIC_INIT(flowrec_cnt); |
1308 | 71 | SC_ATOMIC_INIT(flowrec_busy); |
1309 | 71 | } |