Coverage Report

Created: 2026-05-16 07:38

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/suricata7/src/flow-worker.c
Line
Count
Source
1
/* Copyright (C) 2016-2020 Open Information Security Foundation
2
 *
3
 * You can copy, redistribute or modify this Program under the terms of
4
 * the GNU General Public License version 2 as published by the Free
5
 * Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * version 2 along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15
 * 02110-1301, USA.
16
 */
17
18
/**
19
 * \file
20
 *
21
 * \author Victor Julien <victor@inliniac.net>
22
 *
23
 * Flow Workers are single thread modules taking care of (almost)
24
 * everything related to packets with flows:
25
 *
26
 * - Lookup/creation
27
 * - Stream tracking, reassembly
28
 * - Applayer update
29
 * - Detection
30
 *
31
 * This all while holding the flow lock.
32
 */
33
34
#include "suricata-common.h"
35
#include "suricata.h"
36
37
#include "action-globals.h"
38
#include "packet.h"
39
#include "decode.h"
40
#include "detect.h"
41
#include "stream-tcp.h"
42
#include "app-layer.h"
43
#include "detect-engine.h"
44
#include "output.h"
45
#include "app-layer-parser.h"
46
#include "app-layer-frames.h"
47
48
#include "util-profiling.h"
49
#include "util-validate.h"
50
#include "util-time.h"
51
#include "tmqh-packetpool.h"
52
53
#include "flow-util.h"
54
#include "flow-manager.h"
55
#include "flow-timeout.h"
56
#include "flow-spare-pool.h"
57
#include "flow-worker.h"
58
59
typedef DetectEngineThreadCtx *DetectEngineThreadCtxPtr;
60
61
typedef struct FlowTimeoutCounters {
62
    uint32_t flows_aside_needs_work;
63
    uint32_t flows_aside_pkt_inject;
64
} FlowTimeoutCounters;
65
66
typedef struct FlowWorkerThreadData_ {
67
    DecodeThreadVars *dtv;
68
69
    union {
70
        StreamTcpThread *stream_thread;
71
        void *stream_thread_ptr;
72
    };
73
74
    SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread);
75
76
    void *output_thread; /* Output thread data. */
77
    void *output_thread_flow; /* Output thread data. */
78
79
    uint16_t local_bypass_pkts;
80
    uint16_t local_bypass_bytes;
81
    uint16_t both_bypass_pkts;
82
    uint16_t both_bypass_bytes;
83
    /** Queue to put pseudo packets that have been created by the stream (RST response) and by the
84
     * flush logic following a protocol change. */
85
    PacketQueueNoLock pq;
86
    FlowLookupStruct fls;
87
88
    struct {
89
        uint16_t flows_injected;
90
        uint16_t flows_injected_max;
91
        uint16_t flows_removed;
92
        uint16_t flows_aside_needs_work;
93
        uint16_t flows_aside_pkt_inject;
94
    } cnt;
95
    FlowEndCounters fec;
96
97
} FlowWorkerThreadData;
98
99
static void FlowWorkerFlowTimeout(ThreadVars *tv, Packet *p, FlowWorkerThreadData *fw, void *detect_thread);
100
Packet *FlowForceReassemblyPseudoPacketGet(int direction, Flow *f, TcpSession *ssn);
101
102
/**
103
 * \internal
104
 * \brief Forces reassembly for flow if it needs it.
105
 *
106
 *        The function requires flow to be locked beforehand.
107
 *
108
 * \param f Pointer to the flow.
109
 *
110
 * \retval cnt number of packets injected
111
 */
112
static int FlowFinish(ThreadVars *tv, Flow *f, FlowWorkerThreadData *fw, void *detect_thread)
113
80.3k
{
114
80.3k
    Packet *p1 = NULL, *p2 = NULL;
115
80.3k
    const int server = f->ffr_tc;
116
80.3k
    const int client = f->ffr_ts;
117
118
    /* Get the tcp session for the flow */
119
80.3k
    TcpSession *ssn = (TcpSession *)f->protoctx;
120
121
    /* The packets we use are based on what segments in what direction are
122
     * unprocessed.
123
     * p1 if we have client segments for reassembly purpose only.  If we
124
     * have no server segments p2 can be a toserver packet with dummy
125
     * seq/ack, and if we have server segments p2 has to carry out reassembly
126
     * for server segment as well, in which case we will also need a p3 in the
127
     * toclient which is now dummy since all we need it for is detection */
128
129
    /* insert a pseudo packet in the toserver direction */
130
80.3k
    if (client == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) {
131
78.1k
        p1 = FlowForceReassemblyPseudoPacketGet(0, f, ssn);
132
78.1k
        if (p1 == NULL) {
133
0
            return 0;
134
0
        }
135
78.1k
        PKT_SET_SRC(p1, PKT_SRC_FFR);
136
137
78.1k
        if (server == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) {
138
53.6k
            p2 = FlowForceReassemblyPseudoPacketGet(1, f, ssn);
139
53.6k
            if (p2 == NULL) {
140
0
                FlowDeReference(&p1->flow);
141
0
                TmqhOutputPacketpool(NULL, p1);
142
0
                return 0;
143
0
            }
144
53.6k
            PKT_SET_SRC(p2, PKT_SRC_FFR);
145
53.6k
            p2->flowflags |= FLOW_PKT_LAST_PSEUDO;
146
53.6k
        } else {
147
24.5k
            p1->flowflags |= FLOW_PKT_LAST_PSEUDO;
148
24.5k
        }
149
78.1k
    } else {
150
2.17k
        if (server == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) {
151
2.17k
            p1 = FlowForceReassemblyPseudoPacketGet(1, f, ssn);
152
2.17k
            if (p1 == NULL) {
153
0
                return 0;
154
0
            }
155
2.17k
            PKT_SET_SRC(p1, PKT_SRC_FFR);
156
2.17k
            p1->flowflags |= FLOW_PKT_LAST_PSEUDO;
157
2.17k
        } else {
158
            /* impossible */
159
0
            BUG_ON(1);
160
0
        }
161
2.17k
    }
162
80.3k
    f->flags |= FLOW_TIMEOUT_REASSEMBLY_DONE;
163
164
80.3k
    FlowWorkerFlowTimeout(tv, p1, fw, detect_thread);
165
80.3k
    PacketPoolReturnPacket(p1);
166
80.3k
    if (p2) {
167
53.6k
        FlowWorkerFlowTimeout(tv, p2, fw, detect_thread);
168
53.6k
        PacketPoolReturnPacket(p2);
169
53.6k
        return 2;
170
53.6k
    }
171
26.7k
    return 1;
172
80.3k
}
173
174
extern uint32_t flow_spare_pool_block_size;
175
176
/** \param[in] max_work Max flows to process. 0 if unlimited. */
177
static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw, FlowTimeoutCounters *counters,
178
        FlowQueuePrivate *fq, const uint32_t max_work)
179
101k
{
180
101k
    FlowQueuePrivate ret_queue = { NULL, NULL, 0 };
181
101k
    uint32_t i = 0;
182
101k
    Flow *f;
183
203k
    while ((f = FlowQueuePrivateGetFromTop(fq)) != NULL) {
184
102k
        FLOWLOCK_WRLOCK(f);
185
102k
        f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT; //TODO emerg
186
187
102k
        if (f->proto == IPPROTO_TCP) {
188
98.2k
            if (!(f->flags & (FLOW_TIMEOUT_REASSEMBLY_DONE | FLOW_ACTION_DROP)) &&
189
98.1k
                    !FlowIsBypassed(f) && FlowForceReassemblyNeedReassembly(f) == 1 &&
190
80.3k
                    f->ffr != 0) {
191
                /* read detect thread in case we're doing a reload */
192
80.3k
                void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
193
80.3k
                int cnt = FlowFinish(tv, f, fw, detect_thread);
194
80.3k
                counters->flows_aside_pkt_inject += cnt;
195
80.3k
                counters->flows_aside_needs_work++;
196
80.3k
            }
197
98.2k
        }
198
199
        /* no one is referring to this flow, removed from hash
200
         * so we can unlock it and pass it to the flow recycler */
201
202
102k
        if (fw->output_thread_flow != NULL)
203
102k
            (void)OutputFlowLog(tv, fw->output_thread_flow, f);
204
205
102k
        FlowEndCountersUpdate(tv, &fw->fec, f);
206
102k
        if (f->proto == IPPROTO_TCP && f->protoctx != NULL) {
207
95.0k
            StatsDecr(tv, fw->dtv->counter_tcp_active_sessions);
208
95.0k
        }
209
102k
        StatsDecr(tv, fw->dtv->counter_flow_active);
210
211
102k
        FlowClearMemory (f, f->protomap);
212
102k
        FLOWLOCK_UNLOCK(f);
213
214
102k
        if (fw->fls.spare_queue.len >= (flow_spare_pool_block_size * 2)) {
215
0
            FlowQueuePrivatePrependFlow(&ret_queue, f);
216
0
            if (ret_queue.len == flow_spare_pool_block_size) {
217
0
                FlowSparePoolReturnFlows(&ret_queue);
218
0
            }
219
102k
        } else {
220
102k
            FlowQueuePrivatePrependFlow(&fw->fls.spare_queue, f);
221
102k
        }
222
223
102k
        if (max_work != 0 && ++i == max_work)
224
309
            break;
225
102k
    }
226
101k
    if (ret_queue.len > 0) {
227
0
        FlowSparePoolReturnFlows(&ret_queue);
228
0
    }
229
230
101k
    StatsAddUI64(tv, fw->cnt.flows_removed, (uint64_t)i);
231
101k
}
232
233
/** \brief handle flow for packet
234
 *
235
 *  Handle flow creation/lookup
236
 */
237
static inline TmEcode FlowUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
238
14.0M
{
239
14.0M
    FlowHandlePacketUpdate(p->flow, p, tv, fw->dtv);
240
241
14.0M
    int state = p->flow->flow_state;
242
14.0M
    switch (state) {
243
#ifdef CAPTURE_OFFLOAD
244
        case FLOW_STATE_CAPTURE_BYPASSED: {
245
            StatsAddUI64(tv, fw->both_bypass_pkts, 1);
246
            StatsAddUI64(tv, fw->both_bypass_bytes, GET_PKT_LEN(p));
247
            Flow *f = p->flow;
248
            FlowDeReference(&p->flow);
249
            FLOWLOCK_UNLOCK(f);
250
            return TM_ECODE_DONE;
251
        }
252
#endif
253
307
        case FLOW_STATE_LOCAL_BYPASSED: {
254
307
            StatsAddUI64(tv, fw->local_bypass_pkts, 1);
255
307
            StatsAddUI64(tv, fw->local_bypass_bytes, GET_PKT_LEN(p));
256
307
            Flow *f = p->flow;
257
307
            FlowDeReference(&p->flow);
258
307
            FLOWLOCK_UNLOCK(f);
259
307
            return TM_ECODE_DONE;
260
0
        }
261
14.0M
        default:
262
14.0M
            return TM_ECODE_OK;
263
14.0M
    }
264
14.0M
}
265
266
static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data);
267
268
static TmEcode FlowWorkerThreadInit(ThreadVars *tv, const void *initdata, void **data)
269
4
{
270
4
    FlowWorkerThreadData *fw = SCCalloc(1, sizeof(*fw));
271
4
    if (fw == NULL)
272
0
        return TM_ECODE_FAILED;
273
274
4
    SC_ATOMIC_INITPTR(fw->detect_thread);
275
4
    SC_ATOMIC_SET(fw->detect_thread, NULL);
276
277
4
    fw->local_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_pkts", tv);
278
4
    fw->local_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_bytes", tv);
279
4
    fw->both_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_capture_pkts", tv);
280
4
    fw->both_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_capture_bytes", tv);
281
282
4
    fw->cnt.flows_aside_needs_work = StatsRegisterCounter("flow.wrk.flows_evicted_needs_work", tv);
283
4
    fw->cnt.flows_aside_pkt_inject = StatsRegisterCounter("flow.wrk.flows_evicted_pkt_inject", tv);
284
4
    fw->cnt.flows_removed = StatsRegisterCounter("flow.wrk.flows_evicted", tv);
285
4
    fw->cnt.flows_injected = StatsRegisterCounter("flow.wrk.flows_injected", tv);
286
4
    fw->cnt.flows_injected_max = StatsRegisterMaxCounter("flow.wrk.flows_injected_max", tv);
287
288
4
    fw->fls.dtv = fw->dtv = DecodeThreadVarsAlloc(tv);
289
4
    if (fw->dtv == NULL) {
290
0
        FlowWorkerThreadDeinit(tv, fw);
291
0
        return TM_ECODE_FAILED;
292
0
    }
293
294
    /* setup TCP */
295
4
    if (StreamTcpThreadInit(tv, NULL, &fw->stream_thread_ptr) != TM_ECODE_OK) {
296
0
        FlowWorkerThreadDeinit(tv, fw);
297
0
        return TM_ECODE_FAILED;
298
0
    }
299
300
4
    if (DetectEngineEnabled()) {
301
        /* setup DETECT */
302
4
        void *detect_thread = NULL;
303
4
        if (DetectEngineThreadCtxInit(tv, NULL, &detect_thread) != TM_ECODE_OK) {
304
0
            FlowWorkerThreadDeinit(tv, fw);
305
0
            return TM_ECODE_FAILED;
306
0
        }
307
4
        SC_ATOMIC_SET(fw->detect_thread, detect_thread);
308
4
    }
309
310
    /* Setup outputs for this thread. */
311
4
    if (OutputLoggerThreadInit(tv, initdata, &fw->output_thread) != TM_ECODE_OK) {
312
0
        FlowWorkerThreadDeinit(tv, fw);
313
0
        return TM_ECODE_FAILED;
314
0
    }
315
4
    if (OutputFlowLogThreadInit(tv, NULL, &fw->output_thread_flow) != TM_ECODE_OK) {
316
0
        SCLogError("initializing flow log API for thread failed");
317
0
        FlowWorkerThreadDeinit(tv, fw);
318
0
        return TM_ECODE_FAILED;
319
0
    }
320
321
4
    DecodeRegisterPerfCounters(fw->dtv, tv);
322
4
    AppLayerRegisterThreadCounters(tv);
323
4
    FlowEndCountersRegister(tv, &fw->fec);
324
325
    /* setup pq for stream end pkts */
326
4
    memset(&fw->pq, 0, sizeof(PacketQueueNoLock));
327
4
    *data = fw;
328
4
    return TM_ECODE_OK;
329
4
}
330
331
static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data)
332
0
{
333
0
    FlowWorkerThreadData *fw = data;
334
335
0
    DecodeThreadVarsFree(tv, fw->dtv);
336
337
    /* free TCP */
338
0
    StreamTcpThreadDeinit(tv, (void *)fw->stream_thread);
339
340
    /* free DETECT */
341
0
    void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
342
0
    if (detect_thread != NULL) {
343
0
        DetectEngineThreadCtxDeinit(tv, detect_thread);
344
0
        SC_ATOMIC_SET(fw->detect_thread, NULL);
345
0
    }
346
347
    /* Free output. */
348
0
    OutputLoggerThreadDeinit(tv, fw->output_thread);
349
0
    OutputFlowLogThreadDeinit(tv, fw->output_thread_flow);
350
351
    /* free pq */
352
0
    BUG_ON(fw->pq.len);
353
354
0
    Flow *f;
355
0
    while ((f = FlowQueuePrivateGetFromTop(&fw->fls.spare_queue)) != NULL) {
356
0
        FlowFree(f);
357
0
    }
358
359
0
    SCFree(fw);
360
0
    return TM_ECODE_OK;
361
0
}
362
363
TmEcode Detect(ThreadVars *tv, Packet *p, void *data);
364
TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueueNoLock *pq);
365
366
static inline void UpdateCounters(ThreadVars *tv,
367
        FlowWorkerThreadData *fw, const FlowTimeoutCounters *counters)
368
203k
{
369
203k
    if (counters->flows_aside_needs_work) {
370
151k
        StatsAddUI64(tv, fw->cnt.flows_aside_needs_work,
371
151k
                (uint64_t)counters->flows_aside_needs_work);
372
151k
    }
373
203k
    if (counters->flows_aside_pkt_inject) {
374
151k
        StatsAddUI64(tv, fw->cnt.flows_aside_pkt_inject,
375
151k
                (uint64_t)counters->flows_aside_pkt_inject);
376
151k
    }
377
203k
}
378
379
/** \brief update stream engine
380
 *
381
 *  We can be called from both the flow timeout path as well as from the
382
 *  "real" traffic path. If in the timeout path any additional packets we
383
 *  forge for flushing pipelines should not leave our scope. If the original
384
 *  packet is real (or related to a real packet) we need to push the packets
385
 *  on, so IPS logic stays valid.
386
 */
387
static inline void FlowWorkerStreamTCPUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p,
388
        void *detect_thread, const bool timeout)
389
6.80M
{
390
6.80M
    FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_STREAM);
391
6.80M
    StreamTcp(tv, p, fw->stream_thread, &fw->pq);
392
6.80M
    FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_STREAM);
393
394
    // this is the first packet that sets no payload inspection
395
6.80M
    bool setting_nopayload =
396
6.80M
            p->flow->alparser &&
397
5.02M
            AppLayerParserStateIssetFlag(p->flow->alparser, APP_LAYER_PARSER_NO_INSPECTION) &&
398
103
            !(p->flags & PKT_NOPAYLOAD_INSPECTION);
399
6.80M
    if (FlowChangeProto(p->flow) || setting_nopayload) {
400
18.4k
        StreamTcpDetectLogFlush(tv, fw->stream_thread, p->flow, p, &fw->pq);
401
18.4k
        if (setting_nopayload) {
402
15
            FlowSetNoPayloadInspectionFlag(p->flow);
403
15
        }
404
18.4k
        AppLayerParserStateSetFlag(p->flow->alparser, APP_LAYER_PARSER_EOF_TS);
405
18.4k
        AppLayerParserStateSetFlag(p->flow->alparser, APP_LAYER_PARSER_EOF_TC);
406
18.4k
    }
407
408
    /* Packets here can safely access p->flow as it's locked */
409
6.80M
    SCLogDebug("packet %"PRIu64": extra packets %u", p->pcap_cnt, fw->pq.len);
410
6.80M
    Packet *x;
411
6.83M
    while ((x = PacketDequeueNoLock(&fw->pq))) {
412
36.8k
        SCLogDebug("packet %"PRIu64" extra packet %p", p->pcap_cnt, x);
413
414
36.8k
        if (detect_thread != NULL) {
415
36.8k
            FLOWWORKER_PROFILING_START(x, PROFILE_FLOWWORKER_DETECT);
416
36.8k
            Detect(tv, x, detect_thread);
417
36.8k
            FLOWWORKER_PROFILING_END(x, PROFILE_FLOWWORKER_DETECT);
418
36.8k
        }
419
420
36.8k
        OutputLoggerLog(tv, x, fw->output_thread);
421
422
36.8k
        FramesPrune(x->flow, x);
423
        /*  Release tcp segments. Done here after alerting can use them. */
424
36.8k
        FLOWWORKER_PROFILING_START(x, PROFILE_FLOWWORKER_TCPPRUNE);
425
36.8k
        StreamTcpPruneSession(
426
36.8k
                x->flow, x->flowflags & FLOW_PKT_TOSERVER ? STREAM_TOSERVER : STREAM_TOCLIENT);
427
36.8k
        FLOWWORKER_PROFILING_END(x, PROFILE_FLOWWORKER_TCPPRUNE);
428
429
        /* no need to keep a flow ref beyond this point */
430
36.8k
        FlowDeReference(&x->flow);
431
432
        /* no further work to do for this pseudo packet, so we can return
433
         * it to the pool immediately. */
434
36.8k
        if (timeout) {
435
132
            PacketPoolReturnPacket(x);
436
36.7k
        } else {
437
            /* to support IPS verdict logic, in the non-timeout case we need to do a bit more */
438
36.7k
            TmqhOutputPacketpool(tv, x);
439
36.7k
        }
440
36.8k
    }
441
6.80M
    if (FlowChangeProto(p->flow) && p->flow->flags & FLOW_ACTION_DROP) {
442
        // in case f->flags & FLOW_ACTION_DROP was set by one of the dequeued packets
443
20
        PacketDrop(p, ACTION_DROP, PKT_DROP_REASON_FLOW_DROP);
444
20
    }
445
6.80M
}
446
447
static void FlowWorkerFlowTimeout(ThreadVars *tv, Packet *p, FlowWorkerThreadData *fw,
448
        void *detect_thread)
449
133k
{
450
133k
    DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR);
451
452
133k
    SCLogDebug("packet %"PRIu64" is TCP. Direction %s", p->pcap_cnt, PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT");
453
133k
    DEBUG_VALIDATE_BUG_ON(!(p->flow && PKT_IS_TCP(p)));
454
133k
    DEBUG_ASSERT_FLOW_LOCKED(p->flow);
455
456
    /* handle TCP and app layer */
457
133k
    FlowWorkerStreamTCPUpdate(tv, fw, p, detect_thread, true);
458
459
133k
    PacketUpdateEngineEventCounters(tv, fw->dtv, p);
460
461
    /* handle Detect */
462
133k
    SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt);
463
133k
    if (detect_thread != NULL) {
464
133k
        FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT);
465
133k
        Detect(tv, p, detect_thread);
466
133k
        FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT);
467
133k
    }
468
469
    // Outputs.
470
133k
    OutputLoggerLog(tv, p, fw->output_thread);
471
472
133k
    FramesPrune(p->flow, p);
473
474
    /*  Release tcp segments. Done here after alerting can use them. */
475
133k
    FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_TCPPRUNE);
476
133k
    StreamTcpPruneSession(p->flow, p->flowflags & FLOW_PKT_TOSERVER ?
477
73.3k
            STREAM_TOSERVER : STREAM_TOCLIENT);
478
133k
    FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_TCPPRUNE);
479
480
    /* run tx cleanup last */
481
133k
    AppLayerParserTransactionsCleanup(p->flow, STREAM_FLAGS_FOR_PACKET(p));
482
483
133k
    FlowDeReference(&p->flow);
484
    /* flow is unlocked later in FlowFinish() */
485
133k
}
486
487
/** \internal
488
 *  \brief process flows injected into our queue by other threads
489
 */
490
static inline void FlowWorkerProcessInjectedFlows(
491
        ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
492
8.55M
{
493
    /* take injected flows and append to our work queue */
494
8.55M
    FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_INJECTED);
495
8.55M
    FlowQueuePrivate injected = { NULL, NULL, 0 };
496
8.55M
    if (SC_ATOMIC_GET(tv->flow_queue->non_empty) == true)
497
0
        injected = FlowQueueExtractPrivate(tv->flow_queue);
498
8.55M
    if (injected.len > 0) {
499
0
        StatsAddUI64(tv, fw->cnt.flows_injected, (uint64_t)injected.len);
500
0
        if (p->pkt_src == PKT_SRC_WIRE)
501
0
            StatsSetUI64(tv, fw->cnt.flows_injected_max, (uint64_t)injected.len);
502
503
        /* move to local queue so we can process over the course of multiple packets */
504
0
        FlowQueuePrivateAppendPrivate(&fw->fls.work_queue, &injected);
505
0
    }
506
8.55M
    FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_INJECTED);
507
8.55M
}
508
509
/** \internal
510
 *  \brief process flows set aside locally during flow lookup
511
 */
512
static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p)
513
17.4M
{
514
17.4M
    uint32_t max_work = 2;
515
17.4M
    if (p->pkt_src == PKT_SRC_SHUTDOWN_FLUSH || p->pkt_src == PKT_SRC_CAPTURE_TIMEOUT)
516
0
        max_work = 0;
517
518
17.4M
    FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
519
17.4M
    if (fw->fls.work_queue.len) {
520
203k
        FlowTimeoutCounters counters = { 0, 0, };
521
203k
        CheckWorkQueue(tv, fw, &counters, &fw->fls.work_queue, max_work);
522
203k
        UpdateCounters(tv, fw, &counters);
523
203k
    }
524
17.4M
    FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_EVICTED);
525
17.4M
}
526
527
/** \internal
528
 *  \brief apply Packet::app_update_direction to the flow flags
529
 */
530
static void PacketAppUpdate2FlowFlags(Packet *p)
531
6.88M
{
532
6.88M
    switch ((enum StreamUpdateDir)p->app_update_direction) {
533
5.19M
        case UPDATE_DIR_NONE: // NONE implies pseudo packet
534
5.19M
            break;
535
213k
        case UPDATE_DIR_PACKET:
536
213k
            if (PKT_IS_TOSERVER(p)) {
537
166k
                p->flow->flags |= FLOW_TS_APP_UPDATED;
538
166k
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", p->pcap_cnt);
539
166k
            } else {
540
47.1k
                p->flow->flags |= FLOW_TC_APP_UPDATED;
541
47.1k
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", p->pcap_cnt);
542
47.1k
            }
543
213k
            break;
544
2.78k
        case UPDATE_DIR_BOTH:
545
2.78k
            if (PKT_IS_TOSERVER(p)) {
546
1.42k
                p->flow->flags |= FLOW_TS_APP_UPDATED;
547
1.42k
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", p->pcap_cnt);
548
1.42k
            } else {
549
1.36k
                p->flow->flags |= FLOW_TC_APP_UPDATED;
550
1.36k
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", p->pcap_cnt);
551
1.36k
            }
552
            /* fall through */
553
1.47M
        case UPDATE_DIR_OPPOSING:
554
1.47M
            if (PKT_IS_TOSERVER(p)) {
555
870k
                p->flow->flags |= FLOW_TC_APP_UPDATED;
556
870k
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", p->pcap_cnt);
557
870k
            } else {
558
600k
                p->flow->flags |= FLOW_TS_APP_UPDATED;
559
600k
                SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", p->pcap_cnt);
560
600k
            }
561
1.47M
            break;
562
6.88M
    }
563
6.88M
}
564
565
static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data)
566
8.55M
{
567
8.55M
    FlowWorkerThreadData *fw = data;
568
8.55M
    void *detect_thread = SC_ATOMIC_GET(fw->detect_thread);
569
570
8.55M
    DEBUG_VALIDATE_BUG_ON(p == NULL);
571
8.55M
    DEBUG_VALIDATE_BUG_ON(tv->flow_queue == NULL);
572
573
8.55M
    SCLogDebug("packet %"PRIu64, p->pcap_cnt);
574
575
    /* update time */
576
8.55M
    if (!(PKT_IS_PSEUDOPKT(p))) {
577
8.55M
        TimeSetByThread(tv->id, p->ts);
578
8.55M
    }
579
580
    /* handle Flow */
581
8.55M
    if (p->flags & PKT_WANTS_FLOW) {
582
6.99M
        FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW);
583
584
6.99M
        FlowHandlePacket(tv, &fw->fls, p);
585
6.99M
        if (likely(p->flow != NULL)) {
586
6.98M
            DEBUG_ASSERT_FLOW_LOCKED(p->flow);
587
6.98M
            if (FlowUpdate(tv, fw, p) == TM_ECODE_DONE) {
588
241
                goto housekeeping;
589
241
            }
590
6.98M
        }
591
        /* Flow is now LOCKED */
592
593
6.99M
        FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW);
594
595
    /* if PKT_WANTS_FLOW is not set, but PKT_HAS_FLOW is, then this is a
596
     * pseudo packet created by the flow manager. */
597
6.99M
    } else if (p->flags & PKT_HAS_FLOW) {
598
0
        FLOWLOCK_WRLOCK(p->flow);
599
0
        DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR);
600
0
    }
601
602
8.55M
    SCLogDebug("packet %"PRIu64" has flow? %s", p->pcap_cnt, p->flow ? "yes" : "no");
603
604
    /* handle TCP and app layer */
605
8.55M
    if (p->flow) {
606
6.98M
        if (PKT_IS_TCP(p)) {
607
6.66M
            SCLogDebug("packet %" PRIu64 " is TCP. Direction %s", p->pcap_cnt,
608
6.66M
                    PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT");
609
6.66M
            DEBUG_ASSERT_FLOW_LOCKED(p->flow);
610
611
            /* if detect is disabled, we need to apply file flags to the flow
612
             * here on the first packet. */
613
6.66M
            if (detect_thread == NULL &&
614
0
                    ((PKT_IS_TOSERVER(p) && (p->flowflags & FLOW_PKT_TOSERVER_FIRST)) ||
615
0
                            (PKT_IS_TOCLIENT(p) && (p->flowflags & FLOW_PKT_TOCLIENT_FIRST)))) {
616
0
                DisableDetectFlowFileFlags(p->flow);
617
0
            }
618
619
6.66M
            FlowWorkerStreamTCPUpdate(tv, fw, p, detect_thread, false);
620
6.66M
            PacketAppUpdate2FlowFlags(p);
621
622
            /* handle the app layer part of the UDP packet payload */
623
6.66M
        } else if (p->proto == IPPROTO_UDP && !PacketCheckAction(p, ACTION_DROP)) {
624
216k
            FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_APPLAYERUDP);
625
216k
            AppLayerHandleUdp(tv, fw->stream_thread->ra_ctx->app_tctx, p, p->flow);
626
216k
            FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_APPLAYERUDP);
627
216k
            PacketAppUpdate2FlowFlags(p);
628
216k
        }
629
6.98M
    }
630
631
8.55M
    PacketUpdateEngineEventCounters(tv, fw->dtv, p);
632
633
    /* handle Detect */
634
8.55M
    DEBUG_ASSERT_FLOW_LOCKED(p->flow);
635
8.55M
    SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt);
636
8.55M
    if (detect_thread != NULL) {
637
8.55M
        FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT);
638
8.55M
        Detect(tv, p, detect_thread);
639
8.55M
        FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT);
640
8.55M
    }
641
642
    // Outputs.
643
8.55M
    OutputLoggerLog(tv, p, fw->output_thread);
644
645
    /*  Release tcp segments. Done here after alerting can use them. */
646
8.55M
    if (p->flow != NULL) {
647
6.98M
        DEBUG_ASSERT_FLOW_LOCKED(p->flow);
648
649
6.98M
        if (FlowIsBypassed(p->flow)) {
650
10
            FlowCleanupAppLayer(p->flow);
651
10
            if (p->proto == IPPROTO_TCP) {
652
1
                StreamTcpSessionCleanup(p->flow->protoctx);
653
1
            }
654
6.98M
        } else if (p->proto == IPPROTO_TCP && p->flow->protoctx) {
655
6.60M
            FramesPrune(p->flow, p);
656
6.60M
            FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_TCPPRUNE);
657
6.60M
            StreamTcpPruneSession(p->flow, p->flowflags & FLOW_PKT_TOSERVER ?
658
3.64M
                    STREAM_TOSERVER : STREAM_TOCLIENT);
659
6.60M
            FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_TCPPRUNE);
660
6.60M
        } else if (p->proto == IPPROTO_UDP) {
661
216k
            FramesPrune(p->flow, p);
662
216k
        }
663
664
6.98M
        if ((PKT_IS_PSEUDOPKT(p)) ||
665
6.98M
                (p->flow->flags & (FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED))) {
666
3.55M
            if (PKT_IS_TOSERVER(p)) {
667
1.99M
                if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TS_APP_UPDATED))) {
668
734k
                    AppLayerParserTransactionsCleanup(p->flow, STREAM_TOSERVER);
669
734k
                    p->flow->flags &= ~FLOW_TS_APP_UPDATED;
670
734k
                }
671
1.99M
            } else {
672
1.55M
                if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TC_APP_UPDATED))) {
673
892k
                    AppLayerParserTransactionsCleanup(p->flow, STREAM_TOCLIENT);
674
892k
                    p->flow->flags &= ~FLOW_TC_APP_UPDATED;
675
892k
                }
676
1.55M
            }
677
678
3.55M
        } else {
679
3.43M
            SCLogDebug("not pseudo, no app update: skip");
680
3.43M
        }
681
682
6.98M
        if (p->flow->flags & FLOW_ACTION_DROP) {
683
2.95k
            SCLogDebug("flow drop in place: remove app update flags");
684
2.95k
            p->flow->flags &= ~(FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED);
685
2.95k
        }
686
687
6.98M
        Flow *f = p->flow;
688
6.98M
        FlowDeReference(&p->flow);
689
6.98M
        FLOWLOCK_UNLOCK(f);
690
6.98M
    }
691
692
8.55M
housekeeping:
693
694
    /* take injected flows and add them to our local queue */
695
8.55M
    FlowWorkerProcessInjectedFlows(tv, fw, p);
696
697
    /* process local work queue */
698
8.55M
    FlowWorkerProcessLocalFlows(tv, fw, p);
699
700
8.55M
    return TM_ECODE_OK;
701
8.55M
}
702
703
void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx)
704
122k
{
705
122k
    FlowWorkerThreadData *fw = flow_worker;
706
707
122k
    SC_ATOMIC_SET(fw->detect_thread, detect_ctx);
708
122k
}
709
710
void *FlowWorkerGetDetectCtxPtr(void *flow_worker)
711
122k
{
712
122k
    FlowWorkerThreadData *fw = flow_worker;
713
714
122k
    return SC_ATOMIC_GET(fw->detect_thread);
715
122k
}
716
717
const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi)
718
0
{
719
0
    switch (fwi) {
720
0
        case PROFILE_FLOWWORKER_FLOW:
721
0
            return "flow";
722
0
        case PROFILE_FLOWWORKER_STREAM:
723
0
            return "stream";
724
0
        case PROFILE_FLOWWORKER_APPLAYERUDP:
725
0
            return "app-layer";
726
0
        case PROFILE_FLOWWORKER_DETECT:
727
0
            return "detect";
728
0
        case PROFILE_FLOWWORKER_TCPPRUNE:
729
0
            return "tcp-prune";
730
0
        case PROFILE_FLOWWORKER_FLOW_INJECTED:
731
0
            return "flow-inject";
732
0
        case PROFILE_FLOWWORKER_FLOW_EVICTED:
733
0
            return "flow-evict";
734
0
        case PROFILE_FLOWWORKER_SIZE:
735
0
            return "size";
736
0
    }
737
0
    return "error";
738
0
}
739
740
static void FlowWorkerExitPrintStats(ThreadVars *tv, void *data)
741
0
{
742
0
    FlowWorkerThreadData *fw = data;
743
0
    OutputLoggerExitPrintStats(tv, fw->output_thread);
744
0
}
745
746
static bool FlowWorkerIsBusy(ThreadVars *tv, void *flow_worker)
747
0
{
748
0
    FlowWorkerThreadData *fw = flow_worker;
749
0
    if (fw->pq.len)
750
0
        return true;
751
0
    if (fw->fls.work_queue.len)
752
0
        return true;
753
754
0
    if (tv->flow_queue) {
755
0
        FQLOCK_LOCK(tv->flow_queue);
756
0
        bool fq_done = (tv->flow_queue->qlen == 0);
757
0
        FQLOCK_UNLOCK(tv->flow_queue);
758
0
        if (!fq_done) {
759
0
            return true;
760
0
        }
761
0
    }
762
763
0
    return false;
764
0
}
765
766
void TmModuleFlowWorkerRegister (void)
767
71
{
768
71
    tmm_modules[TMM_FLOWWORKER].name = "FlowWorker";
769
71
    tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit;
770
71
    tmm_modules[TMM_FLOWWORKER].Func = FlowWorker;
771
71
    tmm_modules[TMM_FLOWWORKER].ThreadBusy = FlowWorkerIsBusy;
772
71
    tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit;
773
71
    tmm_modules[TMM_FLOWWORKER].ThreadExitPrintStats = FlowWorkerExitPrintStats;
774
71
    tmm_modules[TMM_FLOWWORKER].cap_flags = 0;
775
71
    tmm_modules[TMM_FLOWWORKER].flags = TM_FLAG_STREAM_TM|TM_FLAG_DETECT_TM;
776
71
}