Coverage Report

Created: 2026-05-04 06:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/Fast-DDS/src/cpp/rtps/DataSharing/WriterPool.hpp
Line
Count
Source
1
// Copyright 2020 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file WriterPool.hpp
17
 */
18
19
#ifndef RTPS_DATASHARING_WRITERPOOL_HPP
20
#define RTPS_DATASHARING_WRITERPOOL_HPP
21
22
#include <fastdds/rtps/attributes/ResourceManagement.hpp>
23
#include <fastdds/rtps/common/CacheChange.hpp>
24
#include <fastdds/rtps/writer/RTPSWriter.hpp>
25
#include <fastdds/dds/log/Log.hpp>
26
#include <rtps/DataSharing/DataSharingPayloadPool.hpp>
27
#include <utils/collections/FixedSizeQueue.hpp>
28
29
#include <memory>
30
31
namespace eprosima {
32
namespace fastdds {
33
namespace rtps {
34
35
class WriterPool : public DataSharingPayloadPool
36
{
37
38
public:
39
40
    WriterPool(
41
            uint32_t pool_size,
42
            uint32_t payload_size)
43
0
        : max_data_size_(payload_size)
44
0
        , pool_size_(pool_size)
45
0
        , free_history_size_(0)
46
0
    {
47
0
    }
48
49
    ~WriterPool()
50
0
    {
51
0
        EPROSIMA_LOG_INFO(DATASHARING_PAYLOADPOOL, "DataSharingPayloadPool::WriterPool destructor");
52
53
        // We cannot destroy the objects in the SHM, as the Reader may still be using them.
54
        // We just remove the segment, and when the Reader closes it, it will be removed from the system.
55
0
        if (segment_)
56
0
        {
57
0
            segment_->remove();
58
0
        }
59
0
    }
60
61
    bool get_payload(
62
            uint32_t /*size*/,
63
            SerializedPayload_t& payload) override
64
0
    {
65
0
        if (free_payloads_.empty())
66
0
        {
67
0
            return false;
68
0
        }
69
70
0
        PayloadNode* payload_node = free_payloads_.front();
71
0
        free_payloads_.pop_front();
72
        // Reset all the metadata to signal the reader that the payload is dirty
73
0
        payload_node->reset();
74
75
0
        payload.data = payload_node->data();
76
0
        payload.max_size = max_data_size_;
77
0
        payload.payload_owner = this;
78
79
0
        return true;
80
0
    }
81
82
    bool get_payload(
83
            const SerializedPayload_t& data,
84
            SerializedPayload_t& payload) override
85
0
    {
86
0
        if (data.payload_owner == this)
87
0
        {
88
0
            payload.data = data.data;
89
0
            payload.length = data.length;
90
0
            payload.max_size = data.length;
91
0
            payload.is_serialized_key = data.is_serialized_key;
92
0
            payload.payload_owner = this;
93
0
            return true;
94
0
        }
95
0
        else
96
0
        {
97
0
            if (get_payload(data.length, payload))
98
0
            {
99
0
                if (!payload.copy(&data, true))
100
0
                {
101
0
                    release_payload(payload);
102
0
                    return false;
103
0
                }
104
105
0
                return true;
106
0
            }
107
0
        }
108
109
0
        return false;
110
0
    }
111
112
    bool release_payload(
113
            SerializedPayload_t& payload) override
114
0
    {
115
0
        assert(payload.payload_owner == this);
116
117
        // Payloads are reset on the `get` operation, the `release` leaves the data to give more chances to the reader
118
0
        PayloadNode* payload_node = PayloadNode::get_from_data(payload.data);
119
0
        if (payload_node->has_been_removed())
120
0
        {
121
0
            advance_till_first_non_removed();
122
0
        }
123
0
        else
124
0
        {
125
0
            free_payloads_.push_back(payload_node);
126
0
        }
127
0
        EPROSIMA_LOG_INFO(DATASHARING_PAYLOADPOOL, "Serialized payload released.");
128
129
0
        return DataSharingPayloadPool::release_payload(payload);
130
0
    }
131
132
    template<typename T>
133
    bool init_shared_segment(
134
            const RTPSWriter* writer,
135
            const std::string& shared_dir)
136
0
    {
137
0
        segment_id_ = writer->getGuid();
138
0
        segment_name_ = generate_segment_name(shared_dir, segment_id_);
139
0
        std::unique_ptr<T> local_segment;
140
0
        size_t payload_size;
141
0
        uint64_t estimated_size_for_payloads_pool;
142
0
        uint64_t estimated_size_for_history;
143
0
        uint32_t size_for_payloads_pool;
144
145
0
        try
146
0
        {
147
            // We need to reserve the whole segment at once, and the underlying classes use uint32_t as size type.
148
            // In order to avoid overflows, we will calculate using uint64 and check the casting
149
0
            bool overflow = false;
150
0
            size_t per_allocation_extra_size = T::compute_per_allocation_extra_size(
151
0
                alignof(PayloadNode), DataSharingPayloadPool::domain_name());
152
0
            payload_size = DataSharingPayloadPool::node_size(max_data_size_);
153
154
0
            estimated_size_for_payloads_pool = pool_size_ * payload_size;
155
0
            overflow |= (estimated_size_for_payloads_pool != static_cast<uint32_t>(estimated_size_for_payloads_pool));
156
0
            size_for_payloads_pool = static_cast<uint32_t>(estimated_size_for_payloads_pool);
157
158
            //Reserve one extra to avoid pointer overlapping
159
0
            estimated_size_for_history = (pool_size_ + 1) * sizeof(Segment::Offset);
160
0
            overflow |= (estimated_size_for_history != static_cast<uint32_t>(estimated_size_for_history));
161
0
            uint32_t size_for_history = static_cast<uint32_t>(estimated_size_for_history);
162
163
0
            uint32_t descriptor_size = static_cast<uint32_t>(sizeof(PoolDescriptor));
164
0
            uint64_t estimated_segment_size = size_for_payloads_pool + per_allocation_extra_size +
165
0
                    size_for_history + per_allocation_extra_size +
166
0
                    descriptor_size + per_allocation_extra_size;
167
0
            overflow |= (estimated_segment_size != static_cast<uint32_t>(estimated_segment_size));
168
0
            uint32_t segment_size = static_cast<uint32_t>(estimated_segment_size);
169
170
0
            if (overflow)
171
0
            {
172
0
                EPROSIMA_LOG_ERROR(DATASHARING_PAYLOADPOOL, "Failed to create segment " << segment_name_
173
0
                                                                                        <<
174
0
                        ": Segment size is too large: " << estimated_size_for_payloads_pool
175
0
                                                                                        << " (max is "
176
0
                                                                                        << (std::numeric_limits<uint32_t>
177
0
                        ::max)() << ")."
178
0
                                                                                        <<
179
0
                        " Please reduce the maximum size of the history");
180
0
                return false;
181
0
            }
182
183
            //Open the segment
184
0
            T::remove(segment_name_);
185
186
0
            local_segment.reset(
187
0
                new T(boost::interprocess::create_only,
188
0
                segment_name_,
189
0
                segment_size + T::EXTRA_SEGMENT_SIZE));
190
0
        }
191
0
        catch (const std::exception& e)
192
0
        {
193
0
            EPROSIMA_LOG_ERROR(DATASHARING_PAYLOADPOOL, "Failed to create segment " << segment_name_
194
0
                                                                                    << ": " << e.what());
195
0
            return false;
196
0
        }
197
198
0
        try
199
0
        {
200
            // Alloc the memory for the pool
201
            // Cannot use 'construct' because we need to reserve extra space for the data,
202
            // which is not considered in sizeof(PayloadNode).
203
0
            payloads_pool_ = static_cast<octet*>(local_segment->get().allocate(size_for_payloads_pool));
204
205
            // Initialize each node in the pool
206
0
            free_payloads_.init(pool_size_);
207
0
            octet* payload = payloads_pool_;
208
0
            for (uint32_t i = 0; i < pool_size_; ++i)
209
0
            {
210
0
                new (payload) PayloadNode();
211
212
                // All payloads are free
213
0
                free_payloads_.push_back(reinterpret_cast<PayloadNode*>(payload));
214
215
0
                payload += (ptrdiff_t)payload_size;
216
0
            }
217
218
            //Alloc the memory for the history
219
0
            history_ = local_segment->get().template construct<Segment::Offset>(history_chunk_name())[pool_size_ + 1]();
220
221
            //Alloc the memory for the descriptor
222
0
            descriptor_ = local_segment->get().template construct<PoolDescriptor>(descriptor_chunk_name())();
223
224
            // Initialize the data in the descriptor
225
0
            descriptor_->history_size = pool_size_ + 1;
226
0
            descriptor_->notified_begin = 0u;
227
0
            descriptor_->notified_end = 0u;
228
0
            descriptor_->liveliness_sequence = 0u;
229
230
0
            free_history_size_ = pool_size_;
231
0
        }
232
0
        catch (std::exception& e)
233
0
        {
234
0
            T::remove(segment_name_);
235
236
0
            EPROSIMA_LOG_ERROR(DATASHARING_PAYLOADPOOL, "Failed to initialize segment " << segment_name_
237
0
                                                                                        << ": " << e.what());
238
0
            return false;
239
0
        }
240
241
0
        segment_ = std::move(local_segment);
242
0
        is_initialized_ = true;
243
0
        return true;
244
0
    }
Unexecuted instantiation: bool eprosima::fastdds::rtps::WriterPool::init_shared_segment<eprosima::fastdds::rtps::SharedSegment<boost::interprocess::basic_managed_shared_memory<char, boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family, boost::interprocess::offset_ptr<void, unsigned int, unsigned long, 0ul>, 0ul>, boost::interprocess::iset_index>, boost::interprocess::shared_memory_object> >(eprosima::fastdds::rtps::RTPSWriter const*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)
Unexecuted instantiation: bool eprosima::fastdds::rtps::WriterPool::init_shared_segment<eprosima::fastdds::rtps::SharedSegment<boost::interprocess::basic_managed_mapped_file<char, boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family, boost::interprocess::offset_ptr<void, unsigned int, unsigned long, 0ul>, 0ul>, boost::interprocess::iset_index>, boost::interprocess::file_mapping> >(eprosima::fastdds::rtps::RTPSWriter const*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&)
245
246
    bool init_shared_memory(
247
            const RTPSWriter* writer,
248
            const std::string& shared_dir) override
249
0
    {
250
0
        if (shared_dir.empty())
251
0
        {
252
0
            return init_shared_segment<fastdds::rtps::SharedMemSegment>(writer, shared_dir);
253
0
        }
254
0
        else
255
0
        {
256
0
            return init_shared_segment<fastdds::rtps::SharedFileSegment>(writer, shared_dir);
257
0
        }
258
0
    }
259
260
    /**
261
     * Fills the metadata of the shared payload from the cache change information
262
     * and adds the payload's offset to the shared history
263
     */
264
    void add_to_shared_history(
265
            const CacheChange_t* cache_change)
266
0
    {
267
0
        assert(cache_change);
268
0
        assert(cache_change->serializedPayload.data);
269
0
        assert(cache_change->serializedPayload.payload_owner == this);
270
0
        assert(free_history_size_ > 0);
271
272
        // Fill the payload metadata with the change info
273
0
        PayloadNode* node = PayloadNode::get_from_data(cache_change->serializedPayload.data);
274
0
        node->status(ALIVE);
275
0
        node->data_length(cache_change->serializedPayload.length);
276
0
        node->source_timestamp(cache_change->sourceTimestamp);
277
0
        node->writer_GUID(cache_change->writerGUID);
278
0
        node->instance_handle(cache_change->instanceHandle);
279
0
        if (cache_change->write_params.related_sample_identity() != SampleIdentity::unknown())
280
0
        {
281
0
            node->related_sample_identity(cache_change->write_params.related_sample_identity());
282
0
        }
283
284
        // Set the sequence number last, it signals the data is ready
285
0
        node->sequence_number(cache_change->sequenceNumber);
286
287
        // Add it to the history
288
0
        history_[static_cast<uint32_t>(descriptor_->notified_end)] = segment_->get_offset_from_address(node);
289
0
        EPROSIMA_LOG_INFO(DATASHARING_PAYLOADPOOL, "Change added to shared history"
290
0
                << " with SN " << cache_change->sequenceNumber);
291
0
        advance(descriptor_->notified_end);
292
0
        --free_history_size_;
293
0
    }
294
295
    /**
296
     * Removes the payload's offset from the shared history
297
     *
298
     * Payloads don't need to be removed from the history in the same order
299
     * they where added, but a payload will not be available through @ref get_payload until all
300
     * payloads preceding it have been removed from the shared history.
301
     */
302
    void remove_from_shared_history(
303
            const CacheChange_t* cache_change)
304
0
    {
305
0
        assert(cache_change);
306
0
        assert(cache_change->serializedPayload.data);
307
0
        assert(cache_change->serializedPayload.payload_owner == this);
308
0
        assert(descriptor_->notified_end != descriptor_->notified_begin);
309
0
        assert(free_history_size_ < descriptor_->history_size);
310
311
0
        EPROSIMA_LOG_INFO(DATASHARING_PAYLOADPOOL, "Change removed from shared history"
312
0
                << " with SN " << cache_change->sequenceNumber);
313
314
0
        PayloadNode* payload = PayloadNode::get_from_data(cache_change->serializedPayload.data);
315
0
        payload->has_been_removed(true);
316
0
    }
317
318
    void advance_till_first_non_removed()
319
0
    {
320
0
        while (descriptor_->notified_begin != descriptor_->notified_end)
321
0
        {
322
0
            auto offset = history_[static_cast<uint32_t>(descriptor_->notified_begin)];
323
0
            auto payload = static_cast<PayloadNode*>(segment_->get_address_from_offset(offset));
324
0
            if (!payload->has_been_removed())
325
0
            {
326
0
                break;
327
0
            }
328
329
0
            payload->has_been_removed(false);
330
0
            free_payloads_.push_back(payload);
331
0
            advance(descriptor_->notified_begin);
332
0
            ++free_history_size_;
333
0
        }
334
0
    }
335
336
    void assert_liveliness()
337
0
    {
338
0
        ++descriptor_->liveliness_sequence;
339
0
    }
340
341
    bool is_initialized() const
342
0
    {
343
0
        return is_initialized_;
344
0
    }
345
346
private:
347
348
    using DataSharingPayloadPool::init_shared_memory;
349
350
    octet* payloads_pool_;          //< Shared pool of payloads
351
352
    uint32_t max_data_size_;        //< Maximum size of the serialized payload data
353
    uint32_t pool_size_;            //< Number of payloads in the pool
354
    uint32_t free_history_size_;    //< Number of elements currently unused in the shared history
355
356
    FixedSizeQueue<PayloadNode*> free_payloads_;    //< Pointers to the free payloads in the pool
357
358
    bool is_initialized_ = false;   //< Whether the pool has been initialized on shared memory
359
360
};
361
362
363
}  // namespace rtps
364
}  // namespace fastdds
365
}  // namespace eprosima
366
367
#endif  // RTPS_DATASHARING_WRITERPOOL_HPP