/src/Fast-DDS/include/fastdds/rtps/writer/ReaderProxy.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2016-2019 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | /** |
16 | | * @file ReaderProxy.h |
17 | | */ |
18 | | #ifndef _FASTDDS_RTPS_WRITER_READERPROXY_H_ |
19 | | #define _FASTDDS_RTPS_WRITER_READERPROXY_H_ |
20 | | |
21 | | #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC |
22 | | |
23 | | #include <fastdds/rtps/attributes/WriterAttributes.h> |
24 | | #include <fastdds/rtps/attributes/RTPSParticipantAllocationAttributes.hpp> |
25 | | |
26 | | #include <fastdds/rtps/builtin/data/ReaderProxyData.h> |
27 | | |
28 | | #include <fastdds/rtps/common/Types.h> |
29 | | #include <fastdds/rtps/common/Locator.h> |
30 | | #include <fastdds/rtps/common/SequenceNumber.h> |
31 | | #include <fastdds/rtps/common/CacheChange.h> |
32 | | #include <fastdds/rtps/common/FragmentNumber.h> |
33 | | |
34 | | #include <fastdds/rtps/writer/ChangeForReader.h> |
35 | | #include <fastdds/rtps/writer/ReaderLocator.h> |
36 | | |
37 | | #include <fastrtps/utils/collections/ResourceLimitedVector.hpp> |
38 | | |
39 | | #include <algorithm> |
40 | | #include <mutex> |
41 | | #include <set> |
42 | | #include <atomic> |
43 | | |
44 | | namespace eprosima { |
45 | | namespace fastrtps { |
46 | | namespace rtps { |
47 | | |
48 | | class StatefulWriter; |
49 | | class TimedEvent; |
50 | | class RTPSReader; |
51 | | class IDataSharingNotifier; |
52 | | class RTPSGapBuilder; |
53 | | |
54 | | /** |
55 | | * ReaderProxy class that helps to keep the state of a specific Reader with respect to the RTPSWriter. |
56 | | * @ingroup WRITER_MODULE |
57 | | */ |
58 | | class ReaderProxy |
59 | | { |
60 | | public: |
61 | | |
62 | | ~ReaderProxy(); |
63 | | |
64 | | /** |
65 | | * Constructor. |
66 | | * @param times WriterTimes to use in the ReaderProxy. |
67 | | * @param loc_alloc Maximum number of remote locators to keep in the ReaderProxy. |
68 | | * @param writer Pointer to the StatefulWriter creating the reader proxy. |
69 | | */ |
70 | | ReaderProxy( |
71 | | const WriterTimes& times, |
72 | | const RemoteLocatorsAllocationAttributes& loc_alloc, |
73 | | StatefulWriter* writer); |
74 | | |
75 | | /** |
76 | | * Activate this proxy associating it to a remote reader. |
77 | | * @param reader_attributes ReaderProxyData of the reader for which to keep state. |
78 | | * @param is_datasharing whether the reader is datasharing compatible with the writer or not. |
79 | | */ |
80 | | void start( |
81 | | const ReaderProxyData& reader_attributes, |
82 | | bool is_datasharing = false); |
83 | | |
84 | | /** |
85 | | * Update information about the remote reader. |
86 | | * @param reader_attributes ReaderProxyData with updated information of the reader. |
87 | | * @return true if data was modified, false otherwise. |
88 | | */ |
89 | | bool update( |
90 | | const ReaderProxyData& reader_attributes); |
91 | | |
92 | | /** |
93 | | * Disable this proxy. |
94 | | */ |
95 | | void stop(); |
96 | | |
97 | | /** |
98 | | * Called when a change is added to the writer's history. |
99 | | * @param change Information regarding the change added. |
100 | | * @param is_relevant Specify if change is relevant for this remote reader. |
101 | | * @param restart_nack_supression Whether nack-supression event should be restarted. |
102 | | */ |
103 | | void add_change( |
104 | | const ChangeForReader_t& change, |
105 | | bool is_relevant, |
106 | | bool restart_nack_supression); |
107 | | |
108 | | void add_change( |
109 | | const ChangeForReader_t& change, |
110 | | bool is_relevant, |
111 | | bool restart_nack_supression, |
112 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time); |
113 | | |
114 | | /** |
115 | | * Check if there are changes pending for this reader. |
116 | | * @return true when there are pending changes, false otherwise. |
117 | | */ |
118 | | bool has_changes() const; |
119 | | |
120 | | /** |
121 | | * Check if a specific change has been already acknowledged for this reader. |
122 | | * @param seq_num Sequence number of the change to be checked. |
123 | | * @return true when the change is irrelevant or has been already acknowledged, false otherwise. |
124 | | */ |
125 | | bool change_is_acked( |
126 | | const SequenceNumber_t& seq_num) const; |
127 | | |
128 | | /** |
129 | | * Check if a specific change is marked to be sent to this reader. |
130 | | * |
131 | | * @param[in] seq_num Sequence number of the change to be checked. |
132 | | * @param[out] next_unsent_frag Return next fragment to be sent. |
133 | | * @param[out] gap_seq Return, when it is its first delivery (should be relevant seq_num), the sequence number of |
134 | | * the first sequence of the gap [first, seq_num). Otherwise return SequenceNumber_t::unknown(). |
135 | | * @param[in] min_seq Minimum sequence number managed by the History. It could be SequenceNumber_t::unknown() if |
136 | | * history is empty. |
137 | | * @param[out] need_reactivate_periodic_heartbeat Indicates if the heartbeat period event has to be restarted. |
138 | | * |
139 | | * @return true if the change is marked to be sent. False otherwise. |
140 | | */ |
141 | | bool change_is_unsent( |
142 | | const SequenceNumber_t& seq_num, |
143 | | FragmentNumber_t& next_unsent_frag, |
144 | | SequenceNumber_t& gap_seq, |
145 | | const SequenceNumber_t& min_seq, |
146 | | bool& need_reactivate_periodic_heartbeat) const; |
147 | | |
148 | | /** |
149 | | * Mark all changes up to the one indicated by seq_num as Acknowledged. |
150 | | * For instance, when seq_num is 30, changes 1-29 are marked as acknowledged. |
151 | | * @param seq_num Sequence number of the first change not to be marked as acknowledged. |
152 | | */ |
153 | | void acked_changes_set( |
154 | | const SequenceNumber_t& seq_num); |
155 | | |
156 | | /** |
157 | | * Mark all changes in the vector as requested. |
158 | | * @param seq_num_set Bitmap of sequence numbers. |
159 | | * @param gap_builder RTPSGapBuilder reference uses for adding each requested change that is irrelevant for the |
160 | | * requester. |
161 | | * @param[in] min_seq_in_history Minimum SequenceNumber_t in the writer's history. If writer's history is empty, |
162 | | * SequenceNumber_t::unknown() is expected. |
163 | | * @return true if at least one change has been marked as REQUESTED, false otherwise. |
164 | | */ |
165 | | bool requested_changes_set( |
166 | | const SequenceNumberSet_t& seq_num_set, |
167 | | RTPSGapBuilder& gap_builder, |
168 | | const SequenceNumber_t& min_seq_in_history); |
169 | | |
170 | | /** |
171 | | * Performs processing of preemptive acknack |
172 | | * @param func functor called, if the requester is a local reader, for each changes moved to UNSENT status. |
173 | | * @return true if a heartbeat should be sent, false otherwise. |
174 | | */ |
175 | | bool process_initial_acknack( |
176 | | const std::function<void(ChangeForReader_t& change)>& func); |
177 | | |
178 | | /*! |
179 | | * @brief Sets a change to a particular status (if present in the ReaderProxy) |
180 | | * @param seq_num Sequence number of the change to update. |
181 | | * @param status Status to apply. |
182 | | * @param restart_nack_supression Whether nack supression event should be restarted or not. |
183 | | * @param delivered true if change was able to be delivered to its addressees. false otherwise. |
184 | | */ |
185 | | void from_unsent_to_status( |
186 | | const SequenceNumber_t& seq_num, |
187 | | ChangeForReaderStatus_t status, |
188 | | bool restart_nack_supression, |
189 | | bool delivered = true); |
190 | | |
191 | | /** |
192 | | * @brief Mark a particular fragment as sent. |
193 | | * @param[in] seq_num Sequence number of the change to update. |
194 | | * @param[in] frag_num Fragment number to mark as sent. |
195 | | * @param[out] was_last_fragment Indicates if the fragment was the last one pending. |
196 | | * @return true when the change was found, false otherwise. |
197 | | */ |
198 | | bool mark_fragment_as_sent_for_change( |
199 | | const SequenceNumber_t& seq_num, |
200 | | FragmentNumber_t frag_num, |
201 | | bool& was_last_fragment); |
202 | | |
203 | | /** |
204 | | * Turns all UNDERWAY changes into UNACKNOWLEDGED. |
205 | | * |
206 | | * @return true if at least one change changed its status, false otherwise. |
207 | | */ |
208 | | bool perform_nack_supression(); |
209 | | |
210 | | /** |
211 | | * Turns all REQUESTED changes into UNSENT. |
212 | | * |
213 | | * @param func Function executed for each change which changes its status. |
214 | | * @return the number of changes that changed its status. |
215 | | */ |
216 | | uint32_t perform_acknack_response( |
217 | | const std::function<void(ChangeForReader_t& change)>& func); |
218 | | |
219 | | /** |
220 | | * Call this to inform a change was removed from history. |
221 | | * @param seq_num Sequence number of the removed change. |
222 | | */ |
223 | | void change_has_been_removed( |
224 | | const SequenceNumber_t& seq_num); |
225 | | |
226 | | /*! |
227 | | * @brief Returns there is some UNACKNOWLEDGED change. |
228 | | * @param first_seq_in_history Minimum sequence number in the writer history. |
229 | | * @return There is some UNACKNOWLEDGED change. |
230 | | */ |
231 | | bool has_unacknowledged( |
232 | | const SequenceNumber_t& first_seq_in_history) const; |
233 | | |
234 | | /** |
235 | | * Get the GUID of the reader represented by this proxy. |
236 | | * @return the GUID of the reader represented by this proxy. |
237 | | */ |
238 | | inline const GUID_t& guid() const |
239 | 0 | { |
240 | 0 | return locator_info_.remote_guid(); |
241 | 0 | } |
242 | | |
243 | | /** |
244 | | * Get the durability of the reader represented by this proxy. |
245 | | * @return the durability of the reader represented by this proxy. |
246 | | */ |
247 | | inline DurabilityKind_t durability_kind() const |
248 | 0 | { |
249 | 0 | return durability_kind_; |
250 | 0 | } |
251 | | |
252 | | /** |
253 | | * Check if the reader represented by this proxy expexts inline QOS to be received. |
254 | | * @return true if the reader represented by this proxy expexts inline QOS to be received. |
255 | | */ |
256 | | inline bool expects_inline_qos() const |
257 | 0 | { |
258 | 0 | return expects_inline_qos_; |
259 | 0 | } |
260 | | |
261 | | /** |
262 | | * Check if the reader represented by this proxy is reliable. |
263 | | * @return true if the reader represented by this proxy is reliable. |
264 | | */ |
265 | | inline bool is_reliable() const |
266 | 0 | { |
267 | 0 | return is_reliable_; |
268 | 0 | } |
269 | | |
270 | | inline bool disable_positive_acks() const |
271 | 0 | { |
272 | 0 | return disable_positive_acks_; |
273 | 0 | } |
274 | | |
275 | | /** |
276 | | * Check if the reader represented by this proxy is remote and reliable. |
277 | | * @return true if the reader represented by this proxy is remote and reliable. |
278 | | */ |
279 | | inline bool is_remote_and_reliable() const |
280 | 0 | { |
281 | 0 | return !locator_info_.is_local_reader() && !locator_info_.is_datasharing_reader() && is_reliable_; |
282 | 0 | } |
283 | | |
284 | | /** |
285 | | * Check if the reader is on the same process. |
286 | | * @return true if the reader is no the same process. |
287 | | */ |
288 | | inline bool is_local_reader() |
289 | 0 | { |
290 | 0 | return locator_info_.is_local_reader(); |
291 | 0 | } |
292 | | |
293 | | /** |
294 | | * Get the local reader on the same process (if any). |
295 | | * @return The local reader on the same process. |
296 | | */ |
297 | | inline RTPSReader* local_reader() |
298 | 0 | { |
299 | 0 | return locator_info_.local_reader(); |
300 | 0 | } |
301 | | |
302 | | /** |
303 | | * Called when an ACKNACK is received to set a new value for the count of the last received ACKNACK. |
304 | | * @param acknack_count The count of the received ACKNACK. |
305 | | * @return true if internal count changed (i.e. new ACKNACK is accepted) |
306 | | */ |
307 | | bool check_and_set_acknack_count( |
308 | | uint32_t acknack_count) |
309 | 0 | { |
310 | 0 | if (last_acknack_count_ < acknack_count) |
311 | 0 | { |
312 | 0 | last_acknack_count_ = acknack_count; |
313 | 0 | return true; |
314 | 0 | } |
315 | | |
316 | 0 | return false; |
317 | 0 | } |
318 | | |
319 | | /** |
320 | | * Process an incoming NACKFRAG submessage. |
321 | | * @param reader_guid Destination guid of the submessage. |
322 | | * @param nack_count Counter field of the submessage. |
323 | | * @param seq_num Sequence number field of the submessage. |
324 | | * @param fragments_state Bitmap indicating the requested fragments. |
325 | | * @return true if a change was modified, false otherwise. |
326 | | */ |
327 | | bool process_nack_frag( |
328 | | const GUID_t& reader_guid, |
329 | | uint32_t nack_count, |
330 | | const SequenceNumber_t& seq_num, |
331 | | const FragmentNumberSet_t& fragments_state); |
332 | | |
333 | | /** |
334 | | * Filter a CacheChange_t using the StatefulWriter's IReaderDataFilter. |
335 | | * @param change |
336 | | * @return true if the change is relevant, false otherwise. |
337 | | */ |
338 | | bool rtps_is_relevant( |
339 | | CacheChange_t* change) const; |
340 | | |
341 | | /** |
342 | | * Get the highest fully acknowledged sequence number. |
343 | | * @return the highest fully acknowledged sequence number. |
344 | | */ |
345 | | SequenceNumber_t changes_low_mark() const |
346 | 0 | { |
347 | 0 | return changes_low_mark_; |
348 | 0 | } |
349 | | |
350 | | /** |
351 | | * Change the interval of nack-supression event. |
352 | | * @param interval Time from data sending to acknack processing. |
353 | | */ |
354 | | void update_nack_supression_interval( |
355 | | const Duration_t& interval); |
356 | | |
357 | | LocatorSelectorEntry* general_locator_selector_entry() |
358 | 0 | { |
359 | 0 | return locator_info_.general_locator_selector_entry(); |
360 | 0 | } |
361 | | |
362 | | LocatorSelectorEntry* async_locator_selector_entry() |
363 | 0 | { |
364 | 0 | return locator_info_.async_locator_selector_entry(); |
365 | 0 | } |
366 | | |
367 | | RTPSMessageSenderInterface* message_sender() |
368 | 0 | { |
369 | 0 | return &locator_info_; |
370 | 0 | } |
371 | | |
372 | | bool is_datasharing_reader() const |
373 | 0 | { |
374 | 0 | return locator_info_.is_datasharing_reader(); |
375 | 0 | } |
376 | | |
377 | | IDataSharingNotifier* datasharing_notifier() |
378 | 0 | { |
379 | 0 | return locator_info_.datasharing_notifier(); |
380 | 0 | } |
381 | | |
382 | | const IDataSharingNotifier* datasharing_notifier() const |
383 | 0 | { |
384 | 0 | return locator_info_.datasharing_notifier(); |
385 | 0 | } |
386 | | |
387 | | void datasharing_notify() |
388 | 0 | { |
389 | 0 | locator_info_.datasharing_notify(); |
390 | 0 | } |
391 | | |
392 | | size_t locators_size() const |
393 | 0 | { |
394 | 0 | return locator_info_.locators_size(); |
395 | 0 | } |
396 | | |
397 | | bool active() const |
398 | 0 | { |
399 | 0 | return active_; |
400 | 0 | } |
401 | | |
402 | | void active( |
403 | | bool active) |
404 | 0 | { |
405 | 0 | active_ = active; |
406 | 0 | } |
407 | | |
408 | | private: |
409 | | |
410 | | //!Is this proxy active? I.e. does it have a remote reader associated? |
411 | | bool is_active_; |
412 | | //!Reader locator information |
413 | | ReaderLocator locator_info_; |
414 | | //!Taken from QoS |
415 | | DurabilityKind_t durability_kind_; |
416 | | //!Taken from QoS |
417 | | bool expects_inline_qos_; |
418 | | //!Taken from QoS |
419 | | bool is_reliable_; |
420 | | //!Taken from QoS |
421 | | bool disable_positive_acks_; |
422 | | //!Pointer to the associated StatefulWriter. |
423 | | StatefulWriter* writer_; |
424 | | //!Set of the changes and its state. |
425 | | ResourceLimitedVector<ChangeForReader_t, std::true_type> changes_for_reader_; |
426 | | //! Timed Event to manage the delay to mark a change as UNACKED after sending it. |
427 | | TimedEvent* nack_supression_event_; |
428 | | TimedEvent* initial_heartbeat_event_; |
429 | | //! Are timed events enabled? |
430 | | std::atomic_bool timers_enabled_; |
431 | | //! Last ack/nack count |
432 | | uint32_t last_acknack_count_; |
433 | | //! Last NACKFRAG count. |
434 | | uint32_t last_nackfrag_count_; |
435 | | |
436 | | SequenceNumber_t changes_low_mark_; |
437 | | |
438 | | bool active_ = false; |
439 | | |
440 | | using ChangeIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::iterator; |
441 | | using ChangeConstIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::const_iterator; |
442 | | |
443 | | void disable_timers(); |
444 | | |
445 | | /* |
446 | | * Converts all changes with a given status to a different status. |
447 | | * @param previous Status to change. |
448 | | * @param next Status to adopt. |
449 | | * @param func Function executed for each change which changes its status. |
450 | | * @return the number of changes that have been modified. |
451 | | */ |
452 | | uint32_t convert_status_on_all_changes( |
453 | | ChangeForReaderStatus_t previous, |
454 | | ChangeForReaderStatus_t next, |
455 | | const std::function<void(ChangeForReader_t& change)>& func = {}); |
456 | | |
457 | | /*! |
458 | | * @brief Adds requested fragments. These fragments will be sent in next NackResponseDelay. |
459 | | * @param[in] seq_num Sequence number to be paired with the requested fragments. |
460 | | * @param[in] frag_set set containing the requested fragments to be sent. |
461 | | * @return True if there is at least one requested fragment. False in other case. |
462 | | */ |
463 | | bool requested_fragment_set( |
464 | | const SequenceNumber_t& seq_num, |
465 | | const FragmentNumberSet_t& frag_set); |
466 | | |
467 | | void add_change( |
468 | | const ChangeForReader_t& change, |
469 | | bool is_relevant); |
470 | | |
471 | | /** |
472 | | * @brief Find a change with the specified sequence number. |
473 | | * @param seq_num Sequence number to find. |
474 | | * @param exact When false, the first change with a sequence number not less than seq_num will be returned. |
475 | | * When true, the change with a sequence number value of seq_num will be returned. |
476 | | * @return Iterator pointing to the change, changes_for_reader_.end() if not found. |
477 | | */ |
478 | | ChangeIterator find_change( |
479 | | const SequenceNumber_t& seq_num, |
480 | | bool exact); |
481 | | |
482 | | /** |
483 | | * @brief Find a change with the specified sequence number. |
484 | | * @param seq_num Sequence number to find. |
485 | | * @return Iterator pointing to the change, changes_for_reader_.end() if not found. |
486 | | */ |
487 | | ChangeConstIterator find_change( |
488 | | const SequenceNumber_t& seq_num) const; |
489 | | }; |
490 | | |
491 | | } /* namespace rtps */ |
492 | | } /* namespace fastrtps */ |
493 | | } /* namespace eprosima */ |
494 | | |
495 | | #endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC |
496 | | #endif /* _FASTDDS_RTPS_WRITER_READERPROXY_H_ */ |