Coverage Report

Created: 2026-04-01 06:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/Fast-DDS/include/fastdds/rtps/common/CacheChange.hpp
Line
Count
Source
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file CacheChange.hpp
17
 */
18
19
#ifndef FASTDDS_RTPS_COMMON__CACHECHANGE_HPP
20
#define FASTDDS_RTPS_COMMON__CACHECHANGE_HPP
21
22
#include <atomic>
23
#include <cassert>
24
#include <limits>
25
26
#include <fastdds/rtps/common/ChangeKind_t.hpp>
27
#include <fastdds/rtps/common/FragmentNumber.hpp>
28
#include <fastdds/rtps/common/InstanceHandle.hpp>
29
#include <fastdds/rtps/common/SerializedPayload.hpp>
30
#include <fastdds/rtps/common/Time_t.hpp>
31
#include <fastdds/rtps/common/Types.hpp>
32
#include <fastdds/rtps/common/VendorId_t.hpp>
33
#include <fastdds/rtps/common/WriteParams.hpp>
34
#include <fastdds/rtps/history/IPayloadPool.hpp>
35
36
namespace eprosima {
37
namespace fastdds {
38
namespace rtps {
39
40
struct CacheChange_t;
41
42
/*!
43
 * Specific information for a writer.
44
 */
45
struct CacheChangeWriterInfo_t
46
{
47
    //!Number of DATA / DATA_FRAG submessages sent to the transport (only used in Writers)
48
    size_t num_sent_submessages = 0;
49
    //! Used to link with previous node in a list. Used by FlowControllerImpl.
50
    //! Cannot be cached because there are several comparisons without locking.
51
    CacheChange_t* volatile previous = nullptr;
52
    //! Used to link with next node in a list. Used by FlowControllerImpl.
53
    //! Cannot be cached because there are several comparisons without locking.
54
    CacheChange_t* volatile next = nullptr;
55
    //! Used to know if the object is already in a list.
56
    std::atomic_bool is_linked {false};
57
    //! Last fragment number sent.
58
    FragmentNumber_t last_fragment_sent {0};
59
};
60
61
/*!
62
 * Specific information for a reader.
63
 */
64
struct CacheChangeReaderInfo_t
65
{
66
    //!Reception TimeStamp (only used in Readers)
67
    Time_t receptionTimestamp;
68
    //! Disposed generation of the instance when this entry was added to it
69
    int32_t disposed_generation_count;
70
    //! No-writers generation of the instance when this entry was added to it
71
    int32_t no_writers_generation_count;
72
    //! Ownership stregth of its writer when the sample was received.
73
    uint32_t writer_ownership_strength;
74
};
75
76
/**
77
 * Structure CacheChange_t, contains information on a specific CacheChange.
78
 * @ingroup COMMON_MODULE
79
 */
80
struct FASTDDS_EXPORTED_API CacheChange_t
81
{
82
    //!Kind of change, default value ALIVE.
83
    ChangeKind_t kind = ALIVE;
84
    //!GUID_t of the writer that generated this change.
85
    GUID_t writerGUID{};
86
    //!Handle of the data associated with this change.
87
    InstanceHandle_t instanceHandle{};
88
    //!SequenceNumber of the change
89
    SequenceNumber_t sequenceNumber{};
90
    //!Serialized Payload associated with the change.
91
    SerializedPayload_t serializedPayload{};
92
    //!CDR serialization of inlined QoS for this change.
93
    SerializedPayload_t inline_qos{};
94
    //!Indicates if the cache has been read (only used in READERS)
95
    bool isRead = false;
96
    //!Source TimeStamp
97
    Time_t sourceTimestamp{};
98
    //! Vendor Id of the writer that generated this change.
99
    fastdds::rtps::VendorId_t vendor_id = c_VendorId_Unknown;
100
101
    union
102
    {
103
        CacheChangeReaderInfo_t reader_info;
104
        CacheChangeWriterInfo_t writer_info;
105
    };
106
107
    WriteParams write_params{};
108
    bool is_untyped_ = true;
109
110
    /*!
111
     * @brief Default constructor.
112
     * Creates an empty CacheChange_t.
113
     */
114
    CacheChange_t()
115
0
        : writer_info()
116
0
    {
117
0
        inline_qos.encapsulation = DEFAULT_ENDIAN == LITTLEEND ? PL_CDR_LE : PL_CDR_BE;
118
0
    }
119
120
    CacheChange_t(
121
            const CacheChange_t&) = delete;
122
    const CacheChange_t& operator =(
123
            const CacheChange_t&) = delete;
124
125
    /**
126
     * Constructor with payload size
127
     * @param payload_size Serialized payload size
128
     * @param is_untyped Flag to mark the change as untyped.
129
     */
130
    CacheChange_t(
131
            uint32_t payload_size,
132
            bool is_untyped = false)
133
        : serializedPayload(payload_size)
134
        , is_untyped_(is_untyped)
135
0
    {
136
0
    }
137
138
    /*!
139
     * Copy a different change into this one. All the elements are copied, included the data, allocating new memory.
140
     * @param [in] ch_ptr Pointer to the change.
141
     * @return True if correct.
142
     */
143
    bool copy(
144
            const CacheChange_t* ch_ptr)
145
0
    {
146
0
        kind = ch_ptr->kind;
147
0
        writerGUID = ch_ptr->writerGUID;
148
0
        instanceHandle = ch_ptr->instanceHandle;
149
0
        sequenceNumber = ch_ptr->sequenceNumber;
150
0
        sourceTimestamp = ch_ptr->sourceTimestamp;
151
0
        reader_info.receptionTimestamp = ch_ptr->reader_info.receptionTimestamp;
152
0
        write_params = ch_ptr->write_params;
153
0
        isRead = ch_ptr->isRead;
154
0
        vendor_id = ch_ptr->vendor_id;
155
0
        fragment_size_ = ch_ptr->fragment_size_;
156
0
        fragment_count_ = ch_ptr->fragment_count_;
157
0
        first_missing_fragment_ = ch_ptr->first_missing_fragment_;
158
159
0
        return serializedPayload.copy(&ch_ptr->serializedPayload, !ch_ptr->is_untyped_);
160
0
    }
161
162
    /*!
163
     * Copy information form a different change into this one.
164
     * All the elements are copied except data.
165
     * @param [in] ch_ptr Pointer to the change.
166
     */
167
    void copy_not_memcpy(
168
            const CacheChange_t* ch_ptr)
169
0
    {
170
0
        kind = ch_ptr->kind;
171
0
        writerGUID = ch_ptr->writerGUID;
172
0
        instanceHandle = ch_ptr->instanceHandle;
173
0
        sequenceNumber = ch_ptr->sequenceNumber;
174
0
        sourceTimestamp = ch_ptr->sourceTimestamp;
175
0
        reader_info.receptionTimestamp = ch_ptr->reader_info.receptionTimestamp;
176
0
        write_params = ch_ptr->write_params;
177
0
        isRead = ch_ptr->isRead;
178
0
        vendor_id = ch_ptr->vendor_id;
179
180
        // Copy certain values from serializedPayload
181
0
        serializedPayload.encapsulation = ch_ptr->serializedPayload.encapsulation;
182
0
        serializedPayload.is_serialized_key = ch_ptr->serializedPayload.is_serialized_key;
183
184
        // Copy fragment size and calculate fragment count
185
0
        setFragmentSize(ch_ptr->fragment_size_, false);
186
0
    }
187
188
0
    virtual ~CacheChange_t() = default;
189
190
    /*!
191
     * Get the number of fragments this change is split into.
192
     * @return number of fragments.
193
     */
194
    uint32_t getFragmentCount() const
195
0
    {
196
0
        return fragment_count_;
197
0
    }
198
199
    /*!
200
     * Get the size of each fragment this change is split into.
201
     * @return size of fragment (0 means change is not fragmented).
202
     */
203
    uint16_t getFragmentSize() const
204
0
    {
205
0
        return fragment_size_;
206
0
    }
207
208
    /*!
209
     * Checks if all fragments have been received.
210
     * @return true when change is fully assembled (i.e. no missing fragments).
211
     */
212
    bool is_fully_assembled()
213
0
    {
214
0
        return first_missing_fragment_ >= fragment_count_;
215
0
    }
216
217
    /*! Checks if the first fragment is present.
218
     * @return true when it contains the first fragment. In other case, false.
219
     */
220
    bool contains_first_fragment()
221
0
    {
222
0
        return 0 < first_missing_fragment_;
223
0
    }
224
225
    /*!
226
     * Fills a FragmentNumberSet_t with the list of missing fragments.
227
     * @param [out] frag_sns FragmentNumberSet_t where result is stored.
228
     */
229
    void get_missing_fragments(
230
            FragmentNumberSet_t& frag_sns)
231
0
    {
232
        // Note: Fragment numbers are 1-based but we keep them 0 based.
233
0
        frag_sns.base(first_missing_fragment_ + 1);
234
235
        // Traverse list of missing fragments, adding them to frag_sns
236
0
        uint32_t current_frag = first_missing_fragment_;
237
0
        while (current_frag < fragment_count_)
238
0
        {
239
0
            frag_sns.add(current_frag + 1);
240
0
            current_frag = get_next_missing_fragment(current_frag);
241
0
        }
242
0
    }
243
244
    /*!
245
     * Set fragment size for this change.
246
     *
247
     * @param fragment_size Size of fragments.
248
     * @param create_fragment_list Whether to create missing fragments list or not.
249
     *
250
     * @remarks Parameter create_fragment_list should only be true when receiving the first
251
     *          fragment of a change.
252
     */
253
    void setFragmentSize(
254
            uint16_t fragment_size,
255
            bool create_fragment_list = false)
256
0
    {
257
0
        fragment_size_ = fragment_size;
258
0
        fragment_count_ = 0;
259
0
        first_missing_fragment_ = 0;
260
261
0
        if (fragment_size > 0)
262
0
        {
263
            // This follows RTPS 8.3.7.3.5
264
0
            fragment_count_ = (serializedPayload.length + fragment_size - 1) / fragment_size;
265
266
0
            if (create_fragment_list)
267
0
            {
268
                // Keep index of next fragment on the payload portion at the beginning of each fragment. Last
269
                // fragment will have fragment_count_ as 'next fragment index'
270
0
                size_t offset = 0;
271
0
                for (uint32_t i = 1; i <= fragment_count_; i++, offset += fragment_size_)
272
0
                {
273
0
                    set_next_missing_fragment(i - 1, i);  // index to next fragment in missing list
274
0
                }
275
0
            }
276
0
            else
277
0
            {
278
                // List not created. This means we are going to send this change fragmented, so it is already
279
                // assembled, and the missing list is empty (i.e. first missing points to fragment count)
280
0
                first_missing_fragment_ = fragment_count_;
281
0
            }
282
0
        }
283
0
    }
284
285
    bool add_fragments(
286
            const SerializedPayload_t& incoming_data,
287
            uint32_t fragment_starting_num,
288
            uint32_t fragments_in_submessage)
289
0
    {
290
0
        uint32_t original_offset = (fragment_starting_num - 1) * fragment_size_;
291
0
        uint32_t incoming_length = fragment_size_ * fragments_in_submessage;
292
0
        uint32_t last_fragment_index = fragment_starting_num + fragments_in_submessage - 1;
293
294
        // Validate payload types
295
0
        if (serializedPayload.is_serialized_key != incoming_data.is_serialized_key)
296
0
        {
297
0
            return false;
298
0
        }
299
300
        // Validate fragment indexes
301
0
        if (last_fragment_index > fragment_count_)
302
0
        {
303
0
            return false;
304
0
        }
305
306
        // Update incoming length for last fragment
307
0
        if (last_fragment_index == fragment_count_)
308
0
        {
309
0
            incoming_length = serializedPayload.length - original_offset;
310
0
        }
311
312
        // Validate lengths
313
0
        if (incoming_data.length < incoming_length)
314
0
        {
315
0
            return false;
316
0
        }
317
318
0
        if (original_offset + incoming_length > serializedPayload.length)
319
0
        {
320
0
            return false;
321
0
        }
322
323
0
        if (received_fragments(fragment_starting_num - 1, fragments_in_submessage))
324
0
        {
325
0
            memcpy(
326
0
                &serializedPayload.data[original_offset],
327
0
                incoming_data.data, incoming_length);
328
0
        }
329
330
0
        return is_fully_assembled();
331
0
    }
332
333
    /**
334
     * @brief Calculate the minimum required payload size to store a fragmented change.
335
     *
336
     * @param[in]  payload_size       Size of the full payload.
337
     * @param[in]  fragment_size      Size of each fragment.
338
     * @param[out] min_required_size  Minimum required size to store the fragmented payload.
339
     */
340
    static bool calculate_required_fragmented_payload_size(
341
            uint32_t payload_size,
342
            uint16_t fragment_size,
343
            uint32_t& min_required_size)
344
0
    {
345
0
        if ((0 == fragment_size) || (payload_size <= fragment_size))
346
0
        {
347
0
            min_required_size = payload_size;
348
0
            return true;
349
0
        }
350
351
        // In order to avoid overflow on the calculations, we limit the maximum payload size
352
0
        constexpr uint32_t MAX_PAYLOAD_SIZE = std::numeric_limits<uint32_t>::max() - 4u - 3u;
353
0
        if (payload_size > MAX_PAYLOAD_SIZE)
354
0
        {
355
0
            return false;
356
0
        }
357
358
        // Ensure fragment size is at least 4 bytes to store fragment index
359
0
        if (fragment_size < 4u)
360
0
        {
361
0
            return false;
362
0
        }
363
364
        // Calculate number of fragments without risk of overflow
365
0
        uint32_t fragment_count = payload_size / fragment_size;
366
0
        if (0 != (payload_size % fragment_size))
367
0
        {
368
0
            ++fragment_count;
369
0
        }
370
371
        // This cannot overflow as the result will always be <= payload_size
372
0
        uint32_t last_fragment_offset = (fragment_count - 1) * fragment_size;
373
374
        // Since we will write a fragment index at the beginning of each fragment,
375
        // we need to ensure there is space for it in the last fragment.
376
        // Note: we already imposed limits to ensure no overflow occurs.
377
0
        min_required_size = (last_fragment_offset + 3u) & ~3u; // Align last fragment size to 4 bytes
378
0
        min_required_size += 4u; // Add fragment index size
379
380
        // Ensure minimum size is at least payload size
381
0
        if (min_required_size < payload_size)
382
0
        {
383
0
            min_required_size = payload_size;
384
0
        }
385
386
0
        return true;
387
0
    }
388
389
private:
390
391
    // Fragment size
392
    uint16_t fragment_size_ = 0;
393
394
    // Number of fragments
395
    uint32_t fragment_count_ = 0;
396
397
    // First fragment in missing list
398
    uint32_t first_missing_fragment_ = 0;
399
400
    uint32_t get_next_missing_fragment(
401
            uint32_t fragment_index)
402
0
    {
403
0
        uint32_t* ptr = next_fragment_pointer(fragment_index);
404
0
        return *ptr;
405
0
    }
406
407
    void set_next_missing_fragment(
408
            uint32_t fragment_index,
409
            uint32_t next_fragment_index)
410
0
    {
411
0
        uint32_t* ptr = next_fragment_pointer(fragment_index);
412
0
        *ptr = next_fragment_index;
413
0
    }
414
415
    uint32_t* next_fragment_pointer(
416
            uint32_t fragment_index)
417
0
    {
418
0
        size_t offset = fragment_size_;
419
0
        offset *= fragment_index;
420
0
        offset = (offset + 3u) & ~3u;
421
0
        return reinterpret_cast<uint32_t*>(&serializedPayload.data[offset]);
422
0
    }
423
424
    /*!
425
     * Mark a set of consecutive fragments as received.
426
     * This will remove a set of consecutive fragments from the missing list.
427
     * Should be called BEFORE copying the received data into the serialized payload.
428
     *
429
     * @param initial_fragment Index (0-based) of first received fragment.
430
     * @param num_of_fragments Number of received fragments. Should be strictly positive.
431
     * @return true if the list of missing fragments was modified, false otherwise.
432
     */
433
    bool received_fragments(
434
            uint32_t initial_fragment,
435
            uint32_t num_of_fragments)
436
0
    {
437
0
        bool at_least_one_changed = false;
438
439
0
        if ((fragment_size_ > 0) && (initial_fragment < fragment_count_))
440
0
        {
441
0
            uint32_t last_fragment = initial_fragment + num_of_fragments;
442
0
            if (last_fragment > fragment_count_)
443
0
            {
444
0
                last_fragment = fragment_count_;
445
0
            }
446
447
0
            if (initial_fragment <= first_missing_fragment_)
448
0
            {
449
                // Perform first = *first until first >= last_received
450
0
                while (first_missing_fragment_ < last_fragment)
451
0
                {
452
0
                    first_missing_fragment_ = get_next_missing_fragment(first_missing_fragment_);
453
0
                    at_least_one_changed = true;
454
0
                }
455
0
            }
456
0
            else
457
0
            {
458
                // Find prev in missing list
459
0
                uint32_t current_frag = first_missing_fragment_;
460
0
                while (current_frag < initial_fragment)
461
0
                {
462
0
                    uint32_t next_frag = get_next_missing_fragment(current_frag);
463
0
                    if (next_frag >= initial_fragment)
464
0
                    {
465
                        // This is the fragment previous to initial_fragment.
466
                        // Find future value for next by repeating next = *next until next >= last_fragment.
467
0
                        uint32_t next_missing_fragment = next_frag;
468
0
                        while (next_missing_fragment < last_fragment)
469
0
                        {
470
0
                            next_missing_fragment = get_next_missing_fragment(next_missing_fragment);
471
0
                            at_least_one_changed = true;
472
0
                        }
473
474
                        // Update next and finish loop
475
0
                        if (at_least_one_changed)
476
0
                        {
477
0
                            set_next_missing_fragment(current_frag, next_missing_fragment);
478
0
                        }
479
0
                        break;
480
0
                    }
481
0
                    current_frag = next_frag;
482
0
                }
483
0
            }
484
0
        }
485
486
0
        return at_least_one_changed;
487
0
    }
488
489
};
490
491
} // namespace rtps
492
} // namespace fastdds
493
} // namespace eprosima
494
495
#endif // FASTDDS_RTPS_COMMON__CACHECHANGE_HPP