Coverage Report

Created: 2025-07-12 06:05

/src/libzmq/src/ypipe_conflate.hpp
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#ifndef __ZMQ_YPIPE_CONFLATE_HPP_INCLUDED__
4
#define __ZMQ_YPIPE_CONFLATE_HPP_INCLUDED__
5
6
#include "platform.hpp"
7
#include "dbuffer.hpp"
8
#include "ypipe_base.hpp"
9
10
namespace zmq
11
{
12
//  Adapter for dbuffer, to plug it in instead of a queue for the sake
13
//  of implementing the conflate socket option, which, if set, makes
14
//  the receiving side to discard all incoming messages but the last one.
15
//
16
//  reader_awake flag is needed here to mimic ypipe delicate behaviour
17
//  around the reader being asleep (see 'c' pointer being NULL in ypipe.hpp)
18
19
template <typename T> class ypipe_conflate_t ZMQ_FINAL : public ypipe_base_t<T>
20
{
21
  public:
22
    //  Initialises the pipe.
23
0
    ypipe_conflate_t () : reader_awake (false) {}
24
25
    //  Following function (write) deliberately copies uninitialised data
26
    //  when used with zmq_msg. Initialising the VSM body for
27
    //  non-VSM messages won't be good for performance.
28
29
#ifdef ZMQ_HAVE_OPENVMS
30
#pragma message save
31
#pragma message disable(UNINIT)
32
#endif
33
    void write (const T &value_, bool incomplete_)
34
0
    {
35
0
        (void) incomplete_;
36
37
0
        dbuffer.write (value_);
38
0
    }
39
40
#ifdef ZMQ_HAVE_OPENVMS
41
#pragma message restore
42
#endif
43
44
    // There are no incomplete items for conflate ypipe
45
0
    bool unwrite (T *) { return false; }
46
47
    //  Flush is no-op for conflate ypipe. Reader asleep behaviour
48
    //  is as of the usual ypipe.
49
    //  Returns false if the reader thread is sleeping. In that case,
50
    //  caller is obliged to wake the reader up before using the pipe again.
51
0
    bool flush () { return reader_awake; }
52
53
    //  Check whether item is available for reading.
54
    bool check_read ()
55
0
    {
56
0
        const bool res = dbuffer.check_read ();
57
0
        if (!res)
58
0
            reader_awake = false;
59
60
0
        return res;
61
0
    }
62
63
    //  Reads an item from the pipe. Returns false if there is no value.
64
    //  available.
65
    bool read (T *value_)
66
0
    {
67
0
        if (!check_read ())
68
0
            return false;
69
70
0
        return dbuffer.read (value_);
71
0
    }
72
73
    //  Applies the function fn to the first element in the pipe
74
    //  and returns the value returned by the fn.
75
    //  The pipe mustn't be empty or the function crashes.
76
0
    bool probe (bool (*fn_) (const T &)) { return dbuffer.probe (fn_); }
77
78
  protected:
79
    dbuffer_t<T> dbuffer;
80
    bool reader_awake;
81
82
    ZMQ_NON_COPYABLE_NOR_MOVABLE (ypipe_conflate_t)
83
};
84
}
85
86
#endif