Coverage Report

Created: 2025-07-03 06:58

/src/Fast-DDS/src/cpp/rtps/history/TopicPayloadPool.hpp
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2020 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file TopicPayloadPool.hpp
17
 */
18
19
#ifndef RTPS_HISTORY_TOPICPAYLOADPOOL_HPP
20
#define RTPS_HISTORY_TOPICPAYLOADPOOL_HPP
21
22
#include <fastdds/rtps/attributes/ResourceManagement.hpp>
23
#include <fastdds/rtps/common/SerializedPayload.hpp>
24
#include <fastdds/rtps/history/IPayloadPool.hpp>
25
#include <fastdds/dds/log/Log.hpp>
26
#include <rtps/history/PoolConfig.h>
27
#include <rtps/history/ITopicPayloadPool.h>
28
29
#include <atomic>
30
#include <cstddef>
31
#include <memory>
32
#include <mutex>
33
#include <vector>
34
#include <cassert>
35
36
namespace eprosima {
37
namespace fastdds {
38
namespace rtps {
39
40
class TopicPayloadPool : public ITopicPayloadPool
41
{
42
43
public:
44
45
0
    TopicPayloadPool() = default;
46
47
    virtual ~TopicPayloadPool()
48
0
    {
49
0
        EPROSIMA_LOG_INFO(RTPS_UTILS, "PayloadPool destructor");
50
51
0
        for (PayloadNode* payload : all_payloads_)
52
0
        {
53
0
            delete payload;
54
0
        }
55
0
    }
56
57
    bool get_payload(
58
            uint32_t size,
59
            SerializedPayload_t& payload) override;
60
61
    bool get_payload(
62
            const SerializedPayload_t& data,
63
            SerializedPayload_t& payload) override;
64
65
    bool release_payload(
66
            SerializedPayload_t& payload) override;
67
68
    /**
69
     * @brief Ensures the pool has capacity to fullfill the requirements of a new history.
70
     *
71
     * @param [in]  config              The new history's pool requirements.
72
     * @param [in]  is_reader_history   True if the new history is for a reader. False otherwise.
73
     * @return Whether the operation was successful or not.
74
     *
75
     * @pre
76
     *   - Current pool is configured for the same memory policy as @c config.memory_policy.
77
     *
78
     * @post
79
     *   - If @c config.maximum_size is not zero
80
     *     - The maximum size of the pool is increased by @c config.maximum_size.
81
     *   - else
82
     *     - The maximum size of the pool is set to the largest representable value.
83
     *   - If the pool is configured for PREALLOCATED or PREALLOCATED WITH REALLOC memory policy:
84
     *     - The pool has at least as many elements allocated (including elements already in use)
85
     *       as the sum of the @c config.initial_size for all reserved writer histories
86
     *       plus the maximum of the @c config.initial_size for all reserved reader histories.
87
     */
88
    bool reserve_history(
89
            const PoolConfig& config,
90
            bool is_reader) override;
91
92
    /**
93
     * @brief Informs the pool that some history requirements are not longer active.
94
     *
95
     * The pool can release some resources that are not needed any longer.
96
     *
97
     * @param [in]  config              The old history's pool requirements, which are no longer active.
98
     * @param [in]  is_reader_history   True if the history was for a reader. False otherwise.
99
     * @return Whether the operation was successful or not.
100
     *
101
     * @pre
102
     *   - Current pool is configured for the same memory policy as @c config.memory_policy.
103
     *   - If all remaining histories were reserved with non zero @c config.maximum_size
104
     *      - The number of elements in use is less than
105
     *        the sum of the @c config.maximum_size for all remaining histories
106
     *
107
     * @post
108
     *   - If all remaining histories were reserved with non zero @c config.maximum_size
109
     *      - The maximum size of the pool is set to
110
     *        the sum of the @c config.maximum_size for all remaining histories
111
     *   - else
112
     *     - The maximum size of the pool remains the largest representable value.
113
     *   - If the number of allocated elements is greater than the new maximum size,
114
     *     the excess of elements are freed until the number of allocated elemets is equal to the new maximum.
115
     */
116
    bool release_history(
117
            const PoolConfig& config,
118
            bool is_reader) override;
119
120
    size_t payload_pool_allocated_size() const override
121
0
    {
122
0
        return all_payloads_.size();
123
0
    }
124
125
    size_t payload_pool_available_size() const override
126
0
    {
127
0
        return free_payloads_.size();
128
0
    }
129
130
    static std::unique_ptr<ITopicPayloadPool> get(
131
            const BasicPoolConfig& config);
132
133
protected:
134
135
    class PayloadNode
136
    {
137
    public:
138
139
        explicit PayloadNode(
140
                uint32_t size)
141
0
        {
142
0
            if (!size)
143
0
            {
144
                //! At least, we need this to allocate space for a NodeInfo.
145
                //! In order to be able to place-construct later
146
0
                buffer = (octet*)calloc(sizeof(NodeInfo), sizeof(octet));
147
0
            }
148
0
            else
149
0
            {
150
0
                buffer = (octet*)calloc(size + sizeof(NodeInfo) - 1, sizeof(octet));
151
0
            }
152
153
0
            if (buffer == nullptr)
154
0
            {
155
0
                throw std::bad_alloc();
156
0
            }
157
158
            // The atomic may need some initialization depending on the platform
159
0
            new (buffer) NodeInfo();
160
0
            data_size(size);
161
0
        }
162
163
        ~PayloadNode()
164
0
        {
165
0
            info().~NodeInfo();
166
0
            free(buffer);
167
0
        }
168
169
        bool resize (
170
                uint32_t size)
171
0
        {
172
0
            assert(size > data_size());
173
174
0
            octet* old_buffer = buffer;
175
0
            buffer = (octet*)realloc(buffer, size + data_offset);
176
0
            if (!buffer)
177
0
            {
178
0
                buffer = old_buffer;
179
0
                return false;
180
0
            }
181
0
            memset(buffer + data_offset + data_size(), 0, (size - data_size()) * sizeof(octet));
182
0
            data_size(size);
183
0
            return true;
184
0
        }
185
186
        uint32_t data_size() const
187
0
        {
188
0
            return info().data_size;
189
0
        }
190
191
        static uint32_t data_size(
192
                octet* data)
193
0
        {
194
0
            return info(data).data_size;
195
0
        }
196
197
        void data_size(
198
                uint32_t size)
199
0
        {
200
0
            info().data_size = size;
201
0
        }
202
203
        uint32_t data_index() const
204
0
        {
205
0
            return info().data_index;
206
0
        }
207
208
        static uint32_t data_index(
209
                octet* data)
210
0
        {
211
0
            return info(data).data_index;
212
0
        }
213
214
        void data_index(
215
                uint32_t index)
216
0
        {
217
0
            info().data_index = index;
218
0
        }
219
220
        octet* data() const
221
0
        {
222
0
            return info().data;
223
0
        }
224
225
        void reference()
226
0
        {
227
0
            info().ref_counter.fetch_add(1, std::memory_order_relaxed);
228
0
        }
229
230
        bool dereference()
231
0
        {
232
0
            return (info().ref_counter.fetch_sub(1, std::memory_order_acq_rel) == 1);
233
0
        }
234
235
        static void reference(
236
                octet* data)
237
0
        {
238
0
            info(data).ref_counter.fetch_add(1, std::memory_order_relaxed);
239
0
        }
240
241
        static bool dereference(
242
                octet* data)
243
0
        {
244
0
            return (info(data).ref_counter.fetch_sub(1, std::memory_order_acq_rel) == 1);
245
0
        }
246
247
    private:
248
249
        struct NodeInfo
250
        {
251
            std::atomic<uint32_t> ref_counter{ 0 };
252
            uint32_t data_size = 0;
253
            uint32_t data_index = 0;
254
            octet data[1];
255
        };
256
257
        octet* buffer = nullptr;
258
259
        // Payload data comes after the metadata
260
        static constexpr size_t data_offset = offsetof(NodeInfo, data);
261
262
        NodeInfo& info() const
263
0
        {
264
0
            return *reinterpret_cast<NodeInfo*>(buffer);
265
0
        }
266
267
        static NodeInfo& info(
268
                octet* data)
269
0
        {
270
0
            return *reinterpret_cast<NodeInfo*>(data - data_offset);
271
0
        }
272
273
    };
274
275
    /**
276
     * Adds a new payload in the pool, but does not add it to the list of free payloads
277
     *
278
     * @param [IN] size  Minimum size required for the payload data
279
     * @return The node representing the newly allocated payload.
280
     *
281
     * @post
282
     *   - @c payload_pool_allocated_size() increases by one
283
     *   - @c payload_pool_available_size() does not change
284
     */
285
    virtual PayloadNode* allocate(
286
            uint32_t size);
287
288
    PayloadNode* do_allocate(
289
            uint32_t size);
290
291
    virtual void update_maximum_size(
292
            const PoolConfig& config,
293
            bool is_reserve);
294
295
    /**
296
     * Ensures the pool has capacity for at least @c num_payloads elements.
297
     *
298
     * @param [IN] min_num_payloads Minimum number of payloads reserved in the pool
299
     * @param [IN] size             Size to allocate for the payloads that need to be added to the pool
300
     *
301
     * @pre
302
     *   - @c min_num_payloads <= @c max_pool_size_
303
     * @post
304
     *   - @c payload_pool_allocated_size() >= @c min_num_payloads
305
     */
306
    virtual void reserve (
307
            uint32_t min_num_payloads,
308
            uint32_t size);
309
310
    /**
311
     * Ensures the pool has capacity for at most @c num_payloads elements.
312
     *
313
     * @param [IN] max_num_payloads Maximum number of payloads reserved in the pool
314
     *
315
     * @return @c true on success, @c false otherwise
316
     *
317
     * @post
318
     *   - On success, payload_pool_allocated_size() <= max_num_payloads
319
     *   - On failure, memory for some payloads may have been released, but payload_pool_allocated_size() > min_num_payloads
320
     */
321
    bool shrink (
322
            uint32_t max_num_payloads);
323
324
    /**
325
     * @brief Get a serialized payload for a new sample.
326
     *
327
     * If the payload is recycled from the pool, @c resizable controls whether it can
328
     * be reallocated to accomodate larger sizes.
329
     * If @c resizable is false and there is at least one free payload in the pool, that payload will
330
     * be returned even though it may not reach the requested size.
331
     *
332
     * If @c resizable is true and the reallocation fails, the operation returns false and
333
     * the payload is returned to the pool.
334
     *
335
     * @param [in]     size          Number of bytes required for the serialized payload
336
     * @param [in,out] cache_change  Cache change to assign the payload to
337
     * @param [in]     resizable     Whether payloads recycled from the pool are resizable to accomodate larger sizes
338
     *
339
     * @returns whether the operation succeeded or not
340
     *
341
     * @post
342
     *   On success:
343
     *     @li Field @c cache_change.payload_owner equals this
344
     *     @li Field @c serializedPayload.data points to a buffer of at least @c size bytes
345
     *     @li Field @c serializedPayload.max_size is greater than or equal to @c size
346
     */
347
    virtual bool do_get_payload(
348
            uint32_t size,
349
            SerializedPayload_t& payload,
350
            bool resizeable);
351
352
    virtual MemoryManagementPolicy_t memory_policy() const = 0;
353
354
    uint32_t max_pool_size_             = 0;  //< Maximum size of the pool
355
    uint32_t infinite_histories_count_  = 0;  //< Number of infinite histories reserved
356
    uint32_t finite_max_pool_size_      = 0;  //< Maximum size of the pool if no infinite histories were reserved
357
358
    std::vector<PayloadNode*> free_payloads_; //< Payloads that are free
359
    std::vector<PayloadNode*> all_payloads_;  //< All payloads
360
361
    std::mutex mutex_;
362
363
};
364
365
366
}  // namespace rtps
367
}  // namespace fastdds
368
}  // namespace eprosima
369
370
#endif  // RTPS_HISTORY_TOPICPAYLOADPOOL_HPP