Coverage Report

Created: 2025-06-13 06:46

/src/Fast-DDS/include/fastdds/rtps/common/CacheChange.hpp
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file CacheChange.hpp
17
 */
18
19
#ifndef FASTDDS_RTPS_COMMON__CACHECHANGE_HPP
20
#define FASTDDS_RTPS_COMMON__CACHECHANGE_HPP
21
22
#include <atomic>
23
#include <cassert>
24
25
#include <fastdds/rtps/common/ChangeKind_t.hpp>
26
#include <fastdds/rtps/common/FragmentNumber.hpp>
27
#include <fastdds/rtps/common/InstanceHandle.hpp>
28
#include <fastdds/rtps/common/SerializedPayload.hpp>
29
#include <fastdds/rtps/common/Time_t.hpp>
30
#include <fastdds/rtps/common/Types.hpp>
31
#include <fastdds/rtps/common/VendorId_t.hpp>
32
#include <fastdds/rtps/common/WriteParams.hpp>
33
#include <fastdds/rtps/history/IPayloadPool.hpp>
34
35
namespace eprosima {
36
namespace fastdds {
37
namespace rtps {
38
39
struct CacheChange_t;
40
41
/*!
42
 * Specific information for a writer.
43
 */
44
struct CacheChangeWriterInfo_t
45
{
46
    //!Number of DATA / DATA_FRAG submessages sent to the transport (only used in Writers)
47
    size_t num_sent_submessages = 0;
48
    //! Used to link with previous node in a list. Used by FlowControllerImpl.
49
    //! Cannot be cached because there are several comparisons without locking.
50
    CacheChange_t* volatile previous = nullptr;
51
    //! Used to link with next node in a list. Used by FlowControllerImpl.
52
    //! Cannot be cached because there are several comparisons without locking.
53
    CacheChange_t* volatile next = nullptr;
54
    //! Used to know if the object is already in a list.
55
    std::atomic_bool is_linked {false};
56
};
57
58
/*!
59
 * Specific information for a reader.
60
 */
61
struct CacheChangeReaderInfo_t
62
{
63
    //!Reception TimeStamp (only used in Readers)
64
    Time_t receptionTimestamp;
65
    //! Disposed generation of the instance when this entry was added to it
66
    int32_t disposed_generation_count;
67
    //! No-writers generation of the instance when this entry was added to it
68
    int32_t no_writers_generation_count;
69
    //! Ownership stregth of its writer when the sample was received.
70
    uint32_t writer_ownership_strength;
71
};
72
73
/**
74
 * Structure CacheChange_t, contains information on a specific CacheChange.
75
 * @ingroup COMMON_MODULE
76
 */
77
struct FASTDDS_EXPORTED_API CacheChange_t
78
{
79
    //!Kind of change, default value ALIVE.
80
    ChangeKind_t kind = ALIVE;
81
    //!GUID_t of the writer that generated this change.
82
    GUID_t writerGUID{};
83
    //!Handle of the data associated with this change.
84
    InstanceHandle_t instanceHandle{};
85
    //!SequenceNumber of the change
86
    SequenceNumber_t sequenceNumber{};
87
    //!Serialized Payload associated with the change.
88
    SerializedPayload_t serializedPayload{};
89
    //!CDR serialization of inlined QoS for this change.
90
    SerializedPayload_t inline_qos{};
91
    //!Indicates if the cache has been read (only used in READERS)
92
    bool isRead = false;
93
    //!Source TimeStamp
94
    Time_t sourceTimestamp{};
95
    //! Vendor Id of the writer that generated this change.
96
    fastdds::rtps::VendorId_t vendor_id = c_VendorId_Unknown;
97
98
    union
99
    {
100
        CacheChangeReaderInfo_t reader_info;
101
        CacheChangeWriterInfo_t writer_info;
102
    };
103
104
    WriteParams write_params{};
105
    bool is_untyped_ = true;
106
107
    /*!
108
     * @brief Default constructor.
109
     * Creates an empty CacheChange_t.
110
     */
111
    CacheChange_t()
112
0
        : writer_info()
113
0
    {
114
0
        inline_qos.encapsulation = DEFAULT_ENDIAN == LITTLEEND ? PL_CDR_LE : PL_CDR_BE;
115
0
    }
116
117
    CacheChange_t(
118
            const CacheChange_t&) = delete;
119
    const CacheChange_t& operator =(
120
            const CacheChange_t&) = delete;
121
122
    /**
123
     * Constructor with payload size
124
     * @param payload_size Serialized payload size
125
     * @param is_untyped Flag to mark the change as untyped.
126
     */
127
    CacheChange_t(
128
            uint32_t payload_size,
129
            bool is_untyped = false)
130
        : serializedPayload(payload_size)
131
        , is_untyped_(is_untyped)
132
0
    {
133
0
    }
134
135
    /*!
136
     * Copy a different change into this one. All the elements are copied, included the data, allocating new memory.
137
     * @param [in] ch_ptr Pointer to the change.
138
     * @return True if correct.
139
     */
140
    bool copy(
141
            const CacheChange_t* ch_ptr)
142
0
    {
143
0
        kind = ch_ptr->kind;
144
0
        writerGUID = ch_ptr->writerGUID;
145
0
        instanceHandle = ch_ptr->instanceHandle;
146
0
        sequenceNumber = ch_ptr->sequenceNumber;
147
0
        sourceTimestamp = ch_ptr->sourceTimestamp;
148
0
        reader_info.receptionTimestamp = ch_ptr->reader_info.receptionTimestamp;
149
0
        write_params = ch_ptr->write_params;
150
0
        isRead = ch_ptr->isRead;
151
0
        vendor_id = ch_ptr->vendor_id;
152
0
        fragment_size_ = ch_ptr->fragment_size_;
153
0
        fragment_count_ = ch_ptr->fragment_count_;
154
0
        first_missing_fragment_ = ch_ptr->first_missing_fragment_;
155
156
0
        return serializedPayload.copy(&ch_ptr->serializedPayload, !ch_ptr->is_untyped_);
157
0
    }
158
159
    /*!
160
     * Copy information form a different change into this one.
161
     * All the elements are copied except data.
162
     * @param [in] ch_ptr Pointer to the change.
163
     */
164
    void copy_not_memcpy(
165
            const CacheChange_t* ch_ptr)
166
0
    {
167
0
        kind = ch_ptr->kind;
168
0
        writerGUID = ch_ptr->writerGUID;
169
0
        instanceHandle = ch_ptr->instanceHandle;
170
0
        sequenceNumber = ch_ptr->sequenceNumber;
171
0
        sourceTimestamp = ch_ptr->sourceTimestamp;
172
0
        reader_info.receptionTimestamp = ch_ptr->reader_info.receptionTimestamp;
173
0
        write_params = ch_ptr->write_params;
174
0
        isRead = ch_ptr->isRead;
175
0
        vendor_id = ch_ptr->vendor_id;
176
177
        // Copy certain values from serializedPayload
178
0
        serializedPayload.encapsulation = ch_ptr->serializedPayload.encapsulation;
179
180
        // Copy fragment size and calculate fragment count
181
0
        setFragmentSize(ch_ptr->fragment_size_, false);
182
0
    }
183
184
0
    virtual ~CacheChange_t() = default;
185
186
    /*!
187
     * Get the number of fragments this change is split into.
188
     * @return number of fragments.
189
     */
190
    uint32_t getFragmentCount() const
191
0
    {
192
0
        return fragment_count_;
193
0
    }
194
195
    /*!
196
     * Get the size of each fragment this change is split into.
197
     * @return size of fragment (0 means change is not fragmented).
198
     */
199
    uint16_t getFragmentSize() const
200
0
    {
201
0
        return fragment_size_;
202
0
    }
203
204
    /*!
205
     * Checks if all fragments have been received.
206
     * @return true when change is fully assembled (i.e. no missing fragments).
207
     */
208
    bool is_fully_assembled()
209
0
    {
210
0
        return first_missing_fragment_ >= fragment_count_;
211
0
    }
212
213
    /*! Checks if the first fragment is present.
214
     * @return true when it contains the first fragment. In other case, false.
215
     */
216
    bool contains_first_fragment()
217
0
    {
218
0
        return 0 < first_missing_fragment_;
219
0
    }
220
221
    /*!
222
     * Fills a FragmentNumberSet_t with the list of missing fragments.
223
     * @param [out] frag_sns FragmentNumberSet_t where result is stored.
224
     */
225
    void get_missing_fragments(
226
            FragmentNumberSet_t& frag_sns)
227
0
    {
228
        // Note: Fragment numbers are 1-based but we keep them 0 based.
229
0
        frag_sns.base(first_missing_fragment_ + 1);
230
231
        // Traverse list of missing fragments, adding them to frag_sns
232
0
        uint32_t current_frag = first_missing_fragment_;
233
0
        while (current_frag < fragment_count_)
234
0
        {
235
0
            frag_sns.add(current_frag + 1);
236
0
            current_frag = get_next_missing_fragment(current_frag);
237
0
        }
238
0
    }
239
240
    /*!
241
     * Set fragment size for this change.
242
     *
243
     * @param fragment_size Size of fragments.
244
     * @param create_fragment_list Whether to create missing fragments list or not.
245
     *
246
     * @remarks Parameter create_fragment_list should only be true when receiving the first
247
     *          fragment of a change.
248
     */
249
    void setFragmentSize(
250
            uint16_t fragment_size,
251
            bool create_fragment_list = false)
252
0
    {
253
0
        fragment_size_ = fragment_size;
254
0
        fragment_count_ = 0;
255
0
        first_missing_fragment_ = 0;
256
257
0
        if (fragment_size > 0)
258
0
        {
259
            // This follows RTPS 8.3.7.3.5
260
0
            fragment_count_ = (serializedPayload.length + fragment_size - 1) / fragment_size;
261
262
0
            if (create_fragment_list)
263
0
            {
264
                // Keep index of next fragment on the payload portion at the beginning of each fragment. Last
265
                // fragment will have fragment_count_ as 'next fragment index'
266
0
                size_t offset = 0;
267
0
                for (uint32_t i = 1; i <= fragment_count_; i++, offset += fragment_size_)
268
0
                {
269
0
                    set_next_missing_fragment(i - 1, i);  // index to next fragment in missing list
270
0
                }
271
0
            }
272
0
            else
273
0
            {
274
                // List not created. This means we are going to send this change fragmented, so it is already
275
                // assembled, and the missing list is empty (i.e. first missing points to fragment count)
276
0
                first_missing_fragment_ = fragment_count_;
277
0
            }
278
0
        }
279
0
    }
280
281
    bool add_fragments(
282
            const SerializedPayload_t& incoming_data,
283
            uint32_t fragment_starting_num,
284
            uint32_t fragments_in_submessage)
285
0
    {
286
0
        uint32_t original_offset = (fragment_starting_num - 1) * fragment_size_;
287
0
        uint32_t incoming_length = fragment_size_ * fragments_in_submessage;
288
0
        uint32_t last_fragment_index = fragment_starting_num + fragments_in_submessage - 1;
289
290
        // Validate fragment indexes
291
0
        if (last_fragment_index > fragment_count_)
292
0
        {
293
0
            return false;
294
0
        }
295
296
        // validate lengths
297
0
        if (last_fragment_index < fragment_count_)
298
0
        {
299
0
            if (incoming_data.length < incoming_length)
300
0
            {
301
0
                return false;
302
0
            }
303
0
        }
304
0
        else
305
0
        {
306
0
            incoming_length = serializedPayload.length - original_offset;
307
0
        }
308
309
0
        if (original_offset + incoming_length > serializedPayload.length)
310
0
        {
311
0
            return false;
312
0
        }
313
314
0
        if (received_fragments(fragment_starting_num - 1, fragments_in_submessage))
315
0
        {
316
0
            memcpy(
317
0
                &serializedPayload.data[original_offset],
318
0
                incoming_data.data, incoming_length);
319
0
        }
320
321
0
        return is_fully_assembled();
322
0
    }
323
324
private:
325
326
    // Fragment size
327
    uint16_t fragment_size_ = 0;
328
329
    // Number of fragments
330
    uint32_t fragment_count_ = 0;
331
332
    // First fragment in missing list
333
    uint32_t first_missing_fragment_ = 0;
334
335
    uint32_t get_next_missing_fragment(
336
            uint32_t fragment_index)
337
0
    {
338
0
        uint32_t* ptr = next_fragment_pointer(fragment_index);
339
0
        return *ptr;
340
0
    }
341
342
    void set_next_missing_fragment(
343
            uint32_t fragment_index,
344
            uint32_t next_fragment_index)
345
0
    {
346
0
        uint32_t* ptr = next_fragment_pointer(fragment_index);
347
0
        *ptr = next_fragment_index;
348
0
    }
349
350
    uint32_t* next_fragment_pointer(
351
            uint32_t fragment_index)
352
0
    {
353
0
        size_t offset = fragment_size_;
354
0
        offset *= fragment_index;
355
0
        offset = (offset + 3u) & ~3u;
356
0
        return reinterpret_cast<uint32_t*>(&serializedPayload.data[offset]);
357
0
    }
358
359
    /*!
360
     * Mark a set of consecutive fragments as received.
361
     * This will remove a set of consecutive fragments from the missing list.
362
     * Should be called BEFORE copying the received data into the serialized payload.
363
     *
364
     * @param initial_fragment Index (0-based) of first received fragment.
365
     * @param num_of_fragments Number of received fragments. Should be strictly positive.
366
     * @return true if the list of missing fragments was modified, false otherwise.
367
     */
368
    bool received_fragments(
369
            uint32_t initial_fragment,
370
            uint32_t num_of_fragments)
371
0
    {
372
0
        bool at_least_one_changed = false;
373
374
0
        if ((fragment_size_ > 0) && (initial_fragment < fragment_count_))
375
0
        {
376
0
            uint32_t last_fragment = initial_fragment + num_of_fragments;
377
0
            if (last_fragment > fragment_count_)
378
0
            {
379
0
                last_fragment = fragment_count_;
380
0
            }
381
382
0
            if (initial_fragment <= first_missing_fragment_)
383
0
            {
384
                // Perform first = *first until first >= last_received
385
0
                while (first_missing_fragment_ < last_fragment)
386
0
                {
387
0
                    first_missing_fragment_ = get_next_missing_fragment(first_missing_fragment_);
388
0
                    at_least_one_changed = true;
389
0
                }
390
0
            }
391
0
            else
392
0
            {
393
                // Find prev in missing list
394
0
                uint32_t current_frag = first_missing_fragment_;
395
0
                while (current_frag < initial_fragment)
396
0
                {
397
0
                    uint32_t next_frag = get_next_missing_fragment(current_frag);
398
0
                    if (next_frag >= initial_fragment)
399
0
                    {
400
                        // This is the fragment previous to initial_fragment.
401
                        // Find future value for next by repeating next = *next until next >= last_fragment.
402
0
                        uint32_t next_missing_fragment = next_frag;
403
0
                        while (next_missing_fragment < last_fragment)
404
0
                        {
405
0
                            next_missing_fragment = get_next_missing_fragment(next_missing_fragment);
406
0
                            at_least_one_changed = true;
407
0
                        }
408
409
                        // Update next and finish loop
410
0
                        if (at_least_one_changed)
411
0
                        {
412
0
                            set_next_missing_fragment(current_frag, next_missing_fragment);
413
0
                        }
414
0
                        break;
415
0
                    }
416
0
                    current_frag = next_frag;
417
0
                }
418
0
            }
419
0
        }
420
421
0
        return at_least_one_changed;
422
0
    }
423
424
};
425
426
} // namespace rtps
427
} // namespace fastdds
428
} // namespace eprosima
429
430
#endif // FASTDDS_RTPS_COMMON__CACHECHANGE_HPP