Coverage Report

Created: 2025-06-13 06:46

/src/Fast-DDS/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2020 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file DataSharingPayloadPool.hpp
17
 */
18
19
#ifndef FASTDDS_RTPS_DATASHARING__DATASHARINGPAYLOADPOOL_HPP
20
#define FASTDDS_RTPS_DATASHARING__DATASHARINGPAYLOADPOOL_HPP
21
22
#include <fastdds/rtps/common/CacheChange.hpp>
23
#include <fastdds/rtps/history/IPayloadPool.hpp>
24
#include <fastdds/dds/log/Log.hpp>
25
#include <rtps/history/PoolConfig.h>
26
#include <utils/shared_memory/SharedDir.hpp>
27
#include <utils/shared_memory/SharedMemSegment.hpp>
28
29
#include <memory>
30
31
namespace eprosima {
32
namespace fastdds {
33
namespace rtps {
34
35
class RTPSWriter;
36
37
class DataSharingPayloadPool : public IPayloadPool
38
{
39
40
protected:
41
42
    class PayloadNode;
43
44
public:
45
46
    using Segment = fastdds::rtps::SharedSegmentBase;
47
    using sharable_mutex = Segment::sharable_mutex;
48
    template <class M>
49
    using sharable_lock = Segment::sharable_lock<M>;
50
51
0
    DataSharingPayloadPool() = default;
52
53
0
    ~DataSharingPayloadPool() = default;
54
55
    virtual bool release_payload(
56
            SerializedPayload_t& payload) override;
57
58
    static std::shared_ptr<DataSharingPayloadPool> get_reader_pool(
59
            bool is_reader_volatile);
60
61
    static std::shared_ptr<DataSharingPayloadPool> get_writer_pool(
62
            const PoolConfig& config);
63
64
    static std::string get_default_directory()
65
0
    {
66
0
        std::string dir;
67
0
        fastdds::rtps::SharedDir::get_default_shared_dir(dir);
68
0
        return dir;
69
0
    }
70
71
    virtual bool init_shared_memory(
72
            const GUID_t& /*writer_guid*/,
73
            const std::string& /*shared_dir*/)
74
0
    {
75
        // Default implementation is NOP
76
        // will be overriden by children if needed
77
0
        return false;
78
0
    }
79
80
    virtual bool init_shared_memory(
81
            const RTPSWriter* /*writer*/,
82
            const std::string& /*shared_dir*/)
83
0
    {
84
        // Default implementation is NOP
85
        // will be overriden by children if needed
86
0
        return false;
87
0
    }
88
89
    constexpr static const char* domain_name()
90
0
    {
91
0
        return "fast_datasharing";
92
0
    }
93
94
    constexpr static const char* descriptor_chunk_name()
95
0
    {
96
0
        return "descriptor";
97
0
    }
98
99
    constexpr static const char* history_chunk_name()
100
0
    {
101
0
        return "history";
102
0
    }
103
104
    uint32_t history_size() const
105
0
    {
106
0
        return descriptor_->history_size;
107
0
    }
108
109
    /**
110
     * Advances an index to the history to the next position
111
     */
112
    void advance(
113
            uint64_t& index) const;
114
115
    /**
116
     * The index of the first valid position in the history
117
     */
118
    uint64_t begin() const;
119
120
    /**
121
     * The index of one past the last valid position in the history
122
     */
123
    uint64_t end() const;
124
125
    /**
126
     * Whether the history is empty or not
127
     */
128
    bool empty() const;
129
130
    const GUID_t& writer() const;
131
132
    uint32_t last_liveliness_sequence() const;
133
134
    static bool check_sequence_number(
135
            const octet* data,
136
            const SequenceNumber_t& sn);
137
138
    bool is_sample_valid(
139
            const CacheChange_t& change) const;
140
141
protected:
142
143
#pragma warning(push)
144
#pragma warning(disable:4324)
145
    class alignas (8) PayloadNode
146
    {
147
148
        struct PayloadNodeMetaData
149
        {
150
        public:
151
152
            PayloadNodeMetaData()
153
0
                : status(fastdds::rtps::ChangeKind_t::ALIVE)
154
0
                , has_been_removed(0)
155
0
                , data_length(0)
156
0
                , sequence_number(c_SequenceNumber_Unknown)
157
0
                , writer_GUID(c_Guid_Unknown)
158
0
                , instance_handle(c_InstanceHandle_Unknown)
159
0
            {
160
0
            }
161
162
            ~PayloadNodeMetaData() = default;
163
164
            // writer/instance status
165
            uint8_t status;
166
167
            // Has this payload been removed from the shared history?
168
            uint8_t has_been_removed;
169
170
            // Encapsulation of the data
171
            uint16_t encapsulation;
172
173
            // Actual data size of the payload. Must be less than the configured maximum
174
            uint32_t data_length;
175
176
            // Writer's timestamp
177
            Time_t source_timestamp;
178
179
            // Sequence number of the payload inside the writer
180
            std::atomic<SequenceNumber_t> sequence_number;
181
182
            // GUID of the writer that created the payload
183
            GUID_t writer_GUID;
184
185
            // Instance handel for the change
186
            InstanceHandle_t instance_handle;
187
188
            // Related sample identity for the change
189
            fastdds::rtps::SampleIdentity related_sample_identity;
190
191
            // Mutex for shared read / exclusive write access to the payload
192
            sharable_mutex mutex;
193
194
        };
195
196
    public:
197
198
        // Payload data comes after the metadata
199
        static constexpr size_t data_offset = sizeof(PayloadNodeMetaData);
200
201
202
0
        PayloadNode() = default;
203
204
        ~PayloadNode() = default;
205
206
        void reset()
207
0
        {
208
            // Reset the sequence number first, it signals the data is not valid anymore
209
0
            metadata_.sequence_number.store(c_SequenceNumber_Unknown, std::memory_order_relaxed);
210
0
            metadata_.status = fastdds::rtps::ChangeKind_t::ALIVE;
211
0
            metadata_.has_been_removed = 0;
212
0
            metadata_.data_length = 0;
213
0
            metadata_.writer_GUID = c_Guid_Unknown;
214
0
            metadata_.instance_handle = c_InstanceHandle_Unknown;
215
0
            metadata_.related_sample_identity = fastdds::rtps::SampleIdentity();
216
0
        }
217
218
        static const PayloadNode* get_from_data(
219
                const octet* data)
220
0
        {
221
0
            return reinterpret_cast<const PayloadNode*>(data - data_offset);
222
0
        }
223
224
        static PayloadNode* get_from_data(
225
                octet* data)
226
0
        {
227
0
            return reinterpret_cast<PayloadNode*>(data - data_offset);
228
0
        }
229
230
        octet* data()
231
0
        {
232
0
            return reinterpret_cast<octet*>(this) + data_offset;
233
0
        }
234
235
        uint32_t data_length() const
236
0
        {
237
0
            return metadata_.data_length;
238
0
        }
239
240
        void data_length(
241
                uint32_t length)
242
0
        {
243
0
            metadata_.data_length = length;
244
0
        }
245
246
        uint16_t encapsulation() const
247
0
        {
248
0
            return metadata_.encapsulation;
249
0
        }
250
251
        void encapsulation(
252
                uint16_t encapsulation)
253
0
        {
254
0
            metadata_.encapsulation = encapsulation;
255
0
        }
256
257
        GUID_t writer_GUID() const
258
0
        {
259
0
            return metadata_.writer_GUID;
260
0
        }
261
262
        void writer_GUID(
263
                const GUID_t& guid)
264
0
        {
265
0
            metadata_.writer_GUID = guid;
266
0
        }
267
268
        SequenceNumber_t sequence_number() const
269
0
        {
270
0
            SequenceNumber_t value = metadata_.sequence_number.load(std::memory_order_relaxed);
271
0
            return value;
272
0
        }
273
274
        void sequence_number(
275
                SequenceNumber_t sequence_number)
276
0
        {
277
0
            metadata_.sequence_number.store(sequence_number, std::memory_order_relaxed);
278
0
        }
279
280
        Time_t source_timestamp() const
281
0
        {
282
0
            return metadata_.source_timestamp;
283
0
        }
284
285
        void source_timestamp(
286
                Time_t timestamp)
287
0
        {
288
0
            metadata_.source_timestamp = timestamp;
289
0
        }
290
291
        InstanceHandle_t instance_handle() const
292
0
        {
293
0
            return metadata_.instance_handle;
294
0
        }
295
296
        void instance_handle(
297
                InstanceHandle_t handle)
298
0
        {
299
0
            metadata_.instance_handle = handle;
300
0
        }
301
302
        uint8_t status() const
303
0
        {
304
0
            return metadata_.status;
305
0
        }
306
307
        void status(
308
                uint8_t status)
309
0
        {
310
0
            metadata_.status = status;
311
0
        }
312
313
        bool has_been_removed() const
314
0
        {
315
0
            return metadata_.has_been_removed == 1;
316
0
        }
317
318
        void has_been_removed(
319
                bool removed)
320
0
        {
321
0
            metadata_.has_been_removed = removed ? 1 : 0;
322
0
        }
323
324
        fastdds::rtps::SampleIdentity related_sample_identity() const
325
0
        {
326
0
            return metadata_.related_sample_identity;
327
0
        }
328
329
        void related_sample_identity(
330
                fastdds::rtps::SampleIdentity identity)
331
0
        {
332
0
            metadata_.related_sample_identity = identity;
333
0
        }
334
335
    private:
336
337
        PayloadNodeMetaData metadata_;
338
339
    };
340
341
    struct alignas (8) PoolDescriptor
342
    {
343
        uint32_t history_size;          //< Number of payloads in the history
344
        uint64_t notified_begin;        //< The index of the oldest history entry already notified (ready to read)
345
        uint64_t notified_end;          //< The index of the history entry that will be notified next
346
        uint32_t liveliness_sequence;   //< The ID of the last liveliness assertion sent by the writer
347
    };
348
#pragma warning(pop)
349
350
    static std::string generate_segment_name(
351
            const std::string& shared_dir,
352
            const GUID_t& writer_guid)
353
0
    {
354
0
        std::stringstream ss;
355
0
        if (!shared_dir.empty())
356
0
        {
357
0
            ss << shared_dir << "/";
358
0
        }
359
360
0
        ss << DataSharingPayloadPool::domain_name() << "_" << writer_guid.guidPrefix << "_" << writer_guid.entityId;
361
0
        return ss.str();
362
0
    }
363
364
    static size_t node_size (
365
            size_t payload_size)
366
0
    {
367
0
        return (payload_size + PayloadNode::data_offset + alignof(PayloadNode) - 1)
368
0
               & ~(alignof(PayloadNode) - 1);
369
0
    }
370
371
    GUID_t segment_id_;         //< The ID of the segment
372
    std::string segment_name_;  //< Segment name
373
374
    std::unique_ptr<Segment> segment_;    //< Shared memory segment
375
376
    Segment::Offset* history_;      //< Offsets of the payloads that are currently in the writer's history
377
    PoolDescriptor* descriptor_;    //< Shared descriptor of the pool
378
379
};
380
381
382
}  // namespace rtps
383
}  // namespace fastdds
384
}  // namespace eprosima
385
386
#endif  // FASTDDS_RTPS_DATASHARING__DATASHARINGPAYLOADPOOL_HPP