Coverage Report

Created: 2026-05-16 07:38

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/suricata7/src/source-dpdk.c
Line
Count
Source
1
/* Copyright (C) 2021 Open Information Security Foundation
2
 *
3
 * You can copy, redistribute or modify this Program under the terms of
4
 * the GNU General Public License version 2 as published by the Free
5
 * Software Foundation.
6
 *
7
 * This program is distributed in the hope that it will be useful,
8
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
9
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
10
 * GNU General Public License for more details.
11
 *
12
 * You should have received a copy of the GNU General Public License
13
 * version 2 along with this program; if not, write to the Free Software
14
 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15
 * 02110-1301, USA.
16
 */
17
18
/**
19
 *  \defgroup dpdk DPDK running mode
20
 *
21
 *  @{
22
 */
23
24
/**
25
 * \file
26
 *
27
 * \author Lukas Sismis <lukas.sismis@gmail.com>
28
 *
29
 * DPDK capture interface
30
 *
31
 */
32
33
#include "suricata-common.h"
34
#include "runmodes.h"
35
#include "decode.h"
36
#include "packet.h"
37
#include "source-dpdk.h"
38
#include "suricata.h"
39
#include "threads.h"
40
#include "threadvars.h"
41
#include "tm-threads.h"
42
#include "tmqh-packetpool.h"
43
#include "util-privs.h"
44
#include "action-globals.h"
45
46
#ifndef HAVE_DPDK
47
48
TmEcode NoDPDKSupportExit(ThreadVars *, const void *, void **);
49
50
void TmModuleReceiveDPDKRegister(void)
51
71
{
52
71
    tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
53
71
    tmm_modules[TMM_RECEIVEDPDK].ThreadInit = NoDPDKSupportExit;
54
71
    tmm_modules[TMM_RECEIVEDPDK].Func = NULL;
55
71
    tmm_modules[TMM_RECEIVEDPDK].ThreadExitPrintStats = NULL;
56
71
    tmm_modules[TMM_RECEIVEDPDK].ThreadDeinit = NULL;
57
71
    tmm_modules[TMM_RECEIVEDPDK].cap_flags = 0;
58
71
    tmm_modules[TMM_RECEIVEDPDK].flags = TM_FLAG_RECEIVE_TM;
59
71
}
60
61
/**
62
 * \brief Registration Function for DecodeDPDK.
63
 */
64
void TmModuleDecodeDPDKRegister(void)
65
71
{
66
71
    tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
67
71
    tmm_modules[TMM_DECODEDPDK].ThreadInit = NoDPDKSupportExit;
68
71
    tmm_modules[TMM_DECODEDPDK].Func = NULL;
69
71
    tmm_modules[TMM_DECODEDPDK].ThreadExitPrintStats = NULL;
70
71
    tmm_modules[TMM_DECODEDPDK].ThreadDeinit = NULL;
71
71
    tmm_modules[TMM_DECODEDPDK].cap_flags = 0;
72
71
    tmm_modules[TMM_DECODEDPDK].flags = TM_FLAG_DECODE_TM;
73
71
}
74
75
/**
76
 * \brief this function prints an error message and exits.
77
 */
78
TmEcode NoDPDKSupportExit(ThreadVars *tv, const void *initdata, void **data)
79
0
{
80
    FatalError("Error creating thread %s: you do not have "
81
0
               "support for DPDK enabled, on Linux host please recompile "
82
0
               "with --enable-dpdk",
83
0
            tv->name);
84
0
}
85
86
#else /* We have DPDK support */
87
88
#include "util-affinity.h"
89
#include "util-dpdk.h"
90
#include "util-dpdk-i40e.h"
91
#include "util-dpdk-bonding.h"
92
#include <numa.h>
93
94
#define BURST_SIZE                   32
95
// interrupt mode constants
96
#define MIN_ZERO_POLL_COUNT          10U
97
#define MIN_ZERO_POLL_COUNT_TO_SLEEP 10U
98
#define MINIMUM_SLEEP_TIME_US        1U
99
#define STANDARD_SLEEP_TIME_US       100U
100
#define MAX_EPOLL_TIMEOUT_MS         500U
101
static rte_spinlock_t intr_lock[RTE_MAX_ETHPORTS];
102
103
/**
104
 * \brief Structure to hold thread specific variables.
105
 */
106
typedef struct DPDKThreadVars_ {
107
    /* counters */
108
    uint64_t pkts;
109
    ThreadVars *tv;
110
    TmSlot *slot;
111
    LiveDevice *livedev;
112
    ChecksumValidationMode checksum_mode;
113
    bool intr_enabled;
114
    /* references to packet and drop counters */
115
    uint16_t capture_dpdk_packets;
116
    uint16_t capture_dpdk_rx_errs;
117
    uint16_t capture_dpdk_imissed;
118
    uint16_t capture_dpdk_rx_no_mbufs;
119
    uint16_t capture_dpdk_ierrors;
120
    uint16_t capture_dpdk_tx_errs;
121
    unsigned int flags;
122
    int threads;
123
    /* for IPS */
124
    DpdkCopyModeEnum copy_mode;
125
    uint16_t out_port_id;
126
    /* Entry in the peers_list */
127
128
    uint64_t bytes;
129
    uint64_t accepted;
130
    uint64_t dropped;
131
    uint16_t port_id;
132
    uint16_t queue_id;
133
    int32_t port_socket_id;
134
    struct rte_mempool *pkt_mempool;
135
    struct rte_mbuf *received_mbufs[BURST_SIZE];
136
} DPDKThreadVars;
137
138
static TmEcode ReceiveDPDKThreadInit(ThreadVars *, const void *, void **);
139
static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *, void *);
140
static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot);
141
142
static TmEcode DecodeDPDKThreadInit(ThreadVars *, const void *, void **);
143
static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data);
144
static TmEcode DecodeDPDK(ThreadVars *, Packet *, void *);
145
146
static void DPDKFreeMbufArray(struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset);
147
static bool InterruptsRXEnable(uint16_t port_id, uint16_t queue_id)
148
{
149
    uint32_t event_data = port_id << UINT16_WIDTH | queue_id;
150
    int32_t ret = rte_eth_dev_rx_intr_ctl_q(port_id, queue_id, RTE_EPOLL_PER_THREAD,
151
            RTE_INTR_EVENT_ADD, (void *)((uintptr_t)event_data));
152
153
    if (ret != 0) {
154
        SCLogError("%s-Q%d: failed to enable interrupt mode: %s", DPDKGetPortNameByPortID(port_id),
155
                queue_id, rte_strerror(-ret));
156
        return false;
157
    }
158
    return true;
159
}
160
161
static inline uint32_t InterruptsSleepHeuristic(uint32_t no_pkt_polls_count)
162
{
163
    if (no_pkt_polls_count < MIN_ZERO_POLL_COUNT_TO_SLEEP)
164
        return MINIMUM_SLEEP_TIME_US;
165
166
    return STANDARD_SLEEP_TIME_US;
167
}
168
169
static inline void InterruptsTurnOnOff(uint16_t port_id, uint16_t queue_id, bool on)
170
{
171
    rte_spinlock_lock(&(intr_lock[port_id]));
172
173
    if (on)
174
        rte_eth_dev_rx_intr_enable(port_id, queue_id);
175
    else
176
        rte_eth_dev_rx_intr_disable(port_id, queue_id);
177
178
    rte_spinlock_unlock(&(intr_lock[port_id]));
179
}
180
181
static void DPDKFreeMbufArray(struct rte_mbuf **mbuf_array, uint16_t mbuf_cnt, uint16_t offset)
182
{
183
    for (int i = offset; i < mbuf_cnt; i++) {
184
        rte_pktmbuf_free(mbuf_array[i]);
185
    }
186
}
187
188
static void DevicePostStartPMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
189
{
190
    if (strcmp(driver_name, "net_bonding") == 0) {
191
        driver_name = BondingDeviceDriverGet(ptv->port_id);
192
    }
193
194
    // The PMD Driver i40e has a special way to set the RSS, it can be set via rte_flow rules
195
    // and only after the start of the port
196
    if (strcmp(driver_name, "net_i40e") == 0)
197
        i40eDeviceSetRSS(ptv->port_id, ptv->threads);
198
}
199
200
static void DevicePreClosePMDSpecificActions(DPDKThreadVars *ptv, const char *driver_name)
201
{
202
    if (strcmp(driver_name, "net_bonding") == 0) {
203
        driver_name = BondingDeviceDriverGet(ptv->port_id);
204
    }
205
206
    if (strcmp(driver_name, "net_i40e") == 0) {
207
#if RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0)
208
        // Flush the RSS rules that have been inserted in the post start section
209
        struct rte_flow_error flush_error = { 0 };
210
        int32_t retval = rte_flow_flush(ptv->port_id, &flush_error);
211
        if (retval != 0) {
212
            SCLogError("%s: unable to flush rte_flow rules: %s Flush error msg: %s",
213
                    ptv->livedev->dev, rte_strerror(-retval), flush_error.message);
214
        }
215
#endif /* RTE_VERSION > RTE_VERSION_NUM(20, 0, 0, 0) */
216
    }
217
}
218
219
/**
220
 * Attempts to retrieve NUMA node id on which the caller runs
221
 * @return NUMA id on success, -1 otherwise
222
 */
223
static int GetNumaNode(void)
224
{
225
    int cpu = 0;
226
    int node = -1;
227
228
#if defined(__linux__)
229
    cpu = sched_getcpu();
230
    node = numa_node_of_cpu(cpu);
231
#else
232
    SCLogWarning("NUMA node retrieval is not supported on this OS.");
233
#endif
234
235
    return node;
236
}
237
238
/**
239
 * \brief Registration Function for ReceiveDPDK.
240
 * \todo Unit tests are needed for this module.
241
 */
242
void TmModuleReceiveDPDKRegister(void)
243
{
244
    tmm_modules[TMM_RECEIVEDPDK].name = "ReceiveDPDK";
245
    tmm_modules[TMM_RECEIVEDPDK].ThreadInit = ReceiveDPDKThreadInit;
246
    tmm_modules[TMM_RECEIVEDPDK].Func = NULL;
247
    tmm_modules[TMM_RECEIVEDPDK].PktAcqLoop = ReceiveDPDKLoop;
248
    tmm_modules[TMM_RECEIVEDPDK].PktAcqBreakLoop = NULL;
249
    tmm_modules[TMM_RECEIVEDPDK].ThreadExitPrintStats = NULL;
250
    tmm_modules[TMM_RECEIVEDPDK].ThreadDeinit = ReceiveDPDKThreadDeinit;
251
    tmm_modules[TMM_RECEIVEDPDK].cap_flags = SC_CAP_NET_RAW;
252
    tmm_modules[TMM_RECEIVEDPDK].flags = TM_FLAG_RECEIVE_TM;
253
}
254
255
/**
256
 * \brief Registration Function for DecodeDPDK.
257
 * \todo Unit tests are needed for this module.
258
 */
259
void TmModuleDecodeDPDKRegister(void)
260
{
261
    tmm_modules[TMM_DECODEDPDK].name = "DecodeDPDK";
262
    tmm_modules[TMM_DECODEDPDK].ThreadInit = DecodeDPDKThreadInit;
263
    tmm_modules[TMM_DECODEDPDK].Func = DecodeDPDK;
264
    tmm_modules[TMM_DECODEDPDK].ThreadExitPrintStats = NULL;
265
    tmm_modules[TMM_DECODEDPDK].ThreadDeinit = DecodeDPDKThreadDeinit;
266
    tmm_modules[TMM_DECODEDPDK].cap_flags = 0;
267
    tmm_modules[TMM_DECODEDPDK].flags = TM_FLAG_DECODE_TM;
268
}
269
270
static inline void DPDKDumpCounters(DPDKThreadVars *ptv)
271
{
272
    /* Some NICs (e.g. Intel) do not support queue statistics and the drops can be fetched only on
273
     * the port level. Therefore setting it to the first worker to have at least continuous update
274
     * on the dropped packets. */
275
    if (ptv->queue_id == 0) {
276
        struct rte_eth_stats eth_stats;
277
        int retval = rte_eth_stats_get(ptv->port_id, &eth_stats);
278
        if (unlikely(retval != 0)) {
279
            SCLogError("%s: failed to get stats: %s", ptv->livedev->dev, rte_strerror(-retval));
280
            return;
281
        }
282
283
        StatsSetUI64(ptv->tv, ptv->capture_dpdk_packets,
284
                ptv->pkts + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
285
        SC_ATOMIC_SET(ptv->livedev->pkts,
286
                eth_stats.ipackets + eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
287
        StatsSetUI64(ptv->tv, ptv->capture_dpdk_rx_errs,
288
                eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
289
        StatsSetUI64(ptv->tv, ptv->capture_dpdk_imissed, eth_stats.imissed);
290
        StatsSetUI64(ptv->tv, ptv->capture_dpdk_rx_no_mbufs, eth_stats.rx_nombuf);
291
        StatsSetUI64(ptv->tv, ptv->capture_dpdk_ierrors, eth_stats.ierrors);
292
        StatsSetUI64(ptv->tv, ptv->capture_dpdk_tx_errs, eth_stats.oerrors);
293
        SC_ATOMIC_SET(
294
                ptv->livedev->drop, eth_stats.imissed + eth_stats.ierrors + eth_stats.rx_nombuf);
295
    } else {
296
        StatsSetUI64(ptv->tv, ptv->capture_dpdk_packets, ptv->pkts);
297
    }
298
}
299
300
static void DPDKReleasePacket(Packet *p)
301
{
302
    int retval;
303
    /* Need to be in copy mode and need to detect early release
304
       where Ethernet header could not be set (and pseudo packet)
305
       When enabling promiscuous mode on Intel cards, 2 ICMPv6 packets are generated.
306
       These get into the infinite cycle between the NIC and the switch in some cases */
307
    if ((p->dpdk_v.copy_mode == DPDK_COPY_MODE_TAP ||
308
                (p->dpdk_v.copy_mode == DPDK_COPY_MODE_IPS && !PacketCheckAction(p, ACTION_DROP)))
309
#if defined(RTE_LIBRTE_I40E_PMD) || defined(RTE_LIBRTE_IXGBE_PMD) || defined(RTE_LIBRTE_ICE_PMD)
310
            && !(PKT_IS_ICMPV6(p) && p->icmpv6h->type == 143)
311
#endif
312
    ) {
313
        BUG_ON(PKT_IS_PSEUDOPKT(p));
314
        retval =
315
                rte_eth_tx_burst(p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
316
        // rte_eth_tx_burst can return only 0 (failure) or 1 (success) because we are only
317
        // transmitting burst of size 1 and the function rte_eth_tx_burst returns number of
318
        // successfully sent packets.
319
        if (unlikely(retval < 1)) {
320
            // sometimes a repeated transmit can help to send out the packet
321
            rte_delay_us(DPDK_BURST_TX_WAIT_US);
322
            retval = rte_eth_tx_burst(
323
                    p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id, &p->dpdk_v.mbuf, 1);
324
            if (unlikely(retval < 1)) {
325
                SCLogDebug("Unable to transmit the packet on port %u queue %u",
326
                        p->dpdk_v.out_port_id, p->dpdk_v.out_queue_id);
327
                rte_pktmbuf_free(p->dpdk_v.mbuf);
328
                p->dpdk_v.mbuf = NULL;
329
            }
330
        }
331
    } else {
332
        rte_pktmbuf_free(p->dpdk_v.mbuf);
333
        p->dpdk_v.mbuf = NULL;
334
    }
335
336
    PacketFreeOrRelease(p);
337
}
338
339
static void PrintDPDKPortXstats(uint16_t port_id, const char *port_name)
340
{
341
    struct rte_eth_xstat *xstats;
342
    struct rte_eth_xstat_name *xstats_names;
343
344
    int32_t ret = rte_eth_xstats_get(port_id, NULL, 0);
345
    if (ret < 0) {
346
        SCLogPerf("%s: unable to obtain rte_eth_xstats (%s)", port_name, rte_strerror(-ret));
347
        return;
348
    }
349
    uint16_t len = (uint16_t)ret;
350
351
    xstats = SCCalloc(len, sizeof(*xstats));
352
    if (xstats == NULL) {
353
        SCLogWarning("Failed to allocate memory for the rte_eth_xstat structure");
354
        return;
355
    }
356
357
    ret = rte_eth_xstats_get(port_id, xstats, len);
358
    if (ret < 0 || ret > len) {
359
        SCFree(xstats);
360
        SCLogPerf("%s: unable to obtain rte_eth_xstats (%s)", port_name, rte_strerror(-ret));
361
        return;
362
    }
363
    xstats_names = SCCalloc(len, sizeof(*xstats_names));
364
    if (xstats_names == NULL) {
365
        SCFree(xstats);
366
        SCLogWarning("Failed to allocate memory for the rte_eth_xstat_name array");
367
        return;
368
    }
369
    ret = rte_eth_xstats_get_names(port_id, xstats_names, len);
370
    if (ret < 0 || ret > len) {
371
        SCFree(xstats);
372
        SCFree(xstats_names);
373
        SCLogPerf(
374
                "%s: unable to obtain names of rte_eth_xstats (%s)", port_name, rte_strerror(-ret));
375
        return;
376
    }
377
    for (int32_t i = 0; i < len; i++) {
378
        if (xstats[i].value > 0)
379
            SCLogPerf("Port %u (%s) - %s: %" PRIu64, port_id, port_name, xstats_names[i].name,
380
                    xstats[i].value);
381
    }
382
383
    SCFree(xstats);
384
    SCFree(xstats_names);
385
}
386
387
/**
388
 *  \brief Main DPDK reading Loop function
389
 */
390
static TmEcode ReceiveDPDKLoop(ThreadVars *tv, void *data, void *slot)
391
{
392
    SCEnter();
393
    Packet *p;
394
    uint16_t nb_rx;
395
    SCTime_t last_dump = { 0 };
396
    SCTime_t current_time;
397
    bool segmented_mbufs_warned = 0;
398
    SCTime_t t = TimeGet();
399
    uint64_t last_timeout_msec = SCTIME_MSECS(t);
400
401
    DPDKThreadVars *ptv = (DPDKThreadVars *)data;
402
    TmSlot *s = (TmSlot *)slot;
403
404
    ptv->slot = s->slot_next;
405
406
    // Indicate that the thread is actually running its application level code (i.e., it can poll
407
    // packets)
408
    TmThreadsSetFlag(tv, THV_RUNNING);
409
    PacketPoolWait();
410
411
    rte_eth_stats_reset(ptv->port_id);
412
    rte_eth_xstats_reset(ptv->port_id);
413
414
    uint32_t pwd_zero_rx_packet_polls_count = 0;
415
    if (ptv->intr_enabled && !InterruptsRXEnable(ptv->port_id, ptv->queue_id))
416
        SCReturnInt(TM_ECODE_FAILED);
417
418
    while (1) {
419
        if (unlikely(suricata_ctl_flags != 0)) {
420
            SCLogDebug("Stopping Suricata!");
421
            DPDKDumpCounters(ptv);
422
            if (ptv->queue_id == 0) {
423
                PrintDPDKPortXstats(ptv->port_id, ptv->livedev->dev);
424
                rte_eth_dev_stop(ptv->port_id);
425
                if (ptv->copy_mode == DPDK_COPY_MODE_TAP || ptv->copy_mode == DPDK_COPY_MODE_IPS) {
426
                    rte_eth_dev_stop(ptv->out_port_id);
427
                }
428
            }
429
            break;
430
        }
431
432
        nb_rx = rte_eth_rx_burst(ptv->port_id, ptv->queue_id, ptv->received_mbufs, BURST_SIZE);
433
        if (unlikely(nb_rx == 0)) {
434
            t = TimeGet();
435
            uint64_t msecs = SCTIME_MSECS(t);
436
            if (msecs > last_timeout_msec + 100) {
437
                TmThreadsCaptureHandleTimeout(tv, NULL);
438
                last_timeout_msec = msecs;
439
            }
440
441
            if (!ptv->intr_enabled)
442
                continue;
443
444
            pwd_zero_rx_packet_polls_count++;
445
            if (pwd_zero_rx_packet_polls_count <= MIN_ZERO_POLL_COUNT)
446
                continue;
447
448
            uint32_t pwd_idle_hint = InterruptsSleepHeuristic(pwd_zero_rx_packet_polls_count);
449
450
            if (pwd_idle_hint < STANDARD_SLEEP_TIME_US) {
451
                rte_delay_us(pwd_idle_hint);
452
            } else {
453
                InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, true);
454
                struct rte_epoll_event event;
455
                rte_epoll_wait(RTE_EPOLL_PER_THREAD, &event, 1, MAX_EPOLL_TIMEOUT_MS);
456
                InterruptsTurnOnOff(ptv->port_id, ptv->queue_id, false);
457
                continue;
458
            }
459
        } else if (ptv->intr_enabled && pwd_zero_rx_packet_polls_count) {
460
            pwd_zero_rx_packet_polls_count = 0;
461
        }
462
463
        ptv->pkts += (uint64_t)nb_rx;
464
        for (uint16_t i = 0; i < nb_rx; i++) {
465
            p = PacketGetFromQueueOrAlloc();
466
            if (unlikely(p == NULL)) {
467
                rte_pktmbuf_free(ptv->received_mbufs[i]);
468
                continue;
469
            }
470
            PKT_SET_SRC(p, PKT_SRC_WIRE);
471
            p->datalink = LINKTYPE_ETHERNET;
472
            if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
473
                p->flags |= PKT_IGNORE_CHECKSUM;
474
            }
475
476
            p->ts = TimeGet();
477
            p->dpdk_v.mbuf = ptv->received_mbufs[i];
478
            p->ReleasePacket = DPDKReleasePacket;
479
            p->dpdk_v.copy_mode = ptv->copy_mode;
480
            p->dpdk_v.out_port_id = ptv->out_port_id;
481
            p->dpdk_v.out_queue_id = ptv->queue_id;
482
            p->livedev = ptv->livedev;
483
484
            if (ptv->checksum_mode == CHECKSUM_VALIDATION_DISABLE) {
485
                p->flags |= PKT_IGNORE_CHECKSUM;
486
            } else if (ptv->checksum_mode == CHECKSUM_VALIDATION_OFFLOAD) {
487
                uint64_t ol_flags = ptv->received_mbufs[i]->ol_flags;
488
                if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_GOOD &&
489
                        (ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_GOOD) {
490
                    SCLogDebug("HW detected GOOD IP and L4 chsum, ignoring validation");
491
                    p->flags |= PKT_IGNORE_CHECKSUM;
492
                } else {
493
                    if ((ol_flags & RTE_MBUF_F_RX_IP_CKSUM_MASK) == RTE_MBUF_F_RX_IP_CKSUM_BAD) {
494
                        SCLogDebug("HW detected BAD IP checksum");
495
                        // chsum recalc will not be triggered but rule keyword check will be
496
                        p->level3_comp_csum = 0;
497
                    }
498
                    if ((ol_flags & RTE_MBUF_F_RX_L4_CKSUM_MASK) == RTE_MBUF_F_RX_L4_CKSUM_BAD) {
499
                        SCLogDebug("HW detected BAD L4 chsum");
500
                        p->level4_comp_csum = 0;
501
                    }
502
                }
503
            }
504
505
            if (!rte_pktmbuf_is_contiguous(p->dpdk_v.mbuf) && !segmented_mbufs_warned) {
506
                char warn_s[] = "Segmented mbufs detected! Redmine Ticket #6012 "
507
                                "Check your configuration or report the issue";
508
                enum rte_proc_type_t eal_t = rte_eal_process_type();
509
                if (eal_t == RTE_PROC_SECONDARY) {
510
                    SCLogWarning("%s. To avoid segmented mbufs, "
511
                                 "try to increase mbuf size in your primary application",
512
                            warn_s);
513
                } else if (eal_t == RTE_PROC_PRIMARY) {
514
                    SCLogWarning("%s. To avoid segmented mbufs, "
515
                                 "try to increase MTU in your suricata.yaml",
516
                            warn_s);
517
                }
518
519
                segmented_mbufs_warned = 1;
520
            }
521
522
            PacketSetData(p, rte_pktmbuf_mtod(p->dpdk_v.mbuf, uint8_t *),
523
                    rte_pktmbuf_pkt_len(p->dpdk_v.mbuf));
524
            if (TmThreadsSlotProcessPkt(ptv->tv, ptv->slot, p) != TM_ECODE_OK) {
525
                TmqhOutputPacketpool(ptv->tv, p);
526
                DPDKFreeMbufArray(ptv->received_mbufs, nb_rx - i - 1, i + 1);
527
                SCReturnInt(EXIT_FAILURE);
528
            }
529
        }
530
531
        /* Trigger one dump of stats every second */
532
        current_time = TimeGet();
533
        if (current_time.secs != last_dump.secs) {
534
            DPDKDumpCounters(ptv);
535
            last_dump = current_time;
536
        }
537
        StatsSyncCountersIfSignalled(tv);
538
    }
539
540
    SCReturnInt(TM_ECODE_OK);
541
}
542
543
/**
544
 * \brief Init function for ReceiveDPDK.
545
 *
546
 * \param tv pointer to ThreadVars
547
 * \param initdata pointer to the interface passed from the user
548
 * \param data pointer gets populated with DPDKThreadVars
549
 *
550
 */
551
static TmEcode ReceiveDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
552
{
553
    SCEnter();
554
    int retval, thread_numa;
555
    DPDKThreadVars *ptv = NULL;
556
    DPDKIfaceConfig *dpdk_config = (DPDKIfaceConfig *)initdata;
557
558
    if (initdata == NULL) {
559
        SCLogError("DPDK configuration is NULL in thread initialization");
560
        goto fail;
561
    }
562
563
    ptv = SCCalloc(1, sizeof(DPDKThreadVars));
564
    if (unlikely(ptv == NULL)) {
565
        SCLogError("Unable to allocate memory");
566
        goto fail;
567
    }
568
569
    ptv->tv = tv;
570
    ptv->pkts = 0;
571
    ptv->bytes = 0;
572
    ptv->livedev = LiveGetDevice(dpdk_config->iface);
573
574
    ptv->capture_dpdk_packets = StatsRegisterCounter("capture.packets", ptv->tv);
575
    ptv->capture_dpdk_rx_errs = StatsRegisterCounter("capture.rx_errors", ptv->tv);
576
    ptv->capture_dpdk_tx_errs = StatsRegisterCounter("capture.tx_errors", ptv->tv);
577
    ptv->capture_dpdk_imissed = StatsRegisterCounter("capture.dpdk.imissed", ptv->tv);
578
    ptv->capture_dpdk_rx_no_mbufs = StatsRegisterCounter("capture.dpdk.no_mbufs", ptv->tv);
579
    ptv->capture_dpdk_ierrors = StatsRegisterCounter("capture.dpdk.ierrors", ptv->tv);
580
581
    ptv->copy_mode = dpdk_config->copy_mode;
582
    ptv->checksum_mode = dpdk_config->checksum_mode;
583
584
    ptv->threads = dpdk_config->threads;
585
    ptv->intr_enabled = (dpdk_config->flags & DPDK_IRQ_MODE) ? true : false;
586
    ptv->port_id = dpdk_config->port_id;
587
    ptv->out_port_id = dpdk_config->out_port_id;
588
    ptv->port_socket_id = dpdk_config->socket_id;
589
    // pass the pointer to the mempool and then forget about it. Mempool is freed in thread deinit.
590
    ptv->pkt_mempool = dpdk_config->pkt_mempool;
591
    dpdk_config->pkt_mempool = NULL;
592
593
    thread_numa = GetNumaNode();
594
    if (thread_numa >= 0 && ptv->port_socket_id != SOCKET_ID_ANY &&
595
            thread_numa != ptv->port_socket_id) {
596
        SC_ATOMIC_ADD(dpdk_config->inconsitent_numa_cnt, 1);
597
        SCLogPerf("%s: NIC is on NUMA %d, thread on NUMA %d", dpdk_config->iface,
598
                ptv->port_socket_id, thread_numa);
599
    }
600
601
    uint16_t queue_id = SC_ATOMIC_ADD(dpdk_config->queue_id, 1);
602
    ptv->queue_id = queue_id;
603
604
    // the last thread starts the device
605
    if (queue_id == dpdk_config->threads - 1) {
606
        retval = rte_eth_dev_start(ptv->port_id);
607
        if (retval < 0) {
608
            SCLogError("%s: error (%s) during device startup", dpdk_config->iface,
609
                    rte_strerror(-retval));
610
            goto fail;
611
        }
612
613
        struct rte_eth_dev_info dev_info;
614
        retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
615
        if (retval != 0) {
616
            SCLogError("%s: error (%s) when getting device info", dpdk_config->iface,
617
                    rte_strerror(-retval));
618
            goto fail;
619
        }
620
621
        // some PMDs requires additional actions only after the device has started
622
        DevicePostStartPMDSpecificActions(ptv, dev_info.driver_name);
623
624
        uint16_t inconsistent_numa_cnt = SC_ATOMIC_GET(dpdk_config->inconsitent_numa_cnt);
625
        if (inconsistent_numa_cnt > 0 && ptv->port_socket_id != SOCKET_ID_ANY) {
626
            SCLogWarning("%s: NIC is on NUMA %d, %u threads on different NUMA node(s)",
627
                    dpdk_config->iface, ptv->port_socket_id, inconsistent_numa_cnt);
628
        } else if (ptv->port_socket_id == SOCKET_ID_ANY && rte_socket_count() > 1) {
629
            SCLogNotice(
630
                    "%s: unable to determine NIC's NUMA node, degraded performance can be expected",
631
                    dpdk_config->iface);
632
        }
633
        if (ptv->intr_enabled) {
634
            rte_spinlock_init(&intr_lock[ptv->port_id]);
635
        }
636
    }
637
638
    *data = (void *)ptv;
639
    dpdk_config->DerefFunc(dpdk_config);
640
    SCReturnInt(TM_ECODE_OK);
641
642
fail:
643
    if (dpdk_config != NULL)
644
        dpdk_config->DerefFunc(dpdk_config);
645
    if (ptv != NULL)
646
        SCFree(ptv);
647
    SCReturnInt(TM_ECODE_FAILED);
648
}
649
650
/**
651
 * \brief DeInit function closes dpdk at exit.
652
 * \param tv pointer to ThreadVars
653
 * \param data pointer that gets cast into DPDKThreadVars for ptv
654
 */
655
static TmEcode ReceiveDPDKThreadDeinit(ThreadVars *tv, void *data)
656
{
657
    SCEnter();
658
    DPDKThreadVars *ptv = (DPDKThreadVars *)data;
659
660
    if (ptv->queue_id == 0) {
661
        struct rte_eth_dev_info dev_info;
662
        int retval = rte_eth_dev_info_get(ptv->port_id, &dev_info);
663
        if (retval != 0) {
664
            SCLogError("%s: error (%s) when getting device info", ptv->livedev->dev,
665
                    rte_strerror(-retval));
666
            SCReturnInt(TM_ECODE_FAILED);
667
        }
668
669
        DevicePreClosePMDSpecificActions(ptv, dev_info.driver_name);
670
    }
671
672
    ptv->pkt_mempool = NULL; // MP is released when device is closed
673
674
    SCFree(ptv);
675
    SCReturnInt(TM_ECODE_OK);
676
}
677
678
/**
679
 * \brief This function passes off to link type decoders.
680
 *
681
 * DecodeDPDK decodes packets from DPDK and passes
682
 * them off to the proper link type decoder.
683
 *
684
 * \param t pointer to ThreadVars
685
 * \param p pointer to the current packet
686
 * \param data pointer that gets cast into DPDKThreadVars for ptv
687
 */
688
static TmEcode DecodeDPDK(ThreadVars *tv, Packet *p, void *data)
689
{
690
    SCEnter();
691
    DecodeThreadVars *dtv = (DecodeThreadVars *)data;
692
693
    BUG_ON(PKT_IS_PSEUDOPKT(p));
694
695
    /* update counters */
696
    DecodeUpdatePacketCounters(tv, dtv, p);
697
698
    /* If suri has set vlan during reading, we increase vlan counter */
699
    if (p->vlan_idx) {
700
        StatsIncr(tv, dtv->counter_vlan);
701
    }
702
703
    /* call the decoder */
704
    DecodeLinkLayer(tv, dtv, p->datalink, p, GET_PKT_DATA(p), GET_PKT_LEN(p));
705
706
    PacketDecodeFinalize(tv, dtv, p);
707
708
    SCReturnInt(TM_ECODE_OK);
709
}
710
711
static TmEcode DecodeDPDKThreadInit(ThreadVars *tv, const void *initdata, void **data)
712
{
713
    SCEnter();
714
    DecodeThreadVars *dtv = NULL;
715
716
    dtv = DecodeThreadVarsAlloc(tv);
717
718
    if (dtv == NULL)
719
        SCReturnInt(TM_ECODE_FAILED);
720
721
    DecodeRegisterPerfCounters(dtv, tv);
722
723
    *data = (void *)dtv;
724
725
    SCReturnInt(TM_ECODE_OK);
726
}
727
728
static TmEcode DecodeDPDKThreadDeinit(ThreadVars *tv, void *data)
729
{
730
    SCEnter();
731
    if (data != NULL)
732
        DecodeThreadVarsFree(tv, data);
733
    SCReturnInt(TM_ECODE_OK);
734
}
735
736
#endif /* HAVE_DPDK */
737
/* eof */
738
/**
739
 * @}
740
 */