Coverage Report

Created: 2026-02-14 07:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/Fast-DDS/src/cpp/rtps/history/TopicPayloadPool.cpp
Line
Count
Source
1
// Copyright 2020 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file TopicPayloadPool.cpp
17
 */
18
19
#include <rtps/history/ITopicPayloadPool.h>
20
21
#include "./TopicPayloadPool.hpp"
22
#include "./TopicPayloadPool_impl/Preallocated.hpp"
23
#include "./TopicPayloadPool_impl/PreallocatedWithRealloc.hpp"
24
#include "./TopicPayloadPool_impl/Dynamic.hpp"
25
#include "./TopicPayloadPool_impl/DynamicReusable.hpp"
26
27
#include <memory>
28
29
namespace eprosima {
30
namespace fastdds {
31
namespace rtps {
32
33
bool TopicPayloadPool::get_payload(
34
        uint32_t size,
35
        SerializedPayload_t& payload)
36
0
{
37
0
    return do_get_payload(size, payload, false);
38
0
}
39
40
bool TopicPayloadPool::do_get_payload(
41
        uint32_t size,
42
        SerializedPayload_t& payload,
43
        bool resizeable)
44
0
{
45
0
    PayloadNode* payload_node = nullptr;
46
47
0
    std::unique_lock<std::mutex> lock(mutex_);
48
0
    if (free_payloads_.empty())
49
0
    {
50
0
        payload_node = allocate(size); //Allocates a single payload
51
0
        if (payload_node == nullptr)
52
0
        {
53
0
            lock.unlock();
54
0
            payload.data = nullptr;
55
0
            payload.max_size = 0;
56
0
            payload.payload_owner = nullptr;
57
0
            return false;
58
0
        }
59
0
    }
60
0
    else
61
0
    {
62
0
        payload_node = free_payloads_.back();
63
0
        free_payloads_.pop_back();
64
0
    }
65
66
    // Resize if needed
67
0
    if (resizeable && size > payload_node->data_size())
68
0
    {
69
0
        if (!payload_node->resize(size))
70
0
        {
71
            // Failed to resize, but we can still keep it for later.
72
0
            free_payloads_.push_back(payload_node);
73
0
            lock.unlock();
74
0
            EPROSIMA_LOG_ERROR(RTPS_HISTORY, "Failed to resize the payload");
75
76
0
            payload.data = nullptr;
77
0
            payload.max_size = 0;
78
0
            payload.payload_owner = nullptr;
79
0
            return false;
80
0
        }
81
0
    }
82
83
0
    lock.unlock();
84
0
    payload_node->reference();
85
0
    payload.data = payload_node->data();
86
0
    payload.max_size = payload_node->data_size();
87
0
    payload.payload_owner = this;
88
89
0
    return true;
90
0
}
91
92
bool TopicPayloadPool::get_payload(
93
        const SerializedPayload_t& data,
94
        SerializedPayload_t& payload)
95
0
{
96
0
    if (data.payload_owner == this)
97
0
    {
98
0
        PayloadNode::reference(data.data);
99
100
0
        payload.data = data.data;
101
0
        payload.length = data.length;
102
0
        payload.max_size = PayloadNode::data_size(data.data);
103
0
        payload.is_serialized_key = data.is_serialized_key;
104
0
        payload.payload_owner = this;
105
0
        return true;
106
0
    }
107
0
    else
108
0
    {
109
0
        if (get_payload(data.length, payload))
110
0
        {
111
0
            if (!payload.copy(&data, true))
112
0
            {
113
0
                release_payload(payload);
114
0
                return false;
115
0
            }
116
117
0
            return true;
118
0
        }
119
0
    }
120
121
0
    return false;
122
0
}
123
124
bool TopicPayloadPool::release_payload(
125
        SerializedPayload_t& payload)
126
0
{
127
0
    assert(payload.payload_owner == this);
128
129
0
    if (PayloadNode::dereference(payload.data))
130
0
    {
131
0
        std::lock_guard<std::mutex> lock(mutex_);
132
0
        PayloadNode* payload_node = all_payloads_.at(PayloadNode::data_index(payload.data));
133
0
        free_payloads_.push_back(payload_node);
134
0
    }
135
136
0
    payload.length = 0;
137
0
    payload.pos = 0;
138
0
    payload.max_size = 0;
139
0
    payload.data = nullptr;
140
0
    payload.is_serialized_key = false;
141
0
    payload.payload_owner = nullptr;
142
0
    return true;
143
0
}
144
145
bool TopicPayloadPool::reserve_history(
146
        const PoolConfig& config,
147
        bool /*is_reader*/)
148
0
{
149
0
    assert(config.memory_policy == memory_policy());
150
151
0
    std::lock_guard<std::mutex> lock(mutex_);
152
0
    update_maximum_size(config, true);
153
0
    return true;
154
0
}
155
156
bool TopicPayloadPool::release_history(
157
        const PoolConfig& config,
158
        bool /*is_reader*/)
159
0
{
160
0
    assert(config.memory_policy == memory_policy());
161
162
0
    std::lock_guard<std::mutex> lock(mutex_);
163
0
    update_maximum_size(config, false);
164
165
0
    return shrink(max_pool_size_);
166
0
}
167
168
TopicPayloadPool::PayloadNode* TopicPayloadPool::allocate(
169
        uint32_t size)
170
0
{
171
0
    if (all_payloads_.size() >= max_pool_size_)
172
0
    {
173
0
        EPROSIMA_LOG_WARNING(RTPS_HISTORY, "Maximum number of allowed reserved payloads reached");
174
0
        return nullptr;
175
0
    }
176
177
0
    return do_allocate(size);
178
0
}
179
180
TopicPayloadPool::PayloadNode* TopicPayloadPool::do_allocate(
181
        uint32_t size)
182
0
{
183
0
    PayloadNode* payload = new (std::nothrow) PayloadNode(size);
184
185
0
    if (payload != nullptr)
186
0
    {
187
0
        payload->data_index(static_cast<uint32_t>(all_payloads_.size()));
188
0
        all_payloads_.push_back(payload);
189
0
    }
190
0
    else
191
0
    {
192
0
        EPROSIMA_LOG_WARNING(RTPS_HISTORY, "Failure to create a new payload ");
193
0
    }
194
195
0
    return payload;
196
0
}
197
198
void TopicPayloadPool::update_maximum_size(
199
        const PoolConfig& config,
200
        bool is_reserve)
201
0
{
202
0
    if (is_reserve)
203
0
    {
204
0
        if (config.maximum_size == 0)
205
0
        {
206
0
            max_pool_size_ = (std::numeric_limits<uint32_t>::max)();
207
0
            ++infinite_histories_count_;
208
0
        }
209
0
        else
210
0
        {
211
0
            finite_max_pool_size_ += std::max(config.initial_size, config.maximum_size);
212
0
            if (infinite_histories_count_ == 0)
213
0
            {
214
0
                max_pool_size_ = finite_max_pool_size_;
215
0
            }
216
0
        }
217
0
    }
218
0
    else
219
0
    {
220
0
        if (config.maximum_size == 0)
221
0
        {
222
0
            --infinite_histories_count_;
223
0
        }
224
0
        else
225
0
        {
226
0
            finite_max_pool_size_ -= (config.initial_size > config.maximum_size ?
227
0
                    config.initial_size : config.maximum_size);
228
0
        }
229
0
        if (infinite_histories_count_ == 0)
230
0
        {
231
0
            max_pool_size_ = finite_max_pool_size_;
232
0
        }
233
0
    }
234
0
}
235
236
void TopicPayloadPool::reserve (
237
        uint32_t min_num_payloads,
238
        uint32_t size)
239
0
{
240
0
    assert (min_num_payloads <= max_pool_size_);
241
242
0
    for (size_t i = all_payloads_.size(); i < min_num_payloads; ++i)
243
0
    {
244
0
        PayloadNode* payload = do_allocate(size);
245
246
0
        if (payload != nullptr)
247
0
        {
248
0
            free_payloads_.push_back(payload);
249
0
        }
250
0
    }
251
0
}
252
253
bool TopicPayloadPool::shrink (
254
        uint32_t max_num_payloads)
255
0
{
256
0
    assert(payload_pool_allocated_size() - payload_pool_available_size() <= max_num_payloads);
257
258
0
    while (max_num_payloads < all_payloads_.size())
259
0
    {
260
0
        PayloadNode* payload = free_payloads_.back();
261
0
        free_payloads_.pop_back();
262
263
        // Find data in allPayloads, remove element, then delete it
264
0
        all_payloads_.at(payload->data_index()) = all_payloads_.back();
265
0
        all_payloads_.back()->data_index(payload->data_index());
266
0
        all_payloads_.pop_back();
267
0
        delete payload;
268
0
    }
269
270
0
    return true;
271
0
}
272
273
std::unique_ptr<ITopicPayloadPool> TopicPayloadPool::get(
274
        const BasicPoolConfig& config)
275
0
{
276
0
    if (config.payload_initial_size == 0u)
277
0
    {
278
0
        return nullptr;
279
0
    }
280
281
0
    ITopicPayloadPool* ret_val = nullptr;
282
283
0
    switch (config.memory_policy)
284
0
    {
285
0
        case PREALLOCATED_MEMORY_MODE:
286
0
            ret_val = new PreallocatedTopicPayloadPool(config.payload_initial_size);
287
0
            break;
288
0
        case PREALLOCATED_WITH_REALLOC_MEMORY_MODE:
289
0
            ret_val = new PreallocatedReallocTopicPayloadPool(config.payload_initial_size);
290
0
            break;
291
0
        case DYNAMIC_RESERVE_MEMORY_MODE:
292
0
            ret_val = new DynamicTopicPayloadPool();
293
0
            break;
294
0
        case DYNAMIC_REUSABLE_MEMORY_MODE:
295
0
            ret_val = new DynamicReusableTopicPayloadPool();
296
0
            break;
297
0
    }
298
299
0
    return std::unique_ptr<ITopicPayloadPool>(ret_val);
300
0
}
301
302
}  // namespace rtps
303
}  // namespace fastdds
304
}  // namespace eprosima