Coverage Report

Created: 2025-11-11 06:57

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/yqueue.hpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#ifndef __ZMQ_YQUEUE_HPP_INCLUDED__
4
#define __ZMQ_YQUEUE_HPP_INCLUDED__
5
6
#include <stdlib.h>
7
#include <stddef.h>
8
9
#include "err.hpp"
10
#include "atomic_ptr.hpp"
11
#include "platform.hpp"
12
13
namespace zmq
14
{
15
//  yqueue is an efficient queue implementation. The main goal is
16
//  to minimise number of allocations/deallocations needed. Thus yqueue
17
//  allocates/deallocates elements in batches of N.
18
//
19
//  yqueue allows one thread to use push/back function and another one
20
//  to use pop/front functions. However, user must ensure that there's no
21
//  pop on the empty queue and that both threads don't access the same
22
//  element in unsynchronised manner.
23
//
24
//  T is the type of the object in the queue.
25
//  N is granularity of the queue (how many pushes have to be done till
26
//  actual memory allocation is required).
27
#if defined HAVE_POSIX_MEMALIGN
28
// ALIGN is the memory alignment size to use in the case where we have
29
// posix_memalign available. Default value is 64, this alignment will
30
// prevent two queue chunks from occupying the same CPU cache line on
31
// architectures where cache lines are <= 64 bytes (e.g. most things
32
// except POWER). It is detected at build time to try to account for other
33
// platforms like POWER and s390x.
34
template <typename T, int N, size_t ALIGN = ZMQ_CACHELINE_SIZE> class yqueue_t
35
#else
36
template <typename T, int N> class yqueue_t
37
#endif
38
{
39
  public:
40
    //  Create the queue.
41
    inline yqueue_t ()
42
0
    {
43
0
        _begin_chunk = allocate_chunk ();
44
0
        alloc_assert (_begin_chunk);
45
0
        _begin_pos = 0;
46
0
        _back_chunk = NULL;
47
0
        _back_pos = 0;
48
0
        _end_chunk = _begin_chunk;
49
0
        _end_pos = 0;
50
0
    }
Unexecuted instantiation: zmq::yqueue_t<zmq::command_t, 16, 64ul>::yqueue_t()
Unexecuted instantiation: zmq::yqueue_t<zmq::msg_t, 256, 64ul>::yqueue_t()
51
52
    //  Destroy the queue.
53
    inline ~yqueue_t ()
54
0
    {
55
0
        while (true) {
56
0
            if (_begin_chunk == _end_chunk) {
57
0
                free (_begin_chunk);
58
0
                break;
59
0
            }
60
0
            chunk_t *o = _begin_chunk;
61
0
            _begin_chunk = _begin_chunk->next;
62
0
            free (o);
63
0
        }
64
65
0
        chunk_t *sc = _spare_chunk.xchg (NULL);
66
0
        free (sc);
67
0
    }
Unexecuted instantiation: zmq::yqueue_t<zmq::command_t, 16, 64ul>::~yqueue_t()
Unexecuted instantiation: zmq::yqueue_t<zmq::msg_t, 256, 64ul>::~yqueue_t()
68
69
    //  Returns reference to the front element of the queue.
70
    //  If the queue is empty, behaviour is undefined.
71
0
    inline T &front () { return _begin_chunk->values[_begin_pos]; }
Unexecuted instantiation: zmq::yqueue_t<zmq::command_t, 16, 64ul>::front()
Unexecuted instantiation: zmq::yqueue_t<zmq::msg_t, 256, 64ul>::front()
72
73
    //  Returns reference to the back element of the queue.
74
    //  If the queue is empty, behaviour is undefined.
75
0
    inline T &back () { return _back_chunk->values[_back_pos]; }
Unexecuted instantiation: zmq::yqueue_t<zmq::command_t, 16, 64ul>::back()
Unexecuted instantiation: zmq::yqueue_t<zmq::msg_t, 256, 64ul>::back()
76
77
    //  Adds an element to the back end of the queue.
78
    inline void push ()
79
0
    {
80
0
        _back_chunk = _end_chunk;
81
0
        _back_pos = _end_pos;
82
83
0
        if (++_end_pos != N)
84
0
            return;
85
86
0
        chunk_t *sc = _spare_chunk.xchg (NULL);
87
0
        if (sc) {
88
0
            _end_chunk->next = sc;
89
0
            sc->prev = _end_chunk;
90
0
        } else {
91
0
            _end_chunk->next = allocate_chunk ();
92
0
            alloc_assert (_end_chunk->next);
93
0
            _end_chunk->next->prev = _end_chunk;
94
0
        }
95
0
        _end_chunk = _end_chunk->next;
96
0
        _end_pos = 0;
97
0
    }
Unexecuted instantiation: zmq::yqueue_t<zmq::command_t, 16, 64ul>::push()
Unexecuted instantiation: zmq::yqueue_t<zmq::msg_t, 256, 64ul>::push()
98
99
    //  Removes element from the back end of the queue. In other words
100
    //  it rollbacks last push to the queue. Take care: Caller is
101
    //  responsible for destroying the object being unpushed.
102
    //  The caller must also guarantee that the queue isn't empty when
103
    //  unpush is called. It cannot be done automatically as the read
104
    //  side of the queue can be managed by different, completely
105
    //  unsynchronised thread.
106
    inline void unpush ()
107
0
    {
108
        //  First, move 'back' one position backwards.
109
0
        if (_back_pos)
110
0
            --_back_pos;
111
0
        else {
112
0
            _back_pos = N - 1;
113
0
            _back_chunk = _back_chunk->prev;
114
0
        }
115
116
        //  Now, move 'end' position backwards. Note that obsolete end chunk
117
        //  is not used as a spare chunk. The analysis shows that doing so
118
        //  would require free and atomic operation per chunk deallocated
119
        //  instead of a simple free.
120
0
        if (_end_pos)
121
0
            --_end_pos;
122
0
        else {
123
0
            _end_pos = N - 1;
124
0
            _end_chunk = _end_chunk->prev;
125
0
            free (_end_chunk->next);
126
0
            _end_chunk->next = NULL;
127
0
        }
128
0
    }
Unexecuted instantiation: zmq::yqueue_t<zmq::command_t, 16, 64ul>::unpush()
Unexecuted instantiation: zmq::yqueue_t<zmq::msg_t, 256, 64ul>::unpush()
129
130
    //  Removes an element from the front end of the queue.
131
    inline void pop ()
132
0
    {
133
0
        if (++_begin_pos == N) {
134
0
            chunk_t *o = _begin_chunk;
135
0
            _begin_chunk = _begin_chunk->next;
136
0
            _begin_chunk->prev = NULL;
137
0
            _begin_pos = 0;
138
139
            //  'o' has been more recently used than _spare_chunk,
140
            //  so for cache reasons we'll get rid of the spare and
141
            //  use 'o' as the spare.
142
0
            chunk_t *cs = _spare_chunk.xchg (o);
143
0
            free (cs);
144
0
        }
145
0
    }
Unexecuted instantiation: zmq::yqueue_t<zmq::command_t, 16, 64ul>::pop()
Unexecuted instantiation: zmq::yqueue_t<zmq::msg_t, 256, 64ul>::pop()
146
147
  private:
148
    //  Individual memory chunk to hold N elements.
149
    struct chunk_t
150
    {
151
        T values[N];
152
        chunk_t *prev;
153
        chunk_t *next;
154
    };
155
156
    static inline chunk_t *allocate_chunk ()
157
0
    {
158
0
#if defined HAVE_POSIX_MEMALIGN
159
0
        void *pv;
160
0
        if (posix_memalign (&pv, ALIGN, sizeof (chunk_t)) == 0)
161
0
            return (chunk_t *) pv;
162
0
        return NULL;
163
#else
164
        return static_cast<chunk_t *> (malloc (sizeof (chunk_t)));
165
#endif
166
0
    }
Unexecuted instantiation: zmq::yqueue_t<zmq::command_t, 16, 64ul>::allocate_chunk()
Unexecuted instantiation: zmq::yqueue_t<zmq::msg_t, 256, 64ul>::allocate_chunk()
167
168
    //  Back position may point to invalid memory if the queue is empty,
169
    //  while begin & end positions are always valid. Begin position is
170
    //  accessed exclusively be queue reader (front/pop), while back and
171
    //  end positions are accessed exclusively by queue writer (back/push).
172
    chunk_t *_begin_chunk;
173
    int _begin_pos;
174
    chunk_t *_back_chunk;
175
    int _back_pos;
176
    chunk_t *_end_chunk;
177
    int _end_pos;
178
179
    //  People are likely to produce and consume at similar rates.  In
180
    //  this scenario holding onto the most recently freed chunk saves
181
    //  us from having to call malloc/free.
182
    atomic_ptr_t<chunk_t> _spare_chunk;
183
184
    ZMQ_NON_COPYABLE_NOR_MOVABLE (yqueue_t)
185
};
186
}
187
188
#endif