Coverage Report

Created: 2025-11-09 06:32

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libzmq/src/ypipe.hpp
Line
Count
Source
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#ifndef __ZMQ_YPIPE_HPP_INCLUDED__
4
#define __ZMQ_YPIPE_HPP_INCLUDED__
5
6
#include "atomic_ptr.hpp"
7
#include "yqueue.hpp"
8
#include "ypipe_base.hpp"
9
10
namespace zmq
11
{
12
//  Lock-free queue implementation.
13
//  Only a single thread can read from the pipe at any specific moment.
14
//  Only a single thread can write to the pipe at any specific moment.
15
//  T is the type of the object in the queue.
16
//  N is granularity of the pipe, i.e. how many items are needed to
17
//  perform next memory allocation.
18
19
template <typename T, int N> class ypipe_t ZMQ_FINAL : public ypipe_base_t<T>
20
{
21
  public:
22
    //  Initialises the pipe.
23
    ypipe_t ()
24
0
    {
25
        //  Insert terminator element into the queue.
26
0
        _queue.push ();
27
28
        //  Let all the pointers to point to the terminator.
29
        //  (unless pipe is dead, in which case c is set to NULL).
30
0
        _r = _w = _f = &_queue.back ();
31
0
        _c.set (&_queue.back ());
32
0
    }
Unexecuted instantiation: zmq::ypipe_t<zmq::command_t, 16>::ypipe_t()
Unexecuted instantiation: zmq::ypipe_t<zmq::msg_t, 256>::ypipe_t()
33
34
    //  Following function (write) deliberately copies uninitialised data
35
    //  when used with zmq_msg. Initialising the VSM body for
36
    //  non-VSM messages won't be good for performance.
37
38
#ifdef ZMQ_HAVE_OPENVMS
39
#pragma message save
40
#pragma message disable(UNINIT)
41
#endif
42
43
    //  Write an item to the pipe.  Don't flush it yet. If incomplete is
44
    //  set to true the item is assumed to be continued by items
45
    //  subsequently written to the pipe. Incomplete items are never
46
    //  flushed down the stream.
47
    void write (const T &value_, bool incomplete_)
48
0
    {
49
        //  Place the value to the queue, add new terminator element.
50
0
        _queue.back () = value_;
51
0
        _queue.push ();
52
53
        //  Move the "flush up to here" pointer.
54
0
        if (!incomplete_)
55
0
            _f = &_queue.back ();
56
0
    }
Unexecuted instantiation: zmq::ypipe_t<zmq::command_t, 16>::write(zmq::command_t const&, bool)
Unexecuted instantiation: zmq::ypipe_t<zmq::msg_t, 256>::write(zmq::msg_t const&, bool)
57
58
#ifdef ZMQ_HAVE_OPENVMS
59
#pragma message restore
60
#endif
61
62
    //  Pop an incomplete item from the pipe. Returns true if such
63
    //  item exists, false otherwise.
64
    bool unwrite (T *value_)
65
0
    {
66
0
        if (_f == &_queue.back ())
67
0
            return false;
68
0
        _queue.unpush ();
69
0
        *value_ = _queue.back ();
70
0
        return true;
71
0
    }
Unexecuted instantiation: zmq::ypipe_t<zmq::command_t, 16>::unwrite(zmq::command_t*)
Unexecuted instantiation: zmq::ypipe_t<zmq::msg_t, 256>::unwrite(zmq::msg_t*)
72
73
    //  Flush all the completed items into the pipe. Returns false if
74
    //  the reader thread is sleeping. In that case, caller is obliged to
75
    //  wake the reader up before using the pipe again.
76
    bool flush ()
77
0
    {
78
        //  If there are no un-flushed items, do nothing.
79
0
        if (_w == _f)
80
0
            return true;
81
82
        //  Try to set 'c' to 'f'.
83
0
        if (_c.cas (_w, _f) != _w) {
84
            //  Compare-and-swap was unsuccessful because 'c' is NULL.
85
            //  This means that the reader is asleep. Therefore we don't
86
            //  care about thread-safeness and update c in non-atomic
87
            //  manner. We'll return false to let the caller know
88
            //  that reader is sleeping.
89
0
            _c.set (_f);
90
0
            _w = _f;
91
0
            return false;
92
0
        }
93
94
        //  Reader is alive. Nothing special to do now. Just move
95
        //  the 'first un-flushed item' pointer to 'f'.
96
0
        _w = _f;
97
0
        return true;
98
0
    }
Unexecuted instantiation: zmq::ypipe_t<zmq::command_t, 16>::flush()
Unexecuted instantiation: zmq::ypipe_t<zmq::msg_t, 256>::flush()
99
100
    //  Check whether item is available for reading.
101
    bool check_read ()
102
0
    {
103
        //  Was the value prefetched already? If so, return.
104
0
        if (&_queue.front () != _r && _r)
105
0
            return true;
106
107
        //  There's no prefetched value, so let us prefetch more values.
108
        //  Prefetching is to simply retrieve the
109
        //  pointer from c in atomic fashion. If there are no
110
        //  items to prefetch, set c to NULL (using compare-and-swap).
111
0
        _r = _c.cas (&_queue.front (), NULL);
112
113
        //  If there are no elements prefetched, exit.
114
        //  During pipe's lifetime r should never be NULL, however,
115
        //  it can happen during pipe shutdown when items
116
        //  are being deallocated.
117
0
        if (&_queue.front () == _r || !_r)
118
0
            return false;
119
120
        //  There was at least one value prefetched.
121
0
        return true;
122
0
    }
Unexecuted instantiation: zmq::ypipe_t<zmq::command_t, 16>::check_read()
Unexecuted instantiation: zmq::ypipe_t<zmq::msg_t, 256>::check_read()
123
124
    //  Reads an item from the pipe. Returns false if there is no value.
125
    //  available.
126
    bool read (T *value_)
127
0
    {
128
        //  Try to prefetch a value.
129
0
        if (!check_read ())
130
0
            return false;
131
132
        //  There was at least one value prefetched.
133
        //  Return it to the caller.
134
0
        *value_ = _queue.front ();
135
0
        _queue.pop ();
136
0
        return true;
137
0
    }
Unexecuted instantiation: zmq::ypipe_t<zmq::command_t, 16>::read(zmq::command_t*)
Unexecuted instantiation: zmq::ypipe_t<zmq::msg_t, 256>::read(zmq::msg_t*)
138
139
    //  Applies the function fn to the first element in the pipe
140
    //  and returns the value returned by the fn.
141
    //  The pipe mustn't be empty or the function crashes.
142
    bool probe (bool (*fn_) (const T &))
143
0
    {
144
0
        const bool rc = check_read ();
145
0
        zmq_assert (rc);
146
147
0
        return (*fn_) (_queue.front ());
148
0
    }
Unexecuted instantiation: zmq::ypipe_t<zmq::command_t, 16>::probe(bool (*)(zmq::command_t const&))
Unexecuted instantiation: zmq::ypipe_t<zmq::msg_t, 256>::probe(bool (*)(zmq::msg_t const&))
149
150
  protected:
151
    //  Allocation-efficient queue to store pipe items.
152
    //  Front of the queue points to the first prefetched item, back of
153
    //  the pipe points to last un-flushed item. Front is used only by
154
    //  reader thread, while back is used only by writer thread.
155
    yqueue_t<T, N> _queue;
156
157
    //  Points to the first un-flushed item. This variable is used
158
    //  exclusively by writer thread.
159
    T *_w;
160
161
    //  Points to the first un-prefetched item. This variable is used
162
    //  exclusively by reader thread.
163
    T *_r;
164
165
    //  Points to the first item to be flushed in the future.
166
    T *_f;
167
168
    //  The single point of contention between writer and reader thread.
169
    //  Points past the last flushed item. If it is NULL,
170
    //  reader is asleep. This pointer should be always accessed using
171
    //  atomic operations.
172
    atomic_ptr_t<T> _c;
173
174
    ZMQ_NON_COPYABLE_NOR_MOVABLE (ypipe_t)
175
};
176
}
177
178
#endif