/src/Fast-DDS/src/cpp/rtps/writer/ReaderProxy.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | /** |
16 | | * @file ReaderProxy.cpp |
17 | | * |
18 | | */ |
19 | | |
20 | | |
21 | | #include <fastdds/dds/log/Log.hpp> |
22 | | #include <fastdds/rtps/history/WriterHistory.h> |
23 | | #include <fastdds/rtps/writer/ReaderProxy.h> |
24 | | #include <fastdds/rtps/writer/StatefulWriter.h> |
25 | | #include <fastdds/rtps/resources/TimedEvent.h> |
26 | | #include <fastrtps/utils/TimeConversion.h> |
27 | | #include <fastdds/rtps/common/LocatorListComparisons.hpp> |
28 | | |
29 | | #include <rtps/participant/RTPSParticipantImpl.h> |
30 | | #include <rtps/history/HistoryAttributesExtension.hpp> |
31 | | |
32 | | #include "rtps/messages/RTPSGapBuilder.hpp" |
33 | | #include <rtps/DataSharing/DataSharingNotifier.hpp> |
34 | | |
35 | | #include <mutex> |
36 | | #include <cassert> |
37 | | #include <algorithm> |
38 | | |
39 | | namespace eprosima { |
40 | | namespace fastrtps { |
41 | | namespace rtps { |
42 | | |
43 | | ReaderProxy::ReaderProxy( |
44 | | const WriterTimes& times, |
45 | | const RemoteLocatorsAllocationAttributes& loc_alloc, |
46 | | StatefulWriter* writer) |
47 | | : is_active_(false) |
48 | | , locator_info_( |
49 | | writer, loc_alloc.max_unicast_locators, |
50 | | loc_alloc.max_multicast_locators) |
51 | | , durability_kind_(VOLATILE) |
52 | | , expects_inline_qos_(false) |
53 | | , is_reliable_(false) |
54 | | , disable_positive_acks_(false) |
55 | | , writer_(writer) |
56 | | , changes_for_reader_(resource_limits_from_history(writer->mp_history->m_att, 0)) |
57 | | , nack_supression_event_(nullptr) |
58 | | , initial_heartbeat_event_(nullptr) |
59 | | , timers_enabled_(false) |
60 | | , last_acknack_count_(0) |
61 | | , last_nackfrag_count_(0) |
62 | 0 | { |
63 | 0 | nack_supression_event_ = new TimedEvent(writer_->getRTPSParticipant()->getEventResource(), |
64 | 0 | [&]() -> bool |
65 | 0 | { |
66 | 0 | writer_->perform_nack_supression(guid()); |
67 | 0 | return false; |
68 | 0 | }, |
69 | 0 | TimeConv::Time_t2MilliSecondsDouble(times.nackSupressionDuration)); |
70 | |
|
71 | 0 | initial_heartbeat_event_ = new TimedEvent(writer_->getRTPSParticipant()->getEventResource(), |
72 | 0 | [&]() -> bool |
73 | 0 | { |
74 | 0 | writer_->intraprocess_heartbeat(this); |
75 | 0 | return false; |
76 | 0 | }, 0); |
77 | |
|
78 | 0 | stop(); |
79 | 0 | } |
80 | | |
81 | | bool ReaderProxy::rtps_is_relevant( |
82 | | CacheChange_t* change) const |
83 | 0 | { |
84 | 0 | auto filter = writer_->reader_data_filter(); |
85 | 0 | if (nullptr != filter) |
86 | 0 | { |
87 | 0 | bool ret = filter->is_relevant(*change, guid()); |
88 | 0 | logInfo(RTPS_READER_PROXY, |
89 | 0 | "Change " << change->instanceHandle << " is relevant for reader " << guid() << "? " << ret); |
90 | 0 | return ret; |
91 | 0 | } |
92 | 0 | return true; |
93 | 0 | } |
94 | | |
95 | | ReaderProxy::~ReaderProxy() |
96 | 0 | { |
97 | 0 | if (nack_supression_event_) |
98 | 0 | { |
99 | 0 | delete(nack_supression_event_); |
100 | 0 | nack_supression_event_ = nullptr; |
101 | 0 | } |
102 | |
|
103 | 0 | if (initial_heartbeat_event_) |
104 | 0 | { |
105 | 0 | delete(initial_heartbeat_event_); |
106 | 0 | initial_heartbeat_event_ = nullptr; |
107 | 0 | } |
108 | 0 | } |
109 | | |
110 | | void ReaderProxy::start( |
111 | | const ReaderProxyData& reader_attributes, |
112 | | bool is_datasharing) |
113 | 0 | { |
114 | 0 | locator_info_.start( |
115 | 0 | reader_attributes.guid(), |
116 | 0 | reader_attributes.remote_locators().unicast, |
117 | 0 | reader_attributes.remote_locators().multicast, |
118 | 0 | reader_attributes.m_expectsInlineQos, |
119 | 0 | is_datasharing); |
120 | |
|
121 | 0 | is_active_ = true; |
122 | 0 | durability_kind_ = reader_attributes.m_qos.m_durability.durabilityKind(); |
123 | 0 | expects_inline_qos_ = reader_attributes.m_expectsInlineQos; |
124 | 0 | is_reliable_ = reader_attributes.m_qos.m_reliability.kind != BEST_EFFORT_RELIABILITY_QOS; |
125 | 0 | disable_positive_acks_ = reader_attributes.disable_positive_acks(); |
126 | 0 | if (durability_kind_ == DurabilityKind_t::VOLATILE) |
127 | 0 | { |
128 | 0 | SequenceNumber_t min_sequence = writer_->get_seq_num_min(); |
129 | 0 | changes_low_mark_ = (min_sequence == SequenceNumber_t::unknown()) ? |
130 | 0 | writer_->next_sequence_number() - 1 : min_sequence - 1; |
131 | 0 | } |
132 | 0 | else |
133 | 0 | { |
134 | 0 | acked_changes_set(SequenceNumber_t()); // Simulate initial acknack to set low mark |
135 | 0 | } |
136 | |
|
137 | 0 | timers_enabled_.store(is_remote_and_reliable()); |
138 | 0 | if (is_local_reader()) |
139 | 0 | { |
140 | 0 | initial_heartbeat_event_->restart_timer(); |
141 | 0 | } |
142 | |
|
143 | 0 | logInfo(RTPS_READER_PROXY, "Reader Proxy started"); |
144 | 0 | } |
145 | | |
146 | | bool ReaderProxy::update( |
147 | | const ReaderProxyData& reader_attributes) |
148 | 0 | { |
149 | 0 | durability_kind_ = reader_attributes.m_qos.m_durability.durabilityKind(); |
150 | 0 | expects_inline_qos_ = reader_attributes.m_expectsInlineQos; |
151 | 0 | is_reliable_ = reader_attributes.m_qos.m_reliability.kind != BEST_EFFORT_RELIABILITY_QOS; |
152 | 0 | disable_positive_acks_ = reader_attributes.disable_positive_acks(); |
153 | |
|
154 | 0 | locator_info_.update( |
155 | 0 | reader_attributes.remote_locators().unicast, |
156 | 0 | reader_attributes.remote_locators().multicast, |
157 | 0 | reader_attributes.m_expectsInlineQos); |
158 | |
|
159 | 0 | return true; |
160 | 0 | } |
161 | | |
162 | | void ReaderProxy::stop() |
163 | 0 | { |
164 | 0 | locator_info_.stop(); |
165 | 0 | is_active_ = false; |
166 | 0 | disable_timers(); |
167 | |
|
168 | 0 | changes_for_reader_.clear(); |
169 | 0 | last_acknack_count_ = 0; |
170 | 0 | last_nackfrag_count_ = 0; |
171 | 0 | changes_low_mark_ = SequenceNumber_t(); |
172 | 0 | } |
173 | | |
174 | | void ReaderProxy::disable_timers() |
175 | 0 | { |
176 | 0 | if (timers_enabled_.exchange(false)) |
177 | 0 | { |
178 | 0 | nack_supression_event_->cancel_timer(); |
179 | 0 | } |
180 | 0 | initial_heartbeat_event_->cancel_timer(); |
181 | 0 | } |
182 | | |
183 | | void ReaderProxy::update_nack_supression_interval( |
184 | | const Duration_t& interval) |
185 | 0 | { |
186 | 0 | nack_supression_event_->update_interval(interval); |
187 | 0 | } |
188 | | |
189 | | void ReaderProxy::add_change( |
190 | | const ChangeForReader_t& change, |
191 | | bool is_relevant, |
192 | | bool restart_nack_supression) |
193 | 0 | { |
194 | 0 | if (restart_nack_supression && timers_enabled_.load()) |
195 | 0 | { |
196 | 0 | nack_supression_event_->restart_timer(); |
197 | 0 | } |
198 | |
|
199 | 0 | add_change(change, is_relevant); |
200 | 0 | } |
201 | | |
202 | | void ReaderProxy::add_change( |
203 | | const ChangeForReader_t& change, |
204 | | bool is_relevant, |
205 | | bool restart_nack_supression, |
206 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) |
207 | 0 | { |
208 | 0 | if (restart_nack_supression && timers_enabled_) |
209 | 0 | { |
210 | 0 | nack_supression_event_->restart_timer(max_blocking_time); |
211 | 0 | } |
212 | |
|
213 | 0 | add_change(change, is_relevant); |
214 | 0 | } |
215 | | |
216 | | void ReaderProxy::add_change( |
217 | | const ChangeForReader_t& change, |
218 | | bool is_relevant) |
219 | 0 | { |
220 | 0 | assert(change.getSequenceNumber() > changes_low_mark_); |
221 | 0 | assert(changes_for_reader_.empty() ? true : |
222 | 0 | change.getSequenceNumber() > changes_for_reader_.back().getSequenceNumber()); |
223 | | |
224 | | // Irrelevant changes are not added to the collection |
225 | 0 | if (!is_relevant) |
226 | 0 | { |
227 | 0 | if ( !is_reliable_ && |
228 | 0 | changes_low_mark_ + 1 == change.getSequenceNumber()) |
229 | 0 | { |
230 | 0 | changes_low_mark_ = change.getSequenceNumber(); |
231 | 0 | } |
232 | 0 | return; |
233 | 0 | } |
234 | | |
235 | 0 | if (changes_for_reader_.push_back(change) == nullptr) |
236 | 0 | { |
237 | | // This should never happen |
238 | 0 | logError(RTPS_READER_PROXY, "Error adding change " << change.getSequenceNumber() |
239 | 0 | << " to reader proxy " << guid()); |
240 | 0 | eprosima::fastdds::dds::Log::Flush(); |
241 | 0 | assert(false); |
242 | 0 | } |
243 | 0 | } |
244 | | |
245 | | bool ReaderProxy::has_changes() const |
246 | 0 | { |
247 | 0 | return !changes_for_reader_.empty(); |
248 | 0 | } |
249 | | |
250 | | bool ReaderProxy::change_is_acked( |
251 | | const SequenceNumber_t& seq_num) const |
252 | 0 | { |
253 | 0 | if (seq_num <= changes_low_mark_ || changes_for_reader_.empty()) |
254 | 0 | { |
255 | 0 | return true; |
256 | 0 | } |
257 | | |
258 | 0 | ChangeConstIterator chit = find_change(seq_num); |
259 | 0 | if (chit == changes_for_reader_.end()) |
260 | 0 | { |
261 | | // There is a hole in changes_for_reader_ |
262 | | // This means a change was removed, or was not relevant. |
263 | 0 | return true; |
264 | 0 | } |
265 | | |
266 | 0 | return chit->getStatus() == ACKNOWLEDGED; |
267 | 0 | } |
268 | | |
269 | | bool ReaderProxy::change_is_unsent( |
270 | | const SequenceNumber_t& seq_num, |
271 | | FragmentNumber_t& next_unsent_frag, |
272 | | SequenceNumber_t& gap_seq, |
273 | | const SequenceNumber_t& min_seq, |
274 | | bool& need_reactivate_periodic_heartbeat) const |
275 | 0 | { |
276 | 0 | if (seq_num <= changes_low_mark_ || changes_for_reader_.empty()) |
277 | 0 | { |
278 | 0 | return false; |
279 | 0 | } |
280 | | |
281 | 0 | ChangeConstIterator chit = find_change(seq_num); |
282 | 0 | if (chit == changes_for_reader_.end()) |
283 | 0 | { |
284 | | // There is a hole in changes_for_reader_ |
285 | | // This means a change was removed. |
286 | 0 | return false; |
287 | 0 | } |
288 | | |
289 | 0 | bool returned_value = chit->getStatus() == UNSENT; |
290 | |
|
291 | 0 | if (returned_value) |
292 | 0 | { |
293 | 0 | next_unsent_frag = chit->get_next_unsent_fragment(); |
294 | 0 | gap_seq = SequenceNumber_t::unknown(); |
295 | |
|
296 | 0 | if (is_reliable_ && !chit->has_been_delivered()) |
297 | 0 | { |
298 | 0 | need_reactivate_periodic_heartbeat |= true; |
299 | 0 | SequenceNumber_t prev = |
300 | 0 | (changes_for_reader_.begin() != chit ? |
301 | 0 | std::prev(chit)->getSequenceNumber() : |
302 | 0 | changes_low_mark_ |
303 | 0 | ) + 1; |
304 | |
|
305 | 0 | if (prev != chit->getSequenceNumber()) |
306 | 0 | { |
307 | 0 | gap_seq = prev; |
308 | | |
309 | | // Verify the calculated gap_seq in ReaderProxy is a real hole in the history. |
310 | 0 | if (gap_seq < min_seq) // Several samples of the hole are not really already available. |
311 | 0 | { |
312 | 0 | if (min_seq < seq_num) |
313 | 0 | { |
314 | 0 | gap_seq = min_seq; |
315 | 0 | } |
316 | 0 | else |
317 | 0 | { |
318 | 0 | gap_seq = SequenceNumber_t::unknown(); |
319 | 0 | } |
320 | 0 | } |
321 | 0 | } |
322 | 0 | } |
323 | 0 | } |
324 | |
|
325 | 0 | return returned_value; |
326 | 0 | } |
327 | | |
328 | | void ReaderProxy::acked_changes_set( |
329 | | const SequenceNumber_t& seq_num) |
330 | 0 | { |
331 | 0 | SequenceNumber_t future_low_mark = seq_num; |
332 | |
|
333 | 0 | if (seq_num > changes_low_mark_) |
334 | 0 | { |
335 | 0 | ChangeIterator chit = find_change(seq_num, false); |
336 | | // continue advancing until next change is not acknowledged |
337 | 0 | while (chit != changes_for_reader_.end() |
338 | 0 | && chit->getSequenceNumber() == future_low_mark |
339 | 0 | && chit->getStatus() == ACKNOWLEDGED) |
340 | 0 | { |
341 | 0 | ++chit; |
342 | 0 | ++future_low_mark; |
343 | 0 | } |
344 | 0 | changes_for_reader_.erase(changes_for_reader_.begin(), chit); |
345 | 0 | } |
346 | 0 | else |
347 | 0 | { |
348 | 0 | future_low_mark = changes_low_mark_ + 1; |
349 | |
|
350 | 0 | if (seq_num == SequenceNumber_t() && durability_kind_ != DurabilityKind_t::VOLATILE) |
351 | 0 | { |
352 | | // Special case. Currently only used on Builtin StatefulWriters |
353 | | // after losing lease duration, and on late joiners to set |
354 | | // changes_low_mark_ to match that of the writer. |
355 | 0 | SequenceNumber_t min_sequence = writer_->get_seq_num_min(); |
356 | 0 | if (min_sequence != SequenceNumber_t::unknown()) |
357 | 0 | { |
358 | 0 | SequenceNumber_t current_sequence = seq_num; |
359 | 0 | if (seq_num < min_sequence) |
360 | 0 | { |
361 | 0 | current_sequence = min_sequence; |
362 | 0 | } |
363 | 0 | future_low_mark = current_sequence; |
364 | |
|
365 | 0 | bool should_sort = false; |
366 | 0 | for (; current_sequence <= changes_low_mark_; ++current_sequence) |
367 | 0 | { |
368 | | // Skip all consecutive changes already in the collection |
369 | 0 | ChangeConstIterator it = find_change(current_sequence); |
370 | 0 | while ( it != changes_for_reader_.end() && |
371 | 0 | current_sequence <= changes_low_mark_ && |
372 | 0 | it->getSequenceNumber() == current_sequence) |
373 | 0 | { |
374 | 0 | ++current_sequence; |
375 | 0 | ++it; |
376 | 0 | } |
377 | |
|
378 | 0 | if (current_sequence <= changes_low_mark_) |
379 | 0 | { |
380 | 0 | CacheChange_t* change = nullptr; |
381 | 0 | if (writer_->mp_history->get_change(current_sequence, writer_->getGuid(), &change)) |
382 | 0 | { |
383 | 0 | should_sort = true; |
384 | 0 | ChangeForReader_t cr(change); |
385 | 0 | cr.setStatus(UNACKNOWLEDGED); |
386 | 0 | changes_for_reader_.push_back(cr); |
387 | 0 | } |
388 | 0 | } |
389 | 0 | } |
390 | | // Keep changes sorted by sequence number |
391 | 0 | if (should_sort) |
392 | 0 | { |
393 | 0 | std::sort(changes_for_reader_.begin(), changes_for_reader_.end(), ChangeForReaderCmp()); |
394 | 0 | } |
395 | 0 | } |
396 | 0 | else if (!is_local_reader()) |
397 | 0 | { |
398 | 0 | future_low_mark = writer_->next_sequence_number(); |
399 | 0 | } |
400 | 0 | } |
401 | 0 | } |
402 | 0 | changes_low_mark_ = future_low_mark - 1; |
403 | 0 | } |
404 | | |
405 | | bool ReaderProxy::requested_changes_set( |
406 | | const SequenceNumberSet_t& seq_num_set, |
407 | | RTPSGapBuilder& gap_builder, |
408 | | const SequenceNumber_t& min_seq_in_history) |
409 | 0 | { |
410 | 0 | bool isSomeoneWasSetRequested = false; |
411 | |
|
412 | 0 | if (SequenceNumber_t::unknown() != min_seq_in_history) |
413 | 0 | { |
414 | 0 | seq_num_set.for_each([&](SequenceNumber_t sit) |
415 | 0 | { |
416 | 0 | ChangeIterator chit = find_change(sit, true); |
417 | 0 | if (chit != changes_for_reader_.end()) |
418 | 0 | { |
419 | 0 | if (UNACKNOWLEDGED == chit->getStatus()) |
420 | 0 | { |
421 | 0 | chit->setStatus(REQUESTED); |
422 | 0 | chit->markAllFragmentsAsUnsent(); |
423 | 0 | isSomeoneWasSetRequested = true; |
424 | 0 | } |
425 | 0 | } |
426 | 0 | else if ((sit >= min_seq_in_history) && (sit > changes_low_mark_)) |
427 | 0 | { |
428 | 0 | gap_builder.add(sit); |
429 | 0 | } |
430 | 0 | }); |
431 | 0 | } |
432 | |
|
433 | 0 | if (isSomeoneWasSetRequested) |
434 | 0 | { |
435 | 0 | logInfo(RTPS_READER_PROXY, "Requested Changes: " << seq_num_set); |
436 | 0 | } |
437 | |
|
438 | 0 | return isSomeoneWasSetRequested; |
439 | 0 | } |
440 | | |
441 | | bool ReaderProxy::process_initial_acknack( |
442 | | const std::function<void(ChangeForReader_t& change)>& func) |
443 | 0 | { |
444 | 0 | if (is_local_reader()) |
445 | 0 | { |
446 | 0 | return 0 != convert_status_on_all_changes(UNACKNOWLEDGED, UNSENT, func); |
447 | 0 | } |
448 | | |
449 | 0 | return true; |
450 | 0 | } |
451 | | |
452 | | void ReaderProxy::from_unsent_to_status( |
453 | | const SequenceNumber_t& seq_num, |
454 | | ChangeForReaderStatus_t status, |
455 | | bool restart_nack_supression, |
456 | | bool delivered) |
457 | 0 | { |
458 | | // This function must not be called by a best-effort reader. |
459 | | // It will use acked_changes_set(). |
460 | 0 | assert(is_reliable_); |
461 | |
|
462 | 0 | if (restart_nack_supression && is_remote_and_reliable()) |
463 | 0 | { |
464 | 0 | assert(timers_enabled_.load()); |
465 | 0 | nack_supression_event_->restart_timer(); |
466 | 0 | } |
467 | | |
468 | | // Called when delivering an UNSENT sample, the seq_number must exists in the ReaderProxy. |
469 | 0 | assert(seq_num > changes_low_mark_); |
470 | 0 | ChangeIterator it = find_change(seq_num, true); |
471 | 0 | assert(changes_for_reader_.end() != it); |
472 | 0 | assert(UNSENT == it->getStatus()); |
473 | 0 | assert(UNSENT != status); |
474 | |
|
475 | 0 | if (ACKNOWLEDGED == status && seq_num == changes_low_mark_ + 1) |
476 | 0 | { |
477 | 0 | assert(changes_for_reader_.begin() == it); |
478 | 0 | changes_for_reader_.erase(it); |
479 | 0 | acked_changes_set(seq_num + 1); |
480 | 0 | return; |
481 | 0 | } |
482 | | |
483 | 0 | it->setStatus(status); |
484 | |
|
485 | 0 | if (delivered) |
486 | 0 | { |
487 | 0 | it->set_delivered(); |
488 | 0 | } |
489 | 0 | } |
490 | | |
491 | | bool ReaderProxy::mark_fragment_as_sent_for_change( |
492 | | const SequenceNumber_t& seq_num, |
493 | | FragmentNumber_t frag_num, |
494 | | bool& was_last_fragment) |
495 | 0 | { |
496 | 0 | was_last_fragment = false; |
497 | |
|
498 | 0 | if (seq_num <= changes_low_mark_) |
499 | 0 | { |
500 | 0 | return false; |
501 | 0 | } |
502 | | |
503 | 0 | bool change_found = false; |
504 | 0 | ChangeIterator it = find_change(seq_num, true); |
505 | |
|
506 | 0 | if (it != changes_for_reader_.end()) |
507 | 0 | { |
508 | 0 | change_found = true; |
509 | 0 | it->markFragmentsAsSent(frag_num); |
510 | 0 | was_last_fragment = it->getUnsentFragments().empty(); |
511 | 0 | } |
512 | |
|
513 | 0 | return change_found; |
514 | 0 | } |
515 | | |
516 | | bool ReaderProxy::perform_nack_supression() |
517 | 0 | { |
518 | 0 | return 0 != convert_status_on_all_changes(UNDERWAY, UNACKNOWLEDGED); |
519 | 0 | } |
520 | | |
521 | | uint32_t ReaderProxy::perform_acknack_response( |
522 | | const std::function<void(ChangeForReader_t& change)>& func) |
523 | 0 | { |
524 | 0 | return convert_status_on_all_changes(REQUESTED, UNSENT, func); |
525 | 0 | } |
526 | | |
527 | | uint32_t ReaderProxy::convert_status_on_all_changes( |
528 | | ChangeForReaderStatus_t previous, |
529 | | ChangeForReaderStatus_t next, |
530 | | const std::function<void(ChangeForReader_t& change)>& func) |
531 | 0 | { |
532 | 0 | assert(previous > next); |
533 | | |
534 | | // NOTE: This is only called for REQUESTED=>UNSENT (acknack response) or |
535 | | // UNDERWAY=>UNACKNOWLEDGED (nack supression) |
536 | |
|
537 | 0 | uint32_t changed = 0; |
538 | 0 | for (ChangeForReader_t& change : changes_for_reader_) |
539 | 0 | { |
540 | 0 | if (change.getStatus() == previous) |
541 | 0 | { |
542 | 0 | ++changed; |
543 | 0 | change.setStatus(next); |
544 | |
|
545 | 0 | if (func) |
546 | 0 | { |
547 | 0 | func(change); |
548 | 0 | } |
549 | 0 | } |
550 | 0 | } |
551 | |
|
552 | 0 | return changed; |
553 | 0 | } |
554 | | |
555 | | void ReaderProxy::change_has_been_removed( |
556 | | const SequenceNumber_t& seq_num) |
557 | 0 | { |
558 | | // Check sequence number is in the container, because it was not clean up. |
559 | 0 | if (changes_for_reader_.empty() || seq_num < changes_for_reader_.begin()->getSequenceNumber()) |
560 | 0 | { |
561 | 0 | return; |
562 | 0 | } |
563 | | |
564 | 0 | auto chit = find_change(seq_num); |
565 | |
|
566 | 0 | if (chit == this->changes_for_reader_.end()) |
567 | 0 | { |
568 | | // No change for this sequence number |
569 | 0 | return; |
570 | 0 | } |
571 | | |
572 | | // In intraprocess, if there is an UNACKNOWLEDGED, a GAP has to be send because there is no reliable mechanism. |
573 | 0 | if (is_local_reader() && ACKNOWLEDGED > chit->getStatus()) |
574 | 0 | { |
575 | 0 | writer_->intraprocess_gap(this, seq_num); |
576 | 0 | } |
577 | | |
578 | | // Element may not be in the container when marked as irrelevant. |
579 | 0 | changes_for_reader_.erase(chit); |
580 | | |
581 | | // When removing the next-to-be-acknowledged, we should auto-acknowledge it. |
582 | 0 | if ((changes_low_mark_ + 1) == seq_num) |
583 | 0 | { |
584 | 0 | acked_changes_set(seq_num + 1); |
585 | 0 | } |
586 | 0 | } |
587 | | |
588 | | bool ReaderProxy::has_unacknowledged( |
589 | | const SequenceNumber_t& first_seq_in_history) const |
590 | 0 | { |
591 | 0 | if (first_seq_in_history > changes_low_mark_) |
592 | 0 | { |
593 | 0 | return true; |
594 | 0 | } |
595 | | |
596 | 0 | for (const ChangeForReader_t& it : changes_for_reader_) |
597 | 0 | { |
598 | 0 | if (it.getStatus() == UNACKNOWLEDGED) |
599 | 0 | { |
600 | 0 | return true; |
601 | 0 | } |
602 | 0 | } |
603 | | |
604 | 0 | return false; |
605 | 0 | } |
606 | | |
607 | | bool ReaderProxy::requested_fragment_set( |
608 | | const SequenceNumber_t& seq_num, |
609 | | const FragmentNumberSet_t& frag_set) |
610 | 0 | { |
611 | | // Locate the outbound change referenced by the NACK_FRAG |
612 | 0 | ChangeIterator changeIter = find_change(seq_num, true); |
613 | 0 | if (changeIter == changes_for_reader_.end()) |
614 | 0 | { |
615 | 0 | return false; |
616 | 0 | } |
617 | | |
618 | 0 | changeIter->markFragmentsAsUnsent(frag_set); |
619 | | |
620 | | // If it was UNSENT, we shouldn't switch back to REQUESTED to prevent stalling. |
621 | 0 | if (changeIter->getStatus() != UNSENT) |
622 | 0 | { |
623 | 0 | changeIter->setStatus(REQUESTED); |
624 | 0 | } |
625 | |
|
626 | 0 | return true; |
627 | 0 | } |
628 | | |
629 | | bool ReaderProxy::process_nack_frag( |
630 | | const GUID_t& reader_guid, |
631 | | uint32_t nack_count, |
632 | | const SequenceNumber_t& seq_num, |
633 | | const FragmentNumberSet_t& fragments_state) |
634 | 0 | { |
635 | 0 | if (guid() == reader_guid) |
636 | 0 | { |
637 | 0 | if (last_nackfrag_count_ < nack_count) |
638 | 0 | { |
639 | 0 | last_nackfrag_count_ = nack_count; |
640 | 0 | if (requested_fragment_set(seq_num, fragments_state)) |
641 | 0 | { |
642 | 0 | return true; |
643 | 0 | } |
644 | 0 | } |
645 | 0 | } |
646 | | |
647 | 0 | return false; |
648 | 0 | } |
649 | | |
650 | | static bool change_less_than_sequence( |
651 | | const ChangeForReader_t& change, |
652 | | const SequenceNumber_t& seq_num) |
653 | 0 | { |
654 | 0 | return change.getSequenceNumber() < seq_num; |
655 | 0 | } |
656 | | |
657 | | ReaderProxy::ChangeIterator ReaderProxy::find_change( |
658 | | const SequenceNumber_t& seq_num, |
659 | | bool exact) |
660 | 0 | { |
661 | 0 | ReaderProxy::ChangeIterator it; |
662 | 0 | ReaderProxy::ChangeIterator end = changes_for_reader_.end(); |
663 | 0 | it = std::lower_bound(changes_for_reader_.begin(), end, seq_num, change_less_than_sequence); |
664 | |
|
665 | 0 | return (!exact) |
666 | 0 | ? it |
667 | 0 | : it == end |
668 | 0 | ? it |
669 | 0 | : it->getSequenceNumber() == seq_num ? it : end; |
670 | 0 | } |
671 | | |
672 | | ReaderProxy::ChangeConstIterator ReaderProxy::find_change( |
673 | | const SequenceNumber_t& seq_num) const |
674 | 0 | { |
675 | 0 | ReaderProxy::ChangeConstIterator it; |
676 | 0 | ReaderProxy::ChangeConstIterator end = changes_for_reader_.end(); |
677 | 0 | it = std::lower_bound(changes_for_reader_.begin(), end, seq_num, change_less_than_sequence); |
678 | |
|
679 | 0 | return it == end |
680 | 0 | ? it |
681 | 0 | : it->getSequenceNumber() == seq_num ? it : end; |
682 | 0 | } |
683 | | |
684 | | } // namespace rtps |
685 | | } // namespace fastrtps |
686 | | } // namespace eprosima |