Coverage Report

Created: 2022-08-24 06:19

/src/Fast-DDS/include/fastdds/rtps/common/CacheChange.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file CacheChange.h
17
 */
18
19
#ifndef _FASTDDS_RTPS_CACHECHANGE_H_
20
#define _FASTDDS_RTPS_CACHECHANGE_H_
21
22
#include <cassert>
23
#include <atomic>
24
25
#include <fastdds/rtps/common/ChangeKind_t.hpp>
26
#include <fastdds/rtps/common/FragmentNumber.h>
27
#include <fastdds/rtps/common/InstanceHandle.h>
28
#include <fastdds/rtps/common/SerializedPayload.h>
29
#include <fastdds/rtps/common/Time_t.h>
30
#include <fastdds/rtps/common/Types.h>
31
#include <fastdds/rtps/common/WriteParams.h>
32
33
#include <fastdds/rtps/history/IPayloadPool.h>
34
35
namespace eprosima {
36
namespace fastrtps {
37
namespace rtps {
38
39
/*!
40
 * Specific information for a writer.
41
 */
42
struct CacheChangeWriterInfo_t
43
{
44
    //!Number of DATA / DATA_FRAG submessages sent to the transport (only used in Writers)
45
    size_t num_sent_submessages = 0;
46
    //! Used to link with previous node in a list. Used by FlowControllerImpl.
47
    //! Cannot be cached because there are several comparisons without locking.
48
    CacheChange_t* volatile previous = nullptr;
49
    //! Used to link with next node in a list. Used by FlowControllerImpl.
50
    //! Cannot be cached because there are several comparisons without locking.
51
    CacheChange_t* volatile next = nullptr;
52
    //! Used to know if the object is already in a list.
53
    std::atomic_bool is_linked {false};
54
};
55
56
/*!
57
 * Specific information for a reader.
58
 */
59
struct CacheChangeReaderInfo_t
60
{
61
    //!Reception TimeStamp (only used in Readers)
62
    Time_t receptionTimestamp;
63
    //! Disposed generation of the instance when this entry was added to it
64
    int32_t disposed_generation_count;
65
    //! No-writers generation of the instance when this entry was added to it
66
    int32_t no_writers_generation_count;
67
};
68
69
/**
70
 * Structure CacheChange_t, contains information on a specific CacheChange.
71
 * @ingroup COMMON_MODULE
72
 */
73
struct RTPS_DllAPI CacheChange_t
74
{
75
    //!Kind of change, default value ALIVE.
76
    ChangeKind_t kind = ALIVE;
77
    //!GUID_t of the writer that generated this change.
78
    GUID_t writerGUID{};
79
    //!Handle of the data associated with this change.
80
    InstanceHandle_t instanceHandle{};
81
    //!SequenceNumber of the change
82
    SequenceNumber_t sequenceNumber{};
83
    //!Serialized Payload associated with the change.
84
    SerializedPayload_t serializedPayload{};
85
    //!CDR serialization of inlined QoS for this change.
86
    SerializedPayload_t inline_qos{};
87
    //!Indicates if the cache has been read (only used in READERS)
88
    bool isRead = false;
89
    //!Source TimeStamp
90
    Time_t sourceTimestamp{};
91
    union
92
    {
93
        CacheChangeReaderInfo_t reader_info;
94
        CacheChangeWriterInfo_t writer_info;
95
    };
96
97
    WriteParams write_params{};
98
    bool is_untyped_ = true;
99
100
    /*!
101
     * @brief Default constructor.
102
     * Creates an empty CacheChange_t.
103
     */
104
    CacheChange_t()
105
        : writer_info()
106
0
    {
107
0
        inline_qos.encapsulation = DEFAULT_ENDIAN == LITTLEEND ? PL_CDR_LE : PL_CDR_BE;
108
0
    }
Unexecuted instantiation: eprosima::fastrtps::rtps::CacheChange_t::CacheChange_t()
Unexecuted instantiation: eprosima::fastrtps::rtps::CacheChange_t::CacheChange_t()
109
110
    CacheChange_t(
111
            const CacheChange_t&) = delete;
112
    const CacheChange_t& operator =(
113
            const CacheChange_t&) = delete;
114
115
    /**
116
     * Constructor with payload size
117
     * @param payload_size Serialized payload size
118
     * @param is_untyped Flag to mark the change as untyped.
119
     */
120
    CacheChange_t(
121
            uint32_t payload_size,
122
            bool is_untyped = false)
123
        : serializedPayload(payload_size)
124
        , is_untyped_(is_untyped)
125
0
    {
126
0
    }
127
128
    /*!
129
     * Copy a different change into this one. All the elements are copied, included the data, allocating new memory.
130
     * @param[in] ch_ptr Pointer to the change.
131
     * @return True if correct.
132
     */
133
    bool copy(
134
            const CacheChange_t* ch_ptr)
135
0
    {
136
0
        kind = ch_ptr->kind;
137
0
        writerGUID = ch_ptr->writerGUID;
138
0
        instanceHandle = ch_ptr->instanceHandle;
139
0
        sequenceNumber = ch_ptr->sequenceNumber;
140
0
        sourceTimestamp = ch_ptr->sourceTimestamp;
141
0
        reader_info.receptionTimestamp = ch_ptr->reader_info.receptionTimestamp;
142
0
        write_params = ch_ptr->write_params;
143
0
        isRead = ch_ptr->isRead;
144
0
        fragment_size_ = ch_ptr->fragment_size_;
145
0
        fragment_count_ = ch_ptr->fragment_count_;
146
0
        first_missing_fragment_ = ch_ptr->first_missing_fragment_;
147
0
148
0
        return serializedPayload.copy(&ch_ptr->serializedPayload, !ch_ptr->is_untyped_);
149
0
    }
150
151
    /*!
152
     * Copy information form a different change into this one.
153
     * All the elements are copied except data.
154
     * @param[in] ch_ptr Pointer to the change.
155
     */
156
    void copy_not_memcpy(
157
            const CacheChange_t* ch_ptr)
158
0
    {
159
0
        kind = ch_ptr->kind;
160
0
        writerGUID = ch_ptr->writerGUID;
161
0
        instanceHandle = ch_ptr->instanceHandle;
162
0
        sequenceNumber = ch_ptr->sequenceNumber;
163
0
        sourceTimestamp = ch_ptr->sourceTimestamp;
164
0
        reader_info.receptionTimestamp = ch_ptr->reader_info.receptionTimestamp;
165
0
        write_params = ch_ptr->write_params;
166
0
        isRead = ch_ptr->isRead;
167
0
168
0
        // Copy certain values from serializedPayload
169
0
        serializedPayload.encapsulation = ch_ptr->serializedPayload.encapsulation;
170
0
171
0
        // Copy fragment size and calculate fragment count
172
0
        setFragmentSize(ch_ptr->fragment_size_, false);
173
0
    }
174
175
    virtual ~CacheChange_t()
176
0
    {
177
0
        if (payload_owner_ != nullptr)
178
0
        {
179
0
            payload_owner_->release_payload(*this);
180
0
        }
181
0
        assert(payload_owner_ == nullptr);
182
0
    }
183
184
    /*!
185
     * Get the number of fragments this change is split into.
186
     * @return number of fragments.
187
     */
188
    uint32_t getFragmentCount() const
189
0
    {
190
0
        return fragment_count_;
191
0
    }
192
193
    /*!
194
     * Get the size of each fragment this change is split into.
195
     * @return size of fragment (0 means change is not fragmented).
196
     */
197
    uint16_t getFragmentSize() const
198
0
    {
199
0
        return fragment_size_;
200
0
    }
201
202
    /*!
203
     * Checks if all fragments have been received.
204
     * @return true when change is fully assembled (i.e. no missing fragments).
205
     */
206
    bool is_fully_assembled()
207
0
    {
208
0
        return first_missing_fragment_ >= fragment_count_;
209
0
    }
210
211
    /*! Checks if the first fragment is present.
212
     * @return true when it contains the first fragment. In other case, false.
213
     */
214
    bool contains_first_fragment()
215
0
    {
216
0
        return 0 < first_missing_fragment_;
217
0
    }
218
219
    /*!
220
     * Fills a FragmentNumberSet_t with the list of missing fragments.
221
     * @param [out] frag_sns FragmentNumberSet_t where result is stored.
222
     */
223
    void get_missing_fragments(
224
            FragmentNumberSet_t& frag_sns)
225
0
    {
226
0
        // Note: Fragment numbers are 1-based but we keep them 0 based.
227
0
        frag_sns.base(first_missing_fragment_ + 1);
228
0
229
0
        // Traverse list of missing fragments, adding them to frag_sns
230
0
        uint32_t current_frag = first_missing_fragment_;
231
0
        while (current_frag < fragment_count_)
232
0
        {
233
0
            frag_sns.add(current_frag + 1);
234
0
            current_frag = get_next_missing_fragment(current_frag);
235
0
        }
236
0
    }
237
238
    /*!
239
     * Set fragment size for this change.
240
     *
241
     * @param fragment_size Size of fragments.
242
     * @param create_fragment_list Whether to create missing fragments list or not.
243
     *
244
     * @remarks Parameter create_fragment_list should only be true when receiving the first
245
     *          fragment of a change.
246
     */
247
    void setFragmentSize(
248
            uint16_t fragment_size,
249
            bool create_fragment_list = false)
250
0
    {
251
0
        fragment_size_ = fragment_size;
252
0
        fragment_count_ = 0;
253
0
        first_missing_fragment_ = 0;
254
0
255
0
        if (fragment_size > 0)
256
0
        {
257
0
            // This follows RTPS 8.3.7.3.5
258
0
            fragment_count_ = (serializedPayload.length + fragment_size - 1) / fragment_size;
259
0
260
0
            if (create_fragment_list)
261
0
            {
262
0
                // Keep index of next fragment on the payload portion at the beginning of each fragment. Last
263
0
                // fragment will have fragment_count_ as 'next fragment index'
264
0
                size_t offset = 0;
265
0
                for (uint32_t i = 1; i <= fragment_count_; i++, offset += fragment_size_)
266
0
                {
267
0
                    set_next_missing_fragment(i - 1, i);  // index to next fragment in missing list
268
0
                }
269
0
            }
270
0
            else
271
0
            {
272
0
                // List not created. This means we are going to send this change fragmented, so it is already
273
0
                // assembled, and the missing list is empty (i.e. first missing points to fragment count)
274
0
                first_missing_fragment_ = fragment_count_;
275
0
            }
276
0
        }
277
0
    }
278
279
    bool add_fragments(
280
            const SerializedPayload_t& incoming_data,
281
            uint32_t fragment_starting_num,
282
            uint32_t fragments_in_submessage)
283
0
    {
284
0
        uint32_t original_offset = (fragment_starting_num - 1) * fragment_size_;
285
0
        uint32_t incoming_length = fragment_size_ * fragments_in_submessage;
286
0
        uint32_t last_fragment_index = fragment_starting_num + fragments_in_submessage - 1;
287
0
288
0
        // Validate fragment indexes
289
0
        if (last_fragment_index > fragment_count_)
290
0
        {
291
0
            return false;
292
0
        }
293
0
294
0
        // validate lengths
295
0
        if (last_fragment_index < fragment_count_)
296
0
        {
297
0
            if (incoming_data.length < incoming_length)
298
0
            {
299
0
                return false;
300
0
            }
301
0
        }
302
0
        else
303
0
        {
304
0
            incoming_length = serializedPayload.length - original_offset;
305
0
        }
306
0
307
0
        if (original_offset + incoming_length > serializedPayload.length)
308
0
        {
309
0
            return false;
310
0
        }
311
0
312
0
        if (received_fragments(fragment_starting_num - 1, fragments_in_submessage))
313
0
        {
314
0
            memcpy(
315
0
                &serializedPayload.data[original_offset],
316
0
                incoming_data.data, incoming_length);
317
0
        }
318
0
319
0
        return is_fully_assembled();
320
0
    }
321
322
    IPayloadPool const* payload_owner() const
323
0
    {
324
0
        return payload_owner_;
325
0
    }
326
327
    IPayloadPool* payload_owner()
328
0
    {
329
0
        return payload_owner_;
330
0
    }
331
332
    void payload_owner(
333
            IPayloadPool* owner)
334
0
    {
335
0
        payload_owner_ = owner;
336
0
    }
337
338
private:
339
340
    // Fragment size
341
    uint16_t fragment_size_ = 0;
342
343
    // Number of fragments
344
    uint32_t fragment_count_ = 0;
345
346
    // First fragment in missing list
347
    uint32_t first_missing_fragment_ = 0;
348
349
    // Pool that created the payload of this cache change
350
    IPayloadPool* payload_owner_ = nullptr;
351
352
    uint32_t get_next_missing_fragment(
353
            uint32_t fragment_index)
354
0
    {
355
0
        uint32_t* ptr = next_fragment_pointer(fragment_index);
356
0
        return *ptr;
357
0
    }
358
359
    void set_next_missing_fragment(
360
            uint32_t fragment_index,
361
            uint32_t next_fragment_index)
362
0
    {
363
0
        uint32_t* ptr = next_fragment_pointer(fragment_index);
364
0
        *ptr = next_fragment_index;
365
0
    }
366
367
    uint32_t* next_fragment_pointer(
368
            uint32_t fragment_index)
369
0
    {
370
0
        size_t offset = fragment_size_;
371
0
        offset *= fragment_index;
372
0
        offset = (offset + 3u) & ~3u;
373
0
        return reinterpret_cast<uint32_t*>(&serializedPayload.data[offset]);
374
0
    }
375
376
    /*!
377
     * Mark a set of consecutive fragments as received.
378
     * This will remove a set of consecutive fragments from the missing list.
379
     * Should be called BEFORE copying the received data into the serialized payload.
380
     *
381
     * @param initial_fragment Index (0-based) of first received fragment.
382
     * @param num_of_fragments Number of received fragments. Should be strictly positive.
383
     * @return true if the list of missing fragments was modified, false otherwise.
384
     */
385
    bool received_fragments(
386
            uint32_t initial_fragment,
387
            uint32_t num_of_fragments)
388
0
    {
389
0
        bool at_least_one_changed = false;
390
0
391
0
        if ((fragment_size_ > 0) && (initial_fragment < fragment_count_))
392
0
        {
393
0
            uint32_t last_fragment = initial_fragment + num_of_fragments;
394
0
            if (last_fragment > fragment_count_)
395
0
            {
396
0
                last_fragment = fragment_count_;
397
0
            }
398
0
399
0
            if (initial_fragment <= first_missing_fragment_)
400
0
            {
401
0
                // Perform first = *first until first >= last_received
402
0
                while (first_missing_fragment_ < last_fragment)
403
0
                {
404
0
                    first_missing_fragment_ = get_next_missing_fragment(first_missing_fragment_);
405
0
                    at_least_one_changed = true;
406
0
                }
407
0
            }
408
0
            else
409
0
            {
410
0
                // Find prev in missing list
411
0
                uint32_t current_frag = first_missing_fragment_;
412
0
                while (current_frag < initial_fragment)
413
0
                {
414
0
                    uint32_t next_frag = get_next_missing_fragment(current_frag);
415
0
                    if (next_frag >= initial_fragment)
416
0
                    {
417
0
                        // This is the fragment previous to initial_fragment.
418
0
                        // Find future value for next by repeating next = *next until next >= last_fragment.
419
0
                        uint32_t next_missing_fragment = next_frag;
420
0
                        while (next_missing_fragment < last_fragment)
421
0
                        {
422
0
                            next_missing_fragment = get_next_missing_fragment(next_missing_fragment);
423
0
                            at_least_one_changed = true;
424
0
                        }
425
0
426
0
                        // Update next and finish loop
427
0
                        if (at_least_one_changed)
428
0
                        {
429
0
                            set_next_missing_fragment(current_frag, next_missing_fragment);
430
0
                        }
431
0
                        break;
432
0
                    }
433
0
                    current_frag = next_frag;
434
0
                }
435
0
            }
436
0
        }
437
0
438
0
        return at_least_one_changed;
439
0
    }
440
441
};
442
443
} // namespace rtps
444
} // namespace fastrtps
445
} // namespace eprosima
446
447
#endif /* _FASTDDS_RTPS_CACHECHANGE_H_ */