/src/Fast-DDS/src/cpp/fastdds/subscriber/DataReaderImpl/ReadTakeCommand.hpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2021 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | /** |
16 | | * @file ReadTakeCommand.hpp |
17 | | */ |
18 | | |
19 | | #ifndef _FASTDDS_SUBSCRIBER_DATAREADERIMPL_READTAKECOMMAND_HPP_ |
20 | | #define _FASTDDS_SUBSCRIBER_DATAREADERIMPL_READTAKECOMMAND_HPP_ |
21 | | |
22 | | #include <cassert> |
23 | | #include <cstdint> |
24 | | |
25 | | #include <fastdds/dds/core/LoanableCollection.hpp> |
26 | | #include <fastdds/dds/core/LoanableTypedCollection.hpp> |
27 | | #include <fastdds/dds/topic/TypeSupport.hpp> |
28 | | #include <fastdds/dds/subscriber/SampleInfo.hpp> |
29 | | |
30 | | #include <fastrtps/types/TypesBase.h> |
31 | | |
32 | | #include <fastdds/subscriber/DataReaderImpl.hpp> |
33 | | #include <fastdds/subscriber/DataReaderImpl/DataReaderLoanManager.hpp> |
34 | | #include <fastdds/subscriber/DataReaderImpl/StateFilter.hpp> |
35 | | #include <fastdds/subscriber/DataReaderImpl/SampleInfoPool.hpp> |
36 | | #include <fastdds/subscriber/DataReaderImpl/SampleLoanManager.hpp> |
37 | | #include <fastdds/subscriber/history/DataReaderHistory.hpp> |
38 | | |
39 | | #include <fastdds/rtps/common/CacheChange.h> |
40 | | #include <fastdds/rtps/reader/RTPSReader.h> |
41 | | |
42 | | #include <rtps/reader/WriterProxy.h> |
43 | | #include <rtps/DataSharing/DataSharingPayloadPool.hpp> |
44 | | |
45 | | |
46 | | namespace eprosima { |
47 | | namespace fastdds { |
48 | | namespace dds { |
49 | | namespace detail { |
50 | | |
51 | | struct ReadTakeCommand |
52 | | { |
53 | | using ReturnCode_t = eprosima::fastrtps::types::ReturnCode_t; |
54 | | using history_type = eprosima::fastdds::dds::detail::DataReaderHistory; |
55 | | using CacheChange_t = eprosima::fastrtps::rtps::CacheChange_t; |
56 | | using RTPSReader = eprosima::fastrtps::rtps::RTPSReader; |
57 | | using WriterProxy = eprosima::fastrtps::rtps::WriterProxy; |
58 | | using SampleInfoSeq = LoanableTypedCollection<SampleInfo>; |
59 | | using DataSharingPayloadPool = eprosima::fastrtps::rtps::DataSharingPayloadPool; |
60 | | |
61 | | ReadTakeCommand( |
62 | | DataReaderImpl& reader, |
63 | | LoanableCollection& data_values, |
64 | | SampleInfoSeq& sample_infos, |
65 | | int32_t max_samples, |
66 | | const StateFilter& states, |
67 | | const history_type::instance_info& instance, |
68 | | bool single_instance = false) |
69 | | : type_(reader.type_) |
70 | | , loan_manager_(reader.loan_manager_) |
71 | | , history_(reader.history_) |
72 | | , reader_(reader.reader_) |
73 | | , info_pool_(reader.sample_info_pool_) |
74 | | , sample_pool_(reader.sample_pool_) |
75 | | , data_values_(data_values) |
76 | | , sample_infos_(sample_infos) |
77 | | , remaining_samples_(max_samples) |
78 | | , states_(states) |
79 | | , instance_(instance) |
80 | | , handle_(instance->first) |
81 | | , single_instance_(single_instance) |
82 | 0 | { |
83 | 0 | assert(0 <= remaining_samples_); |
84 | |
|
85 | 0 | current_slot_ = data_values_.length(); |
86 | 0 | finished_ = false; |
87 | 0 | } |
88 | | |
89 | | ~ReadTakeCommand() |
90 | 0 | { |
91 | 0 | if (!data_values_.has_ownership() && ReturnCode_t::RETCODE_NO_DATA == return_value_) |
92 | 0 | { |
93 | 0 | loan_manager_.return_loan(data_values_, sample_infos_); |
94 | 0 | data_values_.unloan(); |
95 | 0 | sample_infos_.unloan(); |
96 | 0 | } |
97 | 0 | } |
98 | | |
99 | | bool add_instance( |
100 | | bool take_samples) |
101 | 0 | { |
102 | | // Advance to the first instance with a valid state |
103 | 0 | if (!go_to_first_valid_instance()) |
104 | 0 | { |
105 | 0 | return false; |
106 | 0 | } |
107 | | |
108 | | // Traverse changes on current instance |
109 | 0 | bool ret_val = false; |
110 | 0 | LoanableCollection::size_type first_slot = current_slot_; |
111 | 0 | auto it = instance_->second->cache_changes.begin(); |
112 | 0 | while (!finished_ && it != instance_->second->cache_changes.end()) |
113 | 0 | { |
114 | 0 | CacheChange_t* change = *it; |
115 | 0 | SampleStateKind check; |
116 | 0 | check = change->isRead ? SampleStateKind::READ_SAMPLE_STATE : SampleStateKind::NOT_READ_SAMPLE_STATE; |
117 | 0 | if ((check & states_.sample_states) != 0) |
118 | 0 | { |
119 | 0 | WriterProxy* wp = nullptr; |
120 | 0 | bool is_future_change = false; |
121 | 0 | bool remove_change = false; |
122 | 0 | if (reader_->begin_sample_access_nts(change, wp, is_future_change)) |
123 | 0 | { |
124 | | //Check if the payload is dirty |
125 | 0 | remove_change = !check_datasharing_validity(change, data_values_.has_ownership()); |
126 | 0 | } |
127 | 0 | else |
128 | 0 | { |
129 | 0 | remove_change = true; |
130 | 0 | } |
131 | |
|
132 | 0 | if (remove_change) |
133 | 0 | { |
134 | | // Remove from history |
135 | 0 | history_.remove_change_sub(change, it); |
136 | | |
137 | | // Current iterator will point to change next to the one removed. Avoid incrementing. |
138 | 0 | continue; |
139 | 0 | } |
140 | | |
141 | | // If the change is in the future we can skip the remaining changes in the history, as they will be |
142 | | // in the future also |
143 | 0 | if (!is_future_change) |
144 | 0 | { |
145 | | // Add sample and info to collections |
146 | 0 | ReturnCode_t previous_return_value = return_value_; |
147 | 0 | bool added = add_sample(*it, remove_change); |
148 | 0 | history_.change_was_processed_nts(change, added); |
149 | 0 | reader_->end_sample_access_nts(change, wp, added); |
150 | | |
151 | | // Check if the payload is dirty |
152 | 0 | if (added && !check_datasharing_validity(change, data_values_.has_ownership())) |
153 | 0 | { |
154 | | // Decrement length of collections |
155 | 0 | --current_slot_; |
156 | 0 | ++remaining_samples_; |
157 | 0 | data_values_.length(current_slot_); |
158 | 0 | sample_infos_.length(current_slot_); |
159 | |
|
160 | 0 | return_value_ = previous_return_value; |
161 | 0 | finished_ = false; |
162 | |
|
163 | 0 | remove_change = true; |
164 | 0 | added = false; |
165 | 0 | } |
166 | |
|
167 | 0 | if (remove_change || (added && take_samples)) |
168 | 0 | { |
169 | | // Remove from history |
170 | 0 | history_.remove_change_sub(change, it); |
171 | | |
172 | | // Current iterator will point to change next to the one removed. Avoid incrementing. |
173 | 0 | continue; |
174 | 0 | } |
175 | 0 | } |
176 | 0 | } |
177 | | |
178 | | // Go to next sample on instance |
179 | 0 | ++it; |
180 | 0 | } |
181 | |
|
182 | 0 | if (current_slot_ > first_slot) |
183 | 0 | { |
184 | 0 | history_.instance_viewed_nts(instance_->second); |
185 | 0 | ret_val = true; |
186 | | |
187 | | // complete sample infos |
188 | 0 | LoanableCollection::size_type slot = current_slot_; |
189 | 0 | LoanableCollection::size_type n = 0; |
190 | 0 | while (slot > first_slot) |
191 | 0 | { |
192 | 0 | --slot; |
193 | 0 | sample_infos_[slot].sample_rank = n; |
194 | 0 | ++n; |
195 | 0 | } |
196 | 0 | } |
197 | |
|
198 | 0 | next_instance(); |
199 | 0 | return ret_val; |
200 | 0 | } |
201 | | |
202 | | inline bool is_finished() const |
203 | 0 | { |
204 | 0 | return finished_; |
205 | 0 | } |
206 | | |
207 | | inline ReturnCode_t return_value() const |
208 | 0 | { |
209 | 0 | return return_value_; |
210 | 0 | } |
211 | | |
212 | | static void generate_info( |
213 | | SampleInfo& info, |
214 | | const DataReaderInstance& instance, |
215 | | const DataReaderCacheChange& item) |
216 | 0 | { |
217 | 0 | info.sample_state = item->isRead ? READ_SAMPLE_STATE : NOT_READ_SAMPLE_STATE; |
218 | 0 | info.instance_state = instance.instance_state; |
219 | 0 | info.view_state = instance.view_state; |
220 | 0 | info.disposed_generation_count = item->reader_info.disposed_generation_count; |
221 | 0 | info.no_writers_generation_count = item->reader_info.no_writers_generation_count; |
222 | 0 | info.sample_rank = 0; |
223 | 0 | info.generation_rank = 0; |
224 | 0 | info.absolute_generation_rank = 0; |
225 | 0 | info.source_timestamp = item->sourceTimestamp; |
226 | 0 | info.reception_timestamp = item->reader_info.receptionTimestamp; |
227 | 0 | info.instance_handle = item->instanceHandle; |
228 | 0 | info.publication_handle = InstanceHandle_t(item->writerGUID); |
229 | 0 | info.sample_identity.writer_guid(item->writerGUID); |
230 | 0 | info.sample_identity.sequence_number(item->sequenceNumber); |
231 | 0 | info.related_sample_identity = item->write_params.sample_identity(); |
232 | 0 | info.valid_data = true; |
233 | |
|
234 | 0 | switch (item->kind) |
235 | 0 | { |
236 | 0 | case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED: |
237 | 0 | case eprosima::fastrtps::rtps::NOT_ALIVE_DISPOSED_UNREGISTERED: |
238 | 0 | case eprosima::fastrtps::rtps::NOT_ALIVE_UNREGISTERED: |
239 | 0 | info.valid_data = false; |
240 | 0 | break; |
241 | 0 | case eprosima::fastrtps::rtps::ALIVE: |
242 | 0 | default: |
243 | 0 | break; |
244 | 0 | } |
245 | 0 | } |
246 | | |
247 | | private: |
248 | | |
249 | | const TypeSupport& type_; |
250 | | DataReaderLoanManager& loan_manager_; |
251 | | history_type& history_; |
252 | | RTPSReader* reader_; |
253 | | SampleInfoPool& info_pool_; |
254 | | std::shared_ptr<detail::SampleLoanManager> sample_pool_; |
255 | | LoanableCollection& data_values_; |
256 | | SampleInfoSeq& sample_infos_; |
257 | | int32_t remaining_samples_; |
258 | | StateFilter states_; |
259 | | history_type::instance_info instance_; |
260 | | InstanceHandle_t handle_; |
261 | | bool single_instance_; |
262 | | |
263 | | bool finished_ = false; |
264 | | ReturnCode_t return_value_ = ReturnCode_t::RETCODE_NO_DATA; |
265 | | |
266 | | LoanableCollection::size_type current_slot_ = 0; |
267 | | |
268 | | bool go_to_first_valid_instance() |
269 | 0 | { |
270 | 0 | while (!is_current_instance_valid()) |
271 | 0 | { |
272 | 0 | if (!next_instance()) |
273 | 0 | { |
274 | 0 | return false; |
275 | 0 | } |
276 | 0 | } |
277 | 0 | return true; |
278 | 0 | } |
279 | | |
280 | | bool is_current_instance_valid() |
281 | 0 | { |
282 | | // Check instance_state against states_.instance_states and view_state against states_.view_states |
283 | 0 | auto instance_state = instance_->second->instance_state; |
284 | 0 | auto view_state = instance_->second->view_state; |
285 | 0 | return (0 != (states_.instance_states & instance_state)) && (0 != (states_.view_states & view_state)); |
286 | 0 | } |
287 | | |
288 | | bool next_instance() |
289 | 0 | { |
290 | 0 | history_.check_and_remove_instance(instance_); |
291 | 0 | if (single_instance_) |
292 | 0 | { |
293 | 0 | finished_ = true; |
294 | 0 | return false; |
295 | 0 | } |
296 | | |
297 | 0 | auto result = history_.next_available_instance_nts(handle_, instance_); |
298 | 0 | if (!result.first) |
299 | 0 | { |
300 | 0 | finished_ = true; |
301 | 0 | return false; |
302 | 0 | } |
303 | | |
304 | 0 | instance_ = result.second; |
305 | 0 | handle_ = instance_->first; |
306 | 0 | return true; |
307 | 0 | } |
308 | | |
309 | | bool add_sample( |
310 | | const DataReaderCacheChange& item, |
311 | | bool& deserialization_error) |
312 | 0 | { |
313 | 0 | bool ret_val = false; |
314 | 0 | deserialization_error = false; |
315 | |
|
316 | 0 | if (remaining_samples_ > 0) |
317 | 0 | { |
318 | | // Increment length of collections |
319 | 0 | auto new_len = current_slot_ + 1; |
320 | 0 | data_values_.length(new_len); |
321 | 0 | sample_infos_.length(new_len); |
322 | | |
323 | | // Add information |
324 | 0 | generate_info(item); |
325 | 0 | if (sample_infos_[current_slot_].valid_data) |
326 | 0 | { |
327 | 0 | if (!deserialize_sample(item)) |
328 | 0 | { |
329 | | // Decrement length of collections |
330 | 0 | data_values_.length(current_slot_); |
331 | 0 | sample_infos_.length(current_slot_); |
332 | 0 | deserialization_error = true; |
333 | 0 | return false; |
334 | 0 | } |
335 | 0 | } |
336 | | |
337 | | // Mark that some data is available |
338 | 0 | return_value_ = ReturnCode_t::RETCODE_OK; |
339 | 0 | ++current_slot_; |
340 | 0 | --remaining_samples_; |
341 | 0 | ret_val = true; |
342 | 0 | } |
343 | | |
344 | | // Finish when there are no remaining samples |
345 | 0 | finished_ = (remaining_samples_ == 0); |
346 | 0 | return ret_val; |
347 | 0 | } |
348 | | |
349 | | bool deserialize_sample( |
350 | | CacheChange_t* change) |
351 | 0 | { |
352 | 0 | auto payload = &(change->serializedPayload); |
353 | 0 | if (data_values_.has_ownership()) |
354 | 0 | { |
355 | | // perform deserialization |
356 | 0 | return type_->deserialize(payload, data_values_.buffer()[current_slot_]); |
357 | 0 | } |
358 | 0 | else |
359 | 0 | { |
360 | | // loan |
361 | 0 | void* sample; |
362 | 0 | sample_pool_->get_loan(change, sample); |
363 | 0 | const_cast<void**>(data_values_.buffer())[current_slot_] = sample; |
364 | 0 | return true; |
365 | 0 | } |
366 | 0 | } |
367 | | |
368 | | void generate_info( |
369 | | const DataReaderCacheChange& item) |
370 | 0 | { |
371 | | // Loan when necessary |
372 | 0 | if (!sample_infos_.has_ownership()) |
373 | 0 | { |
374 | 0 | SampleInfo* pool_item = info_pool_.get_item(); |
375 | 0 | assert(pool_item != nullptr); |
376 | 0 | const_cast<void**>(sample_infos_.buffer())[current_slot_] = pool_item; |
377 | 0 | } |
378 | |
|
379 | 0 | SampleInfo& info = sample_infos_[current_slot_]; |
380 | 0 | generate_info(info, *instance_->second, item); |
381 | 0 | } |
382 | | |
383 | | bool check_datasharing_validity( |
384 | | CacheChange_t* change, |
385 | | bool has_ownership) |
386 | 0 | { |
387 | 0 | bool is_valid = true; |
388 | 0 | if (has_ownership) //< On loans the user must check the validity anyways |
389 | 0 | { |
390 | 0 | DataSharingPayloadPool* pool = dynamic_cast<DataSharingPayloadPool*>(change->payload_owner()); |
391 | 0 | if (pool) |
392 | 0 | { |
393 | | //Check if the payload is dirty |
394 | 0 | is_valid = pool->is_sample_valid(*change); |
395 | 0 | } |
396 | 0 | } |
397 | |
|
398 | 0 | if (!is_valid) |
399 | 0 | { |
400 | 0 | logWarning(RTPS_READER, |
401 | 0 | "Change " << change->sequenceNumber << " from " << change->writerGUID << " is overidden"); |
402 | 0 | return false; |
403 | 0 | } |
404 | | |
405 | 0 | return true; |
406 | 0 | } |
407 | | |
408 | | }; |
409 | | |
410 | | } /* namespace detail */ |
411 | | } /* namespace dds */ |
412 | | } /* namespace fastdds */ |
413 | | } /* namespace eprosima */ |
414 | | |
415 | | #endif // _FASTDDS_SUBSCRIBER_DATAREADERIMPL_READTAKECOMMAND_HPP_ |