Coverage Report

Created: 2025-07-23 07:29

/src/suricata7/src/flow-manager.c
Line
Count
Source (jump to first uncovered line)
1
/* Copyright (C) 2007-2024 Open Information Security Foundation
2
 *
3
 * You can copy, redistribute or modify this Program under the terms of
4
 * the GNU General Public License version 2 as published by the Free
5
 * Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * version 2 along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15
 * 02110-1301, USA.
16
 */
17
18
/**
19
 * \file
20
 *
21
 * \author Anoop Saldanha <anoopsaldanha@gmail.com>
22
 * \author Victor Julien <victor@inliniac.net>
23
 */
24
25
#include "suricata-common.h"
26
#include "conf.h"
27
#include "threadvars.h"
28
#include "tm-threads.h"
29
#include "runmodes.h"
30
31
#include "util-random.h"
32
#include "util-time.h"
33
34
#include "flow.h"
35
#include "flow-queue.h"
36
#include "flow-hash.h"
37
#include "flow-util.h"
38
#include "flow-private.h"
39
#include "flow-timeout.h"
40
#include "flow-manager.h"
41
#include "flow-storage.h"
42
#include "flow-spare-pool.h"
43
44
#include "stream-tcp-reassemble.h"
45
#include "stream-tcp.h"
46
47
#include "util-unittest.h"
48
#include "util-unittest-helper.h"
49
#include "util-device.h"
50
51
#include "util-debug.h"
52
53
#include "threads.h"
54
#include "detect.h"
55
#include "detect-engine-state.h"
56
#include "stream.h"
57
58
#include "app-layer-parser.h"
59
60
#include "host-timeout.h"
61
#include "defrag-timeout.h"
62
#include "ippair-timeout.h"
63
#include "app-layer-htp-range.h"
64
65
#include "output-flow.h"
66
67
#include "runmode-unix-socket.h"
68
69
/* Run mode selected at suricata.c */
70
extern int run_mode;
71
72
/** queue to pass flows to cleanup/log thread(s) */
73
FlowQueue flow_recycle_q;
74
75
/* multi flow manager support */
76
static uint32_t flowmgr_number = 1;
77
/* atomic counter for flow managers, to assign instance id */
78
SC_ATOMIC_DECLARE(uint32_t, flowmgr_cnt);
79
80
/* multi flow recycler support */
81
static uint32_t flowrec_number = 1;
82
/* atomic counter for flow recyclers, to assign instance id */
83
SC_ATOMIC_DECLARE(uint32_t, flowrec_cnt);
84
SC_ATOMIC_DECLARE(uint32_t, flowrec_busy);
85
SC_ATOMIC_EXTERN(unsigned int, flow_flags);
86
87
SCCtrlCondT flow_manager_ctrl_cond;
88
SCCtrlMutex flow_manager_ctrl_mutex;
89
SCCtrlCondT flow_recycler_ctrl_cond;
90
SCCtrlMutex flow_recycler_ctrl_mutex;
91
92
void FlowTimeoutsInit(void)
93
71
{
94
71
    SC_ATOMIC_SET(flow_timeouts, flow_timeouts_normal);
95
71
}
96
97
void FlowTimeoutsEmergency(void)
98
0
{
99
0
    SC_ATOMIC_SET(flow_timeouts, flow_timeouts_emerg);
100
0
}
101
102
/* 1 seconds */
103
#define FLOW_NORMAL_MODE_UPDATE_DELAY_SEC 1
104
#define FLOW_NORMAL_MODE_UPDATE_DELAY_NSEC 0
105
/* 0.3 seconds */
106
#define FLOW_EMERG_MODE_UPDATE_DELAY_SEC 0
107
#define FLOW_EMERG_MODE_UPDATE_DELAY_NSEC 300000
108
#define NEW_FLOW_COUNT_COND 10
109
110
typedef struct FlowTimeoutCounters_ {
111
    uint32_t rows_checked;
112
    uint32_t rows_skipped;
113
    uint32_t rows_empty;
114
    uint32_t rows_maxlen;
115
116
    uint32_t flows_checked;
117
    uint32_t flows_notimeout;
118
    uint32_t flows_timeout;
119
    uint32_t flows_removed;
120
    uint32_t flows_aside;
121
    uint32_t flows_aside_needs_work;
122
123
    uint32_t bypassed_count;
124
    uint64_t bypassed_pkts;
125
    uint64_t bypassed_bytes;
126
} FlowTimeoutCounters;
127
128
/**
129
 * \brief Used to disable flow manager thread(s).
130
 *
131
 * \todo Kinda hackish since it uses the tv name to identify flow manager
132
 *       thread.  We need an all weather identification scheme.
133
 */
134
void FlowDisableFlowManagerThread(void)
135
0
{
136
0
    SCMutexLock(&tv_root_lock);
137
    /* flow manager thread(s) is/are a part of mgmt threads */
138
0
    for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
139
0
        if (strncasecmp(tv->name, thread_name_flow_mgr,
140
0
            strlen(thread_name_flow_mgr)) == 0)
141
0
        {
142
0
            TmThreadsSetFlag(tv, THV_KILL);
143
0
        }
144
0
    }
145
0
    SCMutexUnlock(&tv_root_lock);
146
147
0
    struct timeval start_ts;
148
0
    struct timeval cur_ts;
149
0
    gettimeofday(&start_ts, NULL);
150
151
0
again:
152
0
    gettimeofday(&cur_ts, NULL);
153
0
    if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
154
0
        FatalError("unable to get all flow manager "
155
0
                   "threads to shutdown in time");
156
0
    }
157
158
0
    SCMutexLock(&tv_root_lock);
159
0
    for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
160
0
        if (strncasecmp(tv->name, thread_name_flow_mgr,
161
0
            strlen(thread_name_flow_mgr)) == 0)
162
0
        {
163
0
            if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
164
0
                SCMutexUnlock(&tv_root_lock);
165
                /* sleep outside lock */
166
0
                SleepMsec(1);
167
0
                goto again;
168
0
            }
169
0
        }
170
0
    }
171
0
    SCMutexUnlock(&tv_root_lock);
172
173
    /* reset count, so we can kill and respawn (unix socket) */
174
0
    SC_ATOMIC_SET(flowmgr_cnt, 0);
175
0
    return;
176
0
}
177
178
/** \internal
179
 *  \brief check if a flow is timed out
180
 *
181
 *  \param f flow
182
 *  \param ts timestamp
183
 *
184
 *  \retval 0 not timed out
185
 *  \retval 1 timed out
186
 */
187
static int FlowManagerFlowTimeout(Flow *f, SCTime_t ts, uint32_t *next_ts, const bool emerg)
188
0
{
189
0
    uint32_t flow_times_out_at = f->timeout_at;
190
0
    if (emerg) {
191
0
        extern FlowProtoTimeout flow_timeouts_delta[FLOW_PROTO_MAX];
192
0
        flow_times_out_at -= FlowGetFlowTimeoutDirect(flow_timeouts_delta, f->flow_state, f->protomap);
193
0
    }
194
0
    if (*next_ts == 0 || flow_times_out_at < *next_ts)
195
0
        *next_ts = flow_times_out_at;
196
197
    /* do the timeout check */
198
0
    if ((uint64_t)flow_times_out_at >= SCTIME_SECS(ts)) {
199
0
        return 0;
200
0
    }
201
202
0
    return 1;
203
0
}
204
205
/** \internal
206
 *  \brief check timeout of captured bypassed flow by querying capture method
207
 *
208
 *  \param f Flow
209
 *  \param ts timestamp
210
 *  \param counters Flow timeout counters
211
 *
212
 *  \retval 0 not timeout
213
 *  \retval 1 timeout (or not capture bypassed)
214
 */
215
static inline int FlowBypassedTimeout(Flow *f, SCTime_t ts, FlowTimeoutCounters *counters)
216
0
{
217
#ifdef CAPTURE_OFFLOAD
218
    if (f->flow_state != FLOW_STATE_CAPTURE_BYPASSED) {
219
        return 1;
220
    }
221
222
    FlowBypassInfo *fc = FlowGetStorageById(f, GetFlowBypassInfoID());
223
    if (fc && fc->BypassUpdate) {
224
        /* flow will be possibly updated */
225
        uint64_t pkts_tosrc = fc->tosrcpktcnt;
226
        uint64_t bytes_tosrc = fc->tosrcbytecnt;
227
        uint64_t pkts_todst = fc->todstpktcnt;
228
        uint64_t bytes_todst = fc->todstbytecnt;
229
        bool update = fc->BypassUpdate(f, fc->bypass_data, SCTIME_SECS(ts));
230
        if (update) {
231
            SCLogDebug("Updated flow: %"PRId64"", FlowGetId(f));
232
            pkts_tosrc = fc->tosrcpktcnt - pkts_tosrc;
233
            bytes_tosrc = fc->tosrcbytecnt - bytes_tosrc;
234
            pkts_todst = fc->todstpktcnt - pkts_todst;
235
            bytes_todst = fc->todstbytecnt - bytes_todst;
236
            if (f->livedev) {
237
                SC_ATOMIC_ADD(f->livedev->bypassed,
238
                        pkts_tosrc + pkts_todst);
239
            }
240
            counters->bypassed_pkts += pkts_tosrc + pkts_todst;
241
            counters->bypassed_bytes += bytes_tosrc + bytes_todst;
242
            return 0;
243
        } else {
244
            SCLogDebug("No new packet, dead flow %"PRId64"", FlowGetId(f));
245
            if (f->livedev) {
246
                if (FLOW_IS_IPV4(f)) {
247
                    LiveDevSubBypassStats(f->livedev, 1, AF_INET);
248
                } else if (FLOW_IS_IPV6(f)) {
249
                    LiveDevSubBypassStats(f->livedev, 1, AF_INET6);
250
                }
251
            }
252
            counters->bypassed_count++;
253
            return 1;
254
        }
255
    }
256
#endif /* CAPTURE_OFFLOAD */
257
0
    return 1;
258
0
}
259
260
typedef struct FlowManagerTimeoutThread {
261
    /* used to temporarily store flows that have timed out and are
262
     * removed from the hash to reduce locking contention */
263
    FlowQueuePrivate aside_queue;
264
} FlowManagerTimeoutThread;
265
266
/**
267
 * \internal
268
 *
269
 * \brief Process the temporary Aside Queue
270
 *        This means that as long as a flow f is not waiting on detection
271
 *        engine to finish dealing with it, f will be put in the recycle
272
 *        queue for further processing later on.
273
 *
274
 * \param td FM Timeout Thread instance
275
 * \param counters Flow Timeout counters to be updated
276
 *
277
 * \retval Number of flows that were recycled
278
 */
279
static uint32_t ProcessAsideQueue(FlowManagerTimeoutThread *td, FlowTimeoutCounters *counters)
280
0
{
281
0
    FlowQueuePrivate recycle = { NULL, NULL, 0 };
282
0
    counters->flows_aside += td->aside_queue.len;
283
284
0
    uint32_t cnt = 0;
285
0
    Flow *f;
286
0
    while ((f = FlowQueuePrivateGetFromTop(&td->aside_queue)) != NULL) {
287
        /* flow is still locked */
288
289
0
        if (f->proto == IPPROTO_TCP &&
290
0
                !(f->flags & (FLOW_TIMEOUT_REASSEMBLY_DONE | FLOW_ACTION_DROP)) &&
291
0
                !FlowIsBypassed(f) && FlowForceReassemblyNeedReassembly(f) == 1) {
292
            /* Send the flow to its thread */
293
0
            FlowForceReassemblyForFlow(f);
294
0
            FLOWLOCK_UNLOCK(f);
295
            /* flow ownership is already passed to the worker thread */
296
297
0
            counters->flows_aside_needs_work++;
298
0
            continue;
299
0
        }
300
0
        FLOWLOCK_UNLOCK(f);
301
302
0
        FlowQueuePrivateAppendFlow(&recycle, f);
303
0
        if (recycle.len == 100) {
304
0
            FlowQueueAppendPrivate(&flow_recycle_q, &recycle);
305
0
            FlowWakeupFlowRecyclerThread();
306
0
        }
307
0
        cnt++;
308
0
    }
309
0
    if (recycle.len) {
310
0
        FlowQueueAppendPrivate(&flow_recycle_q, &recycle);
311
0
        FlowWakeupFlowRecyclerThread();
312
0
    }
313
0
    return cnt;
314
0
}
315
316
/**
317
 *  \internal
318
 *
319
 *  \brief check all flows in a hash row for timing out
320
 *
321
 *  \param f last flow in the hash row
322
 *  \param ts timestamp
323
 *  \param emergency bool indicating emergency mode
324
 *  \param counters ptr to FlowTimeoutCounters structure
325
 */
326
static void FlowManagerHashRowTimeout(FlowManagerTimeoutThread *td, Flow *f, SCTime_t ts,
327
        int emergency, FlowTimeoutCounters *counters, uint32_t *next_ts)
328
0
{
329
0
    uint32_t checked = 0;
330
0
    Flow *prev_f = NULL;
331
332
0
    do {
333
0
        checked++;
334
335
        /* check flow timeout based on lastts and state. Both can be
336
         * accessed w/o Flow lock as we do have the hash row lock (so flow
337
         * can't disappear) and flow_state is atomic. lastts can only
338
         * be modified when we have both the flow and hash row lock */
339
340
        /* timeout logic goes here */
341
0
        if (FlowManagerFlowTimeout(f, ts, next_ts, emergency) == 0) {
342
343
0
            counters->flows_notimeout++;
344
345
0
            prev_f = f;
346
0
            f = f->next;
347
0
            continue;
348
0
        }
349
350
0
        FLOWLOCK_WRLOCK(f);
351
352
0
        Flow *next_flow = f->next;
353
354
        /* never prune a flow that is used by a packet we
355
         * are currently processing in one of the threads */
356
0
        if (!FlowBypassedTimeout(f, ts, counters)) {
357
0
            FLOWLOCK_UNLOCK(f);
358
0
            prev_f = f;
359
0
            f = f->next;
360
0
            continue;
361
0
        }
362
363
0
        f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT;
364
365
0
        counters->flows_timeout++;
366
367
0
        RemoveFromHash(f, prev_f);
368
369
0
        FlowQueuePrivateAppendFlow(&td->aside_queue, f);
370
        /* flow is still locked in the queue */
371
372
0
        f = next_flow;
373
0
    } while (f != NULL);
374
375
0
    counters->flows_checked += checked;
376
0
    if (checked > counters->rows_maxlen)
377
0
        counters->rows_maxlen = checked;
378
0
}
379
380
/**
381
 * \internal
382
 *
383
 * \brief Clear evicted list from Flow Manager.
384
 *        All the evicted flows are removed from the Flow bucket and added
385
 *        to the temporary Aside Queue.
386
 *
387
 * \param td FM timeout thread instance
388
 * \param f head of the evicted list
389
 */
390
static void FlowManagerHashRowClearEvictedList(
391
        FlowManagerTimeoutThread *td, Flow *f, SCTime_t ts, FlowTimeoutCounters *counters)
392
0
{
393
0
    do {
394
0
        FLOWLOCK_WRLOCK(f);
395
0
        Flow *next_flow = f->next;
396
0
        f->next = NULL;
397
0
        f->fb = NULL;
398
399
0
        FlowQueuePrivateAppendFlow(&td->aside_queue, f);
400
        /* flow is still locked in the queue */
401
402
0
        f = next_flow;
403
0
    } while (f != NULL);
404
0
}
405
406
/**
407
 *  \brief time out flows from the hash
408
 *
409
 *  \param ts timestamp
410
 *  \param hash_min min hash index to consider
411
 *  \param hash_max max hash index to consider
412
 *  \param counters ptr to FlowTimeoutCounters structure
413
 *
414
 *  \retval cnt number of timed out flow
415
 */
416
static uint32_t FlowTimeoutHash(FlowManagerTimeoutThread *td, SCTime_t ts, const uint32_t hash_min,
417
        const uint32_t hash_max, FlowTimeoutCounters *counters)
418
0
{
419
0
    uint32_t cnt = 0;
420
0
    const int emergency = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY));
421
0
    const uint32_t rows_checked = hash_max - hash_min;
422
0
    uint32_t rows_skipped = 0;
423
0
    uint32_t rows_empty = 0;
424
425
0
#if __WORDSIZE==64
426
0
#define BITS 64
427
0
#define TYPE uint64_t
428
#else
429
#define BITS 32
430
#define TYPE uint32_t
431
#endif
432
433
0
    const uint32_t ts_secs = SCTIME_SECS(ts);
434
0
    for (uint32_t idx = hash_min; idx < hash_max; idx+=BITS) {
435
0
        TYPE check_bits = 0;
436
0
        const uint32_t check = MIN(BITS, (hash_max - idx));
437
0
        for (uint32_t i = 0; i < check; i++) {
438
0
            FlowBucket *fb = &flow_hash[idx+i];
439
0
            check_bits |= (TYPE)(SC_ATOMIC_LOAD_EXPLICIT(
440
0
                                         fb->next_ts, SC_ATOMIC_MEMORY_ORDER_RELAXED) <= ts_secs)
441
0
                          << (TYPE)i;
442
0
        }
443
0
        if (check_bits == 0)
444
0
            continue;
445
446
0
        for (uint32_t i = 0; i < check; i++) {
447
0
            FlowBucket *fb = &flow_hash[idx+i];
448
0
            if ((check_bits & ((TYPE)1 << (TYPE)i)) != 0 && SC_ATOMIC_GET(fb->next_ts) <= ts_secs) {
449
0
                FBLOCK_LOCK(fb);
450
0
                Flow *evicted = NULL;
451
0
                if (fb->evicted != NULL || fb->head != NULL) {
452
0
                    if (fb->evicted != NULL) {
453
                        /* transfer out of bucket so we can do additional work outside
454
                         * of the bucket lock */
455
0
                        evicted = fb->evicted;
456
0
                        fb->evicted = NULL;
457
0
                    }
458
0
                    if (fb->head != NULL) {
459
0
                        uint32_t next_ts = 0;
460
0
                        FlowManagerHashRowTimeout(td, fb->head, ts, emergency, counters, &next_ts);
461
462
0
                        if (SC_ATOMIC_GET(fb->next_ts) != next_ts)
463
0
                            SC_ATOMIC_SET(fb->next_ts, next_ts);
464
0
                    }
465
0
                    if (fb->evicted == NULL && fb->head == NULL) {
466
                        /* row is empty */
467
0
                        SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
468
0
                    }
469
0
                } else {
470
0
                    SC_ATOMIC_SET(fb->next_ts, UINT_MAX);
471
0
                    rows_empty++;
472
0
                }
473
0
                FBLOCK_UNLOCK(fb);
474
                /* processed evicted list */
475
0
                if (evicted) {
476
0
                    FlowManagerHashRowClearEvictedList(td, evicted, ts, counters);
477
0
                }
478
0
            } else {
479
0
                rows_skipped++;
480
0
            }
481
0
        }
482
0
        if (td->aside_queue.len) {
483
0
            cnt += ProcessAsideQueue(td, counters);
484
0
        }
485
0
    }
486
487
0
    counters->rows_checked += rows_checked;
488
0
    counters->rows_skipped += rows_skipped;
489
0
    counters->rows_empty += rows_empty;
490
491
0
    if (td->aside_queue.len) {
492
0
        cnt += ProcessAsideQueue(td, counters);
493
0
    }
494
0
    counters->flows_removed += cnt;
495
    /* coverity[missing_unlock : FALSE] */
496
0
    return cnt;
497
0
}
498
499
/** \internal
500
 *
501
 *  \brief handle timeout for a slice of hash rows
502
 *         If we wrap around we call FlowTimeoutHash twice
503
 *  \param td FM timeout thread
504
 *  \param ts timeout in seconds
505
 *  \param hash_min lower bound of the row slice
506
 *  \param hash_max upper bound of the row slice
507
 *  \param counters Flow timeout counters to be passed
508
 *  \param rows number of rows for this worker unit
509
 *  \param pos absolute position of the beginning of row slice in the hash table
510
 *
511
 *  \retval number of successfully timed out flows
512
 */
513
static uint32_t FlowTimeoutHashInChunks(FlowManagerTimeoutThread *td, SCTime_t ts,
514
        const uint32_t hash_min, const uint32_t hash_max, FlowTimeoutCounters *counters,
515
        const uint32_t rows, uint32_t *pos)
516
0
{
517
0
    uint32_t start = 0;
518
0
    uint32_t end = 0;
519
0
    uint32_t cnt = 0;
520
0
    uint32_t rows_left = rows;
521
522
0
again:
523
0
    start = (*pos);
524
0
    if (start >= hash_max) {
525
0
        start = hash_min;
526
0
    }
527
0
    end = start + rows_left;
528
0
    if (end > hash_max) {
529
0
        end = hash_max;
530
0
    }
531
0
    *pos = (end == hash_max) ? hash_min : end;
532
0
    rows_left = rows_left - (end - start);
533
534
0
    cnt += FlowTimeoutHash(td, ts, start, end, counters);
535
0
    if (rows_left) {
536
0
        goto again;
537
0
    }
538
0
    return cnt;
539
0
}
540
541
/**
542
 *  \internal
543
 *
544
 *  \brief move all flows out of a hash row
545
 *
546
 *  \param f last flow in the hash row
547
 *  \param recycle_q Flow recycle queue
548
 *  \param mode emergency or not
549
 *
550
 *  \retval cnt number of flows removed from the hash and added to the recycle queue
551
 */
552
static uint32_t FlowManagerHashRowCleanup(Flow *f, FlowQueuePrivate *recycle_q, const int mode)
553
0
{
554
0
    uint32_t cnt = 0;
555
556
0
    do {
557
0
        FLOWLOCK_WRLOCK(f);
558
559
0
        Flow *next_flow = f->next;
560
561
        /* remove from the hash */
562
0
        if (mode == 0) {
563
0
            RemoveFromHash(f, NULL);
564
0
        } else {
565
0
            FlowBucket *fb = f->fb;
566
0
            fb->evicted = f->next;
567
0
            f->next = NULL;
568
0
            f->fb = NULL;
569
0
        }
570
0
        f->flow_end_flags |= FLOW_END_FLAG_SHUTDOWN;
571
572
        /* no one is referring to this flow, removed from hash
573
         * so we can unlock it and move it to the recycle queue. */
574
0
        FLOWLOCK_UNLOCK(f);
575
0
        FlowQueuePrivateAppendFlow(recycle_q, f);
576
577
0
        cnt++;
578
579
0
        f = next_flow;
580
0
    } while (f != NULL);
581
582
0
    return cnt;
583
0
}
584
585
/**
586
 *  \brief remove all flows from the hash
587
 *
588
 *  \retval cnt number of removes out flows
589
 */
590
static uint32_t FlowCleanupHash(void)
591
0
{
592
0
    FlowQueuePrivate local_queue = { NULL, NULL, 0 };
593
0
    uint32_t cnt = 0;
594
595
0
    for (uint32_t idx = 0; idx < flow_config.hash_size; idx++) {
596
0
        FlowBucket *fb = &flow_hash[idx];
597
598
0
        FBLOCK_LOCK(fb);
599
600
0
        if (fb->head != NULL) {
601
            /* we have a flow, or more than one */
602
0
            cnt += FlowManagerHashRowCleanup(fb->head, &local_queue, 0);
603
0
        }
604
0
        if (fb->evicted != NULL) {
605
            /* we have a flow, or more than one */
606
0
            cnt += FlowManagerHashRowCleanup(fb->evicted, &local_queue, 1);
607
0
        }
608
609
0
        FBLOCK_UNLOCK(fb);
610
0
        if (local_queue.len >= 25) {
611
0
            FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
612
0
            FlowWakeupFlowRecyclerThread();
613
0
        }
614
0
    }
615
0
    FlowQueueAppendPrivate(&flow_recycle_q, &local_queue);
616
0
    FlowWakeupFlowRecyclerThread();
617
618
0
    return cnt;
619
0
}
620
621
typedef struct FlowQueueTimeoutCounters {
622
    uint32_t flows_removed;
623
    uint32_t flows_timeout;
624
} FlowQueueTimeoutCounters;
625
626
typedef struct FlowCounters_ {
627
    uint16_t flow_mgr_full_pass;
628
    uint16_t flow_mgr_rows_sec;
629
630
    uint16_t flow_mgr_spare;
631
    uint16_t flow_emerg_mode_enter;
632
    uint16_t flow_emerg_mode_over;
633
634
    uint16_t flow_mgr_flows_checked;
635
    uint16_t flow_mgr_flows_notimeout;
636
    uint16_t flow_mgr_flows_timeout;
637
    uint16_t flow_mgr_flows_aside;
638
    uint16_t flow_mgr_flows_aside_needs_work;
639
640
    uint16_t flow_mgr_rows_maxlen;
641
642
    uint16_t flow_bypassed_cnt_clo;
643
    uint16_t flow_bypassed_pkts;
644
    uint16_t flow_bypassed_bytes;
645
646
    uint16_t memcap_pressure;
647
    uint16_t memcap_pressure_max;
648
} FlowCounters;
649
650
typedef struct FlowManagerThreadData_ {
651
    uint32_t instance;
652
    uint32_t min;
653
    uint32_t max;
654
655
    FlowCounters cnt;
656
657
    FlowManagerTimeoutThread timeout;
658
} FlowManagerThreadData;
659
660
static void FlowCountersInit(ThreadVars *t, FlowCounters *fc)
661
0
{
662
0
    fc->flow_mgr_full_pass = StatsRegisterCounter("flow.mgr.full_hash_pass", t);
663
0
    fc->flow_mgr_rows_sec = StatsRegisterCounter("flow.mgr.rows_per_sec", t);
664
665
0
    fc->flow_mgr_spare = StatsRegisterCounter("flow.spare", t);
666
0
    fc->flow_emerg_mode_enter = StatsRegisterCounter("flow.emerg_mode_entered", t);
667
0
    fc->flow_emerg_mode_over = StatsRegisterCounter("flow.emerg_mode_over", t);
668
669
0
    fc->flow_mgr_rows_maxlen = StatsRegisterMaxCounter("flow.mgr.rows_maxlen", t);
670
0
    fc->flow_mgr_flows_checked = StatsRegisterCounter("flow.mgr.flows_checked", t);
671
0
    fc->flow_mgr_flows_notimeout = StatsRegisterCounter("flow.mgr.flows_notimeout", t);
672
0
    fc->flow_mgr_flows_timeout = StatsRegisterCounter("flow.mgr.flows_timeout", t);
673
0
    fc->flow_mgr_flows_aside = StatsRegisterCounter("flow.mgr.flows_evicted", t);
674
0
    fc->flow_mgr_flows_aside_needs_work = StatsRegisterCounter("flow.mgr.flows_evicted_needs_work", t);
675
676
0
    fc->flow_bypassed_cnt_clo = StatsRegisterCounter("flow_bypassed.closed", t);
677
0
    fc->flow_bypassed_pkts = StatsRegisterCounter("flow_bypassed.pkts", t);
678
0
    fc->flow_bypassed_bytes = StatsRegisterCounter("flow_bypassed.bytes", t);
679
680
0
    fc->memcap_pressure = StatsRegisterCounter("memcap_pressure", t);
681
0
    fc->memcap_pressure_max = StatsRegisterMaxCounter("memcap_pressure_max", t);
682
0
}
683
684
static void FlowCountersUpdate(
685
        ThreadVars *th_v, const FlowManagerThreadData *ftd, const FlowTimeoutCounters *counters)
686
0
{
687
0
    StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_checked, (uint64_t)counters->flows_checked);
688
0
    StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_notimeout, (uint64_t)counters->flows_notimeout);
689
690
0
    StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_timeout, (uint64_t)counters->flows_timeout);
691
0
    StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside, (uint64_t)counters->flows_aside);
692
0
    StatsAddUI64(th_v, ftd->cnt.flow_mgr_flows_aside_needs_work,
693
0
            (uint64_t)counters->flows_aside_needs_work);
694
695
0
    StatsAddUI64(th_v, ftd->cnt.flow_bypassed_cnt_clo, (uint64_t)counters->bypassed_count);
696
0
    StatsAddUI64(th_v, ftd->cnt.flow_bypassed_pkts, (uint64_t)counters->bypassed_pkts);
697
0
    StatsAddUI64(th_v, ftd->cnt.flow_bypassed_bytes, (uint64_t)counters->bypassed_bytes);
698
699
0
    StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_maxlen, (uint64_t)counters->rows_maxlen);
700
0
}
701
702
static TmEcode FlowManagerThreadInit(ThreadVars *t, const void *initdata, void **data)
703
0
{
704
0
    FlowManagerThreadData *ftd = SCCalloc(1, sizeof(FlowManagerThreadData));
705
0
    if (ftd == NULL)
706
0
        return TM_ECODE_FAILED;
707
708
0
    ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
709
0
    SCLogDebug("flow manager instance %u", ftd->instance);
710
711
    /* set the min and max value used for hash row walking
712
     * each thread has it's own section of the flow hash */
713
0
    uint32_t range = flow_config.hash_size / flowmgr_number;
714
715
0
    ftd->min = ftd->instance * range;
716
0
    ftd->max = (ftd->instance + 1) * range;
717
718
    /* last flow-manager takes on hash_size % flowmgr_number extra rows */
719
0
    if ((ftd->instance + 1) == flowmgr_number) {
720
0
        ftd->max = flow_config.hash_size;
721
0
    }
722
0
    BUG_ON(ftd->min > flow_config.hash_size || ftd->max > flow_config.hash_size);
723
724
0
    SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);
725
726
    /* pass thread data back to caller */
727
0
    *data = ftd;
728
729
0
    FlowCountersInit(t, &ftd->cnt);
730
731
0
    PacketPoolInit();
732
0
    return TM_ECODE_OK;
733
0
}
734
735
static TmEcode FlowManagerThreadDeinit(ThreadVars *t, void *data)
736
0
{
737
0
    StreamTcpThreadCacheCleanup();
738
0
    PacketPoolDestroy();
739
0
    SCFree(data);
740
0
    return TM_ECODE_OK;
741
0
}
742
743
/** \internal
744
 *  \brief calculate number of rows to scan and how much time to sleep
745
 *         based on the busy score `mp` (0 idle, 100 max busy).
746
 *
747
 *  We try to to make sure we scan the hash once a second. The number size
748
 *  of the slice of the hash scanned is determined by our busy score 'mp'.
749
 *  We sleep for the remainder of the second after processing the slice,
750
 *  or at least an approximation of it.
751
 *  A minimum busy score of 10 is assumed to avoid a longer than 10 second
752
 *  full hash pass. This is to avoid burstiness in scanning when there is
753
 *  a rapid increase of the busy score, which could lead to the flow manager
754
 *  suddenly scanning a much larger slice of the hash leading to a burst
755
 *  in scan/eviction work.
756
 *
757
 *  \param rows number of rows for the work unit
758
 *  \param mp current memcap pressure value
759
 *  \param emergency emergency mode is set or not
760
 *  \param wu_sleep holds value of sleep time per worker unit
761
 *  \param wu_rows holds value of calculated rows to be processed per second
762
 *  \param rows_sec same as wu_rows, only used for counter updates
763
 */
764
static void GetWorkUnitSizing(const uint32_t rows, const uint32_t mp, const bool emergency,
765
        uint64_t *wu_sleep, uint32_t *wu_rows, uint32_t *rows_sec)
766
0
{
767
0
    if (emergency) {
768
0
        *wu_rows = rows;
769
0
        *wu_sleep = 250;
770
0
        return;
771
0
    }
772
    /* minimum busy score is 10 */
773
0
    const uint32_t emp = MAX(mp, 10);
774
0
    const uint32_t rows_per_sec = (uint32_t)((float)rows * (float)((float)emp / (float)100));
775
    /* calc how much time we estimate the work will take, in ms. We assume
776
     * each row takes an average of 1usec. Maxing out at 1sec. */
777
0
    const uint32_t work_per_unit = MIN(rows_per_sec / 1000, 1000);
778
    /* calc how much time we need to sleep to get to the per second cadence
779
     * but sleeping for at least 250ms. */
780
0
    const uint32_t sleep_per_unit = MAX(250, 1000 - work_per_unit);
781
0
    SCLogDebug("mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec,
782
0
            sleep_per_unit);
783
784
0
    *wu_sleep = sleep_per_unit;
785
0
    *wu_rows = rows_per_sec;
786
0
    *rows_sec = rows_per_sec;
787
0
}
788
789
/** \brief Thread that manages the flow table and times out flows.
790
 *
791
 *  \param td ThreadVars cast to void ptr
792
 *
793
 *  Keeps an eye on the spare list, alloc flows if needed...
794
 */
795
static TmEcode FlowManager(ThreadVars *th_v, void *thread_data)
796
0
{
797
0
    FlowManagerThreadData *ftd = thread_data;
798
0
    const uint32_t rows = ftd->max - ftd->min;
799
0
    const bool time_is_live = TimeModeIsLive();
800
801
0
    uint32_t emerg_over_cnt = 0;
802
0
    uint64_t next_run_ms = 0;
803
0
    uint32_t pos = ftd->min;
804
0
    uint32_t rows_sec = 0;
805
0
    uint32_t rows_per_wu = 0;
806
0
    uint64_t sleep_per_wu = 0;
807
0
    bool prev_emerg = false;
808
0
    uint32_t other_last_sec = 0; /**< last sec stamp when defrag etc ran */
809
0
    SCTime_t ts;
810
811
0
    uint32_t mp = MemcapsGetPressure() * 100;
812
0
    if (ftd->instance == 0) {
813
0
        StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
814
0
        StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
815
0
    }
816
0
    GetWorkUnitSizing(rows, mp, false, &sleep_per_wu, &rows_per_wu, &rows_sec);
817
0
    StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
818
819
0
    TmThreadsSetFlag(th_v, THV_RUNNING);
820
    /* don't start our activities until time is setup */
821
0
    while (!TimeModeIsReady()) {
822
0
        if (suricata_ctl_flags != 0)
823
0
            return TM_ECODE_OK;
824
0
        usleep(10);
825
0
    }
826
827
0
    while (1)
828
0
    {
829
0
        if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
830
0
            TmThreadsSetFlag(th_v, THV_PAUSED);
831
0
            TmThreadTestThreadUnPaused(th_v);
832
0
            TmThreadsUnsetFlag(th_v, THV_PAUSED);
833
0
        }
834
835
0
        bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0);
836
837
        /* Get the time */
838
0
        ts = TimeGet();
839
0
        SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(ts));
840
0
        uint64_t ts_ms = SCTIME_MSECS(ts);
841
0
        const bool emerge_p = (emerg && !prev_emerg);
842
0
        if (emerge_p) {
843
0
            next_run_ms = 0;
844
0
            prev_emerg = true;
845
0
            SCLogNotice("Flow emergency mode entered...");
846
0
            StatsIncr(th_v, ftd->cnt.flow_emerg_mode_enter);
847
0
        }
848
0
        if (ts_ms >= next_run_ms) {
849
0
            if (ftd->instance == 0) {
850
0
                const uint32_t sq_len = FlowSpareGetPoolSize();
851
0
                const uint32_t spare_perc = sq_len * 100 / MAX(flow_config.prealloc, 1);
852
                /* see if we still have enough spare flows */
853
0
                if (spare_perc < 90 || spare_perc > 110) {
854
0
                    FlowSparePoolUpdate(sq_len);
855
0
                }
856
0
            }
857
858
            /* try to time out flows */
859
            // clang-format off
860
0
            FlowTimeoutCounters counters = { 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, };
861
            // clang-format on
862
863
0
            if (emerg) {
864
                /* in emergency mode, do a full pass of the hash table */
865
0
                FlowTimeoutHash(&ftd->timeout, ts, ftd->min, ftd->max, &counters);
866
0
                StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
867
0
            } else {
868
0
                SCLogDebug("hash %u:%u slice starting at %u with %u rows", ftd->min, ftd->max, pos,
869
0
                        rows_per_wu);
870
871
0
                const uint32_t ppos = pos;
872
0
                FlowTimeoutHashInChunks(
873
0
                        &ftd->timeout, ts, ftd->min, ftd->max, &counters, rows_per_wu, &pos);
874
0
                if (ppos > pos) {
875
0
                    StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
876
0
                }
877
0
            }
878
879
0
            const uint32_t spare_pool_len = FlowSpareGetPoolSize();
880
0
            StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)spare_pool_len);
881
882
0
            FlowCountersUpdate(th_v, ftd, &counters);
883
884
0
            if (emerg == true) {
885
0
                SCLogDebug("flow_sparse_q.len = %" PRIu32 " prealloc: %" PRIu32
886
0
                           "flow_spare_q status: %" PRIu32 "%% flows at the queue",
887
0
                        spare_pool_len, flow_config.prealloc,
888
0
                        spare_pool_len * 100 / MAX(flow_config.prealloc, 1));
889
890
                /* only if we have pruned this "emergency_recovery" percentage
891
                 * of flows, we will unset the emergency bit */
892
0
                if ((spare_pool_len * 100 / MAX(flow_config.prealloc, 1)) >
893
0
                        flow_config.emergency_recovery) {
894
0
                    emerg_over_cnt++;
895
0
                } else {
896
0
                    emerg_over_cnt = 0;
897
0
                }
898
899
0
                if (emerg_over_cnt >= 30) {
900
0
                    SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);
901
0
                    FlowTimeoutsReset();
902
903
0
                    emerg = false;
904
0
                    prev_emerg = false;
905
0
                    emerg_over_cnt = 0;
906
0
                    SCLogNotice("Flow emergency mode over, back to normal... unsetting"
907
0
                                " FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX ", "
908
0
                                "ts.tv_usec:%" PRIuMAX ") flow_spare_q status(): %" PRIu32
909
0
                                "%% flows at the queue",
910
0
                            (uintmax_t)SCTIME_SECS(ts), (uintmax_t)SCTIME_USECS(ts),
911
0
                            spare_pool_len * 100 / MAX(flow_config.prealloc, 1));
912
913
0
                    StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over);
914
0
                }
915
0
            }
916
917
            /* update work units */
918
0
            const uint32_t pmp = mp;
919
0
            mp = MemcapsGetPressure() * 100;
920
0
            if (ftd->instance == 0) {
921
0
                StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);
922
0
                StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
923
0
            }
924
0
            GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
925
0
            if (pmp != mp) {
926
0
                StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
927
0
            }
928
929
0
            next_run_ms = ts_ms + sleep_per_wu;
930
0
        }
931
0
        if (other_last_sec == 0 || other_last_sec < (uint32_t)SCTIME_SECS(ts)) {
932
0
            if (ftd->instance == 0) {
933
0
                DefragTimeoutHash(ts);
934
0
                HostTimeoutHash(ts);
935
0
                IPPairTimeoutHash(ts);
936
0
                HttpRangeContainersTimeoutHash(ts);
937
0
                other_last_sec = (uint32_t)SCTIME_SECS(ts);
938
0
            }
939
0
        }
940
941
0
        if (TmThreadsCheckFlag(th_v, THV_KILL)) {
942
0
            StatsSyncCounters(th_v);
943
0
            break;
944
0
        }
945
946
0
        if (emerg || !time_is_live) {
947
0
            usleep(250);
948
0
        } else {
949
0
            struct timeval cond_tv;
950
0
            gettimeofday(&cond_tv, NULL);
951
0
            struct timeval add_tv;
952
0
            add_tv.tv_sec = 0;
953
0
            add_tv.tv_usec = (sleep_per_wu * 1000);
954
0
            timeradd(&cond_tv, &add_tv, &cond_tv);
955
956
0
            struct timespec cond_time = FROM_TIMEVAL(cond_tv);
957
0
            SCCtrlMutexLock(&flow_manager_ctrl_mutex);
958
0
            while (1) {
959
0
                int rc = SCCtrlCondTimedwait(
960
0
                        &flow_manager_ctrl_cond, &flow_manager_ctrl_mutex, &cond_time);
961
0
                if (rc == ETIMEDOUT || rc < 0)
962
0
                    break;
963
0
                if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
964
0
                    break;
965
0
                }
966
0
            }
967
0
            SCCtrlMutexUnlock(&flow_manager_ctrl_mutex);
968
0
        }
969
970
0
        SCLogDebug("woke up... %s", SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY ? "emergency":"");
971
972
0
        StatsSyncCountersIfSignalled(th_v);
973
0
    }
974
0
    return TM_ECODE_OK;
975
0
}
976
977
/** \brief spawn the flow manager thread */
978
void FlowManagerThreadSpawn(void)
979
0
{
980
0
    intmax_t setting = 1;
981
0
    (void)ConfGetInt("flow.managers", &setting);
982
983
0
    if (setting < 1 || setting > 1024) {
984
0
        FatalError("invalid flow.managers setting %" PRIdMAX, setting);
985
0
    }
986
0
    flowmgr_number = (uint32_t)setting;
987
988
0
    SCCtrlCondInit(&flow_manager_ctrl_cond, NULL);
989
0
    SCCtrlMutexInit(&flow_manager_ctrl_mutex, NULL);
990
991
0
    SCLogConfig("using %u flow manager threads", flowmgr_number);
992
0
    StatsRegisterGlobalCounter("flow.memuse", FlowGetMemuse);
993
994
0
    for (uint32_t u = 0; u < flowmgr_number; u++) {
995
0
        char name[TM_THREAD_NAME_MAX];
996
0
        snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_mgr, u+1);
997
998
0
        ThreadVars *tv_flowmgr = TmThreadCreateMgmtThreadByName(name,
999
0
                "FlowManager", 0);
1000
0
        BUG_ON(tv_flowmgr == NULL);
1001
1002
0
        if (tv_flowmgr == NULL) {
1003
0
            FatalError("flow manager thread creation failed");
1004
0
        }
1005
0
        if (TmThreadSpawn(tv_flowmgr) != TM_ECODE_OK) {
1006
0
            FatalError("flow manager thread spawn failed");
1007
0
        }
1008
0
    }
1009
0
    return;
1010
0
}
1011
1012
typedef struct FlowRecyclerThreadData_ {
1013
    void *output_thread_data;
1014
1015
    uint16_t counter_flows;
1016
    uint16_t counter_queue_avg;
1017
    uint16_t counter_queue_max;
1018
1019
    uint16_t counter_flow_active;
1020
    uint16_t counter_tcp_active_sessions;
1021
    FlowEndCounters fec;
1022
} FlowRecyclerThreadData;
1023
1024
static TmEcode FlowRecyclerThreadInit(ThreadVars *t, const void *initdata, void **data)
1025
0
{
1026
0
    FlowRecyclerThreadData *ftd = SCCalloc(1, sizeof(FlowRecyclerThreadData));
1027
0
    if (ftd == NULL)
1028
0
        return TM_ECODE_FAILED;
1029
0
    if (OutputFlowLogThreadInit(t, NULL, &ftd->output_thread_data) != TM_ECODE_OK) {
1030
0
        SCLogError("initializing flow log API for thread failed");
1031
0
        SCFree(ftd);
1032
0
        return TM_ECODE_FAILED;
1033
0
    }
1034
0
    SCLogDebug("output_thread_data %p", ftd->output_thread_data);
1035
1036
0
    ftd->counter_flows = StatsRegisterCounter("flow.recycler.recycled", t);
1037
0
    ftd->counter_queue_avg = StatsRegisterAvgCounter("flow.recycler.queue_avg", t);
1038
0
    ftd->counter_queue_max = StatsRegisterMaxCounter("flow.recycler.queue_max", t);
1039
1040
0
    ftd->counter_flow_active = StatsRegisterCounter("flow.active", t);
1041
0
    ftd->counter_tcp_active_sessions = StatsRegisterCounter("tcp.active_sessions", t);
1042
1043
0
    FlowEndCountersRegister(t, &ftd->fec);
1044
1045
0
    *data = ftd;
1046
0
    return TM_ECODE_OK;
1047
0
}
1048
1049
static TmEcode FlowRecyclerThreadDeinit(ThreadVars *t, void *data)
1050
0
{
1051
0
    StreamTcpThreadCacheCleanup();
1052
1053
0
    FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)data;
1054
0
    if (ftd->output_thread_data != NULL)
1055
0
        OutputFlowLogThreadDeinit(t, ftd->output_thread_data);
1056
1057
0
    SCFree(data);
1058
0
    return TM_ECODE_OK;
1059
0
}
1060
1061
static void Recycler(ThreadVars *tv, FlowRecyclerThreadData *ftd, Flow *f)
1062
0
{
1063
0
    FLOWLOCK_WRLOCK(f);
1064
1065
0
    (void)OutputFlowLog(tv, ftd->output_thread_data, f);
1066
1067
0
    FlowEndCountersUpdate(tv, &ftd->fec, f);
1068
0
    if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
1069
0
        StatsDecr(tv, ftd->counter_tcp_active_sessions);
1070
0
    }
1071
0
    StatsDecr(tv, ftd->counter_flow_active);
1072
1073
0
    FlowClearMemory(f, f->protomap);
1074
0
    FLOWLOCK_UNLOCK(f);
1075
0
}
1076
1077
extern uint32_t flow_spare_pool_block_size;
1078
1079
/** \brief Thread that manages timed out flows.
1080
 *
1081
 *  \param td ThreadVars cast to void ptr
1082
 */
1083
static TmEcode FlowRecycler(ThreadVars *th_v, void *thread_data)
1084
0
{
1085
0
    FlowRecyclerThreadData *ftd = (FlowRecyclerThreadData *)thread_data;
1086
0
    BUG_ON(ftd == NULL);
1087
0
    const bool time_is_live = TimeModeIsLive();
1088
0
    uint64_t recycled_cnt = 0;
1089
0
    FlowQueuePrivate ret_queue = { NULL, NULL, 0 };
1090
1091
0
    TmThreadsSetFlag(th_v, THV_RUNNING);
1092
1093
0
    while (1)
1094
0
    {
1095
0
        if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {
1096
0
            TmThreadsSetFlag(th_v, THV_PAUSED);
1097
0
            TmThreadTestThreadUnPaused(th_v);
1098
0
            TmThreadsUnsetFlag(th_v, THV_PAUSED);
1099
0
        }
1100
0
        SC_ATOMIC_ADD(flowrec_busy,1);
1101
0
        FlowQueuePrivate list = FlowQueueExtractPrivate(&flow_recycle_q);
1102
1103
0
        StatsAddUI64(th_v, ftd->counter_queue_avg, list.len);
1104
0
        StatsSetUI64(th_v, ftd->counter_queue_max, list.len);
1105
1106
0
        const int bail = (TmThreadsCheckFlag(th_v, THV_KILL));
1107
1108
        /* Get the time */
1109
0
        SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(TimeGet()));
1110
1111
0
        uint64_t cnt = 0;
1112
0
        Flow *f;
1113
0
        while ((f = FlowQueuePrivateGetFromTop(&list)) != NULL) {
1114
0
            Recycler(th_v, ftd, f);
1115
0
            cnt++;
1116
1117
            /* for every full sized block, add it to the spare pool */
1118
0
            FlowQueuePrivateAppendFlow(&ret_queue, f);
1119
0
            if (ret_queue.len == flow_spare_pool_block_size) {
1120
0
                FlowSparePoolReturnFlows(&ret_queue);
1121
0
            }
1122
0
        }
1123
0
        if (ret_queue.len > 0) {
1124
0
            FlowSparePoolReturnFlows(&ret_queue);
1125
0
        }
1126
0
        if (cnt > 0) {
1127
0
            recycled_cnt += cnt;
1128
0
            StatsAddUI64(th_v, ftd->counter_flows, cnt);
1129
0
        }
1130
0
        SC_ATOMIC_SUB(flowrec_busy,1);
1131
1132
0
        if (bail) {
1133
0
            break;
1134
0
        }
1135
1136
0
        const bool emerg = (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY);
1137
0
        if (emerg || !time_is_live) {
1138
0
            usleep(250);
1139
0
        } else {
1140
0
            struct timeval cond_tv;
1141
0
            gettimeofday(&cond_tv, NULL);
1142
0
            cond_tv.tv_sec += 1;
1143
0
            struct timespec cond_time = FROM_TIMEVAL(cond_tv);
1144
0
            SCCtrlMutexLock(&flow_recycler_ctrl_mutex);
1145
0
            while (1) {
1146
0
                int rc = SCCtrlCondTimedwait(
1147
0
                        &flow_recycler_ctrl_cond, &flow_recycler_ctrl_mutex, &cond_time);
1148
0
                if (rc == ETIMEDOUT || rc < 0) {
1149
0
                    break;
1150
0
                }
1151
0
                if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {
1152
0
                    break;
1153
0
                }
1154
0
                if (SC_ATOMIC_GET(flow_recycle_q.non_empty) == true) {
1155
0
                    break;
1156
0
                }
1157
0
            }
1158
0
            SCCtrlMutexUnlock(&flow_recycler_ctrl_mutex);
1159
0
        }
1160
1161
0
        SCLogDebug("woke up...");
1162
1163
0
        StatsSyncCountersIfSignalled(th_v);
1164
0
    }
1165
0
    StatsSyncCounters(th_v);
1166
0
    SCLogPerf("%"PRIu64" flows processed", recycled_cnt);
1167
0
    return TM_ECODE_OK;
1168
0
}
1169
1170
static bool FlowRecyclerReadyToShutdown(void)
1171
0
{
1172
0
    if (SC_ATOMIC_GET(flowrec_busy) != 0) {
1173
0
        return false;
1174
0
    }
1175
0
    uint32_t len = 0;
1176
0
    FQLOCK_LOCK(&flow_recycle_q);
1177
0
    len = flow_recycle_q.qlen;
1178
0
    FQLOCK_UNLOCK(&flow_recycle_q);
1179
1180
0
    return ((len == 0));
1181
0
}
1182
1183
/** \brief spawn the flow recycler thread */
1184
void FlowRecyclerThreadSpawn(void)
1185
0
{
1186
0
    intmax_t setting = 1;
1187
0
    (void)ConfGetInt("flow.recyclers", &setting);
1188
1189
0
    if (setting < 1 || setting > 1024) {
1190
0
        FatalError("invalid flow.recyclers setting %" PRIdMAX, setting);
1191
0
    }
1192
0
    flowrec_number = (uint32_t)setting;
1193
1194
0
    SCCtrlCondInit(&flow_recycler_ctrl_cond, NULL);
1195
0
    SCCtrlMutexInit(&flow_recycler_ctrl_mutex, NULL);
1196
1197
0
    SCLogConfig("using %u flow recycler threads", flowrec_number);
1198
1199
0
    for (uint32_t u = 0; u < flowrec_number; u++) {
1200
0
        char name[TM_THREAD_NAME_MAX];
1201
0
        snprintf(name, sizeof(name), "%s#%02u", thread_name_flow_rec, u+1);
1202
1203
0
        ThreadVars *tv_flowrec = TmThreadCreateMgmtThreadByName(name,
1204
0
                "FlowRecycler", 0);
1205
1206
0
        if (tv_flowrec == NULL) {
1207
0
            FatalError("flow recycler thread creation failed");
1208
0
        }
1209
0
        if (TmThreadSpawn(tv_flowrec) != TM_ECODE_OK) {
1210
0
            FatalError("flow recycler thread spawn failed");
1211
0
        }
1212
0
    }
1213
0
    return;
1214
0
}
1215
1216
/**
1217
 * \brief Used to disable flow recycler thread(s).
1218
 *
1219
 * \note this should only be called when the flow manager is already gone
1220
 *
1221
 * \todo Kinda hackish since it uses the tv name to identify flow recycler
1222
 *       thread.  We need an all weather identification scheme.
1223
 */
1224
void FlowDisableFlowRecyclerThread(void)
1225
0
{
1226
    /* move all flows still in the hash to the recycler queue */
1227
0
#ifndef DEBUG
1228
0
    (void)FlowCleanupHash();
1229
#else
1230
    uint32_t flows = FlowCleanupHash();
1231
    SCLogDebug("flows to progress: %u", flows);
1232
#endif
1233
1234
    /* make sure all flows are processed */
1235
0
    do {
1236
0
        FlowWakeupFlowRecyclerThread();
1237
0
        usleep(10);
1238
0
    } while (FlowRecyclerReadyToShutdown() == false);
1239
1240
0
    SCMutexLock(&tv_root_lock);
1241
    /* flow recycler thread(s) is/are a part of mgmt threads */
1242
0
    for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1243
0
        if (strncasecmp(tv->name, thread_name_flow_rec,
1244
0
            strlen(thread_name_flow_rec)) == 0)
1245
0
        {
1246
0
            TmThreadsSetFlag(tv, THV_KILL);
1247
0
        }
1248
0
    }
1249
0
    SCMutexUnlock(&tv_root_lock);
1250
1251
0
    struct timeval start_ts;
1252
0
    struct timeval cur_ts;
1253
0
    gettimeofday(&start_ts, NULL);
1254
1255
0
again:
1256
0
    gettimeofday(&cur_ts, NULL);
1257
0
    if ((cur_ts.tv_sec - start_ts.tv_sec) > 60) {
1258
0
        FatalError("unable to get all flow recycler "
1259
0
                   "threads to shutdown in time");
1260
0
    }
1261
1262
0
    SCMutexLock(&tv_root_lock);
1263
0
    for (ThreadVars *tv = tv_root[TVT_MGMT]; tv != NULL; tv = tv->next) {
1264
0
        if (strncasecmp(tv->name, thread_name_flow_rec,
1265
0
            strlen(thread_name_flow_rec)) == 0)
1266
0
        {
1267
0
            if (!TmThreadsCheckFlag(tv, THV_RUNNING_DONE)) {
1268
0
                SCMutexUnlock(&tv_root_lock);
1269
0
                FlowWakeupFlowRecyclerThread();
1270
                /* sleep outside lock */
1271
0
                SleepMsec(1);
1272
0
                goto again;
1273
0
            }
1274
0
        }
1275
0
    }
1276
0
    SCMutexUnlock(&tv_root_lock);
1277
1278
    /* reset count, so we can kill and respawn (unix socket) */
1279
0
    SC_ATOMIC_SET(flowrec_cnt, 0);
1280
0
    return;
1281
0
}
1282
1283
void TmModuleFlowManagerRegister (void)
1284
71
{
1285
71
    tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";
1286
71
    tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;
1287
71
    tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;
1288
71
    tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;
1289
71
    tmm_modules[TMM_FLOWMANAGER].cap_flags = 0;
1290
71
    tmm_modules[TMM_FLOWMANAGER].flags = TM_FLAG_MANAGEMENT_TM;
1291
71
    SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);
1292
1293
71
    SC_ATOMIC_INIT(flowmgr_cnt);
1294
71
    SC_ATOMIC_INITPTR(flow_timeouts);
1295
71
}
1296
1297
void TmModuleFlowRecyclerRegister (void)
1298
71
{
1299
71
    tmm_modules[TMM_FLOWRECYCLER].name = "FlowRecycler";
1300
71
    tmm_modules[TMM_FLOWRECYCLER].ThreadInit = FlowRecyclerThreadInit;
1301
71
    tmm_modules[TMM_FLOWRECYCLER].ThreadDeinit = FlowRecyclerThreadDeinit;
1302
71
    tmm_modules[TMM_FLOWRECYCLER].Management = FlowRecycler;
1303
71
    tmm_modules[TMM_FLOWRECYCLER].cap_flags = 0;
1304
71
    tmm_modules[TMM_FLOWRECYCLER].flags = TM_FLAG_MANAGEMENT_TM;
1305
71
    SCLogDebug("%s registered", tmm_modules[TMM_FLOWRECYCLER].name);
1306
1307
71
    SC_ATOMIC_INIT(flowrec_cnt);
1308
71
    SC_ATOMIC_INIT(flowrec_busy);
1309
71
}