Coverage Report

Created: 2025-05-16 06:24

/src/ntopng/include/SPSCQueue.h
Line
Count
Source (jump to first uncovered line)
1
/*
2
 *
3
 * (C) 2014-25 - ntop.org
4
 *
5
 *
6
 * This program is free software; you can redistribute it and/or modify
7
 * it under the terms of the GNU General Public License as published by
8
 * the Free Software Foundation; either version 3 of the License, or
9
 * (at your option) any later version.
10
 *
11
 * This program is distributed in the hope that it will be useful,
12
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
 * GNU General Public License for more details.
15
 *
16
 * You should have received a copy of the GNU General Public License
17
 * along with this program; if not, write to the Free Software Foundation,
18
 * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
19
 *
20
 */
21
22
#ifndef _SPSC_QUEUE_H_
23
#define _SPSC_QUEUE_H_
24
25
#include "ntop_includes.h"
26
27
/* Lockless fixed-size Single-Producer Single-Consumer queue */
28
29
0
#define QUEUE_WATERMARK 8 /* pow of 2 */
30
0
#define QUEUE_WATERMARK_MASK (QUEUE_WATERMARK - 1)
31
32
template <typename T>
33
class SPSCQueue {
34
private:
35
  std::string name;
36
  u_int32_t num_failed_enqueues; /* Counts the number of times the enqueue has
37
                                    failed (queue full) */
38
  u_int64_t shadow_head;
39
  volatile u_int64_t head;
40
  volatile u_int64_t tail;
41
  u_int64_t shadow_tail;
42
  Condvar c;
43
  std::vector<T> queue;
44
  u_int32_t queue_size;
45
46
 public:
47
  /**
48
   * Constructor
49
   * @param size The queue size (rounded up to the next power of 2)
50
   */
51
10.6k
  SPSCQueue(u_int32_t size, const char *_name) {
52
10.6k
    queue_size = Utils::pow2(size);
53
10.6k
    queue.resize(queue_size);
54
10.6k
    tail = shadow_tail = queue_size - 1;
55
10.6k
    head = shadow_head = 0;
56
10.6k
    num_failed_enqueues = 0;
57
10.6k
    name.assign(_name ? _name : "");
58
10.6k
  }
Unexecuted instantiation: SPSCQueue<char*>::SPSCQueue(unsigned int, char const*)
SPSCQueue<FlowAlert*>::SPSCQueue(unsigned int, char const*)
Line
Count
Source
51
4
  SPSCQueue(u_int32_t size, const char *_name) {
52
4
    queue_size = Utils::pow2(size);
53
4
    queue.resize(queue_size);
54
4
    tail = shadow_tail = queue_size - 1;
55
4
    head = shadow_head = 0;
56
4
    num_failed_enqueues = 0;
57
4
    name.assign(_name ? _name : "");
58
4
  }
SPSCQueue<std::__1::pair<HostAlert*, bool> >::SPSCQueue(unsigned int, char const*)
Line
Count
Source
51
4
  SPSCQueue(u_int32_t size, const char *_name) {
52
4
    queue_size = Utils::pow2(size);
53
4
    queue.resize(queue_size);
54
4
    tail = shadow_tail = queue_size - 1;
55
4
    head = shadow_head = 0;
56
4
    num_failed_enqueues = 0;
57
4
    name.assign(_name ? _name : "");
58
4
  }
Unexecuted instantiation: SPSCQueue<Flow*>::SPSCQueue(unsigned int, char const*)
SPSCQueue<std::__1::pair<unsigned short, unsigned short> >::SPSCQueue(unsigned int, char const*)
Line
Count
Source
51
10.6k
  SPSCQueue(u_int32_t size, const char *_name) {
52
10.6k
    queue_size = Utils::pow2(size);
53
10.6k
    queue.resize(queue_size);
54
10.6k
    tail = shadow_tail = queue_size - 1;
55
10.6k
    head = shadow_head = 0;
56
10.6k
    num_failed_enqueues = 0;
57
10.6k
    name.assign(_name ? _name : "");
58
10.6k
  }
59
60
  /**
61
   * Return true if there is at least one item in the queue
62
   */
63
0
  inline bool isNotEmpty() {
64
0
    u_int32_t next_tail = (shadow_tail + 1) & (queue_size - 1);
65
66
0
    return next_tail != head;
67
0
  }
Unexecuted instantiation: SPSCQueue<char*>::isNotEmpty()
Unexecuted instantiation: SPSCQueue<FlowAlert*>::isNotEmpty()
Unexecuted instantiation: SPSCQueue<std::__1::pair<HostAlert*, bool> >::isNotEmpty()
Unexecuted instantiation: SPSCQueue<Flow*>::isNotEmpty()
68
69
  /**
70
   * Return true if the queue is full
71
   */
72
0
  inline bool isFull() {
73
0
    u_int32_t next_head = (shadow_head + 1) & (queue_size - 1);
74
75
0
    return tail == next_head;
76
0
  }
77
78
  /**
79
   * Pop an item from the tail
80
   * Return the item (which is removed from the queue)
81
   * Note: isNotEmpty() should be called before. Similar to std lists,
82
   * if the container is not empty the function never throws exceptions.
83
   * Otherwise, it causes undefined behavior.
84
   */
85
0
  inline T dequeue() {
86
0
    u_int32_t next_tail;
87
88
0
    next_tail = (shadow_tail + 1) & (queue_size - 1);
89
0
    if (next_tail == head) throw "Empty queue";
90
91
0
    T item = queue[next_tail];
92
0
    shadow_tail = next_tail;
93
94
0
    if((shadow_tail & QUEUE_WATERMARK_MASK) == 0) tail = shadow_tail;
95
96
0
    return item;
97
0
  }
Unexecuted instantiation: SPSCQueue<char*>::dequeue()
Unexecuted instantiation: SPSCQueue<FlowAlert*>::dequeue()
Unexecuted instantiation: SPSCQueue<std::__1::pair<HostAlert*, bool> >::dequeue()
Unexecuted instantiation: SPSCQueue<Flow*>::dequeue()
98
99
  inline bool wait() { return ((c.wait() < 0) ? false : true); }
100
101
  /**
102
   * Push an item to the head
103
   * @param item The item to add to the queue
104
   * @param flush Immediately makes the item available to the consumer, a
105
   * watermark is used otherwise Return true on success, false if there is no
106
   * room
107
   */
108
0
  inline bool enqueue(T item, bool flush) {
109
0
    u_int32_t next_head;
110
111
0
    next_head = (shadow_head + 1) & (queue_size - 1);
112
113
0
    if (tail != next_head) {
114
0
      queue[shadow_head] = item;
115
116
0
      shadow_head = next_head;
117
0
      c.signal();
118
119
0
      if (flush || (shadow_head & QUEUE_WATERMARK_MASK) == 0)
120
0
        head = shadow_head;
121
122
0
      return true; /* success */
123
0
    }
124
125
0
    num_failed_enqueues++;
126
127
0
    return false; /* no room */
128
0
  }
Unexecuted instantiation: SPSCQueue<char*>::enqueue(char*, bool)
Unexecuted instantiation: SPSCQueue<FlowAlert*>::enqueue(FlowAlert*, bool)
Unexecuted instantiation: SPSCQueue<std::__1::pair<HostAlert*, bool> >::enqueue(std::__1::pair<HostAlert*, bool>, bool)
Unexecuted instantiation: SPSCQueue<Flow*>::enqueue(Flow*, bool)
Unexecuted instantiation: SPSCQueue<std::__1::pair<unsigned short, unsigned short> >::enqueue(std::__1::pair<unsigned short, unsigned short>, bool)
129
130
  /**
131
   * Return the number of failed enqueue attempts
132
   */
133
  inline u_int32_t get_num_failed_enqueues() const {
134
    return num_failed_enqueues;
135
  };
136
137
  /**
138
   * Writes queue stats in a table of the vm passed as parameter
139
   */
140
0
  inline void lua(lua_State *vm) const {
141
0
    if (vm) {
142
0
      lua_newtable(vm);
143
0
      lua_push_uint32_table_entry(vm, "num_failed_enqueues",
144
0
                                  num_failed_enqueues);
145
0
      lua_pushstring(vm, name.c_str());
146
0
      lua_insert(vm, -2);
147
0
      lua_settable(vm, -3);
148
0
    }
149
0
  };
Unexecuted instantiation: SPSCQueue<Flow*>::lua(lua_State*) const
Unexecuted instantiation: SPSCQueue<FlowAlert*>::lua(lua_State*) const
150
};
151
152
#endif /* _SPSC_QUEUE_H_ */