Coverage Report

Created: 2022-08-24 06:19

/src/Fast-DDS/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file ReadTakeCommand.hpp
17
 */
18
19
#ifndef _FASTDDS_SUBSCRIBER_DATAREADERIMPL_READTAKECOMMAND_HPP_
20
#define _FASTDDS_SUBSCRIBER_DATAREADERIMPL_READTAKECOMMAND_HPP_
21
22
#include <cassert>
23
#include <cstdint>
24
25
#include <fastdds/dds/core/LoanableCollection.hpp>
26
#include <fastdds/dds/core/LoanableTypedCollection.hpp>
27
#include <fastdds/dds/topic/TypeSupport.hpp>
28
#include <fastdds/dds/subscriber/SampleInfo.hpp>
29
30
#include <fastrtps/types/TypesBase.h>
31
32
#include <fastdds/subscriber/DataReaderImpl.hpp>
33
#include <fastdds/subscriber/DataReaderImpl/DataReaderLoanManager.hpp>
34
#include <fastdds/subscriber/DataReaderImpl/StateFilter.hpp>
35
#include <fastdds/subscriber/DataReaderImpl/SampleInfoPool.hpp>
36
#include <fastdds/subscriber/DataReaderImpl/SampleLoanManager.hpp>
37
#include <fastdds/subscriber/history/DataReaderHistory.hpp>
38
39
#include <fastdds/rtps/common/CacheChange.h>
40
#include <fastdds/rtps/reader/RTPSReader.h>
41
42
#include <rtps/reader/WriterProxy.h>
43
#include <rtps/DataSharing/DataSharingPayloadPool.hpp>
44
45
46
namespace eprosima {
47
namespace fastdds {
48
namespace dds {
49
namespace detail {
50
51
struct ReadTakeCommand
52
{
53
    using ReturnCode_t = eprosima::fastrtps::types::ReturnCode_t;
54
    using history_type = eprosima::fastdds::dds::detail::DataReaderHistory;
55
    using CacheChange_t = eprosima::fastrtps::rtps::CacheChange_t;
56
    using RTPSReader = eprosima::fastrtps::rtps::RTPSReader;
57
    using WriterProxy = eprosima::fastrtps::rtps::WriterProxy;
58
    using SampleInfoSeq = LoanableTypedCollection<SampleInfo>;
59
    using DataSharingPayloadPool = eprosima::fastrtps::rtps::DataSharingPayloadPool;
60
61
    ReadTakeCommand(
62
            DataReaderImpl& reader,
63
            LoanableCollection& data_values,
64
            SampleInfoSeq& sample_infos,
65
            int32_t max_samples,
66
            const StateFilter& states,
67
            const history_type::instance_info& instance,
68
            bool single_instance = false)
69
        : type_(reader.type_)
70
        , loan_manager_(reader.loan_manager_)
71
        , history_(reader.history_)
72
        , reader_(reader.reader_)
73
        , info_pool_(reader.sample_info_pool_)
74
        , sample_pool_(reader.sample_pool_)
75
        , data_values_(data_values)
76
        , sample_infos_(sample_infos)
77
        , remaining_samples_(max_samples)
78
        , states_(states)
79
        , instance_(instance)
80
        , handle_(instance->first)
81
        , single_instance_(single_instance)
82
0
    {
83
0
        assert(0 <= remaining_samples_);
84
85
0
        current_slot_ = data_values_.length();
86
0
        finished_ = false;
87
0
    }
88
89
    ~ReadTakeCommand()
90
0
    {
91
0
        if (!data_values_.has_ownership() && ReturnCode_t::RETCODE_NO_DATA == return_value_)
92
0
        {
93
0
            loan_manager_.return_loan(data_values_, sample_infos_);
94
0
            data_values_.unloan();
95
0
            sample_infos_.unloan();
96
0
        }
97
0
    }
98
99
    bool add_instance(
100
            bool take_samples)
101
0
    {
102
        // Advance to the first instance with a valid state
103
0
        if (!go_to_first_valid_instance())
104
0
        {
105
0
            return false;
106
0
        }
107
108
        // Traverse changes on current instance
109
0
        bool ret_val = false;
110
0
        LoanableCollection::size_type first_slot = current_slot_;
111
0
        auto it = instance_->second->cache_changes.begin();
112
0
        while (!finished_ && it != instance_->second->cache_changes.end())
113
0
        {
114
0
            CacheChange_t* change = *it;
115
0
            SampleStateKind check;
116
0
            check = change->isRead ? SampleStateKind::READ_SAMPLE_STATE : SampleStateKind::NOT_READ_SAMPLE_STATE;
117
0
            if ((check & states_.sample_states) != 0)
118
0
            {
119
0
                WriterProxy* wp = nullptr;
120
0
                bool is_future_change = false;
121
0
                bool remove_change = false;
122
0
                if (reader_->begin_sample_access_nts(change, wp, is_future_change))
123
0
                {
124
                    //Check if the payload is dirty
125
0
                    remove_change = !check_datasharing_validity(change, data_values_.has_ownership());
126
0
                }
127
0
                else
128
0
                {
129
0
                    remove_change = true;
130
0
                }
131
132
0
                if (remove_change)
133
0
                {
134
                    // Remove from history
135
0
                    history_.remove_change_sub(change, it);
136
137
                    // Current iterator will point to change next to the one removed. Avoid incrementing.
138
0
                    continue;
139
0
                }
140
141
                // If the change is in the future we can skip the remaining changes in the history, as they will be
142
                // in the future also
143
0
                if (!is_future_change)
144
0
                {
145
                    // Add sample and info to collections
146
0
                    ReturnCode_t previous_return_value = return_value_;
147
0
                    bool added = add_sample(*it, remove_change);
148
0
                    history_.change_was_processed_nts(change, added);
149
0
                    reader_->end_sample_access_nts(change, wp, added);
150
151
                    // Check if the payload is dirty
152
0
                    if (added && !check_datasharing_validity(change, data_values_.has_ownership()))
153
0
                    {
154
                        // Decrement length of collections
155
0
                        --current_slot_;
156
0
                        ++remaining_samples_;
157
0
                        data_values_.length(current_slot_);
158
0
                        sample_infos_.length(current_slot_);
159
160
0
                        return_value_ = previous_return_value;
161
0
                        finished_ = false;
162
163
0
                        remove_change = true;
164
0
                        added = false;
165
0
                    }
166
167
0
                    if (remove_change || (added && take_samples))
168
0
                    {
169
                        // Remove from history
170
0
                        history_.remove_change_sub(change, it);
171
172
                        // Current iterator will point to change next to the one removed. Avoid incrementing.
173
0
                        continue;
174
0
                    }
175
0
                }
176
0
            }
177
178
            // Go to next sample on instance
179
0
            ++it;
180
0
        }
181
182
0
        if (current_slot_ > first_slot)
183
0
        {
184
0
            history_.instance_viewed_nts(instance_->second);
185
0
            ret_val = true;
186
187
            // complete sample infos
188
0
            LoanableCollection::size_type slot = current_slot_;
189
0
            LoanableCollection::size_type n = 0;
190
0
            while (slot > first_slot)
191
0
            {
192
0
                --slot;
193
0
                sample_infos_[slot].sample_rank = n;
194
0
                ++n;
195
0
            }
196
0
        }
197
198
0
        next_instance();
199
0
        return ret_val;
200
0
    }
201
202
    inline bool is_finished() const
203
0
    {
204
0
        return finished_;
205
0
    }
206
207
    inline ReturnCode_t return_value() const
208
0
    {
209
0
        return return_value_;
210
0
    }
211
212
    static void generate_info(
213
            SampleInfo& info,
214
            const DataReaderInstance& instance,
215
            const DataReaderCacheChange& item)
216
0
    {
217
0
        info.sample_state = item->isRead ? READ_SAMPLE_STATE : NOT_READ_SAMPLE_STATE;
218
0
        info.instance_state = instance.instance_state;
219
0
        info.view_state = instance.view_state;
220
0
        info.disposed_generation_count = item->reader_info.disposed_generation_count;
221
0
        info.no_writers_generation_count = item->reader_info.no_writers_generation_count;
222
0
        info.sample_rank = 0;
223
0
        info.generation_rank = 0;
224
0
        info.absolute_generation_rank = 0;
225
0
        info.source_timestamp = item->sourceTimestamp;
226
0
        info.reception_timestamp = item->reader_info.receptionTimestamp;
227
0
        info.instance_handle = item->instanceHandle;
228
0
        info.publication_handle = InstanceHandle_t(item->writerGUID);
229
0
        info.sample_identity.writer_guid(item->writerGUID);
230
0
        info.sample_identity.sequence_number(item->sequenceNumber);
231
0
        info.related_sample_identity = item->write_params.sample_identity();
232
0
        info.valid_data = true;
233
234
0
        switch (item->kind)
235
0
        {
236
0
            case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED:
237
0
            case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED_UNREGISTERED:
238
0
            case eprosima::fastrtps::rtps::NOT_ALIVE_UNREGISTERED:
239
0
                info.valid_data = false;
240
0
                break;
241
0
            case eprosima::fastrtps::rtps::ALIVE:
242
0
            default:
243
0
                break;
244
0
        }
245
0
    }
246
247
private:
248
249
    const TypeSupport& type_;
250
    DataReaderLoanManager& loan_manager_;
251
    history_type& history_;
252
    RTPSReader* reader_;
253
    SampleInfoPool& info_pool_;
254
    std::shared_ptr<detail::SampleLoanManager> sample_pool_;
255
    LoanableCollection& data_values_;
256
    SampleInfoSeq& sample_infos_;
257
    int32_t remaining_samples_;
258
    StateFilter states_;
259
    history_type::instance_info instance_;
260
    InstanceHandle_t handle_;
261
    bool single_instance_;
262
263
    bool finished_ = false;
264
    ReturnCode_t return_value_ = ReturnCode_t::RETCODE_NO_DATA;
265
266
    LoanableCollection::size_type current_slot_ = 0;
267
268
    bool go_to_first_valid_instance()
269
0
    {
270
0
        while (!is_current_instance_valid())
271
0
        {
272
0
            if (!next_instance())
273
0
            {
274
0
                return false;
275
0
            }
276
0
        }
277
0
        return true;
278
0
    }
279
280
    bool is_current_instance_valid()
281
0
    {
282
        // Check instance_state against states_.instance_states and view_state against states_.view_states
283
0
        auto instance_state = instance_->second->instance_state;
284
0
        auto view_state = instance_->second->view_state;
285
0
        return (0 != (states_.instance_states & instance_state)) && (0 != (states_.view_states & view_state));
286
0
    }
287
288
    bool next_instance()
289
0
    {
290
0
        history_.check_and_remove_instance(instance_);
291
0
        if (single_instance_)
292
0
        {
293
0
            finished_ = true;
294
0
            return false;
295
0
        }
296
297
0
        auto result = history_.next_available_instance_nts(handle_, instance_);
298
0
        if (!result.first)
299
0
        {
300
0
            finished_ = true;
301
0
            return false;
302
0
        }
303
304
0
        instance_ = result.second;
305
0
        handle_ = instance_->first;
306
0
        return true;
307
0
    }
308
309
    bool add_sample(
310
            const DataReaderCacheChange& item,
311
            bool& deserialization_error)
312
0
    {
313
0
        bool ret_val = false;
314
0
        deserialization_error = false;
315
316
0
        if (remaining_samples_ > 0)
317
0
        {
318
            // Increment length of collections
319
0
            auto new_len = current_slot_ + 1;
320
0
            data_values_.length(new_len);
321
0
            sample_infos_.length(new_len);
322
323
            // Add information
324
0
            generate_info(item);
325
0
            if (sample_infos_[current_slot_].valid_data)
326
0
            {
327
0
                if (!deserialize_sample(item))
328
0
                {
329
                    // Decrement length of collections
330
0
                    data_values_.length(current_slot_);
331
0
                    sample_infos_.length(current_slot_);
332
0
                    deserialization_error = true;
333
0
                    return false;
334
0
                }
335
0
            }
336
337
            // Mark that some data is available
338
0
            return_value_ = ReturnCode_t::RETCODE_OK;
339
0
            ++current_slot_;
340
0
            --remaining_samples_;
341
0
            ret_val = true;
342
0
        }
343
344
        // Finish when there are no remaining samples
345
0
        finished_ = (remaining_samples_ == 0);
346
0
        return ret_val;
347
0
    }
348
349
    bool deserialize_sample(
350
            CacheChange_t* change)
351
0
    {
352
0
        auto payload = &(change->serializedPayload);
353
0
        if (data_values_.has_ownership())
354
0
        {
355
            // perform deserialization
356
0
            return type_->deserialize(payload, data_values_.buffer()[current_slot_]);
357
0
        }
358
0
        else
359
0
        {
360
            // loan
361
0
            void* sample;
362
0
            sample_pool_->get_loan(change, sample);
363
0
            const_cast<void**>(data_values_.buffer())[current_slot_] = sample;
364
0
            return true;
365
0
        }
366
0
    }
367
368
    void generate_info(
369
            const DataReaderCacheChange& item)
370
0
    {
371
        // Loan when necessary
372
0
        if (!sample_infos_.has_ownership())
373
0
        {
374
0
            SampleInfo* pool_item = info_pool_.get_item();
375
0
            assert(pool_item != nullptr);
376
0
            const_cast<void**>(sample_infos_.buffer())[current_slot_] = pool_item;
377
0
        }
378
379
0
        SampleInfo& info = sample_infos_[current_slot_];
380
0
        generate_info(info, *instance_->second, item);
381
0
    }
382
383
    bool check_datasharing_validity(
384
            CacheChange_t* change,
385
            bool has_ownership)
386
0
    {
387
0
        bool is_valid = true;
388
0
        if (has_ownership)  //< On loans the user must check the validity anyways
389
0
        {
390
0
            DataSharingPayloadPool* pool = dynamic_cast<DataSharingPayloadPool*>(change->payload_owner());
391
0
            if (pool)
392
0
            {
393
                //Check if the payload is dirty
394
0
                is_valid = pool->is_sample_valid(*change);
395
0
            }
396
0
        }
397
398
0
        if (!is_valid)
399
0
        {
400
0
            logWarning(RTPS_READER,
401
0
                    "Change " << change->sequenceNumber << " from " << change->writerGUID << " is overidden");
402
0
            return false;
403
0
        }
404
405
0
        return true;
406
0
    }
407
408
};
409
410
} /* namespace detail */
411
} /* namespace dds */
412
} /* namespace fastdds */
413
} /* namespace eprosima */
414
415
#endif  // _FASTDDS_SUBSCRIBER_DATAREADERIMPL_READTAKECOMMAND_HPP_