/src/ntopng/include/SPSCQueue.h
Line | Count | Source (jump to first uncovered line) |
1 | | /* |
2 | | * |
3 | | * (C) 2014-25 - ntop.org |
4 | | * |
5 | | * |
6 | | * This program is free software; you can redistribute it and/or modify |
7 | | * it under the terms of the GNU General Public License as published by |
8 | | * the Free Software Foundation; either version 3 of the License, or |
9 | | * (at your option) any later version. |
10 | | * |
11 | | * This program is distributed in the hope that it will be useful, |
12 | | * but WITHOUT ANY WARRANTY; without even the implied warranty of |
13 | | * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
14 | | * GNU General Public License for more details. |
15 | | * |
16 | | * You should have received a copy of the GNU General Public License |
17 | | * along with this program; if not, write to the Free Software Foundation, |
18 | | * Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. |
19 | | * |
20 | | */ |
21 | | |
22 | | #ifndef _SPSC_QUEUE_H_ |
23 | | #define _SPSC_QUEUE_H_ |
24 | | |
25 | | #include "ntop_includes.h" |
26 | | |
27 | | /* Lockless fixed-size Single-Producer Single-Consumer queue */ |
28 | | |
29 | 0 | #define QUEUE_WATERMARK 8 /* pow of 2 */ |
30 | 0 | #define QUEUE_WATERMARK_MASK (QUEUE_WATERMARK - 1) |
31 | | |
32 | | template <typename T> |
33 | | class SPSCQueue { |
34 | | private: |
35 | | std::string name; |
36 | | u_int32_t num_failed_enqueues; /* Counts the number of times the enqueue has |
37 | | failed (queue full) */ |
38 | | u_int64_t shadow_head; |
39 | | volatile u_int64_t head; |
40 | | volatile u_int64_t tail; |
41 | | u_int64_t shadow_tail; |
42 | | Condvar c; |
43 | | std::vector<T> queue; |
44 | | u_int32_t queue_size; |
45 | | |
46 | | public: |
47 | | /** |
48 | | * Constructor |
49 | | * @param size The queue size (rounded up to the next power of 2) |
50 | | */ |
51 | 10.6k | SPSCQueue(u_int32_t size, const char *_name) { |
52 | 10.6k | queue_size = Utils::pow2(size); |
53 | 10.6k | queue.resize(queue_size); |
54 | 10.6k | tail = shadow_tail = queue_size - 1; |
55 | 10.6k | head = shadow_head = 0; |
56 | 10.6k | num_failed_enqueues = 0; |
57 | 10.6k | name.assign(_name ? _name : ""); |
58 | 10.6k | } Unexecuted instantiation: SPSCQueue<char*>::SPSCQueue(unsigned int, char const*) SPSCQueue<FlowAlert*>::SPSCQueue(unsigned int, char const*) Line | Count | Source | 51 | 4 | SPSCQueue(u_int32_t size, const char *_name) { | 52 | 4 | queue_size = Utils::pow2(size); | 53 | 4 | queue.resize(queue_size); | 54 | 4 | tail = shadow_tail = queue_size - 1; | 55 | 4 | head = shadow_head = 0; | 56 | 4 | num_failed_enqueues = 0; | 57 | 4 | name.assign(_name ? _name : ""); | 58 | 4 | } |
SPSCQueue<std::__1::pair<HostAlert*, bool> >::SPSCQueue(unsigned int, char const*) Line | Count | Source | 51 | 4 | SPSCQueue(u_int32_t size, const char *_name) { | 52 | 4 | queue_size = Utils::pow2(size); | 53 | 4 | queue.resize(queue_size); | 54 | 4 | tail = shadow_tail = queue_size - 1; | 55 | 4 | head = shadow_head = 0; | 56 | 4 | num_failed_enqueues = 0; | 57 | 4 | name.assign(_name ? _name : ""); | 58 | 4 | } |
Unexecuted instantiation: SPSCQueue<Flow*>::SPSCQueue(unsigned int, char const*) SPSCQueue<std::__1::pair<unsigned short, unsigned short> >::SPSCQueue(unsigned int, char const*) Line | Count | Source | 51 | 10.6k | SPSCQueue(u_int32_t size, const char *_name) { | 52 | 10.6k | queue_size = Utils::pow2(size); | 53 | 10.6k | queue.resize(queue_size); | 54 | 10.6k | tail = shadow_tail = queue_size - 1; | 55 | 10.6k | head = shadow_head = 0; | 56 | 10.6k | num_failed_enqueues = 0; | 57 | 10.6k | name.assign(_name ? _name : ""); | 58 | 10.6k | } |
|
59 | | |
60 | | /** |
61 | | * Return true if there is at least one item in the queue |
62 | | */ |
63 | 0 | inline bool isNotEmpty() { |
64 | 0 | u_int32_t next_tail = (shadow_tail + 1) & (queue_size - 1); |
65 | |
|
66 | 0 | return next_tail != head; |
67 | 0 | } Unexecuted instantiation: SPSCQueue<char*>::isNotEmpty() Unexecuted instantiation: SPSCQueue<FlowAlert*>::isNotEmpty() Unexecuted instantiation: SPSCQueue<std::__1::pair<HostAlert*, bool> >::isNotEmpty() Unexecuted instantiation: SPSCQueue<Flow*>::isNotEmpty() |
68 | | |
69 | | /** |
70 | | * Return true if the queue is full |
71 | | */ |
72 | 0 | inline bool isFull() { |
73 | 0 | u_int32_t next_head = (shadow_head + 1) & (queue_size - 1); |
74 | |
|
75 | 0 | return tail == next_head; |
76 | 0 | } |
77 | | |
78 | | /** |
79 | | * Pop an item from the tail |
80 | | * Return the item (which is removed from the queue) |
81 | | * Note: isNotEmpty() should be called before. Similar to std lists, |
82 | | * if the container is not empty the function never throws exceptions. |
83 | | * Otherwise, it causes undefined behavior. |
84 | | */ |
85 | 0 | inline T dequeue() { |
86 | 0 | u_int32_t next_tail; |
87 | |
|
88 | 0 | next_tail = (shadow_tail + 1) & (queue_size - 1); |
89 | 0 | if (next_tail == head) throw "Empty queue"; |
90 | | |
91 | 0 | T item = queue[next_tail]; |
92 | 0 | shadow_tail = next_tail; |
93 | |
|
94 | 0 | if((shadow_tail & QUEUE_WATERMARK_MASK) == 0) tail = shadow_tail; |
95 | |
|
96 | 0 | return item; |
97 | 0 | } Unexecuted instantiation: SPSCQueue<char*>::dequeue() Unexecuted instantiation: SPSCQueue<FlowAlert*>::dequeue() Unexecuted instantiation: SPSCQueue<std::__1::pair<HostAlert*, bool> >::dequeue() Unexecuted instantiation: SPSCQueue<Flow*>::dequeue() |
98 | | |
99 | | inline bool wait() { return ((c.wait() < 0) ? false : true); } |
100 | | |
101 | | /** |
102 | | * Push an item to the head |
103 | | * @param item The item to add to the queue |
104 | | * @param flush Immediately makes the item available to the consumer, a |
105 | | * watermark is used otherwise Return true on success, false if there is no |
106 | | * room |
107 | | */ |
108 | 0 | inline bool enqueue(T item, bool flush) { |
109 | 0 | u_int32_t next_head; |
110 | |
|
111 | 0 | next_head = (shadow_head + 1) & (queue_size - 1); |
112 | |
|
113 | 0 | if (tail != next_head) { |
114 | 0 | queue[shadow_head] = item; |
115 | |
|
116 | 0 | shadow_head = next_head; |
117 | 0 | c.signal(); |
118 | |
|
119 | 0 | if (flush || (shadow_head & QUEUE_WATERMARK_MASK) == 0) |
120 | 0 | head = shadow_head; |
121 | |
|
122 | 0 | return true; /* success */ |
123 | 0 | } |
124 | | |
125 | 0 | num_failed_enqueues++; |
126 | |
|
127 | 0 | return false; /* no room */ |
128 | 0 | } Unexecuted instantiation: SPSCQueue<char*>::enqueue(char*, bool) Unexecuted instantiation: SPSCQueue<FlowAlert*>::enqueue(FlowAlert*, bool) Unexecuted instantiation: SPSCQueue<std::__1::pair<HostAlert*, bool> >::enqueue(std::__1::pair<HostAlert*, bool>, bool) Unexecuted instantiation: SPSCQueue<Flow*>::enqueue(Flow*, bool) Unexecuted instantiation: SPSCQueue<std::__1::pair<unsigned short, unsigned short> >::enqueue(std::__1::pair<unsigned short, unsigned short>, bool) |
129 | | |
130 | | /** |
131 | | * Return the number of failed enqueue attempts |
132 | | */ |
133 | | inline u_int32_t get_num_failed_enqueues() const { |
134 | | return num_failed_enqueues; |
135 | | }; |
136 | | |
137 | | /** |
138 | | * Writes queue stats in a table of the vm passed as parameter |
139 | | */ |
140 | 0 | inline void lua(lua_State *vm) const { |
141 | 0 | if (vm) { |
142 | 0 | lua_newtable(vm); |
143 | 0 | lua_push_uint32_table_entry(vm, "num_failed_enqueues", |
144 | 0 | num_failed_enqueues); |
145 | 0 | lua_pushstring(vm, name.c_str()); |
146 | 0 | lua_insert(vm, -2); |
147 | 0 | lua_settable(vm, -3); |
148 | 0 | } |
149 | 0 | }; Unexecuted instantiation: SPSCQueue<Flow*>::lua(lua_State*) const Unexecuted instantiation: SPSCQueue<FlowAlert*>::lua(lua_State*) const |
150 | | }; |
151 | | |
152 | | #endif /* _SPSC_QUEUE_H_ */ |