/src/Fast-DDS/src/cpp/rtps/writer/ReaderProxy.hpp
Line | Count | Source |
1 | | // Copyright 2016-2019 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | /** |
16 | | * @file ReaderProxy.hpp |
17 | | */ |
18 | | #ifndef RTPS_WRITER__READERPROXY_HPP |
19 | | #define RTPS_WRITER__READERPROXY_HPP |
20 | | |
21 | | #include <algorithm> |
22 | | #include <atomic> |
23 | | #include <mutex> |
24 | | #include <set> |
25 | | |
26 | | #include <fastdds/rtps/attributes/RTPSParticipantAllocationAttributes.hpp> |
27 | | #include <fastdds/rtps/attributes/WriterAttributes.hpp> |
28 | | #include <fastdds/rtps/common/CacheChange.hpp> |
29 | | #include <fastdds/rtps/common/FragmentNumber.hpp> |
30 | | #include <fastdds/rtps/common/Locator.hpp> |
31 | | #include <fastdds/rtps/common/SequenceNumber.hpp> |
32 | | #include <fastdds/rtps/common/Types.hpp> |
33 | | #include <fastdds/utils/collections/ResourceLimitedVector.hpp> |
34 | | |
35 | | #include <rtps/builtin/data/ReaderProxyData.hpp> |
36 | | #include <rtps/writer/ChangeForReader.hpp> |
37 | | #include <rtps/writer/ReaderLocator.hpp> |
38 | | |
39 | | namespace eprosima { |
40 | | namespace fastdds { |
41 | | namespace rtps { |
42 | | |
43 | | class BaseReader; |
44 | | class StatefulWriter; |
45 | | class StatefulWriterListener; |
46 | | class TimedEvent; |
47 | | class RTPSReader; |
48 | | class IDataSharingNotifier; |
49 | | class RTPSGapBuilder; |
50 | | |
51 | | /** |
52 | | * ReaderProxy class that helps to keep the state of a specific Reader with respect to the RTPSWriter. |
53 | | * @ingroup WRITER_MODULE |
54 | | */ |
55 | | class ReaderProxy |
56 | | { |
57 | | public: |
58 | | |
59 | | ~ReaderProxy(); |
60 | | |
61 | | /** |
62 | | * Constructor. |
63 | | * @param times WriterTimes to use in the ReaderProxy. |
64 | | * @param loc_alloc Maximum number of remote locators to keep in the ReaderProxy. |
65 | | * @param writer Pointer to the StatefulWriter creating the reader proxy. |
66 | | * @param stateful_listener Pointer to the StatefulWriterListener associated to the writer. |
67 | | */ |
68 | | ReaderProxy( |
69 | | const WriterTimes& times, |
70 | | const RemoteLocatorsAllocationAttributes& loc_alloc, |
71 | | StatefulWriter* writer, |
72 | | StatefulWriterListener* stateful_listener); |
73 | | |
74 | | /** |
75 | | * Activate this proxy associating it to a remote reader. |
76 | | * @param reader_attributes ReaderProxyData of the reader for which to keep state. |
77 | | * @param is_datasharing whether the reader is datasharing compatible with the writer or not. |
78 | | */ |
79 | | void start( |
80 | | const ReaderProxyData& reader_attributes, |
81 | | bool is_datasharing = false); |
82 | | |
83 | | /** |
84 | | * Update information about the remote reader. |
85 | | * @param reader_attributes ReaderProxyData with updated information of the reader. |
86 | | * @return true if data was modified, false otherwise. |
87 | | */ |
88 | | bool update( |
89 | | const ReaderProxyData& reader_attributes); |
90 | | |
91 | | /** |
92 | | * Disable this proxy. |
93 | | */ |
94 | | void stop(); |
95 | | |
96 | | /** |
97 | | * Called when a change is added to the writer's history. |
98 | | * @param change Information regarding the change added. |
99 | | * @param is_relevant Specify if change is relevant for this remote reader. |
100 | | * @param restart_nack_supression Whether nack-supression event should be restarted. |
101 | | */ |
102 | | void add_change( |
103 | | const ChangeForReader_t& change, |
104 | | bool is_relevant, |
105 | | bool restart_nack_supression); |
106 | | |
107 | | void add_change( |
108 | | const ChangeForReader_t& change, |
109 | | bool is_relevant, |
110 | | bool restart_nack_supression, |
111 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time); |
112 | | |
113 | | /** |
114 | | * Check if there are changes pending for this reader. |
115 | | * @return true when there are pending changes, false otherwise. |
116 | | */ |
117 | | bool has_changes() const; |
118 | | |
119 | | /** |
120 | | * Check if a specific change has been already acknowledged for this reader. |
121 | | * @param seq_num Sequence number of the change to be checked. |
122 | | * @return true when the change is irrelevant or has been already acknowledged, false otherwise. |
123 | | */ |
124 | | bool change_is_acked( |
125 | | const SequenceNumber_t& seq_num) const; |
126 | | |
127 | | /** |
128 | | * Check if a specific change is marked to be sent to this reader. |
129 | | * |
130 | | * @param [in] seq_num Sequence number of the change to be checked. |
131 | | * @param [out] next_unsent_frag Return next fragment to be sent. |
132 | | * @param [out] gap_seq Return, when it is its first delivery (should be relevant seq_num), the sequence number of |
133 | | * the first sequence of the gap [first, seq_num). Otherwise return SequenceNumber_t::unknown(). |
134 | | * @param [in] min_seq Minimum sequence number managed by the History. It could be SequenceNumber_t::unknown() if |
135 | | * history is empty. |
136 | | * @param [out] need_reactivate_periodic_heartbeat Indicates if the heartbeat period event has to be restarted. |
137 | | * |
138 | | * @return true if the change is marked to be sent. False otherwise. |
139 | | */ |
140 | | bool change_is_unsent( |
141 | | const SequenceNumber_t& seq_num, |
142 | | FragmentNumber_t& next_unsent_frag, |
143 | | SequenceNumber_t& gap_seq, |
144 | | const SequenceNumber_t& min_seq, |
145 | | bool& need_reactivate_periodic_heartbeat); |
146 | | |
147 | | /** |
148 | | * Mark all changes up to the one indicated by seq_num as Acknowledged. |
149 | | * For instance, when seq_num is 30, changes 1-29 are marked as acknowledged. |
150 | | * @param seq_num Sequence number of the first change not to be marked as acknowledged. |
151 | | */ |
152 | | void acked_changes_set( |
153 | | const SequenceNumber_t& seq_num); |
154 | | |
155 | | /** |
156 | | * Mark all changes in the vector as requested. |
157 | | * @param seq_num_set Bitmap of sequence numbers. |
158 | | * @param gap_builder RTPSGapBuilder reference uses for adding each requested change that is irrelevant for the |
159 | | * requester. |
160 | | * @param [in] min_seq_in_history Minimum SequenceNumber_t in the writer's history. If writer's history is empty, |
161 | | * SequenceNumber_t::unknown() is expected. |
162 | | * @return true if at least one change has been marked as REQUESTED, false otherwise. |
163 | | */ |
164 | | bool requested_changes_set( |
165 | | const SequenceNumberSet_t& seq_num_set, |
166 | | RTPSGapBuilder& gap_builder, |
167 | | const SequenceNumber_t& min_seq_in_history); |
168 | | |
169 | | /** |
170 | | * Performs processing of preemptive acknack |
171 | | * @param func functor called, if the requester is a local reader, for each changes moved to UNSENT status. |
172 | | * @return true if a heartbeat should be sent, false otherwise. |
173 | | */ |
174 | | bool process_initial_acknack( |
175 | | const std::function<void(ChangeForReader_t& change)>& func); |
176 | | |
177 | | /*! |
178 | | * @brief Sets a change to a particular status (if present in the ReaderProxy) |
179 | | * @param seq_num Sequence number of the change to update. |
180 | | * @param status Status to apply. |
181 | | * @param restart_nack_supression Whether nack supression event should be restarted or not. |
182 | | * @param delivered true if change was able to be delivered to its addressees. false otherwise. |
183 | | */ |
184 | | void from_unsent_to_status( |
185 | | const SequenceNumber_t& seq_num, |
186 | | ChangeForReaderStatus_t status, |
187 | | bool restart_nack_supression, |
188 | | bool delivered = true); |
189 | | |
190 | | /** |
191 | | * @brief Mark a particular fragment as sent. |
192 | | * @param [in] seq_num Sequence number of the change to update. |
193 | | * @param [in] frag_num Fragment number to mark as sent. |
194 | | * @param [out] was_last_fragment Indicates if the fragment was the last one pending. |
195 | | * @return true when the change was found, false otherwise. |
196 | | */ |
197 | | bool mark_fragment_as_sent_for_change( |
198 | | const SequenceNumber_t& seq_num, |
199 | | FragmentNumber_t frag_num, |
200 | | bool& was_last_fragment); |
201 | | |
202 | | /** |
203 | | * Turns all UNDERWAY changes into UNACKNOWLEDGED. |
204 | | * |
205 | | * @return true if at least one change changed its status, false otherwise. |
206 | | */ |
207 | | bool perform_nack_supression(); |
208 | | |
209 | | /** |
210 | | * Turns all REQUESTED changes into UNSENT. |
211 | | * |
212 | | * @param func Function executed for each change which changes its status. |
213 | | * @return the number of changes that changed its status. |
214 | | */ |
215 | | uint32_t perform_acknack_response( |
216 | | const std::function<void(ChangeForReader_t& change)>& func); |
217 | | |
218 | | /** |
219 | | * Call this to inform a change was removed from history. |
220 | | * @param seq_num Sequence number of the removed change. |
221 | | */ |
222 | | void change_has_been_removed( |
223 | | const SequenceNumber_t& seq_num); |
224 | | |
225 | | /*! |
226 | | * @brief Returns there is some UNACKNOWLEDGED change. |
227 | | * @param first_seq_in_history Minimum sequence number in the writer history. |
228 | | * @return There is some UNACKNOWLEDGED change. |
229 | | */ |
230 | | bool has_unacknowledged( |
231 | | const SequenceNumber_t& first_seq_in_history) const; |
232 | | |
233 | | /** |
234 | | * Get the GUID of the reader represented by this proxy. |
235 | | * @return the GUID of the reader represented by this proxy. |
236 | | */ |
237 | | inline const GUID_t& guid() const |
238 | 0 | { |
239 | 0 | return locator_info_.remote_guid(); |
240 | 0 | } |
241 | | |
242 | | /** |
243 | | * Get the durability of the reader represented by this proxy. |
244 | | * @return the durability of the reader represented by this proxy. |
245 | | */ |
246 | | inline DurabilityKind_t durability_kind() const |
247 | 0 | { |
248 | 0 | return durability_kind_; |
249 | 0 | } |
250 | | |
251 | | /** |
252 | | * Check if the reader represented by this proxy expexts inline QOS to be received. |
253 | | * @return true if the reader represented by this proxy expexts inline QOS to be received. |
254 | | */ |
255 | | inline bool expects_inline_qos() const |
256 | 0 | { |
257 | 0 | return expects_inline_qos_; |
258 | 0 | } |
259 | | |
260 | | /** |
261 | | * Check if the reader represented by this proxy is reliable. |
262 | | * @return true if the reader represented by this proxy is reliable. |
263 | | */ |
264 | | inline bool is_reliable() const |
265 | 0 | { |
266 | 0 | return is_reliable_; |
267 | 0 | } |
268 | | |
269 | | inline bool disable_positive_acks() const |
270 | 0 | { |
271 | 0 | return disable_positive_acks_; |
272 | 0 | } |
273 | | |
274 | | /** |
275 | | * Check if the reader represented by this proxy is remote and reliable. |
276 | | * @return true if the reader represented by this proxy is remote and reliable. |
277 | | */ |
278 | | inline bool is_remote_and_reliable() const |
279 | 0 | { |
280 | 0 | return !locator_info_.is_local_reader() && !locator_info_.is_datasharing_reader() && is_reliable_; |
281 | 0 | } |
282 | | |
283 | | /** |
284 | | * Check if the reader is on the same process. |
285 | | * @return true if the reader is no the same process. |
286 | | */ |
287 | | inline bool is_local_reader() |
288 | 0 | { |
289 | 0 | return locator_info_.is_local_reader(); |
290 | 0 | } |
291 | | |
292 | | /** |
293 | | * Get the local reader on the same process (if any). |
294 | | * @return The local reader on the same process. |
295 | | */ |
296 | | inline LocalReaderPointer::Instance local_reader() |
297 | 0 | { |
298 | 0 | return locator_info_.local_reader(); |
299 | 0 | } |
300 | | |
301 | | /** |
302 | | * Called when an ACKNACK is received to set a new value for the minimum count accepted for following received |
303 | | * ACKNACKs. |
304 | | * |
305 | | * @param acknack_count The count of the received ACKNACK. |
306 | | * @return true if internal count changed (i.e. received ACKNACK is accepted) |
307 | | */ |
308 | | bool check_and_set_acknack_count( |
309 | | uint32_t acknack_count) |
310 | 0 | { |
311 | 0 | if (acknack_count >= next_expected_acknack_count_) |
312 | 0 | { |
313 | 0 | next_expected_acknack_count_ = acknack_count; |
314 | 0 | ++next_expected_acknack_count_; |
315 | 0 | return true; |
316 | 0 | } |
317 | | |
318 | 0 | return false; |
319 | 0 | } |
320 | | |
321 | | /** |
322 | | * Process an incoming NACKFRAG submessage. |
323 | | * @param reader_guid Destination guid of the submessage. |
324 | | * @param nack_count Counter field of the submessage. |
325 | | * @param seq_num Sequence number field of the submessage. |
326 | | * @param fragments_state Bitmap indicating the requested fragments. |
327 | | * @return true if a change was modified, false otherwise. |
328 | | */ |
329 | | bool process_nack_frag( |
330 | | const GUID_t& reader_guid, |
331 | | uint32_t nack_count, |
332 | | const SequenceNumber_t& seq_num, |
333 | | const FragmentNumberSet_t& fragments_state); |
334 | | |
335 | | /** |
336 | | * Filter a CacheChange_t using the StatefulWriter's IReaderDataFilter. |
337 | | * @param change |
338 | | * @return true if the change is relevant, false otherwise. |
339 | | */ |
340 | | bool rtps_is_relevant( |
341 | | CacheChange_t* change) const; |
342 | | |
343 | | /** |
344 | | * Get the highest fully acknowledged sequence number. |
345 | | * @return the highest fully acknowledged sequence number. |
346 | | */ |
347 | | inline SequenceNumber_t changes_low_mark() const |
348 | 0 | { |
349 | 0 | return changes_low_mark_; |
350 | 0 | } |
351 | | |
352 | | /*! |
353 | | * Get the first sequence number not relevant that was removed without reader being informed. |
354 | | * @return First sequence number. |
355 | | */ |
356 | | inline SequenceNumber_t first_irrelevant_removed() const |
357 | 0 | { |
358 | 0 | return first_irrelevant_removed_; |
359 | 0 | } |
360 | | |
361 | | /*! |
362 | | * Get the last sequence number not relevant that was removed without reader being informed. |
363 | | * @return last sequence number. |
364 | | */ |
365 | | inline SequenceNumber_t last_irrelevant_removed() const |
366 | 0 | { |
367 | 0 | return last_irrelevant_removed_; |
368 | 0 | } |
369 | | |
370 | | /*! |
371 | | * Reset the interval of sequence numbers not relevant that were removed without reader being informed. |
372 | | */ |
373 | | inline void reset_irrelevant_removed() |
374 | 0 | { |
375 | 0 | first_irrelevant_removed_ = SequenceNumber_t::unknown(); |
376 | 0 | last_irrelevant_removed_ = SequenceNumber_t::unknown(); |
377 | 0 | } |
378 | | |
379 | | /** |
380 | | * Change the interval of nack-supression event. |
381 | | * @param interval Time from data sending to acknack processing. |
382 | | */ |
383 | | void update_nack_supression_interval( |
384 | | const dds::Duration_t& interval); |
385 | | |
386 | | LocatorSelectorEntry* general_locator_selector_entry() |
387 | 0 | { |
388 | 0 | return locator_info_.general_locator_selector_entry(); |
389 | 0 | } |
390 | | |
391 | | LocatorSelectorEntry* async_locator_selector_entry() |
392 | 0 | { |
393 | 0 | return locator_info_.async_locator_selector_entry(); |
394 | 0 | } |
395 | | |
396 | | RTPSMessageSenderInterface* message_sender() |
397 | 0 | { |
398 | 0 | return &locator_info_; |
399 | 0 | } |
400 | | |
401 | | bool is_datasharing_reader() const |
402 | 0 | { |
403 | 0 | return locator_info_.is_datasharing_reader(); |
404 | 0 | } |
405 | | |
406 | | IDataSharingNotifier* datasharing_notifier() |
407 | 0 | { |
408 | 0 | return locator_info_.datasharing_notifier(); |
409 | 0 | } |
410 | | |
411 | | const IDataSharingNotifier* datasharing_notifier() const |
412 | 0 | { |
413 | 0 | return locator_info_.datasharing_notifier(); |
414 | 0 | } |
415 | | |
416 | | void datasharing_notify() |
417 | 0 | { |
418 | 0 | locator_info_.datasharing_notify(); |
419 | 0 | } |
420 | | |
421 | | size_t locators_size() const |
422 | 0 | { |
423 | 0 | return locator_info_.locators_size(); |
424 | 0 | } |
425 | | |
426 | | bool active() const |
427 | 0 | { |
428 | 0 | return active_; |
429 | 0 | } |
430 | | |
431 | | void active( |
432 | | bool active) |
433 | 0 | { |
434 | 0 | active_ = active; |
435 | 0 | } |
436 | | |
437 | | /** |
438 | | * @brief Check if the sequence number given has been delivered at least once to the transport layer. |
439 | | * |
440 | | * @param seq_number Sequence number of the change to check. |
441 | | * @param found The sequence number has been found in the list of changes pending to be sent/ack. |
442 | | * This flag allows to differentiate the case when the change is not found from the one that is found |
443 | | * but it has not been delivered yet. |
444 | | * @return true if the change has been delivered. |
445 | | * @return false otherwise. |
446 | | */ |
447 | | bool has_been_delivered( |
448 | | const SequenceNumber_t& seq_number, |
449 | | bool& found) const; |
450 | | |
451 | | private: |
452 | | |
453 | | //!Is this proxy active? I.e. does it have a remote reader associated? |
454 | | bool is_active_; |
455 | | //!Reader locator information |
456 | | ReaderLocator locator_info_; |
457 | | //!Taken from QoS |
458 | | DurabilityKind_t durability_kind_; |
459 | | //!Taken from QoS |
460 | | bool expects_inline_qos_; |
461 | | //!Taken from QoS |
462 | | bool is_reliable_; |
463 | | //!Taken from QoS |
464 | | bool disable_positive_acks_; |
465 | | //!Pointer to the associated StatefulWriter. |
466 | | StatefulWriter* writer_; |
467 | | //!Set of the changes and its state. |
468 | | ResourceLimitedVector<ChangeForReader_t, std::true_type> changes_for_reader_; |
469 | | //! Timed Event to manage the delay to mark a change as UNACKED after sending it. |
470 | | TimedEvent* nack_supression_event_; |
471 | | TimedEvent* initial_heartbeat_event_; |
472 | | //! Are timed events enabled? |
473 | | std::atomic_bool timers_enabled_; |
474 | | //! Next expected ack/nack count |
475 | | uint32_t next_expected_acknack_count_; |
476 | | //! Last NACKFRAG count. |
477 | | uint32_t last_nackfrag_count_; |
478 | | |
479 | | //! Sequence number of the lowest change not fully acknowledged. |
480 | | SequenceNumber_t changes_low_mark_; |
481 | | |
482 | | //! First sequence number not relevant that was removed without reader being informed. |
483 | | SequenceNumber_t first_irrelevant_removed_ {SequenceNumber_t::unknown()}; |
484 | | //! Last sequence number not relevant that was removed without reader being informed. |
485 | | SequenceNumber_t last_irrelevant_removed_ {SequenceNumber_t::unknown()}; |
486 | | |
487 | | bool active_ = false; |
488 | | |
489 | | //! Listener to notify about data acknowledgements and resends. |
490 | | StatefulWriterListener* const stateful_writer_listener_ = nullptr; |
491 | | |
492 | | using ChangeIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::iterator; |
493 | | using ChangeConstIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::const_iterator; |
494 | | |
495 | | void disable_timers(); |
496 | | |
497 | | /* |
498 | | * Converts all changes with a given status to a different status. |
499 | | * @param previous Status to change. |
500 | | * @param next Status to adopt. |
501 | | * @param func Function executed for each change which changes its status. |
502 | | * @return the number of changes that have been modified. |
503 | | */ |
504 | | uint32_t convert_status_on_all_changes( |
505 | | ChangeForReaderStatus_t previous, |
506 | | ChangeForReaderStatus_t next, |
507 | | bool notify_resend, |
508 | | const std::function<void(ChangeForReader_t& change)>& func = {}); |
509 | | |
510 | | /*! |
511 | | * @brief Adds requested fragments. These fragments will be sent in next NackResponseDelay. |
512 | | * @param [in] seq_num Sequence number to be paired with the requested fragments. |
513 | | * @param [in] frag_set set containing the requested fragments to be sent. |
514 | | * @return True if there is at least one requested fragment. False in other case. |
515 | | */ |
516 | | bool requested_fragment_set( |
517 | | const SequenceNumber_t& seq_num, |
518 | | const FragmentNumberSet_t& frag_set); |
519 | | |
520 | | void add_change( |
521 | | const ChangeForReader_t& change, |
522 | | bool is_relevant); |
523 | | |
524 | | /** |
525 | | * @brief Find a change with the specified sequence number. |
526 | | * @param seq_num Sequence number to find. |
527 | | * @param exact When false, the first change with a sequence number not less than seq_num will be returned. |
528 | | * When true, the change with a sequence number value of seq_num will be returned. |
529 | | * @return Iterator pointing to the change, changes_for_reader_.end() if not found. |
530 | | */ |
531 | | ChangeIterator find_change( |
532 | | const SequenceNumber_t& seq_num, |
533 | | bool exact); |
534 | | |
535 | | /** |
536 | | * @brief Find a change with the specified sequence number. |
537 | | * @param seq_num Sequence number to find. |
538 | | * @return Iterator pointing to the change, changes_for_reader_.end() if not found. |
539 | | */ |
540 | | ChangeConstIterator find_change( |
541 | | const SequenceNumber_t& seq_num) const; |
542 | | |
543 | | /** |
544 | | * @brief Notifies that a change has been acknowledged by this ReaderProxy. |
545 | | * |
546 | | * @param chiange Reference to the ChangeForReader_t that has been acknowledged. |
547 | | */ |
548 | | void notify_acknowledged( |
549 | | const ChangeForReader_t& change) const; |
550 | | |
551 | | /** |
552 | | * @brief Notifies that a change has been resent to this ReaderProxy. |
553 | | * |
554 | | * @param change Reference to the ChangeForReader_t that has been resent. |
555 | | */ |
556 | | void notify_resent( |
557 | | const ChangeForReader_t& change) const; |
558 | | |
559 | | }; |
560 | | |
561 | | } /* namespace rtps */ |
562 | | } /* namespace fastdds */ |
563 | | } /* namespace eprosima */ |
564 | | |
565 | | #endif // RTPS_WRITER__READERPROXY_HPP |