Coverage Report

Created: 2026-05-04 06:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/Fast-DDS/src/cpp/rtps/messages/SendBuffersManager.cpp
Line
Count
Source
1
// Copyright 2019 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file SendBuffersManager.cpp
17
 */
18
19
#include "SendBuffersManager.hpp"
20
21
#include "RTPSMessageGroup.hpp"
22
#include "../participant/RTPSParticipantImpl.hpp"
23
24
namespace eprosima {
25
namespace fastdds {
26
namespace rtps {
27
28
SendBuffersManager::SendBuffersManager(
29
        size_t reserved_size,
30
        bool allow_growing,
31
        ResourceLimitedContainerConfig network_buffers_config)
32
0
    : allow_growing_(allow_growing)
33
0
    , network_buffers_config_(network_buffers_config)
34
0
{
35
0
    pool_.reserve(reserved_size);
36
0
}
37
38
void SendBuffersManager::init(
39
        const RTPSParticipantImpl* participant)
40
0
{
41
0
    std::lock_guard<TimedMutex> guard(mutex_);
42
43
0
    if (n_created_ < pool_.capacity())
44
0
    {
45
0
        const GuidPrefix_t& guid_prefix = participant->getGuid().guidPrefix;
46
47
        // Single allocation for the data of all the buffers.
48
        // We align the payload size to the size of a pointer, so all buffers will
49
        // be aligned as if directly allocated.
50
0
        constexpr size_t align_size = sizeof(octet*) - 1;
51
0
        uint32_t payload_size = participant->getMaxMessageSize();
52
0
        assert(payload_size > 0u);
53
0
        payload_size = (payload_size + align_size) & ~align_size;
54
0
        size_t advance = payload_size;
55
#if HAVE_SECURITY
56
        bool secure = participant->is_secure();
57
        advance *= secure ? 3 : 2;
58
#else
59
0
        advance *= 2;
60
0
#endif // if HAVE_SECURITY
61
0
        size_t data_size = advance * (pool_.capacity() - n_created_);
62
0
        common_buffer_.assign(data_size, 0);
63
64
0
        octet* raw_buffer = common_buffer_.data();
65
0
        while (n_created_ < pool_.capacity())
66
0
        {
67
0
            pool_.emplace_back(new RTPSMessageGroup_t(
68
0
                        raw_buffer,
69
#if HAVE_SECURITY
70
                        secure,
71
#endif // if HAVE_SECURITY
72
0
                        payload_size, guid_prefix, network_buffers_config_
73
0
                        ));
74
0
            raw_buffer += advance;
75
0
            ++n_created_;
76
0
        }
77
0
    }
78
0
}
79
80
std::unique_ptr<RTPSMessageGroup_t> SendBuffersManager::get_buffer(
81
        const RTPSParticipantImpl* participant,
82
        const std::chrono::steady_clock::time_point& max_blocking_time)
83
0
{
84
#if HAVE_STRICT_REALTIME
85
    std::unique_lock<TimedMutex> lock(mutex_, std::defer_lock);
86
    if (!lock.try_lock_until(max_blocking_time))
87
    {
88
        throw RTPSMessageGroup::timeout();
89
    }
90
#else
91
0
    std::unique_lock<TimedMutex> lock(mutex_);
92
0
#endif // if HAVE_STRICT_REALTIME
93
94
0
    std::unique_ptr<RTPSMessageGroup_t> ret_val;
95
96
0
    while (pool_.empty())
97
0
    {
98
0
        if (allow_growing_ || n_created_ < pool_.capacity())
99
0
        {
100
0
            add_one_buffer(participant);
101
0
        }
102
0
        else
103
0
        {
104
0
            EPROSIMA_LOG_INFO(RTPS_PARTICIPANT, "Waiting for send buffer");
105
0
            if (std::cv_status::timeout == available_cv_.wait_until(lock, max_blocking_time))
106
0
            {
107
0
                throw RTPSMessageGroup::timeout();
108
0
            }
109
0
        }
110
0
    }
111
112
0
    ret_val = std::move(pool_.back());
113
0
    pool_.pop_back();
114
115
0
    return ret_val;
116
0
}
117
118
void SendBuffersManager::return_buffer(
119
        std::unique_ptr<RTPSMessageGroup_t>&& buffer)
120
0
{
121
0
    std::lock_guard<TimedMutex> guard(mutex_);
122
0
    pool_.push_back(std::move(buffer));
123
0
    available_cv_.notify_one();
124
0
}
125
126
void SendBuffersManager::add_one_buffer(
127
        const RTPSParticipantImpl* participant)
128
0
{
129
0
    RTPSMessageGroup_t* new_item = new RTPSMessageGroup_t(
130
#if HAVE_SECURITY
131
        participant->is_secure(),
132
#endif // if HAVE_SECURITY
133
0
        participant->getMaxMessageSize(), participant->getGuid().guidPrefix);
134
0
    pool_.emplace_back(new_item);
135
0
    ++n_created_;
136
0
}
137
138
} /* namespace rtps */
139
} /* namespace fastdds */
140
} /* namespace eprosima */