/src/Fast-DDS/src/cpp/rtps/writer/ReaderProxy.cpp
Line | Count | Source |
1 | | // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | /** |
16 | | * @file ReaderProxy.cpp |
17 | | * |
18 | | */ |
19 | | |
20 | | #include <rtps/writer/ReaderProxy.hpp> |
21 | | |
22 | | #include <algorithm> |
23 | | #include <cassert> |
24 | | #include <cstdint> |
25 | | #include <mutex> |
26 | | |
27 | | #include <fastdds/dds/log/Log.hpp> |
28 | | #include <fastdds/rtps/history/WriterHistory.hpp> |
29 | | #include <fastdds/rtps/common/LocatorListComparisons.hpp> |
30 | | |
31 | | #include <rtps/DataSharing/DataSharingNotifier.hpp> |
32 | | #include <rtps/history/HistoryAttributesExtension.hpp> |
33 | | #include <rtps/messages/RTPSGapBuilder.hpp> |
34 | | #include <rtps/participant/RTPSParticipantImpl.hpp> |
35 | | #include <rtps/resources/TimedEvent.h> |
36 | | #include <rtps/writer/StatefulWriter.hpp> |
37 | | #include <rtps/writer/StatefulWriterListener.hpp> |
38 | | #include <utils/TimeConversion.hpp> |
39 | | |
40 | | |
41 | | namespace eprosima { |
42 | | namespace fastdds { |
43 | | namespace rtps { |
44 | | |
45 | | ReaderProxy::ReaderProxy( |
46 | | const WriterTimes& times, |
47 | | const RemoteLocatorsAllocationAttributes& loc_alloc, |
48 | | StatefulWriter* writer, |
49 | | StatefulWriterListener* stateful_listener) |
50 | 0 | : is_active_(false) |
51 | 0 | , locator_info_( |
52 | 0 | writer, loc_alloc.max_unicast_locators, |
53 | 0 | loc_alloc.max_multicast_locators) |
54 | 0 | , durability_kind_(VOLATILE) |
55 | 0 | , expects_inline_qos_(false) |
56 | 0 | , is_reliable_(false) |
57 | 0 | , disable_positive_acks_(false) |
58 | 0 | , writer_(writer) |
59 | 0 | , changes_for_reader_(resource_limits_from_history(writer->get_history()->m_att, 0)) |
60 | 0 | , nack_supression_event_(nullptr) |
61 | 0 | , initial_heartbeat_event_(nullptr) |
62 | 0 | , timers_enabled_(false) |
63 | 0 | , next_expected_acknack_count_(0) |
64 | 0 | , last_nackfrag_count_(0) |
65 | 0 | , stateful_writer_listener_(stateful_listener) |
66 | 0 | { |
67 | 0 | auto participant = writer_->get_participant_impl(); |
68 | 0 | if (nullptr != participant) |
69 | 0 | { |
70 | 0 | nack_supression_event_ = new TimedEvent(participant->getEventResource(), |
71 | 0 | [&]() -> bool |
72 | 0 | { |
73 | 0 | writer_->perform_nack_supression(guid()); |
74 | 0 | return false; |
75 | 0 | }, |
76 | 0 | fastdds::rtps::TimeConv::Time_t2MilliSecondsDouble(times.nack_supression_duration)); |
77 | |
|
78 | 0 | initial_heartbeat_event_ = new TimedEvent(participant->getEventResource(), |
79 | 0 | [&]() -> bool |
80 | 0 | { |
81 | 0 | writer_->intraprocess_heartbeat(this); |
82 | 0 | return false; |
83 | 0 | }, 0); |
84 | 0 | } |
85 | |
|
86 | 0 | stop(); |
87 | 0 | } |
88 | | |
89 | | bool ReaderProxy::rtps_is_relevant( |
90 | | CacheChange_t* change) const |
91 | 0 | { |
92 | 0 | auto filter = writer_->reader_data_filter(); |
93 | 0 | if (nullptr != filter) |
94 | 0 | { |
95 | 0 | bool ret = filter->is_relevant(*change, guid()); |
96 | 0 | EPROSIMA_LOG_INFO(RTPS_READER_PROXY, |
97 | 0 | "Change " << change->instanceHandle << " is relevant for reader " << guid() << "? " << ret); |
98 | 0 | return ret; |
99 | 0 | } |
100 | 0 | return true; |
101 | 0 | } |
102 | | |
103 | | ReaderProxy::~ReaderProxy() |
104 | 0 | { |
105 | 0 | if (nack_supression_event_) |
106 | 0 | { |
107 | 0 | delete(nack_supression_event_); |
108 | 0 | nack_supression_event_ = nullptr; |
109 | 0 | } |
110 | |
|
111 | 0 | if (initial_heartbeat_event_) |
112 | 0 | { |
113 | 0 | delete(initial_heartbeat_event_); |
114 | 0 | initial_heartbeat_event_ = nullptr; |
115 | 0 | } |
116 | 0 | } |
117 | | |
118 | | void ReaderProxy::start( |
119 | | const ReaderProxyData& reader_attributes, |
120 | | bool is_datasharing) |
121 | 0 | { |
122 | 0 | locator_info_.start( |
123 | 0 | reader_attributes.guid, |
124 | 0 | reader_attributes.remote_locators.unicast, |
125 | 0 | reader_attributes.remote_locators.multicast, |
126 | 0 | reader_attributes.expects_inline_qos, |
127 | 0 | is_datasharing); |
128 | |
|
129 | 0 | is_active_ = true; |
130 | 0 | durability_kind_ = reader_attributes.durability.durabilityKind(); |
131 | 0 | expects_inline_qos_ = reader_attributes.expects_inline_qos; |
132 | 0 | is_reliable_ = reader_attributes.reliability.kind != dds::BEST_EFFORT_RELIABILITY_QOS; |
133 | 0 | disable_positive_acks_ = reader_attributes.disable_positive_acks_enabled(); |
134 | 0 | if (durability_kind_ == DurabilityKind_t::VOLATILE) |
135 | 0 | { |
136 | 0 | SequenceNumber_t min_sequence = writer_->get_seq_num_min(); |
137 | 0 | changes_low_mark_ = (min_sequence == SequenceNumber_t::unknown()) ? |
138 | 0 | writer_->next_sequence_number() - 1 : min_sequence - 1; |
139 | 0 | } |
140 | 0 | else |
141 | 0 | { |
142 | 0 | acked_changes_set(SequenceNumber_t()); // Simulate initial acknack to set low mark |
143 | 0 | } |
144 | |
|
145 | 0 | timers_enabled_.store(is_remote_and_reliable()); |
146 | 0 | if (is_local_reader() && initial_heartbeat_event_) |
147 | 0 | { |
148 | 0 | initial_heartbeat_event_->restart_timer(); |
149 | 0 | } |
150 | |
|
151 | 0 | EPROSIMA_LOG_INFO(RTPS_READER_PROXY, "Reader Proxy started"); |
152 | 0 | } |
153 | | |
154 | | bool ReaderProxy::update( |
155 | | const ReaderProxyData& reader_attributes) |
156 | 0 | { |
157 | 0 | durability_kind_ = reader_attributes.durability.durabilityKind(); |
158 | 0 | expects_inline_qos_ = reader_attributes.expects_inline_qos; |
159 | 0 | is_reliable_ = reader_attributes.reliability.kind != dds::BEST_EFFORT_RELIABILITY_QOS; |
160 | 0 | disable_positive_acks_ = reader_attributes.disable_positive_acks_enabled(); |
161 | |
|
162 | 0 | locator_info_.update( |
163 | 0 | reader_attributes.remote_locators.unicast, |
164 | 0 | reader_attributes.remote_locators.multicast, |
165 | 0 | reader_attributes.expects_inline_qos); |
166 | |
|
167 | 0 | return true; |
168 | 0 | } |
169 | | |
170 | | void ReaderProxy::stop() |
171 | 0 | { |
172 | 0 | locator_info_.stop(); |
173 | 0 | is_active_ = false; |
174 | 0 | disable_timers(); |
175 | |
|
176 | 0 | changes_for_reader_.clear(); |
177 | 0 | next_expected_acknack_count_ = 0; |
178 | 0 | last_nackfrag_count_ = 0; |
179 | 0 | changes_low_mark_ = SequenceNumber_t(); |
180 | |
|
181 | 0 | first_irrelevant_removed_ = SequenceNumber_t::unknown(); |
182 | 0 | last_irrelevant_removed_ = SequenceNumber_t::unknown(); |
183 | 0 | } |
184 | | |
185 | | void ReaderProxy::disable_timers() |
186 | 0 | { |
187 | 0 | if (timers_enabled_.exchange(false) && nack_supression_event_) |
188 | 0 | { |
189 | 0 | nack_supression_event_->cancel_timer(); |
190 | 0 | } |
191 | 0 | if (initial_heartbeat_event_) |
192 | 0 | { |
193 | 0 | initial_heartbeat_event_->cancel_timer(); |
194 | 0 | } |
195 | 0 | } |
196 | | |
197 | | void ReaderProxy::update_nack_supression_interval( |
198 | | const dds::Duration_t& interval) |
199 | 0 | { |
200 | 0 | if (nack_supression_event_) |
201 | 0 | { |
202 | 0 | nack_supression_event_->update_interval(interval); |
203 | 0 | } |
204 | 0 | } |
205 | | |
206 | | void ReaderProxy::add_change( |
207 | | const ChangeForReader_t& change, |
208 | | bool is_relevant, |
209 | | bool restart_nack_supression) |
210 | 0 | { |
211 | 0 | if (restart_nack_supression && timers_enabled_.load() && nack_supression_event_) |
212 | 0 | { |
213 | 0 | nack_supression_event_->restart_timer(); |
214 | 0 | } |
215 | |
|
216 | 0 | add_change(change, is_relevant); |
217 | 0 | } |
218 | | |
219 | | void ReaderProxy::add_change( |
220 | | const ChangeForReader_t& change, |
221 | | bool is_relevant, |
222 | | bool restart_nack_supression, |
223 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) |
224 | 0 | { |
225 | 0 | if (restart_nack_supression && timers_enabled_ && nack_supression_event_) |
226 | 0 | { |
227 | 0 | nack_supression_event_->restart_timer(max_blocking_time); |
228 | 0 | } |
229 | |
|
230 | 0 | add_change(change, is_relevant); |
231 | 0 | } |
232 | | |
233 | | void ReaderProxy::add_change( |
234 | | const ChangeForReader_t& change, |
235 | | bool is_relevant) |
236 | 0 | { |
237 | 0 | SequenceNumber_t seq_num {change.getSequenceNumber()}; |
238 | 0 | assert(seq_num > changes_low_mark_); |
239 | 0 | assert(changes_for_reader_.empty() ? true : |
240 | 0 | seq_num > changes_for_reader_.back().getSequenceNumber()); |
241 | | |
242 | | // Irrelevant changes are not added to the collection |
243 | 0 | if (!is_relevant) |
244 | 0 | { |
245 | 0 | if (is_reliable_) |
246 | 0 | { |
247 | 0 | if (!is_local_reader()) |
248 | 0 | { |
249 | 0 | if (SequenceNumber_t::unknown() == first_irrelevant_removed_) |
250 | 0 | { |
251 | 0 | first_irrelevant_removed_ = seq_num; |
252 | 0 | last_irrelevant_removed_ = seq_num; |
253 | 0 | } |
254 | 0 | else if (seq_num == last_irrelevant_removed_ + 1) |
255 | 0 | { |
256 | 0 | last_irrelevant_removed_ = seq_num; |
257 | 0 | } |
258 | 0 | } |
259 | 0 | } |
260 | 0 | else if (changes_low_mark_ + 1 == seq_num) |
261 | 0 | { |
262 | 0 | changes_low_mark_ = seq_num; |
263 | 0 | } |
264 | 0 | return; |
265 | 0 | } |
266 | | |
267 | 0 | if (changes_for_reader_.push_back(change) == nullptr) |
268 | 0 | { |
269 | | // This should never happen |
270 | 0 | EPROSIMA_LOG_ERROR(RTPS_READER_PROXY, "Error adding change " << seq_num |
271 | 0 | << " to reader proxy " << guid()); |
272 | 0 | eprosima::fastdds::dds::Log::Flush(); |
273 | 0 | assert(false); |
274 | 0 | } |
275 | 0 | } |
276 | | |
277 | | bool ReaderProxy::has_changes() const |
278 | 0 | { |
279 | 0 | return !changes_for_reader_.empty(); |
280 | 0 | } |
281 | | |
282 | | bool ReaderProxy::change_is_acked( |
283 | | const SequenceNumber_t& seq_num) const |
284 | 0 | { |
285 | 0 | if (seq_num <= changes_low_mark_ || changes_for_reader_.empty()) |
286 | 0 | { |
287 | 0 | return true; |
288 | 0 | } |
289 | | |
290 | 0 | ChangeConstIterator chit = find_change(seq_num); |
291 | 0 | if (chit == changes_for_reader_.end()) |
292 | 0 | { |
293 | | // There is a hole in changes_for_reader_ |
294 | | // This means a change was removed, or was not relevant. |
295 | 0 | return true; |
296 | 0 | } |
297 | | |
298 | 0 | return chit->getStatus() == ACKNOWLEDGED; |
299 | 0 | } |
300 | | |
301 | | bool ReaderProxy::change_is_unsent( |
302 | | const SequenceNumber_t& seq_num, |
303 | | FragmentNumber_t& next_unsent_frag, |
304 | | SequenceNumber_t& gap_seq, |
305 | | const SequenceNumber_t& min_seq, |
306 | | bool& need_reactivate_periodic_heartbeat) |
307 | 0 | { |
308 | 0 | if (seq_num <= changes_low_mark_ || changes_for_reader_.empty()) |
309 | 0 | { |
310 | 0 | return false; |
311 | 0 | } |
312 | | |
313 | 0 | ChangeConstIterator chit = find_change(seq_num); |
314 | 0 | if (chit == changes_for_reader_.end()) |
315 | 0 | { |
316 | | // There is a hole in changes_for_reader_ |
317 | | // This means a change was removed. |
318 | 0 | return false; |
319 | 0 | } |
320 | | |
321 | 0 | bool returned_value = chit->getStatus() == UNSENT; |
322 | |
|
323 | 0 | if (returned_value) |
324 | 0 | { |
325 | 0 | next_unsent_frag = chit->get_next_unsent_fragment(); |
326 | 0 | gap_seq = SequenceNumber_t::unknown(); |
327 | |
|
328 | 0 | if (is_reliable_ && !chit->has_been_delivered()) |
329 | 0 | { |
330 | 0 | need_reactivate_periodic_heartbeat |= true; |
331 | 0 | SequenceNumber_t prev = |
332 | 0 | (changes_for_reader_.begin() != chit ? |
333 | 0 | std::prev(chit)->getSequenceNumber() : |
334 | 0 | changes_low_mark_ |
335 | 0 | ) + 1; |
336 | |
|
337 | 0 | if (prev != chit->getSequenceNumber()) |
338 | 0 | { |
339 | 0 | gap_seq = prev; |
340 | | |
341 | | // Verify the calculated gap_seq in ReaderProxy is a real hole in the history. |
342 | 0 | if (gap_seq < min_seq) // Several samples of the hole are not really already available. |
343 | 0 | { |
344 | 0 | if (min_seq < seq_num) |
345 | 0 | { |
346 | 0 | gap_seq = min_seq; |
347 | 0 | } |
348 | 0 | else |
349 | 0 | { |
350 | 0 | gap_seq = SequenceNumber_t::unknown(); |
351 | 0 | } |
352 | 0 | } |
353 | |
|
354 | 0 | if (SequenceNumber_t::unknown() != first_irrelevant_removed_ && |
355 | 0 | SequenceNumber_t::unknown() != gap_seq) |
356 | 0 | { |
357 | | // Check if the hole is due to irrelevant changes removed without informing the reader |
358 | 0 | if (first_irrelevant_removed_ <= gap_seq ) |
359 | 0 | { |
360 | 0 | if (gap_seq == first_irrelevant_removed_) |
361 | 0 | { |
362 | 0 | first_irrelevant_removed_ = SequenceNumber_t::unknown(); |
363 | 0 | last_irrelevant_removed_ = SequenceNumber_t::unknown(); |
364 | 0 | } |
365 | 0 | else if (gap_seq < last_irrelevant_removed_) |
366 | 0 | { |
367 | 0 | last_irrelevant_removed_ = gap_seq - 1; |
368 | 0 | } |
369 | 0 | } |
370 | 0 | } |
371 | 0 | } |
372 | 0 | } |
373 | 0 | } |
374 | |
|
375 | 0 | return returned_value; |
376 | 0 | } |
377 | | |
378 | | void ReaderProxy::acked_changes_set( |
379 | | const SequenceNumber_t& seq_num) |
380 | 0 | { |
381 | 0 | SequenceNumber_t future_low_mark = seq_num; |
382 | |
|
383 | 0 | if (seq_num > changes_low_mark_) |
384 | 0 | { |
385 | 0 | ChangeIterator chit = find_change(seq_num, false); |
386 | | // continue advancing until next change is not acknowledged |
387 | 0 | while (chit != changes_for_reader_.end() |
388 | 0 | && chit->getSequenceNumber() == future_low_mark |
389 | 0 | && chit->getStatus() == ACKNOWLEDGED) |
390 | 0 | { |
391 | 0 | ++chit; |
392 | 0 | ++future_low_mark; |
393 | 0 | } |
394 | 0 | if (stateful_writer_listener_ != nullptr) |
395 | 0 | { |
396 | 0 | for (auto it = changes_for_reader_.begin(); it != chit; ++it) |
397 | 0 | { |
398 | 0 | notify_acknowledged(*it); |
399 | 0 | } |
400 | 0 | } |
401 | 0 | changes_for_reader_.erase(changes_for_reader_.begin(), chit); |
402 | 0 | } |
403 | 0 | else |
404 | 0 | { |
405 | 0 | future_low_mark = changes_low_mark_ + 1; |
406 | |
|
407 | 0 | if (seq_num == SequenceNumber_t() && durability_kind_ != DurabilityKind_t::VOLATILE) |
408 | 0 | { |
409 | | // Special case. Currently only used on Builtin StatefulWriters |
410 | | // after losing lease duration, and on late joiners to set |
411 | | // changes_low_mark_ to match that of the writer. |
412 | 0 | SequenceNumber_t min_sequence = writer_->get_seq_num_min(); |
413 | 0 | if (min_sequence != SequenceNumber_t::unknown()) |
414 | 0 | { |
415 | 0 | SequenceNumber_t current_sequence = seq_num; |
416 | 0 | if (seq_num < min_sequence) |
417 | 0 | { |
418 | 0 | current_sequence = min_sequence; |
419 | 0 | } |
420 | 0 | future_low_mark = current_sequence; |
421 | |
|
422 | 0 | bool should_sort = false; |
423 | 0 | for (; current_sequence <= changes_low_mark_; ++current_sequence) |
424 | 0 | { |
425 | | // Skip all consecutive changes already in the collection |
426 | 0 | ChangeConstIterator it = find_change(current_sequence); |
427 | 0 | while ( it != changes_for_reader_.end() && |
428 | 0 | current_sequence <= changes_low_mark_ && |
429 | 0 | it->getSequenceNumber() == current_sequence) |
430 | 0 | { |
431 | 0 | ++current_sequence; |
432 | 0 | ++it; |
433 | 0 | } |
434 | |
|
435 | 0 | if (current_sequence <= changes_low_mark_) |
436 | 0 | { |
437 | 0 | CacheChange_t* change = nullptr; |
438 | 0 | if (writer_->get_history()->get_change(current_sequence, writer_->getGuid(), &change)) |
439 | 0 | { |
440 | 0 | should_sort = true; |
441 | 0 | ChangeForReader_t cr(change); |
442 | 0 | cr.setStatus(UNACKNOWLEDGED); |
443 | 0 | changes_for_reader_.push_back(cr); |
444 | 0 | } |
445 | 0 | } |
446 | 0 | } |
447 | | // Keep changes sorted by sequence number |
448 | 0 | if (should_sort) |
449 | 0 | { |
450 | 0 | std::sort(changes_for_reader_.begin(), changes_for_reader_.end(), ChangeForReaderCmp()); |
451 | 0 | } |
452 | 0 | } |
453 | 0 | else if (!is_local_reader()) |
454 | 0 | { |
455 | 0 | future_low_mark = writer_->next_sequence_number(); |
456 | 0 | } |
457 | 0 | } |
458 | 0 | } |
459 | 0 | changes_low_mark_ = future_low_mark - 1; |
460 | 0 | } |
461 | | |
462 | | bool ReaderProxy::requested_changes_set( |
463 | | const SequenceNumberSet_t& seq_num_set, |
464 | | RTPSGapBuilder& gap_builder, |
465 | | const SequenceNumber_t& min_seq_in_history) |
466 | 0 | { |
467 | 0 | bool isSomeoneWasSetRequested = false; |
468 | |
|
469 | 0 | if (SequenceNumber_t::unknown() != min_seq_in_history) |
470 | 0 | { |
471 | 0 | seq_num_set.for_each([&](SequenceNumber_t sit) |
472 | 0 | { |
473 | 0 | ChangeIterator chit = find_change(sit, true); |
474 | 0 | if (chit != changes_for_reader_.end()) |
475 | 0 | { |
476 | 0 | if (UNACKNOWLEDGED == chit->getStatus()) |
477 | 0 | { |
478 | 0 | chit->setStatus(REQUESTED); |
479 | 0 | chit->markAllFragmentsAsUnsent(); |
480 | 0 | isSomeoneWasSetRequested = true; |
481 | 0 | } |
482 | 0 | } |
483 | 0 | else if ((sit >= min_seq_in_history) && (sit > changes_low_mark_)) |
484 | 0 | { |
485 | 0 | gap_builder.add(sit); |
486 | |
|
487 | 0 | if (SequenceNumber_t::unknown() != first_irrelevant_removed_) |
488 | 0 | { |
489 | | // Check if the hole is due to irrelevant changes removed without informing the reader |
490 | 0 | if (first_irrelevant_removed_ <= sit ) |
491 | 0 | { |
492 | 0 | if (sit == first_irrelevant_removed_) |
493 | 0 | { |
494 | 0 | first_irrelevant_removed_ = SequenceNumber_t::unknown(); |
495 | 0 | last_irrelevant_removed_ = SequenceNumber_t::unknown(); |
496 | 0 | } |
497 | 0 | else if (sit < last_irrelevant_removed_) |
498 | 0 | { |
499 | 0 | last_irrelevant_removed_ = sit - 1; |
500 | 0 | } |
501 | 0 | } |
502 | 0 | } |
503 | 0 | } |
504 | 0 | }); |
505 | 0 | } |
506 | |
|
507 | 0 | if (isSomeoneWasSetRequested) |
508 | 0 | { |
509 | 0 | EPROSIMA_LOG_INFO(RTPS_READER_PROXY, "Requested Changes: " << seq_num_set); |
510 | 0 | } |
511 | |
|
512 | 0 | return isSomeoneWasSetRequested; |
513 | 0 | } |
514 | | |
515 | | bool ReaderProxy::process_initial_acknack( |
516 | | const std::function<void(ChangeForReader_t& change)>& func) |
517 | 0 | { |
518 | 0 | if (is_local_reader()) |
519 | 0 | { |
520 | 0 | return 0 != convert_status_on_all_changes(UNACKNOWLEDGED, UNSENT, false, func); |
521 | 0 | } |
522 | | |
523 | 0 | return true; |
524 | 0 | } |
525 | | |
526 | | void ReaderProxy::from_unsent_to_status( |
527 | | const SequenceNumber_t& seq_num, |
528 | | ChangeForReaderStatus_t status, |
529 | | bool restart_nack_supression, |
530 | | bool delivered) |
531 | 0 | { |
532 | | // This function must not be called by a best-effort reader. |
533 | | // It will use acked_changes_set(). |
534 | 0 | assert(is_reliable_); |
535 | |
|
536 | 0 | if (restart_nack_supression && is_remote_and_reliable() && nack_supression_event_) |
537 | 0 | { |
538 | 0 | assert(timers_enabled_.load()); |
539 | 0 | nack_supression_event_->restart_timer(); |
540 | 0 | } |
541 | | |
542 | | // Called when delivering an UNSENT sample, the seq_number must exists in the ReaderProxy. |
543 | 0 | assert(seq_num > changes_low_mark_); |
544 | 0 | ChangeIterator it = find_change(seq_num, true); |
545 | 0 | assert(changes_for_reader_.end() != it); |
546 | 0 | assert(UNSENT == it->getStatus()); |
547 | 0 | assert(UNSENT != status); |
548 | |
|
549 | 0 | if (ACKNOWLEDGED == status && seq_num == changes_low_mark_ + 1) |
550 | 0 | { |
551 | 0 | assert(changes_for_reader_.begin() == it); |
552 | 0 | notify_acknowledged(*it); |
553 | 0 | changes_for_reader_.erase(it); |
554 | 0 | acked_changes_set(seq_num + 1); |
555 | 0 | return; |
556 | 0 | } |
557 | | |
558 | 0 | it->setStatus(status); |
559 | |
|
560 | 0 | if (delivered) |
561 | 0 | { |
562 | 0 | it->set_delivered(); |
563 | 0 | } |
564 | 0 | } |
565 | | |
566 | | bool ReaderProxy::mark_fragment_as_sent_for_change( |
567 | | const SequenceNumber_t& seq_num, |
568 | | FragmentNumber_t frag_num, |
569 | | bool& was_last_fragment) |
570 | 0 | { |
571 | 0 | was_last_fragment = false; |
572 | |
|
573 | 0 | if (seq_num <= changes_low_mark_) |
574 | 0 | { |
575 | 0 | return false; |
576 | 0 | } |
577 | | |
578 | 0 | bool change_found = false; |
579 | 0 | ChangeIterator it = find_change(seq_num, true); |
580 | |
|
581 | 0 | if (it != changes_for_reader_.end()) |
582 | 0 | { |
583 | 0 | change_found = true; |
584 | 0 | it->markFragmentsAsSent(frag_num); |
585 | 0 | was_last_fragment = it->getUnsentFragments().empty(); |
586 | 0 | } |
587 | |
|
588 | 0 | return change_found; |
589 | 0 | } |
590 | | |
591 | | bool ReaderProxy::perform_nack_supression() |
592 | 0 | { |
593 | 0 | return 0 != convert_status_on_all_changes(UNDERWAY, UNACKNOWLEDGED, false); |
594 | 0 | } |
595 | | |
596 | | uint32_t ReaderProxy::perform_acknack_response( |
597 | | const std::function<void(ChangeForReader_t& change)>& func) |
598 | 0 | { |
599 | 0 | return convert_status_on_all_changes(REQUESTED, UNSENT, nullptr != stateful_writer_listener_, func); |
600 | 0 | } |
601 | | |
602 | | uint32_t ReaderProxy::convert_status_on_all_changes( |
603 | | ChangeForReaderStatus_t previous, |
604 | | ChangeForReaderStatus_t next, |
605 | | bool notify_resend, |
606 | | const std::function<void(ChangeForReader_t& change)>& func) |
607 | 0 | { |
608 | 0 | assert(previous > next); |
609 | | |
610 | | // NOTE: This is only called for REQUESTED=>UNSENT (acknack response) or |
611 | | // UNDERWAY=>UNACKNOWLEDGED (nack supression) |
612 | |
|
613 | 0 | uint32_t changed = 0; |
614 | 0 | for (ChangeForReader_t& change : changes_for_reader_) |
615 | 0 | { |
616 | 0 | if (change.getStatus() == previous) |
617 | 0 | { |
618 | 0 | ++changed; |
619 | 0 | change.setStatus(next); |
620 | |
|
621 | 0 | if (notify_resend) |
622 | 0 | { |
623 | 0 | notify_resent(change); |
624 | 0 | } |
625 | |
|
626 | 0 | if (func) |
627 | 0 | { |
628 | 0 | func(change); |
629 | 0 | } |
630 | 0 | } |
631 | 0 | } |
632 | |
|
633 | 0 | return changed; |
634 | 0 | } |
635 | | |
636 | | void ReaderProxy::change_has_been_removed( |
637 | | const SequenceNumber_t& seq_num) |
638 | 0 | { |
639 | | // Check sequence number is in the container, because it was not clean up. |
640 | 0 | if (changes_for_reader_.empty() || seq_num < changes_for_reader_.begin()->getSequenceNumber()) |
641 | 0 | { |
642 | 0 | return; |
643 | 0 | } |
644 | | |
645 | 0 | auto chit = find_change(seq_num); |
646 | |
|
647 | 0 | if (chit == this->changes_for_reader_.end()) |
648 | 0 | { |
649 | | // No change for this sequence number |
650 | 0 | return; |
651 | 0 | } |
652 | | |
653 | | // In intraprocess, if there is an UNACKNOWLEDGED, a GAP has to be send because there is no reliable mechanism. |
654 | 0 | if (is_local_reader() && ACKNOWLEDGED > chit->getStatus()) |
655 | 0 | { |
656 | 0 | writer_->intraprocess_gap(this, seq_num); |
657 | 0 | } |
658 | | |
659 | | // Element may not be in the container when marked as irrelevant. |
660 | 0 | changes_for_reader_.erase(chit); |
661 | | |
662 | | // When removing the next-to-be-acknowledged, we should auto-acknowledge it. |
663 | 0 | if ((changes_low_mark_ + 1) == seq_num) |
664 | 0 | { |
665 | 0 | acked_changes_set(seq_num + 1); |
666 | 0 | } |
667 | 0 | } |
668 | | |
669 | | bool ReaderProxy::has_unacknowledged( |
670 | | const SequenceNumber_t& first_seq_in_history) const |
671 | 0 | { |
672 | 0 | if (first_seq_in_history > changes_low_mark_) |
673 | 0 | { |
674 | 0 | return true; |
675 | 0 | } |
676 | | |
677 | 0 | for (const ChangeForReader_t& it : changes_for_reader_) |
678 | 0 | { |
679 | 0 | if (it.getStatus() == UNACKNOWLEDGED) |
680 | 0 | { |
681 | 0 | return true; |
682 | 0 | } |
683 | 0 | } |
684 | | |
685 | 0 | return false; |
686 | 0 | } |
687 | | |
688 | | bool ReaderProxy::requested_fragment_set( |
689 | | const SequenceNumber_t& seq_num, |
690 | | const FragmentNumberSet_t& frag_set) |
691 | 0 | { |
692 | | // Locate the outbound change referenced by the NACK_FRAG |
693 | 0 | ChangeIterator changeIter = find_change(seq_num, true); |
694 | 0 | if (changeIter == changes_for_reader_.end()) |
695 | 0 | { |
696 | 0 | return false; |
697 | 0 | } |
698 | | |
699 | 0 | changeIter->markFragmentsAsUnsent(frag_set); |
700 | | |
701 | | // If it was UNSENT, we shouldn't switch back to REQUESTED to prevent stalling. |
702 | 0 | if (changeIter->getStatus() != UNSENT) |
703 | 0 | { |
704 | 0 | changeIter->setStatus(REQUESTED); |
705 | 0 | } |
706 | |
|
707 | 0 | return true; |
708 | 0 | } |
709 | | |
710 | | bool ReaderProxy::process_nack_frag( |
711 | | const GUID_t& reader_guid, |
712 | | uint32_t nack_count, |
713 | | const SequenceNumber_t& seq_num, |
714 | | const FragmentNumberSet_t& fragments_state) |
715 | 0 | { |
716 | 0 | if (guid() == reader_guid) |
717 | 0 | { |
718 | 0 | if (last_nackfrag_count_ < nack_count) |
719 | 0 | { |
720 | 0 | last_nackfrag_count_ = nack_count; |
721 | 0 | if (requested_fragment_set(seq_num, fragments_state)) |
722 | 0 | { |
723 | 0 | return true; |
724 | 0 | } |
725 | 0 | } |
726 | 0 | } |
727 | | |
728 | 0 | return false; |
729 | 0 | } |
730 | | |
731 | | static bool change_less_than_sequence( |
732 | | const ChangeForReader_t& change, |
733 | | const SequenceNumber_t& seq_num) |
734 | 0 | { |
735 | 0 | return change.getSequenceNumber() < seq_num; |
736 | 0 | } |
737 | | |
738 | | ReaderProxy::ChangeIterator ReaderProxy::find_change( |
739 | | const SequenceNumber_t& seq_num, |
740 | | bool exact) |
741 | 0 | { |
742 | 0 | ReaderProxy::ChangeIterator it; |
743 | 0 | ReaderProxy::ChangeIterator end = changes_for_reader_.end(); |
744 | 0 | it = std::lower_bound(changes_for_reader_.begin(), end, seq_num, change_less_than_sequence); |
745 | |
|
746 | 0 | return (!exact) |
747 | 0 | ? it |
748 | 0 | : it == end |
749 | 0 | ? it |
750 | 0 | : it->getSequenceNumber() == seq_num ? it : end; |
751 | 0 | } |
752 | | |
753 | | ReaderProxy::ChangeConstIterator ReaderProxy::find_change( |
754 | | const SequenceNumber_t& seq_num) const |
755 | 0 | { |
756 | 0 | ReaderProxy::ChangeConstIterator it; |
757 | 0 | ReaderProxy::ChangeConstIterator end = changes_for_reader_.end(); |
758 | 0 | it = std::lower_bound(changes_for_reader_.begin(), end, seq_num, change_less_than_sequence); |
759 | |
|
760 | 0 | return it == end |
761 | 0 | ? it |
762 | 0 | : it->getSequenceNumber() == seq_num ? it : end; |
763 | 0 | } |
764 | | |
765 | | bool ReaderProxy::has_been_delivered( |
766 | | const SequenceNumber_t& seq_number, |
767 | | bool& found) const |
768 | 0 | { |
769 | 0 | if (seq_number <= changes_low_mark_) |
770 | 0 | { |
771 | | // Change has already been acknowledged, so it has been delivered |
772 | 0 | return true; |
773 | 0 | } |
774 | | |
775 | 0 | ChangeConstIterator it = find_change(seq_number); |
776 | 0 | if (it != changes_for_reader_.end()) |
777 | 0 | { |
778 | 0 | found = true; |
779 | 0 | return it->has_been_delivered(); |
780 | 0 | } |
781 | | |
782 | 0 | return false; |
783 | 0 | } |
784 | | |
785 | | void ReaderProxy::notify_acknowledged( |
786 | | const ChangeForReader_t& change) const |
787 | 0 | { |
788 | 0 | if (stateful_writer_listener_ != nullptr) |
789 | 0 | { |
790 | 0 | CacheChange_t* sample = change.getChange(); |
791 | 0 | uint32_t payload_length = sample ? sample->serializedPayload.length : 0; |
792 | 0 | stateful_writer_listener_->on_writer_data_acknowledged( |
793 | 0 | writer_->getGuid(), |
794 | 0 | guid(), |
795 | 0 | change.getSequenceNumber(), |
796 | 0 | payload_length, |
797 | 0 | change.time_since_first_send(), |
798 | 0 | locator_info_.general_locator_selector_entry()); |
799 | 0 | } |
800 | 0 | } |
801 | | |
802 | | void ReaderProxy::notify_resent( |
803 | | const ChangeForReader_t& change) const |
804 | 0 | { |
805 | 0 | assert (stateful_writer_listener_ != nullptr); |
806 | |
|
807 | 0 | const CacheChange_t* sample = change.getChange(); |
808 | 0 | assert(sample != nullptr); |
809 | | |
810 | | // Calc number of bytes to resend |
811 | 0 | uint64_t resent_bytes = 0; |
812 | 0 | const FragmentNumberSet_t& fragments = change.getUnsentFragments(); |
813 | 0 | if (fragments.empty()) |
814 | 0 | { |
815 | 0 | resent_bytes = sample->serializedPayload.length; |
816 | 0 | } |
817 | 0 | else |
818 | 0 | { |
819 | | // Calculate number of bytes to resend |
820 | 0 | uint64_t num_fragments = fragments.count(); |
821 | 0 | uint32_t fragment_size = sample->getFragmentSize(); |
822 | 0 | if (fragments.max() < sample->getFragmentCount()) |
823 | 0 | { |
824 | 0 | resent_bytes = num_fragments * fragment_size; |
825 | 0 | } |
826 | 0 | else |
827 | 0 | { |
828 | | // Last fragment may be smaller |
829 | 0 | resent_bytes = (num_fragments - 1) * fragment_size; |
830 | 0 | uint32_t last_fragment_size = sample->serializedPayload.length % fragment_size; |
831 | 0 | if (last_fragment_size > 0) |
832 | 0 | { |
833 | 0 | resent_bytes += last_fragment_size; |
834 | 0 | } |
835 | 0 | else |
836 | 0 | { |
837 | 0 | resent_bytes += fragment_size; |
838 | 0 | } |
839 | 0 | } |
840 | 0 | } |
841 | | |
842 | | // Notify to participant |
843 | | assert(resent_bytes <= sample->serializedPayload.length); |
844 | 0 | stateful_writer_listener_->on_writer_resend_data( |
845 | 0 | writer_->getGuid(), |
846 | 0 | guid(), |
847 | 0 | sample->sequenceNumber, |
848 | 0 | static_cast<uint32_t>(resent_bytes), |
849 | 0 | locator_info_.general_locator_selector_entry()); |
850 | 0 | } |
851 | | |
852 | | } // namespace rtps |
853 | | } // namespace fastdds |
854 | | } // namespace eprosima |