/src/libzmq/src/yqueue.hpp
Line | Count | Source |
1 | | /* SPDX-License-Identifier: MPL-2.0 */ |
2 | | |
3 | | #ifndef __ZMQ_YQUEUE_HPP_INCLUDED__ |
4 | | #define __ZMQ_YQUEUE_HPP_INCLUDED__ |
5 | | |
6 | | #include <stdlib.h> |
7 | | #include <stddef.h> |
8 | | |
9 | | #include "err.hpp" |
10 | | #include "atomic_ptr.hpp" |
11 | | #include "platform.hpp" |
12 | | |
13 | | namespace zmq |
14 | | { |
15 | | // yqueue is an efficient queue implementation. The main goal is |
16 | | // to minimise number of allocations/deallocations needed. Thus yqueue |
17 | | // allocates/deallocates elements in batches of N. |
18 | | // |
19 | | // yqueue allows one thread to use push/back function and another one |
20 | | // to use pop/front functions. However, user must ensure that there's no |
21 | | // pop on the empty queue and that both threads don't access the same |
22 | | // element in unsynchronised manner. |
23 | | // |
24 | | // T is the type of the object in the queue. |
25 | | // N is granularity of the queue (how many pushes have to be done till |
26 | | // actual memory allocation is required). |
27 | | #if defined HAVE_POSIX_MEMALIGN |
28 | | // ALIGN is the memory alignment size to use in the case where we have |
29 | | // posix_memalign available. Default value is 64, this alignment will |
30 | | // prevent two queue chunks from occupying the same CPU cache line on |
31 | | // architectures where cache lines are <= 64 bytes (e.g. most things |
32 | | // except POWER). It is detected at build time to try to account for other |
33 | | // platforms like POWER and s390x. |
34 | | template <typename T, int N, size_t ALIGN = ZMQ_CACHELINE_SIZE> class yqueue_t |
35 | | #else |
36 | | template <typename T, int N> class yqueue_t |
37 | | #endif |
38 | | { |
39 | | public: |
40 | | // Create the queue. |
41 | | inline yqueue_t () |
42 | 0 | { |
43 | 0 | _begin_chunk = allocate_chunk (); |
44 | 0 | alloc_assert (_begin_chunk); |
45 | 0 | _begin_pos = 0; |
46 | 0 | _back_chunk = NULL; |
47 | 0 | _back_pos = 0; |
48 | 0 | _end_chunk = _begin_chunk; |
49 | 0 | _end_pos = 0; |
50 | 0 | } Unexecuted instantiation: zmq::yqueue_t<zmq::command_t, 16, 64ul>::yqueue_t() Unexecuted instantiation: zmq::yqueue_t<zmq::msg_t, 256, 64ul>::yqueue_t() |
51 | | |
52 | | // Destroy the queue. |
53 | | inline ~yqueue_t () |
54 | 0 | { |
55 | 0 | while (true) { |
56 | 0 | if (_begin_chunk == _end_chunk) { |
57 | 0 | free (_begin_chunk); |
58 | 0 | break; |
59 | 0 | } |
60 | 0 | chunk_t *o = _begin_chunk; |
61 | 0 | _begin_chunk = _begin_chunk->next; |
62 | 0 | free (o); |
63 | 0 | } |
64 | |
|
65 | 0 | chunk_t *sc = _spare_chunk.xchg (NULL); |
66 | 0 | free (sc); |
67 | 0 | } Unexecuted instantiation: zmq::yqueue_t<zmq::command_t, 16, 64ul>::~yqueue_t() Unexecuted instantiation: zmq::yqueue_t<zmq::msg_t, 256, 64ul>::~yqueue_t() |
68 | | |
69 | | // Returns reference to the front element of the queue. |
70 | | // If the queue is empty, behaviour is undefined. |
71 | 0 | inline T &front () { return _begin_chunk->values[_begin_pos]; }Unexecuted instantiation: zmq::yqueue_t<zmq::command_t, 16, 64ul>::front() Unexecuted instantiation: zmq::yqueue_t<zmq::msg_t, 256, 64ul>::front() |
72 | | |
73 | | // Returns reference to the back element of the queue. |
74 | | // If the queue is empty, behaviour is undefined. |
75 | 0 | inline T &back () { return _back_chunk->values[_back_pos]; }Unexecuted instantiation: zmq::yqueue_t<zmq::command_t, 16, 64ul>::back() Unexecuted instantiation: zmq::yqueue_t<zmq::msg_t, 256, 64ul>::back() |
76 | | |
77 | | // Adds an element to the back end of the queue. |
78 | | inline void push () |
79 | 0 | { |
80 | 0 | _back_chunk = _end_chunk; |
81 | 0 | _back_pos = _end_pos; |
82 | |
|
83 | 0 | if (++_end_pos != N) |
84 | 0 | return; |
85 | | |
86 | 0 | chunk_t *sc = _spare_chunk.xchg (NULL); |
87 | 0 | if (sc) { |
88 | 0 | _end_chunk->next = sc; |
89 | 0 | sc->prev = _end_chunk; |
90 | 0 | } else { |
91 | 0 | _end_chunk->next = allocate_chunk (); |
92 | 0 | alloc_assert (_end_chunk->next); |
93 | 0 | _end_chunk->next->prev = _end_chunk; |
94 | 0 | } |
95 | 0 | _end_chunk = _end_chunk->next; |
96 | 0 | _end_pos = 0; |
97 | 0 | } Unexecuted instantiation: zmq::yqueue_t<zmq::command_t, 16, 64ul>::push() Unexecuted instantiation: zmq::yqueue_t<zmq::msg_t, 256, 64ul>::push() |
98 | | |
99 | | // Removes element from the back end of the queue. In other words |
100 | | // it rollbacks last push to the queue. Take care: Caller is |
101 | | // responsible for destroying the object being unpushed. |
102 | | // The caller must also guarantee that the queue isn't empty when |
103 | | // unpush is called. It cannot be done automatically as the read |
104 | | // side of the queue can be managed by different, completely |
105 | | // unsynchronised thread. |
106 | | inline void unpush () |
107 | 0 | { |
108 | | // First, move 'back' one position backwards. |
109 | 0 | if (_back_pos) |
110 | 0 | --_back_pos; |
111 | 0 | else { |
112 | 0 | _back_pos = N - 1; |
113 | 0 | _back_chunk = _back_chunk->prev; |
114 | 0 | } |
115 | | |
116 | | // Now, move 'end' position backwards. Note that obsolete end chunk |
117 | | // is not used as a spare chunk. The analysis shows that doing so |
118 | | // would require free and atomic operation per chunk deallocated |
119 | | // instead of a simple free. |
120 | 0 | if (_end_pos) |
121 | 0 | --_end_pos; |
122 | 0 | else { |
123 | 0 | _end_pos = N - 1; |
124 | 0 | _end_chunk = _end_chunk->prev; |
125 | 0 | free (_end_chunk->next); |
126 | 0 | _end_chunk->next = NULL; |
127 | 0 | } |
128 | 0 | } Unexecuted instantiation: zmq::yqueue_t<zmq::command_t, 16, 64ul>::unpush() Unexecuted instantiation: zmq::yqueue_t<zmq::msg_t, 256, 64ul>::unpush() |
129 | | |
130 | | // Removes an element from the front end of the queue. |
131 | | inline void pop () |
132 | 0 | { |
133 | 0 | if (++_begin_pos == N) { |
134 | 0 | chunk_t *o = _begin_chunk; |
135 | 0 | _begin_chunk = _begin_chunk->next; |
136 | 0 | _begin_chunk->prev = NULL; |
137 | 0 | _begin_pos = 0; |
138 | | |
139 | | // 'o' has been more recently used than _spare_chunk, |
140 | | // so for cache reasons we'll get rid of the spare and |
141 | | // use 'o' as the spare. |
142 | 0 | chunk_t *cs = _spare_chunk.xchg (o); |
143 | 0 | free (cs); |
144 | 0 | } |
145 | 0 | } Unexecuted instantiation: zmq::yqueue_t<zmq::command_t, 16, 64ul>::pop() Unexecuted instantiation: zmq::yqueue_t<zmq::msg_t, 256, 64ul>::pop() |
146 | | |
147 | | private: |
148 | | // Individual memory chunk to hold N elements. |
149 | | struct chunk_t |
150 | | { |
151 | | T values[N]; |
152 | | chunk_t *prev; |
153 | | chunk_t *next; |
154 | | }; |
155 | | |
156 | | static inline chunk_t *allocate_chunk () |
157 | 0 | { |
158 | 0 | #if defined HAVE_POSIX_MEMALIGN |
159 | 0 | void *pv; |
160 | 0 | if (posix_memalign (&pv, ALIGN, sizeof (chunk_t)) == 0) |
161 | 0 | return (chunk_t *) pv; |
162 | 0 | return NULL; |
163 | | #else |
164 | | return static_cast<chunk_t *> (malloc (sizeof (chunk_t))); |
165 | | #endif |
166 | 0 | } Unexecuted instantiation: zmq::yqueue_t<zmq::command_t, 16, 64ul>::allocate_chunk() Unexecuted instantiation: zmq::yqueue_t<zmq::msg_t, 256, 64ul>::allocate_chunk() |
167 | | |
168 | | // Back position may point to invalid memory if the queue is empty, |
169 | | // while begin & end positions are always valid. Begin position is |
170 | | // accessed exclusively be queue reader (front/pop), while back and |
171 | | // end positions are accessed exclusively by queue writer (back/push). |
172 | | chunk_t *_begin_chunk; |
173 | | int _begin_pos; |
174 | | chunk_t *_back_chunk; |
175 | | int _back_pos; |
176 | | chunk_t *_end_chunk; |
177 | | int _end_pos; |
178 | | |
179 | | // People are likely to produce and consume at similar rates. In |
180 | | // this scenario holding onto the most recently freed chunk saves |
181 | | // us from having to call malloc/free. |
182 | | atomic_ptr_t<chunk_t> _spare_chunk; |
183 | | |
184 | | ZMQ_NON_COPYABLE_NOR_MOVABLE (yqueue_t) |
185 | | }; |
186 | | } |
187 | | |
188 | | #endif |