/src/suricata7/src/flow-worker.c
Line | Count | Source |
1 | | /* Copyright (C) 2016-2020 Open Information Security Foundation |
2 | | * |
3 | | * You can copy, redistribute or modify this Program under the terms of |
4 | | * the GNU General Public License version 2 as published by the Free |
5 | | * Software Foundation. |
6 | | * |
7 | | * This program is distributed in the hope that it will be useful, |
8 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
9 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
10 | | * GNU General Public License for more details. |
11 | | * |
12 | | * You should have received a copy of the GNU General Public License |
13 | | * version 2 along with this program; if not, write to the Free Software |
14 | | * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA |
15 | | * 02110-1301, USA. |
16 | | */ |
17 | | |
18 | | /** |
19 | | * \file |
20 | | * |
21 | | * \author Victor Julien <victor@inliniac.net> |
22 | | * |
23 | | * Flow Workers are single thread modules taking care of (almost) |
24 | | * everything related to packets with flows: |
25 | | * |
26 | | * - Lookup/creation |
27 | | * - Stream tracking, reassembly |
28 | | * - Applayer update |
29 | | * - Detection |
30 | | * |
31 | | * This all while holding the flow lock. |
32 | | */ |
33 | | |
34 | | #include "suricata-common.h" |
35 | | #include "suricata.h" |
36 | | |
37 | | #include "action-globals.h" |
38 | | #include "packet.h" |
39 | | #include "decode.h" |
40 | | #include "detect.h" |
41 | | #include "stream-tcp.h" |
42 | | #include "app-layer.h" |
43 | | #include "detect-engine.h" |
44 | | #include "output.h" |
45 | | #include "app-layer-parser.h" |
46 | | #include "app-layer-frames.h" |
47 | | |
48 | | #include "util-profiling.h" |
49 | | #include "util-validate.h" |
50 | | #include "util-time.h" |
51 | | #include "tmqh-packetpool.h" |
52 | | |
53 | | #include "flow-util.h" |
54 | | #include "flow-manager.h" |
55 | | #include "flow-timeout.h" |
56 | | #include "flow-spare-pool.h" |
57 | | #include "flow-worker.h" |
58 | | |
59 | | typedef DetectEngineThreadCtx *DetectEngineThreadCtxPtr; |
60 | | |
61 | | typedef struct FlowTimeoutCounters { |
62 | | uint32_t flows_aside_needs_work; |
63 | | uint32_t flows_aside_pkt_inject; |
64 | | } FlowTimeoutCounters; |
65 | | |
66 | | typedef struct FlowWorkerThreadData_ { |
67 | | DecodeThreadVars *dtv; |
68 | | |
69 | | union { |
70 | | StreamTcpThread *stream_thread; |
71 | | void *stream_thread_ptr; |
72 | | }; |
73 | | |
74 | | SC_ATOMIC_DECLARE(DetectEngineThreadCtxPtr, detect_thread); |
75 | | |
76 | | void *output_thread; /* Output thread data. */ |
77 | | void *output_thread_flow; /* Output thread data. */ |
78 | | |
79 | | uint16_t local_bypass_pkts; |
80 | | uint16_t local_bypass_bytes; |
81 | | uint16_t both_bypass_pkts; |
82 | | uint16_t both_bypass_bytes; |
83 | | /** Queue to put pseudo packets that have been created by the stream (RST response) and by the |
84 | | * flush logic following a protocol change. */ |
85 | | PacketQueueNoLock pq; |
86 | | FlowLookupStruct fls; |
87 | | |
88 | | struct { |
89 | | uint16_t flows_injected; |
90 | | uint16_t flows_injected_max; |
91 | | uint16_t flows_removed; |
92 | | uint16_t flows_aside_needs_work; |
93 | | uint16_t flows_aside_pkt_inject; |
94 | | } cnt; |
95 | | FlowEndCounters fec; |
96 | | |
97 | | } FlowWorkerThreadData; |
98 | | |
99 | | static void FlowWorkerFlowTimeout(ThreadVars *tv, Packet *p, FlowWorkerThreadData *fw, void *detect_thread); |
100 | | Packet *FlowForceReassemblyPseudoPacketGet(int direction, Flow *f, TcpSession *ssn); |
101 | | |
102 | | /** |
103 | | * \internal |
104 | | * \brief Forces reassembly for flow if it needs it. |
105 | | * |
106 | | * The function requires flow to be locked beforehand. |
107 | | * |
108 | | * \param f Pointer to the flow. |
109 | | * |
110 | | * \retval cnt number of packets injected |
111 | | */ |
112 | | static int FlowFinish(ThreadVars *tv, Flow *f, FlowWorkerThreadData *fw, void *detect_thread) |
113 | 80.3k | { |
114 | 80.3k | Packet *p1 = NULL, *p2 = NULL; |
115 | 80.3k | const int server = f->ffr_tc; |
116 | 80.3k | const int client = f->ffr_ts; |
117 | | |
118 | | /* Get the tcp session for the flow */ |
119 | 80.3k | TcpSession *ssn = (TcpSession *)f->protoctx; |
120 | | |
121 | | /* The packets we use are based on what segments in what direction are |
122 | | * unprocessed. |
123 | | * p1 if we have client segments for reassembly purpose only. If we |
124 | | * have no server segments p2 can be a toserver packet with dummy |
125 | | * seq/ack, and if we have server segments p2 has to carry out reassembly |
126 | | * for server segment as well, in which case we will also need a p3 in the |
127 | | * toclient which is now dummy since all we need it for is detection */ |
128 | | |
129 | | /* insert a pseudo packet in the toserver direction */ |
130 | 80.3k | if (client == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) { |
131 | 78.1k | p1 = FlowForceReassemblyPseudoPacketGet(0, f, ssn); |
132 | 78.1k | if (p1 == NULL) { |
133 | 0 | return 0; |
134 | 0 | } |
135 | 78.1k | PKT_SET_SRC(p1, PKT_SRC_FFR); |
136 | | |
137 | 78.1k | if (server == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) { |
138 | 53.6k | p2 = FlowForceReassemblyPseudoPacketGet(1, f, ssn); |
139 | 53.6k | if (p2 == NULL) { |
140 | 0 | FlowDeReference(&p1->flow); |
141 | 0 | TmqhOutputPacketpool(NULL, p1); |
142 | 0 | return 0; |
143 | 0 | } |
144 | 53.6k | PKT_SET_SRC(p2, PKT_SRC_FFR); |
145 | 53.6k | p2->flowflags |= FLOW_PKT_LAST_PSEUDO; |
146 | 53.6k | } else { |
147 | 24.5k | p1->flowflags |= FLOW_PKT_LAST_PSEUDO; |
148 | 24.5k | } |
149 | 78.1k | } else { |
150 | 2.17k | if (server == STREAM_HAS_UNPROCESSED_SEGMENTS_NEED_ONLY_DETECTION) { |
151 | 2.17k | p1 = FlowForceReassemblyPseudoPacketGet(1, f, ssn); |
152 | 2.17k | if (p1 == NULL) { |
153 | 0 | return 0; |
154 | 0 | } |
155 | 2.17k | PKT_SET_SRC(p1, PKT_SRC_FFR); |
156 | 2.17k | p1->flowflags |= FLOW_PKT_LAST_PSEUDO; |
157 | 2.17k | } else { |
158 | | /* impossible */ |
159 | 0 | BUG_ON(1); |
160 | 0 | } |
161 | 2.17k | } |
162 | 80.3k | f->flags |= FLOW_TIMEOUT_REASSEMBLY_DONE; |
163 | | |
164 | 80.3k | FlowWorkerFlowTimeout(tv, p1, fw, detect_thread); |
165 | 80.3k | PacketPoolReturnPacket(p1); |
166 | 80.3k | if (p2) { |
167 | 53.6k | FlowWorkerFlowTimeout(tv, p2, fw, detect_thread); |
168 | 53.6k | PacketPoolReturnPacket(p2); |
169 | 53.6k | return 2; |
170 | 53.6k | } |
171 | 26.7k | return 1; |
172 | 80.3k | } |
173 | | |
174 | | extern uint32_t flow_spare_pool_block_size; |
175 | | |
176 | | /** \param[in] max_work Max flows to process. 0 if unlimited. */ |
177 | | static void CheckWorkQueue(ThreadVars *tv, FlowWorkerThreadData *fw, FlowTimeoutCounters *counters, |
178 | | FlowQueuePrivate *fq, const uint32_t max_work) |
179 | 101k | { |
180 | 101k | FlowQueuePrivate ret_queue = { NULL, NULL, 0 }; |
181 | 101k | uint32_t i = 0; |
182 | 101k | Flow *f; |
183 | 203k | while ((f = FlowQueuePrivateGetFromTop(fq)) != NULL) { |
184 | 102k | FLOWLOCK_WRLOCK(f); |
185 | 102k | f->flow_end_flags |= FLOW_END_FLAG_TIMEOUT; //TODO emerg |
186 | | |
187 | 102k | if (f->proto == IPPROTO_TCP) { |
188 | 98.2k | if (!(f->flags & (FLOW_TIMEOUT_REASSEMBLY_DONE | FLOW_ACTION_DROP)) && |
189 | 98.1k | !FlowIsBypassed(f) && FlowForceReassemblyNeedReassembly(f) == 1 && |
190 | 80.3k | f->ffr != 0) { |
191 | | /* read detect thread in case we're doing a reload */ |
192 | 80.3k | void *detect_thread = SC_ATOMIC_GET(fw->detect_thread); |
193 | 80.3k | int cnt = FlowFinish(tv, f, fw, detect_thread); |
194 | 80.3k | counters->flows_aside_pkt_inject += cnt; |
195 | 80.3k | counters->flows_aside_needs_work++; |
196 | 80.3k | } |
197 | 98.2k | } |
198 | | |
199 | | /* no one is referring to this flow, removed from hash |
200 | | * so we can unlock it and pass it to the flow recycler */ |
201 | | |
202 | 102k | if (fw->output_thread_flow != NULL) |
203 | 102k | (void)OutputFlowLog(tv, fw->output_thread_flow, f); |
204 | | |
205 | 102k | FlowEndCountersUpdate(tv, &fw->fec, f); |
206 | 102k | if (f->proto == IPPROTO_TCP && f->protoctx != NULL) { |
207 | 95.0k | StatsDecr(tv, fw->dtv->counter_tcp_active_sessions); |
208 | 95.0k | } |
209 | 102k | StatsDecr(tv, fw->dtv->counter_flow_active); |
210 | | |
211 | 102k | FlowClearMemory (f, f->protomap); |
212 | 102k | FLOWLOCK_UNLOCK(f); |
213 | | |
214 | 102k | if (fw->fls.spare_queue.len >= (flow_spare_pool_block_size * 2)) { |
215 | 0 | FlowQueuePrivatePrependFlow(&ret_queue, f); |
216 | 0 | if (ret_queue.len == flow_spare_pool_block_size) { |
217 | 0 | FlowSparePoolReturnFlows(&ret_queue); |
218 | 0 | } |
219 | 102k | } else { |
220 | 102k | FlowQueuePrivatePrependFlow(&fw->fls.spare_queue, f); |
221 | 102k | } |
222 | | |
223 | 102k | if (max_work != 0 && ++i == max_work) |
224 | 309 | break; |
225 | 102k | } |
226 | 101k | if (ret_queue.len > 0) { |
227 | 0 | FlowSparePoolReturnFlows(&ret_queue); |
228 | 0 | } |
229 | | |
230 | 101k | StatsAddUI64(tv, fw->cnt.flows_removed, (uint64_t)i); |
231 | 101k | } |
232 | | |
233 | | /** \brief handle flow for packet |
234 | | * |
235 | | * Handle flow creation/lookup |
236 | | */ |
237 | | static inline TmEcode FlowUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p) |
238 | 14.0M | { |
239 | 14.0M | FlowHandlePacketUpdate(p->flow, p, tv, fw->dtv); |
240 | | |
241 | 14.0M | int state = p->flow->flow_state; |
242 | 14.0M | switch (state) { |
243 | | #ifdef CAPTURE_OFFLOAD |
244 | | case FLOW_STATE_CAPTURE_BYPASSED: { |
245 | | StatsAddUI64(tv, fw->both_bypass_pkts, 1); |
246 | | StatsAddUI64(tv, fw->both_bypass_bytes, GET_PKT_LEN(p)); |
247 | | Flow *f = p->flow; |
248 | | FlowDeReference(&p->flow); |
249 | | FLOWLOCK_UNLOCK(f); |
250 | | return TM_ECODE_DONE; |
251 | | } |
252 | | #endif |
253 | 307 | case FLOW_STATE_LOCAL_BYPASSED: { |
254 | 307 | StatsAddUI64(tv, fw->local_bypass_pkts, 1); |
255 | 307 | StatsAddUI64(tv, fw->local_bypass_bytes, GET_PKT_LEN(p)); |
256 | 307 | Flow *f = p->flow; |
257 | 307 | FlowDeReference(&p->flow); |
258 | 307 | FLOWLOCK_UNLOCK(f); |
259 | 307 | return TM_ECODE_DONE; |
260 | 0 | } |
261 | 14.0M | default: |
262 | 14.0M | return TM_ECODE_OK; |
263 | 14.0M | } |
264 | 14.0M | } |
265 | | |
266 | | static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data); |
267 | | |
268 | | static TmEcode FlowWorkerThreadInit(ThreadVars *tv, const void *initdata, void **data) |
269 | 4 | { |
270 | 4 | FlowWorkerThreadData *fw = SCCalloc(1, sizeof(*fw)); |
271 | 4 | if (fw == NULL) |
272 | 0 | return TM_ECODE_FAILED; |
273 | | |
274 | 4 | SC_ATOMIC_INITPTR(fw->detect_thread); |
275 | 4 | SC_ATOMIC_SET(fw->detect_thread, NULL); |
276 | | |
277 | 4 | fw->local_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_pkts", tv); |
278 | 4 | fw->local_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_bytes", tv); |
279 | 4 | fw->both_bypass_pkts = StatsRegisterCounter("flow_bypassed.local_capture_pkts", tv); |
280 | 4 | fw->both_bypass_bytes = StatsRegisterCounter("flow_bypassed.local_capture_bytes", tv); |
281 | | |
282 | 4 | fw->cnt.flows_aside_needs_work = StatsRegisterCounter("flow.wrk.flows_evicted_needs_work", tv); |
283 | 4 | fw->cnt.flows_aside_pkt_inject = StatsRegisterCounter("flow.wrk.flows_evicted_pkt_inject", tv); |
284 | 4 | fw->cnt.flows_removed = StatsRegisterCounter("flow.wrk.flows_evicted", tv); |
285 | 4 | fw->cnt.flows_injected = StatsRegisterCounter("flow.wrk.flows_injected", tv); |
286 | 4 | fw->cnt.flows_injected_max = StatsRegisterMaxCounter("flow.wrk.flows_injected_max", tv); |
287 | | |
288 | 4 | fw->fls.dtv = fw->dtv = DecodeThreadVarsAlloc(tv); |
289 | 4 | if (fw->dtv == NULL) { |
290 | 0 | FlowWorkerThreadDeinit(tv, fw); |
291 | 0 | return TM_ECODE_FAILED; |
292 | 0 | } |
293 | | |
294 | | /* setup TCP */ |
295 | 4 | if (StreamTcpThreadInit(tv, NULL, &fw->stream_thread_ptr) != TM_ECODE_OK) { |
296 | 0 | FlowWorkerThreadDeinit(tv, fw); |
297 | 0 | return TM_ECODE_FAILED; |
298 | 0 | } |
299 | | |
300 | 4 | if (DetectEngineEnabled()) { |
301 | | /* setup DETECT */ |
302 | 4 | void *detect_thread = NULL; |
303 | 4 | if (DetectEngineThreadCtxInit(tv, NULL, &detect_thread) != TM_ECODE_OK) { |
304 | 0 | FlowWorkerThreadDeinit(tv, fw); |
305 | 0 | return TM_ECODE_FAILED; |
306 | 0 | } |
307 | 4 | SC_ATOMIC_SET(fw->detect_thread, detect_thread); |
308 | 4 | } |
309 | | |
310 | | /* Setup outputs for this thread. */ |
311 | 4 | if (OutputLoggerThreadInit(tv, initdata, &fw->output_thread) != TM_ECODE_OK) { |
312 | 0 | FlowWorkerThreadDeinit(tv, fw); |
313 | 0 | return TM_ECODE_FAILED; |
314 | 0 | } |
315 | 4 | if (OutputFlowLogThreadInit(tv, NULL, &fw->output_thread_flow) != TM_ECODE_OK) { |
316 | 0 | SCLogError("initializing flow log API for thread failed"); |
317 | 0 | FlowWorkerThreadDeinit(tv, fw); |
318 | 0 | return TM_ECODE_FAILED; |
319 | 0 | } |
320 | | |
321 | 4 | DecodeRegisterPerfCounters(fw->dtv, tv); |
322 | 4 | AppLayerRegisterThreadCounters(tv); |
323 | 4 | FlowEndCountersRegister(tv, &fw->fec); |
324 | | |
325 | | /* setup pq for stream end pkts */ |
326 | 4 | memset(&fw->pq, 0, sizeof(PacketQueueNoLock)); |
327 | 4 | *data = fw; |
328 | 4 | return TM_ECODE_OK; |
329 | 4 | } |
330 | | |
331 | | static TmEcode FlowWorkerThreadDeinit(ThreadVars *tv, void *data) |
332 | 0 | { |
333 | 0 | FlowWorkerThreadData *fw = data; |
334 | |
|
335 | 0 | DecodeThreadVarsFree(tv, fw->dtv); |
336 | | |
337 | | /* free TCP */ |
338 | 0 | StreamTcpThreadDeinit(tv, (void *)fw->stream_thread); |
339 | | |
340 | | /* free DETECT */ |
341 | 0 | void *detect_thread = SC_ATOMIC_GET(fw->detect_thread); |
342 | 0 | if (detect_thread != NULL) { |
343 | 0 | DetectEngineThreadCtxDeinit(tv, detect_thread); |
344 | 0 | SC_ATOMIC_SET(fw->detect_thread, NULL); |
345 | 0 | } |
346 | | |
347 | | /* Free output. */ |
348 | 0 | OutputLoggerThreadDeinit(tv, fw->output_thread); |
349 | 0 | OutputFlowLogThreadDeinit(tv, fw->output_thread_flow); |
350 | | |
351 | | /* free pq */ |
352 | 0 | BUG_ON(fw->pq.len); |
353 | | |
354 | 0 | Flow *f; |
355 | 0 | while ((f = FlowQueuePrivateGetFromTop(&fw->fls.spare_queue)) != NULL) { |
356 | 0 | FlowFree(f); |
357 | 0 | } |
358 | |
|
359 | 0 | SCFree(fw); |
360 | 0 | return TM_ECODE_OK; |
361 | 0 | } |
362 | | |
363 | | TmEcode Detect(ThreadVars *tv, Packet *p, void *data); |
364 | | TmEcode StreamTcp (ThreadVars *, Packet *, void *, PacketQueueNoLock *pq); |
365 | | |
366 | | static inline void UpdateCounters(ThreadVars *tv, |
367 | | FlowWorkerThreadData *fw, const FlowTimeoutCounters *counters) |
368 | 203k | { |
369 | 203k | if (counters->flows_aside_needs_work) { |
370 | 151k | StatsAddUI64(tv, fw->cnt.flows_aside_needs_work, |
371 | 151k | (uint64_t)counters->flows_aside_needs_work); |
372 | 151k | } |
373 | 203k | if (counters->flows_aside_pkt_inject) { |
374 | 151k | StatsAddUI64(tv, fw->cnt.flows_aside_pkt_inject, |
375 | 151k | (uint64_t)counters->flows_aside_pkt_inject); |
376 | 151k | } |
377 | 203k | } |
378 | | |
379 | | /** \brief update stream engine |
380 | | * |
381 | | * We can be called from both the flow timeout path as well as from the |
382 | | * "real" traffic path. If in the timeout path any additional packets we |
383 | | * forge for flushing pipelines should not leave our scope. If the original |
384 | | * packet is real (or related to a real packet) we need to push the packets |
385 | | * on, so IPS logic stays valid. |
386 | | */ |
387 | | static inline void FlowWorkerStreamTCPUpdate(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p, |
388 | | void *detect_thread, const bool timeout) |
389 | 6.80M | { |
390 | 6.80M | FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_STREAM); |
391 | 6.80M | StreamTcp(tv, p, fw->stream_thread, &fw->pq); |
392 | 6.80M | FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_STREAM); |
393 | | |
394 | | // this is the first packet that sets no payload inspection |
395 | 6.80M | bool setting_nopayload = |
396 | 6.80M | p->flow->alparser && |
397 | 5.02M | AppLayerParserStateIssetFlag(p->flow->alparser, APP_LAYER_PARSER_NO_INSPECTION) && |
398 | 103 | !(p->flags & PKT_NOPAYLOAD_INSPECTION); |
399 | 6.80M | if (FlowChangeProto(p->flow) || setting_nopayload) { |
400 | 18.4k | StreamTcpDetectLogFlush(tv, fw->stream_thread, p->flow, p, &fw->pq); |
401 | 18.4k | if (setting_nopayload) { |
402 | 15 | FlowSetNoPayloadInspectionFlag(p->flow); |
403 | 15 | } |
404 | 18.4k | AppLayerParserStateSetFlag(p->flow->alparser, APP_LAYER_PARSER_EOF_TS); |
405 | 18.4k | AppLayerParserStateSetFlag(p->flow->alparser, APP_LAYER_PARSER_EOF_TC); |
406 | 18.4k | } |
407 | | |
408 | | /* Packets here can safely access p->flow as it's locked */ |
409 | 6.80M | SCLogDebug("packet %"PRIu64": extra packets %u", p->pcap_cnt, fw->pq.len); |
410 | 6.80M | Packet *x; |
411 | 6.83M | while ((x = PacketDequeueNoLock(&fw->pq))) { |
412 | 36.8k | SCLogDebug("packet %"PRIu64" extra packet %p", p->pcap_cnt, x); |
413 | | |
414 | 36.8k | if (detect_thread != NULL) { |
415 | 36.8k | FLOWWORKER_PROFILING_START(x, PROFILE_FLOWWORKER_DETECT); |
416 | 36.8k | Detect(tv, x, detect_thread); |
417 | 36.8k | FLOWWORKER_PROFILING_END(x, PROFILE_FLOWWORKER_DETECT); |
418 | 36.8k | } |
419 | | |
420 | 36.8k | OutputLoggerLog(tv, x, fw->output_thread); |
421 | | |
422 | 36.8k | FramesPrune(x->flow, x); |
423 | | /* Release tcp segments. Done here after alerting can use them. */ |
424 | 36.8k | FLOWWORKER_PROFILING_START(x, PROFILE_FLOWWORKER_TCPPRUNE); |
425 | 36.8k | StreamTcpPruneSession( |
426 | 36.8k | x->flow, x->flowflags & FLOW_PKT_TOSERVER ? STREAM_TOSERVER : STREAM_TOCLIENT); |
427 | 36.8k | FLOWWORKER_PROFILING_END(x, PROFILE_FLOWWORKER_TCPPRUNE); |
428 | | |
429 | | /* no need to keep a flow ref beyond this point */ |
430 | 36.8k | FlowDeReference(&x->flow); |
431 | | |
432 | | /* no further work to do for this pseudo packet, so we can return |
433 | | * it to the pool immediately. */ |
434 | 36.8k | if (timeout) { |
435 | 132 | PacketPoolReturnPacket(x); |
436 | 36.7k | } else { |
437 | | /* to support IPS verdict logic, in the non-timeout case we need to do a bit more */ |
438 | 36.7k | TmqhOutputPacketpool(tv, x); |
439 | 36.7k | } |
440 | 36.8k | } |
441 | 6.80M | if (FlowChangeProto(p->flow) && p->flow->flags & FLOW_ACTION_DROP) { |
442 | | // in case f->flags & FLOW_ACTION_DROP was set by one of the dequeued packets |
443 | 20 | PacketDrop(p, ACTION_DROP, PKT_DROP_REASON_FLOW_DROP); |
444 | 20 | } |
445 | 6.80M | } |
446 | | |
447 | | static void FlowWorkerFlowTimeout(ThreadVars *tv, Packet *p, FlowWorkerThreadData *fw, |
448 | | void *detect_thread) |
449 | 133k | { |
450 | 133k | DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR); |
451 | | |
452 | 133k | SCLogDebug("packet %"PRIu64" is TCP. Direction %s", p->pcap_cnt, PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT"); |
453 | 133k | DEBUG_VALIDATE_BUG_ON(!(p->flow && PKT_IS_TCP(p))); |
454 | 133k | DEBUG_ASSERT_FLOW_LOCKED(p->flow); |
455 | | |
456 | | /* handle TCP and app layer */ |
457 | 133k | FlowWorkerStreamTCPUpdate(tv, fw, p, detect_thread, true); |
458 | | |
459 | 133k | PacketUpdateEngineEventCounters(tv, fw->dtv, p); |
460 | | |
461 | | /* handle Detect */ |
462 | 133k | SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt); |
463 | 133k | if (detect_thread != NULL) { |
464 | 133k | FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT); |
465 | 133k | Detect(tv, p, detect_thread); |
466 | 133k | FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT); |
467 | 133k | } |
468 | | |
469 | | // Outputs. |
470 | 133k | OutputLoggerLog(tv, p, fw->output_thread); |
471 | | |
472 | 133k | FramesPrune(p->flow, p); |
473 | | |
474 | | /* Release tcp segments. Done here after alerting can use them. */ |
475 | 133k | FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_TCPPRUNE); |
476 | 133k | StreamTcpPruneSession(p->flow, p->flowflags & FLOW_PKT_TOSERVER ? |
477 | 73.3k | STREAM_TOSERVER : STREAM_TOCLIENT); |
478 | 133k | FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_TCPPRUNE); |
479 | | |
480 | | /* run tx cleanup last */ |
481 | 133k | AppLayerParserTransactionsCleanup(p->flow, STREAM_FLAGS_FOR_PACKET(p)); |
482 | | |
483 | 133k | FlowDeReference(&p->flow); |
484 | | /* flow is unlocked later in FlowFinish() */ |
485 | 133k | } |
486 | | |
487 | | /** \internal |
488 | | * \brief process flows injected into our queue by other threads |
489 | | */ |
490 | | static inline void FlowWorkerProcessInjectedFlows( |
491 | | ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p) |
492 | 8.55M | { |
493 | | /* take injected flows and append to our work queue */ |
494 | 8.55M | FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_INJECTED); |
495 | 8.55M | FlowQueuePrivate injected = { NULL, NULL, 0 }; |
496 | 8.55M | if (SC_ATOMIC_GET(tv->flow_queue->non_empty) == true) |
497 | 0 | injected = FlowQueueExtractPrivate(tv->flow_queue); |
498 | 8.55M | if (injected.len > 0) { |
499 | 0 | StatsAddUI64(tv, fw->cnt.flows_injected, (uint64_t)injected.len); |
500 | 0 | if (p->pkt_src == PKT_SRC_WIRE) |
501 | 0 | StatsSetUI64(tv, fw->cnt.flows_injected_max, (uint64_t)injected.len); |
502 | | |
503 | | /* move to local queue so we can process over the course of multiple packets */ |
504 | 0 | FlowQueuePrivateAppendPrivate(&fw->fls.work_queue, &injected); |
505 | 0 | } |
506 | 8.55M | FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_INJECTED); |
507 | 8.55M | } |
508 | | |
509 | | /** \internal |
510 | | * \brief process flows set aside locally during flow lookup |
511 | | */ |
512 | | static inline void FlowWorkerProcessLocalFlows(ThreadVars *tv, FlowWorkerThreadData *fw, Packet *p) |
513 | 17.4M | { |
514 | 17.4M | uint32_t max_work = 2; |
515 | 17.4M | if (p->pkt_src == PKT_SRC_SHUTDOWN_FLUSH || p->pkt_src == PKT_SRC_CAPTURE_TIMEOUT) |
516 | 0 | max_work = 0; |
517 | | |
518 | 17.4M | FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW_EVICTED); |
519 | 17.4M | if (fw->fls.work_queue.len) { |
520 | 203k | FlowTimeoutCounters counters = { 0, 0, }; |
521 | 203k | CheckWorkQueue(tv, fw, &counters, &fw->fls.work_queue, max_work); |
522 | 203k | UpdateCounters(tv, fw, &counters); |
523 | 203k | } |
524 | 17.4M | FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW_EVICTED); |
525 | 17.4M | } |
526 | | |
527 | | /** \internal |
528 | | * \brief apply Packet::app_update_direction to the flow flags |
529 | | */ |
530 | | static void PacketAppUpdate2FlowFlags(Packet *p) |
531 | 6.88M | { |
532 | 6.88M | switch ((enum StreamUpdateDir)p->app_update_direction) { |
533 | 5.19M | case UPDATE_DIR_NONE: // NONE implies pseudo packet |
534 | 5.19M | break; |
535 | 213k | case UPDATE_DIR_PACKET: |
536 | 213k | if (PKT_IS_TOSERVER(p)) { |
537 | 166k | p->flow->flags |= FLOW_TS_APP_UPDATED; |
538 | 166k | SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", p->pcap_cnt); |
539 | 166k | } else { |
540 | 47.1k | p->flow->flags |= FLOW_TC_APP_UPDATED; |
541 | 47.1k | SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", p->pcap_cnt); |
542 | 47.1k | } |
543 | 213k | break; |
544 | 2.78k | case UPDATE_DIR_BOTH: |
545 | 2.78k | if (PKT_IS_TOSERVER(p)) { |
546 | 1.42k | p->flow->flags |= FLOW_TS_APP_UPDATED; |
547 | 1.42k | SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", p->pcap_cnt); |
548 | 1.42k | } else { |
549 | 1.36k | p->flow->flags |= FLOW_TC_APP_UPDATED; |
550 | 1.36k | SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", p->pcap_cnt); |
551 | 1.36k | } |
552 | | /* fall through */ |
553 | 1.47M | case UPDATE_DIR_OPPOSING: |
554 | 1.47M | if (PKT_IS_TOSERVER(p)) { |
555 | 870k | p->flow->flags |= FLOW_TC_APP_UPDATED; |
556 | 870k | SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TC_APP_UPDATED set", p->pcap_cnt); |
557 | 870k | } else { |
558 | 600k | p->flow->flags |= FLOW_TS_APP_UPDATED; |
559 | 600k | SCLogDebug("pcap_cnt %" PRIu64 ", FLOW_TS_APP_UPDATED set", p->pcap_cnt); |
560 | 600k | } |
561 | 1.47M | break; |
562 | 6.88M | } |
563 | 6.88M | } |
564 | | |
565 | | static TmEcode FlowWorker(ThreadVars *tv, Packet *p, void *data) |
566 | 8.55M | { |
567 | 8.55M | FlowWorkerThreadData *fw = data; |
568 | 8.55M | void *detect_thread = SC_ATOMIC_GET(fw->detect_thread); |
569 | | |
570 | 8.55M | DEBUG_VALIDATE_BUG_ON(p == NULL); |
571 | 8.55M | DEBUG_VALIDATE_BUG_ON(tv->flow_queue == NULL); |
572 | | |
573 | 8.55M | SCLogDebug("packet %"PRIu64, p->pcap_cnt); |
574 | | |
575 | | /* update time */ |
576 | 8.55M | if (!(PKT_IS_PSEUDOPKT(p))) { |
577 | 8.55M | TimeSetByThread(tv->id, p->ts); |
578 | 8.55M | } |
579 | | |
580 | | /* handle Flow */ |
581 | 8.55M | if (p->flags & PKT_WANTS_FLOW) { |
582 | 6.99M | FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_FLOW); |
583 | | |
584 | 6.99M | FlowHandlePacket(tv, &fw->fls, p); |
585 | 6.99M | if (likely(p->flow != NULL)) { |
586 | 6.98M | DEBUG_ASSERT_FLOW_LOCKED(p->flow); |
587 | 6.98M | if (FlowUpdate(tv, fw, p) == TM_ECODE_DONE) { |
588 | 241 | goto housekeeping; |
589 | 241 | } |
590 | 6.98M | } |
591 | | /* Flow is now LOCKED */ |
592 | | |
593 | 6.99M | FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_FLOW); |
594 | | |
595 | | /* if PKT_WANTS_FLOW is not set, but PKT_HAS_FLOW is, then this is a |
596 | | * pseudo packet created by the flow manager. */ |
597 | 6.99M | } else if (p->flags & PKT_HAS_FLOW) { |
598 | 0 | FLOWLOCK_WRLOCK(p->flow); |
599 | 0 | DEBUG_VALIDATE_BUG_ON(p->pkt_src != PKT_SRC_FFR); |
600 | 0 | } |
601 | | |
602 | 8.55M | SCLogDebug("packet %"PRIu64" has flow? %s", p->pcap_cnt, p->flow ? "yes" : "no"); |
603 | | |
604 | | /* handle TCP and app layer */ |
605 | 8.55M | if (p->flow) { |
606 | 6.98M | if (PKT_IS_TCP(p)) { |
607 | 6.66M | SCLogDebug("packet %" PRIu64 " is TCP. Direction %s", p->pcap_cnt, |
608 | 6.66M | PKT_IS_TOSERVER(p) ? "TOSERVER" : "TOCLIENT"); |
609 | 6.66M | DEBUG_ASSERT_FLOW_LOCKED(p->flow); |
610 | | |
611 | | /* if detect is disabled, we need to apply file flags to the flow |
612 | | * here on the first packet. */ |
613 | 6.66M | if (detect_thread == NULL && |
614 | 0 | ((PKT_IS_TOSERVER(p) && (p->flowflags & FLOW_PKT_TOSERVER_FIRST)) || |
615 | 0 | (PKT_IS_TOCLIENT(p) && (p->flowflags & FLOW_PKT_TOCLIENT_FIRST)))) { |
616 | 0 | DisableDetectFlowFileFlags(p->flow); |
617 | 0 | } |
618 | | |
619 | 6.66M | FlowWorkerStreamTCPUpdate(tv, fw, p, detect_thread, false); |
620 | 6.66M | PacketAppUpdate2FlowFlags(p); |
621 | | |
622 | | /* handle the app layer part of the UDP packet payload */ |
623 | 6.66M | } else if (p->proto == IPPROTO_UDP && !PacketCheckAction(p, ACTION_DROP)) { |
624 | 216k | FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_APPLAYERUDP); |
625 | 216k | AppLayerHandleUdp(tv, fw->stream_thread->ra_ctx->app_tctx, p, p->flow); |
626 | 216k | FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_APPLAYERUDP); |
627 | 216k | PacketAppUpdate2FlowFlags(p); |
628 | 216k | } |
629 | 6.98M | } |
630 | | |
631 | 8.55M | PacketUpdateEngineEventCounters(tv, fw->dtv, p); |
632 | | |
633 | | /* handle Detect */ |
634 | 8.55M | DEBUG_ASSERT_FLOW_LOCKED(p->flow); |
635 | 8.55M | SCLogDebug("packet %"PRIu64" calling Detect", p->pcap_cnt); |
636 | 8.55M | if (detect_thread != NULL) { |
637 | 8.55M | FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_DETECT); |
638 | 8.55M | Detect(tv, p, detect_thread); |
639 | 8.55M | FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_DETECT); |
640 | 8.55M | } |
641 | | |
642 | | // Outputs. |
643 | 8.55M | OutputLoggerLog(tv, p, fw->output_thread); |
644 | | |
645 | | /* Release tcp segments. Done here after alerting can use them. */ |
646 | 8.55M | if (p->flow != NULL) { |
647 | 6.98M | DEBUG_ASSERT_FLOW_LOCKED(p->flow); |
648 | | |
649 | 6.98M | if (FlowIsBypassed(p->flow)) { |
650 | 10 | FlowCleanupAppLayer(p->flow); |
651 | 10 | if (p->proto == IPPROTO_TCP) { |
652 | 1 | StreamTcpSessionCleanup(p->flow->protoctx); |
653 | 1 | } |
654 | 6.98M | } else if (p->proto == IPPROTO_TCP && p->flow->protoctx) { |
655 | 6.60M | FramesPrune(p->flow, p); |
656 | 6.60M | FLOWWORKER_PROFILING_START(p, PROFILE_FLOWWORKER_TCPPRUNE); |
657 | 6.60M | StreamTcpPruneSession(p->flow, p->flowflags & FLOW_PKT_TOSERVER ? |
658 | 3.64M | STREAM_TOSERVER : STREAM_TOCLIENT); |
659 | 6.60M | FLOWWORKER_PROFILING_END(p, PROFILE_FLOWWORKER_TCPPRUNE); |
660 | 6.60M | } else if (p->proto == IPPROTO_UDP) { |
661 | 216k | FramesPrune(p->flow, p); |
662 | 216k | } |
663 | | |
664 | 6.98M | if ((PKT_IS_PSEUDOPKT(p)) || |
665 | 6.98M | (p->flow->flags & (FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED))) { |
666 | 3.55M | if (PKT_IS_TOSERVER(p)) { |
667 | 1.99M | if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TS_APP_UPDATED))) { |
668 | 734k | AppLayerParserTransactionsCleanup(p->flow, STREAM_TOSERVER); |
669 | 734k | p->flow->flags &= ~FLOW_TS_APP_UPDATED; |
670 | 734k | } |
671 | 1.99M | } else { |
672 | 1.55M | if (PKT_IS_PSEUDOPKT(p) || (p->flow->flags & (FLOW_TC_APP_UPDATED))) { |
673 | 892k | AppLayerParserTransactionsCleanup(p->flow, STREAM_TOCLIENT); |
674 | 892k | p->flow->flags &= ~FLOW_TC_APP_UPDATED; |
675 | 892k | } |
676 | 1.55M | } |
677 | | |
678 | 3.55M | } else { |
679 | 3.43M | SCLogDebug("not pseudo, no app update: skip"); |
680 | 3.43M | } |
681 | | |
682 | 6.98M | if (p->flow->flags & FLOW_ACTION_DROP) { |
683 | 2.95k | SCLogDebug("flow drop in place: remove app update flags"); |
684 | 2.95k | p->flow->flags &= ~(FLOW_TS_APP_UPDATED | FLOW_TC_APP_UPDATED); |
685 | 2.95k | } |
686 | | |
687 | 6.98M | Flow *f = p->flow; |
688 | 6.98M | FlowDeReference(&p->flow); |
689 | 6.98M | FLOWLOCK_UNLOCK(f); |
690 | 6.98M | } |
691 | | |
692 | 8.55M | housekeeping: |
693 | | |
694 | | /* take injected flows and add them to our local queue */ |
695 | 8.55M | FlowWorkerProcessInjectedFlows(tv, fw, p); |
696 | | |
697 | | /* process local work queue */ |
698 | 8.55M | FlowWorkerProcessLocalFlows(tv, fw, p); |
699 | | |
700 | 8.55M | return TM_ECODE_OK; |
701 | 8.55M | } |
702 | | |
703 | | void FlowWorkerReplaceDetectCtx(void *flow_worker, void *detect_ctx) |
704 | 122k | { |
705 | 122k | FlowWorkerThreadData *fw = flow_worker; |
706 | | |
707 | 122k | SC_ATOMIC_SET(fw->detect_thread, detect_ctx); |
708 | 122k | } |
709 | | |
710 | | void *FlowWorkerGetDetectCtxPtr(void *flow_worker) |
711 | 122k | { |
712 | 122k | FlowWorkerThreadData *fw = flow_worker; |
713 | | |
714 | 122k | return SC_ATOMIC_GET(fw->detect_thread); |
715 | 122k | } |
716 | | |
717 | | const char *ProfileFlowWorkerIdToString(enum ProfileFlowWorkerId fwi) |
718 | 0 | { |
719 | 0 | switch (fwi) { |
720 | 0 | case PROFILE_FLOWWORKER_FLOW: |
721 | 0 | return "flow"; |
722 | 0 | case PROFILE_FLOWWORKER_STREAM: |
723 | 0 | return "stream"; |
724 | 0 | case PROFILE_FLOWWORKER_APPLAYERUDP: |
725 | 0 | return "app-layer"; |
726 | 0 | case PROFILE_FLOWWORKER_DETECT: |
727 | 0 | return "detect"; |
728 | 0 | case PROFILE_FLOWWORKER_TCPPRUNE: |
729 | 0 | return "tcp-prune"; |
730 | 0 | case PROFILE_FLOWWORKER_FLOW_INJECTED: |
731 | 0 | return "flow-inject"; |
732 | 0 | case PROFILE_FLOWWORKER_FLOW_EVICTED: |
733 | 0 | return "flow-evict"; |
734 | 0 | case PROFILE_FLOWWORKER_SIZE: |
735 | 0 | return "size"; |
736 | 0 | } |
737 | 0 | return "error"; |
738 | 0 | } |
739 | | |
740 | | static void FlowWorkerExitPrintStats(ThreadVars *tv, void *data) |
741 | 0 | { |
742 | 0 | FlowWorkerThreadData *fw = data; |
743 | 0 | OutputLoggerExitPrintStats(tv, fw->output_thread); |
744 | 0 | } |
745 | | |
746 | | static bool FlowWorkerIsBusy(ThreadVars *tv, void *flow_worker) |
747 | 0 | { |
748 | 0 | FlowWorkerThreadData *fw = flow_worker; |
749 | 0 | if (fw->pq.len) |
750 | 0 | return true; |
751 | 0 | if (fw->fls.work_queue.len) |
752 | 0 | return true; |
753 | | |
754 | 0 | if (tv->flow_queue) { |
755 | 0 | FQLOCK_LOCK(tv->flow_queue); |
756 | 0 | bool fq_done = (tv->flow_queue->qlen == 0); |
757 | 0 | FQLOCK_UNLOCK(tv->flow_queue); |
758 | 0 | if (!fq_done) { |
759 | 0 | return true; |
760 | 0 | } |
761 | 0 | } |
762 | | |
763 | 0 | return false; |
764 | 0 | } |
765 | | |
766 | | void TmModuleFlowWorkerRegister (void) |
767 | 71 | { |
768 | 71 | tmm_modules[TMM_FLOWWORKER].name = "FlowWorker"; |
769 | 71 | tmm_modules[TMM_FLOWWORKER].ThreadInit = FlowWorkerThreadInit; |
770 | 71 | tmm_modules[TMM_FLOWWORKER].Func = FlowWorker; |
771 | 71 | tmm_modules[TMM_FLOWWORKER].ThreadBusy = FlowWorkerIsBusy; |
772 | 71 | tmm_modules[TMM_FLOWWORKER].ThreadDeinit = FlowWorkerThreadDeinit; |
773 | 71 | tmm_modules[TMM_FLOWWORKER].ThreadExitPrintStats = FlowWorkerExitPrintStats; |
774 | 71 | tmm_modules[TMM_FLOWWORKER].cap_flags = 0; |
775 | 71 | tmm_modules[TMM_FLOWWORKER].flags = TM_FLAG_STREAM_TM|TM_FLAG_DETECT_TM; |
776 | 71 | } |