Coverage Report

Created: 2025-08-26 06:06

/src/libzmq/src/mailbox_safe.cpp
Line
Count
Source (jump to first uncovered line)
1
/* SPDX-License-Identifier: MPL-2.0 */
2
3
#include "precompiled.hpp"
4
#include "mailbox_safe.hpp"
5
#include "clock.hpp"
6
#include "err.hpp"
7
8
#include <algorithm>
9
10
0
zmq::mailbox_safe_t::mailbox_safe_t (mutex_t *sync_) : _sync (sync_)
11
0
{
12
    //  Get the pipe into passive state. That way, if the users starts by
13
    //  polling on the associated file descriptor it will get woken up when
14
    //  new command is posted.
15
0
    const bool ok = _cpipe.check_read ();
16
0
    zmq_assert (!ok);
17
0
}
18
19
zmq::mailbox_safe_t::~mailbox_safe_t ()
20
0
{
21
    //  TODO: Retrieve and deallocate commands inside the cpipe.
22
23
    // Work around problem that other threads might still be in our
24
    // send() method, by waiting on the mutex before disappearing.
25
0
    _sync->lock ();
26
0
    _sync->unlock ();
27
0
}
28
29
void zmq::mailbox_safe_t::add_signaler (signaler_t *signaler_)
30
0
{
31
0
    _signalers.push_back (signaler_);
32
0
}
33
34
void zmq::mailbox_safe_t::remove_signaler (signaler_t *signaler_)
35
0
{
36
    // TODO: make a copy of array and signal outside the lock
37
0
    const std::vector<zmq::signaler_t *>::iterator end = _signalers.end ();
38
0
    const std::vector<signaler_t *>::iterator it =
39
0
      std::find (_signalers.begin (), end, signaler_);
40
41
0
    if (it != end)
42
0
        _signalers.erase (it);
43
0
}
44
45
void zmq::mailbox_safe_t::clear_signalers ()
46
0
{
47
0
    _signalers.clear ();
48
0
}
49
50
void zmq::mailbox_safe_t::send (const command_t &cmd_)
51
0
{
52
0
    _sync->lock ();
53
0
    _cpipe.write (cmd_, false);
54
0
    const bool ok = _cpipe.flush ();
55
56
0
    if (!ok) {
57
0
        _cond_var.broadcast ();
58
59
0
        for (std::vector<signaler_t *>::iterator it = _signalers.begin (),
60
0
                                                 end = _signalers.end ();
61
0
             it != end; ++it) {
62
0
            (*it)->send ();
63
0
        }
64
0
    }
65
66
0
    _sync->unlock ();
67
0
}
68
69
int zmq::mailbox_safe_t::recv (command_t *cmd_, int timeout_)
70
0
{
71
    //  Try to get the command straight away.
72
0
    if (_cpipe.read (cmd_))
73
0
        return 0;
74
75
    //  If the timeout is zero, it will be quicker to release the lock, giving other a chance to send a command
76
    //  and immediately relock it.
77
0
    if (timeout_ == 0) {
78
0
        _sync->unlock ();
79
0
        _sync->lock ();
80
0
    } else {
81
        //  Wait for signal from the command sender.
82
0
        const int rc = _cond_var.wait (_sync, timeout_);
83
0
        if (rc == -1) {
84
0
            errno_assert (errno == EAGAIN || errno == EINTR);
85
0
            return -1;
86
0
        }
87
0
    }
88
89
    //  Another thread may already fetch the command
90
0
    const bool ok = _cpipe.read (cmd_);
91
92
0
    if (!ok) {
93
0
        errno = EAGAIN;
94
0
        return -1;
95
0
    }
96
97
0
    return 0;
98
0
}