/src/Fast-DDS/include/fastdds/rtps/writer/StatefulWriter.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | /** |
16 | | * @file StatefulWriter.h |
17 | | */ |
18 | | |
19 | | #ifndef _FASTDDS_RTPS_STATEFULWRITER_H_ |
20 | | #define _FASTDDS_RTPS_STATEFULWRITER_H_ |
21 | | |
22 | | #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC |
23 | | |
24 | | #include <fastdds/rtps/writer/RTPSWriter.h> |
25 | | #include <fastdds/rtps/interfaces/IReaderDataFilter.hpp> |
26 | | #include <fastdds/rtps/history/IChangePool.h> |
27 | | #include <fastdds/rtps/history/IPayloadPool.h> |
28 | | #include <fastrtps/utils/collections/ResourceLimitedVector.hpp> |
29 | | #include <condition_variable> |
30 | | #include <mutex> |
31 | | |
32 | | namespace eprosima { |
33 | | namespace fastrtps { |
34 | | namespace rtps { |
35 | | |
36 | | class ReaderProxy; |
37 | | class TimedEvent; |
38 | | |
39 | | /** |
40 | | * Class StatefulWriter, specialization of RTPSWriter that maintains information of each matched Reader. |
41 | | * @ingroup WRITER_MODULE |
42 | | */ |
43 | | class StatefulWriter : public RTPSWriter |
44 | | { |
45 | | friend class RTPSParticipantImpl; |
46 | | friend class ReaderProxy; |
47 | | |
48 | | public: |
49 | | |
50 | | //!Destructor |
51 | | virtual ~StatefulWriter(); |
52 | | |
53 | | protected: |
54 | | |
55 | | //!Constructor |
56 | | StatefulWriter( |
57 | | RTPSParticipantImpl* impl, |
58 | | const GUID_t& guid, |
59 | | const WriterAttributes& att, |
60 | | fastdds::rtps::FlowController* flow_controller, |
61 | | WriterHistory* hist, |
62 | | WriterListener* listen = nullptr); |
63 | | |
64 | | StatefulWriter( |
65 | | RTPSParticipantImpl* impl, |
66 | | const GUID_t& guid, |
67 | | const WriterAttributes& att, |
68 | | const std::shared_ptr<IPayloadPool>& payload_pool, |
69 | | fastdds::rtps::FlowController* flow_controller, |
70 | | WriterHistory* hist, |
71 | | WriterListener* listen = nullptr); |
72 | | |
73 | | StatefulWriter( |
74 | | RTPSParticipantImpl* impl, |
75 | | const GUID_t& guid, |
76 | | const WriterAttributes& att, |
77 | | const std::shared_ptr<IPayloadPool>& payload_pool, |
78 | | const std::shared_ptr<IChangePool>& change_pool, |
79 | | fastdds::rtps::FlowController* flow_controller, |
80 | | WriterHistory* hist, |
81 | | WriterListener* listen = nullptr); |
82 | | |
83 | | void rebuild_status_after_load(); |
84 | | |
85 | | virtual void print_inconsistent_acknack( |
86 | | const GUID_t& writer_guid, |
87 | | const GUID_t& reader_guid, |
88 | | const SequenceNumber_t& min_requested_sequence_number, |
89 | | const SequenceNumber_t& max_requested_sequence_number, |
90 | | const SequenceNumber_t& next_sequence_number); |
91 | | |
92 | | private: |
93 | | |
94 | | void init( |
95 | | RTPSParticipantImpl* pimpl, |
96 | | const WriterAttributes& att); |
97 | | |
98 | | //!Timed Event to manage the periodic HB to the Reader. |
99 | | TimedEvent* periodic_hb_event_; |
100 | | |
101 | | //! Timed Event to manage the Acknack response delay. |
102 | | TimedEvent* nack_response_event_; |
103 | | |
104 | | //! A timed event to mark samples as acknowledget (used only if disable positive ACKs QoS is enabled) |
105 | | TimedEvent* ack_event_; |
106 | | |
107 | | //!Count of the sent heartbeats. |
108 | | Count_t m_heartbeatCount; |
109 | | //!WriterTimes |
110 | | WriterTimes m_times; |
111 | | |
112 | | //! Vector containing all the remote ReaderProxies. |
113 | | ResourceLimitedVector<ReaderProxy*> matched_remote_readers_; |
114 | | //! Vector containing all the inactive, ready for reuse, ReaderProxies. |
115 | | ResourceLimitedVector<ReaderProxy*> matched_readers_pool_; |
116 | | |
117 | | using ReaderProxyIterator = ResourceLimitedVector<ReaderProxy*>::iterator; |
118 | | using ReaderProxyConstIterator = ResourceLimitedVector<ReaderProxy*>::const_iterator; |
119 | | |
120 | | //!To avoid notifying twice of the same sequence number |
121 | | SequenceNumber_t next_all_acked_notify_sequence_; |
122 | | SequenceNumber_t min_readers_low_mark_; |
123 | | |
124 | | // TODO Join this mutex when main mutex would not be recursive. |
125 | | std::mutex all_acked_mutex_; |
126 | | std::condition_variable all_acked_cond_; |
127 | | // TODO Also remove when main mutex not recursive. |
128 | | bool all_acked_; |
129 | | std::condition_variable_any may_remove_change_cond_; |
130 | | unsigned int may_remove_change_; |
131 | | |
132 | | public: |
133 | | |
134 | | /** |
135 | | * Add a specific change to all ReaderLocators. |
136 | | * @param p Pointer to the change. |
137 | | * @param max_blocking_time |
138 | | */ |
139 | | void unsent_change_added_to_history( |
140 | | CacheChange_t* p, |
141 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override; |
142 | | |
143 | | /** |
144 | | * Indicate the writer that a change has been removed by the history due to some HistoryQos requirement. |
145 | | * @param a_change Pointer to the change that is going to be removed. |
146 | | * @return True if removed correctly. |
147 | | */ |
148 | | bool change_removed_by_history( |
149 | | CacheChange_t* a_change) override; |
150 | | |
151 | | /** |
152 | | * Sends a change directly to a intraprocess reader. |
153 | | */ |
154 | | bool intraprocess_delivery( |
155 | | CacheChange_t* change, |
156 | | ReaderProxy* reader_proxy); |
157 | | |
158 | | bool intraprocess_gap( |
159 | | ReaderProxy* reader_proxy, |
160 | | const SequenceNumber_t& seq_num) |
161 | 0 | { |
162 | 0 | SequenceNumber_t last_seq = seq_num + 1; |
163 | 0 | return intraprocess_gap(reader_proxy, seq_num, last_seq); |
164 | 0 | } |
165 | | |
166 | | bool intraprocess_gap( |
167 | | ReaderProxy* reader_proxy, |
168 | | const SequenceNumber_t& first_seq, |
169 | | const SequenceNumber_t& last_seq); |
170 | | |
171 | | bool intraprocess_heartbeat( |
172 | | ReaderProxy* reader_proxy, |
173 | | bool liveliness = false); |
174 | | |
175 | | //!Increment the HB count. |
176 | | inline void incrementHBCount() |
177 | 0 | { |
178 | 0 | on_heartbeat(++m_heartbeatCount); |
179 | 0 | } |
180 | | |
181 | | /** |
182 | | * Add a matched reader. |
183 | | * @param data Pointer to the ReaderProxyData object added. |
184 | | * @return True if added. |
185 | | */ |
186 | | bool matched_reader_add( |
187 | | const ReaderProxyData& data) override; |
188 | | |
189 | | /** |
190 | | * Remove a matched reader. |
191 | | * @param reader_guid GUID of the reader to remove. |
192 | | * @return True if removed. |
193 | | */ |
194 | | bool matched_reader_remove( |
195 | | const GUID_t& reader_guid) override; |
196 | | |
197 | | /** |
198 | | * Tells us if a specific Reader is matched against this writer |
199 | | * @param reader_guid GUID of the reader to check. |
200 | | * @return True if it was matched. |
201 | | */ |
202 | | bool matched_reader_is_matched( |
203 | | const GUID_t& reader_guid) override; |
204 | | |
205 | | bool is_acked_by_all( |
206 | | const CacheChange_t* a_change) const override; |
207 | | |
208 | | template <typename Function> |
209 | | Function for_each_reader_proxy( |
210 | | Function f) const |
211 | 0 | { |
212 | | // we cannot directly pass iterators neither const_iterators to matched_readers_ because then the functor would |
213 | | // be able to modify ReaderProxy elements |
214 | 0 | for ( const ReaderProxy* rp : matched_local_readers_ ) |
215 | 0 | { |
216 | 0 | f(rp); |
217 | 0 | } |
218 | 0 | for ( const ReaderProxy* rp : matched_datasharing_readers_ ) |
219 | 0 | { |
220 | 0 | f(rp); |
221 | 0 | } |
222 | 0 | for ( const ReaderProxy* rp : matched_remote_readers_ ) |
223 | 0 | { |
224 | 0 | f(rp); |
225 | 0 | } |
226 | |
|
227 | 0 | return f; |
228 | 0 | } |
229 | | |
230 | | bool wait_for_all_acked( |
231 | | const Duration_t& max_wait) override; |
232 | | |
233 | | bool all_readers_updated(); |
234 | | |
235 | | /** |
236 | | * Remove the change with the minimum SequenceNumber |
237 | | * @return True if removed. |
238 | | */ |
239 | | bool try_remove_change( |
240 | | const std::chrono::steady_clock::time_point& max_blocking_time_point, |
241 | | std::unique_lock<RecursiveTimedMutex>& lock) override; |
242 | | |
243 | | bool wait_for_acknowledgement( |
244 | | const SequenceNumber_t& seq, |
245 | | const std::chrono::steady_clock::time_point& max_blocking_time_point, |
246 | | std::unique_lock<RecursiveTimedMutex>& lock) override; |
247 | | |
248 | | /** |
249 | | * Update the Attributes of the Writer. |
250 | | * @param att New attributes |
251 | | */ |
252 | | void updateAttributes( |
253 | | const WriterAttributes& att) override; |
254 | | |
255 | | /** |
256 | | * Find a Reader Proxy in this writer. |
257 | | * @param[in] readerGuid The GUID_t of the reader. |
258 | | * @param[out] RP Pointer to pointer to return the ReaderProxy. |
259 | | * @return True if correct. |
260 | | */ |
261 | | bool matched_reader_lookup( |
262 | | GUID_t& readerGuid, |
263 | | ReaderProxy** RP); |
264 | | |
265 | | /** Get count of heartbeats |
266 | | * @return count of heartbeats |
267 | | */ |
268 | | inline Count_t getHeartbeatCount() const |
269 | 0 | { |
270 | 0 | return this->m_heartbeatCount; |
271 | 0 | } |
272 | | |
273 | | /** |
274 | | * Get the RTPS participant |
275 | | * @return RTPS participant |
276 | | */ |
277 | | inline RTPSParticipantImpl* getRTPSParticipant() const |
278 | 0 | { |
279 | 0 | return mp_RTPSParticipant; |
280 | 0 | } |
281 | | |
282 | | /** |
283 | | * Get the number of matched readers |
284 | | * @return Number of the matched readers |
285 | | */ |
286 | | inline size_t getMatchedReadersSize() const |
287 | 0 | { |
288 | 0 | std::lock_guard<RecursiveTimedMutex> guard(mp_mutex); |
289 | 0 | return matched_remote_readers_.size() |
290 | 0 | + matched_local_readers_.size() |
291 | 0 | + matched_datasharing_readers_.size(); |
292 | 0 | } |
293 | | |
294 | | /** |
295 | | * @brief Returns true if disable positive ACKs QoS is enabled |
296 | | * |
297 | | * @return True if positive acks are disabled, false otherwise |
298 | | */ |
299 | | inline bool get_disable_positive_acks() const |
300 | 0 | { |
301 | 0 | return disable_positive_acks_; |
302 | 0 | } |
303 | | |
304 | | /** |
305 | | * Update the WriterTimes attributes of all associated ReaderProxy. |
306 | | * @param times WriterTimes parameter. |
307 | | */ |
308 | | void updateTimes( |
309 | | const WriterTimes& times); |
310 | | |
311 | | SequenceNumber_t next_sequence_number() const; |
312 | | |
313 | | /** |
314 | | * @brief Sends a periodic heartbeat |
315 | | * |
316 | | * @param final Final flag |
317 | | * @param liveliness Liveliness flag |
318 | | * |
319 | | * @return True on success |
320 | | */ |
321 | | bool send_periodic_heartbeat( |
322 | | bool final = false, |
323 | | bool liveliness = false); |
324 | | |
325 | | /*! |
326 | | * @brief Sends a heartbeat to a remote reader. |
327 | | * |
328 | | * @remarks This function is non thread-safe. |
329 | | */ |
330 | | void send_heartbeat_to_nts( |
331 | | ReaderProxy& remoteReaderProxy, |
332 | | bool liveliness = false, |
333 | | bool force = false); |
334 | | |
335 | | void perform_nack_response(); |
336 | | |
337 | | void perform_nack_supression( |
338 | | const GUID_t& reader_guid); |
339 | | |
340 | | /** |
341 | | * Process an incoming ACKNACK submessage. |
342 | | * @param[in] writer_guid GUID of the writer the submessage is directed to. |
343 | | * @param[in] reader_guid GUID of the reader originating the submessage. |
344 | | * @param[in] ack_count Count field of the submessage. |
345 | | * @param[in] sn_set Sequence number bitmap field of the submessage. |
346 | | * @param[in] final_flag Final flag field of the submessage. |
347 | | * @param[out] result true if the writer could process the submessage. |
348 | | * Only valid when returned value is true. |
349 | | * @return true when the submessage was destinated to this writer, false otherwise. |
350 | | */ |
351 | | bool process_acknack( |
352 | | const GUID_t& writer_guid, |
353 | | const GUID_t& reader_guid, |
354 | | uint32_t ack_count, |
355 | | const SequenceNumberSet_t& sn_set, |
356 | | bool final_flag, |
357 | | bool& result) override; |
358 | | |
359 | | /** |
360 | | * Process an incoming NACKFRAG submessage. |
361 | | * @param[in] writer_guid GUID of the writer the submessage is directed to. |
362 | | * @param[in] reader_guid GUID of the reader originating the submessage. |
363 | | * @param[in] ack_count Count field of the submessage. |
364 | | * @param[in] seq_num Sequence number field of the submessage. |
365 | | * @param[in] fragments_state Sequence number field of the submessage. |
366 | | * @param[out] result true if the writer could process the submessage. |
367 | | * Only valid when returned value is true. |
368 | | * @return true when the submessage was destinated to this writer, false otherwise. |
369 | | */ |
370 | | virtual bool process_nack_frag( |
371 | | const GUID_t& writer_guid, |
372 | | const GUID_t& reader_guid, |
373 | | uint32_t ack_count, |
374 | | const SequenceNumber_t& seq_num, |
375 | | const FragmentNumberSet_t fragments_state, |
376 | | bool& result) override; |
377 | | |
378 | | /** |
379 | | * @brief Set a content filter to perform content filtering on this writer. |
380 | | * |
381 | | * This method sets a content filter that will be used to check whether a cache change is relevant |
382 | | * for a reader or not. |
383 | | * |
384 | | * @param filter The content filter to use on this writer. May be @c nullptr to remove the content filter |
385 | | * (i.e. treat all samples as relevant). |
386 | | */ |
387 | | void reader_data_filter( |
388 | | fastdds::rtps::IReaderDataFilter* filter) final; |
389 | | |
390 | | /** |
391 | | * @brief Get the content filter used to perform content filtering on this writer. |
392 | | * |
393 | | * @return The content filter used on this writer. |
394 | | */ |
395 | | const fastdds::rtps::IReaderDataFilter* reader_data_filter() const final; |
396 | | |
397 | | /*! |
398 | | * Tells writer the sample can be sent to the network. |
399 | | * This function should be used by a fastdds::rtps::FlowController. |
400 | | * |
401 | | * @param cache_change Pointer to the CacheChange_t that represents the sample which can be sent. |
402 | | * @param group RTPSMessageGroup reference uses for generating the RTPS message. |
403 | | * @param locator_selector RTPSMessageSenderInterface reference uses for selecting locators. The reference has to |
404 | | * be a member of this RTPSWriter object. |
405 | | * @param max_blocking_time Future timepoint where blocking send should end. |
406 | | * @return Return code. |
407 | | * @note Must be non-thread safe. |
408 | | */ |
409 | | DeliveryRetCode deliver_sample_nts( |
410 | | CacheChange_t* cache_change, |
411 | | RTPSMessageGroup& group, |
412 | | LocatorSelectorSender& locator_selector, |
413 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override; |
414 | | |
415 | | LocatorSelectorSender& get_general_locator_selector() override |
416 | 0 | { |
417 | 0 | return locator_selector_general_; |
418 | 0 | } |
419 | | |
420 | | LocatorSelectorSender& get_async_locator_selector() override |
421 | 0 | { |
422 | 0 | return locator_selector_async_; |
423 | 0 | } |
424 | | |
425 | | private: |
426 | | |
427 | | bool is_acked_by_all( |
428 | | const SequenceNumber_t seq) const; |
429 | | |
430 | | void update_reader_info( |
431 | | LocatorSelectorSender& locator_selector, |
432 | | bool create_sender_resources); |
433 | | |
434 | | void send_heartbeat_piggyback_nts_( |
435 | | ReaderProxy* reader, |
436 | | RTPSMessageGroup& message_group, |
437 | | LocatorSelectorSender& locator_selector, |
438 | | uint32_t& last_bytes_processed); |
439 | | |
440 | | void send_heartbeat_nts_( |
441 | | size_t number_of_readers, |
442 | | RTPSMessageGroup& message_group, |
443 | | bool final, |
444 | | bool liveliness = false); |
445 | | |
446 | | void check_acked_status(); |
447 | | |
448 | | /** |
449 | | * @brief A method called when the ack timer expires |
450 | | * |
451 | | * @details Only used if disable positive ACKs QoS is enabled |
452 | | */ |
453 | | bool ack_timer_expired(); |
454 | | |
455 | | void send_heartbeat_to_all_readers(); |
456 | | |
457 | | void deliver_sample_to_intraprocesses( |
458 | | CacheChange_t* change); |
459 | | |
460 | | void deliver_sample_to_datasharing( |
461 | | CacheChange_t* change); |
462 | | |
463 | | DeliveryRetCode deliver_sample_to_network( |
464 | | CacheChange_t* change, |
465 | | RTPSMessageGroup& group, |
466 | | LocatorSelectorSender& locator_selector, |
467 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time); |
468 | | |
469 | | void prepare_datasharing_delivery( |
470 | | CacheChange_t* change); |
471 | | |
472 | | //! True to disable piggyback heartbeats |
473 | | bool disable_heartbeat_piggyback_; |
474 | | //! True to disable positive ACKs |
475 | | bool disable_positive_acks_; |
476 | | //! Keep duration for disable positive ACKs QoS, in microseconds |
477 | | std::chrono::duration<double, std::ratio<1, 1000000>> keep_duration_us_; |
478 | | //! Last acknowledged cache change (only used if using disable positive ACKs QoS) |
479 | | SequenceNumber_t last_sequence_number_; |
480 | | //! Biggest sequence number removed from history |
481 | | SequenceNumber_t biggest_removed_sequence_number_; |
482 | | |
483 | | const uint32_t sendBufferSize_; |
484 | | |
485 | | int32_t currentUsageSendBufferSize_; |
486 | | |
487 | | bool there_are_remote_readers_ = false; |
488 | | bool there_are_local_readers_ = false; |
489 | | |
490 | | StatefulWriter& operator =( |
491 | | const StatefulWriter&) = delete; |
492 | | |
493 | | //! The filter for the reader |
494 | | fastdds::rtps::IReaderDataFilter* reader_data_filter_ = nullptr; |
495 | | //! Vector containing all the active ReaderProxies for intraprocess delivery. |
496 | | ResourceLimitedVector<ReaderProxy*> matched_local_readers_; |
497 | | //! Vector containing all the active ReaderProxies for datasharing delivery. |
498 | | ResourceLimitedVector<ReaderProxy*> matched_datasharing_readers_; |
499 | | bool there_are_datasharing_readers_ = false; |
500 | | |
501 | | LocatorSelectorSender locator_selector_general_; |
502 | | |
503 | | LocatorSelectorSender locator_selector_async_; |
504 | | }; |
505 | | |
506 | | } /* namespace rtps */ |
507 | | } /* namespace fastrtps */ |
508 | | } /* namespace eprosima */ |
509 | | |
510 | | #endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC |
511 | | #endif /* _FASTDDS_RTPS_STATEFULWRITER_H_ */ |