/src/Fast-DDS/include/fastdds/rtps/writer/RTPSWriter.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | /** |
16 | | * @file RTPSWriter.h |
17 | | */ |
18 | | |
19 | | #ifndef _FASTDDS_RTPS_RTPSWRITER_H_ |
20 | | #define _FASTDDS_RTPS_RTPSWRITER_H_ |
21 | | |
22 | | #include <chrono> |
23 | | #include <functional> |
24 | | #include <memory> |
25 | | #include <mutex> |
26 | | #include <vector> |
27 | | |
28 | | #include <fastdds/rtps/Endpoint.h> |
29 | | #include <fastdds/rtps/attributes/HistoryAttributes.h> |
30 | | #include <fastdds/rtps/attributes/WriterAttributes.h> |
31 | | #include <fastdds/rtps/builtin/data/ReaderProxyData.h> |
32 | | #include <fastdds/rtps/interfaces/IReaderDataFilter.hpp> |
33 | | #include <fastdds/rtps/messages/RTPSMessageGroup.h> |
34 | | #include "DeliveryRetCode.hpp" |
35 | | #include "LocatorSelectorSender.hpp" |
36 | | #include <fastrtps/qos/LivelinessLostStatus.h> |
37 | | |
38 | | #include <fastdds/statistics/rtps/StatisticsCommon.hpp> |
39 | | |
40 | | namespace eprosima { |
41 | | |
42 | | namespace fastdds { |
43 | | namespace rtps { |
44 | | |
45 | | class FlowController; |
46 | | |
47 | | } // namespace rtps |
48 | | |
49 | | namespace dds { |
50 | | |
51 | | class DataWriterImpl; |
52 | | |
53 | | } // namespace dds |
54 | | } // namespace fastdds |
55 | | |
56 | | namespace fastrtps { |
57 | | namespace rtps { |
58 | | |
59 | | class WriterListener; |
60 | | class WriterHistory; |
61 | | class DataSharingNotifier; |
62 | | struct CacheChange_t; |
63 | | |
64 | | /** |
65 | | * Class RTPSWriter, manages the sending of data to the readers. Is always associated with a HistoryCache. |
66 | | * @ingroup WRITER_MODULE |
67 | | */ |
68 | | class RTPSWriter |
69 | | : public Endpoint |
70 | | , public fastdds::statistics::StatisticsWriterImpl |
71 | | { |
72 | | friend class WriterHistory; |
73 | | friend class RTPSParticipantImpl; |
74 | | friend class RTPSMessageGroup; |
75 | | friend class fastdds::dds::DataWriterImpl; |
76 | | |
77 | | protected: |
78 | | |
79 | | RTPSWriter( |
80 | | RTPSParticipantImpl* impl, |
81 | | const GUID_t& guid, |
82 | | const WriterAttributes& att, |
83 | | fastdds::rtps::FlowController* flow_controller, |
84 | | WriterHistory* hist, |
85 | | WriterListener* listen = nullptr); |
86 | | |
87 | | RTPSWriter( |
88 | | RTPSParticipantImpl* impl, |
89 | | const GUID_t& guid, |
90 | | const WriterAttributes& att, |
91 | | const std::shared_ptr<IPayloadPool>& payload_pool, |
92 | | fastdds::rtps::FlowController* flow_controller, |
93 | | WriterHistory* hist, |
94 | | WriterListener* listen = nullptr); |
95 | | |
96 | | RTPSWriter( |
97 | | RTPSParticipantImpl* impl, |
98 | | const GUID_t& guid, |
99 | | const WriterAttributes& att, |
100 | | const std::shared_ptr<IPayloadPool>& payload_pool, |
101 | | const std::shared_ptr<IChangePool>& change_pool, |
102 | | fastdds::rtps::FlowController* flow_controller, |
103 | | WriterHistory* hist, |
104 | | WriterListener* listen = nullptr); |
105 | | |
106 | | virtual ~RTPSWriter(); |
107 | | |
108 | | public: |
109 | | |
110 | | /** |
111 | | * Create a new change based with the provided changeKind. |
112 | | * @param data Data of the change. |
113 | | * @param changeKind The type of change. |
114 | | * @param handle InstanceHandle to assign. |
115 | | * @return Pointer to the CacheChange or nullptr if incorrect. |
116 | | */ |
117 | | template<typename T> |
118 | | CacheChange_t* new_change( |
119 | | T& data, |
120 | | ChangeKind_t changeKind, |
121 | | InstanceHandle_t handle = c_InstanceHandle_Unknown) |
122 | | { |
123 | | return new_change([data]() -> uint32_t |
124 | | { |
125 | | return (uint32_t)T::getCdrSerializedSize(data); |
126 | | }, changeKind, handle); |
127 | | } |
128 | | |
129 | | RTPS_DllAPI CacheChange_t* new_change( |
130 | | const std::function<uint32_t()>& dataCdrSerializedSize, |
131 | | ChangeKind_t changeKind, |
132 | | InstanceHandle_t handle = c_InstanceHandle_Unknown); |
133 | | |
134 | | RTPS_DllAPI CacheChange_t* new_change( |
135 | | ChangeKind_t changeKind, |
136 | | InstanceHandle_t handle = c_InstanceHandle_Unknown); |
137 | | |
138 | | /** |
139 | | * Release a change when it is not being used anymore. |
140 | | * |
141 | | * @param change Pointer to the cache change to be released. |
142 | | * |
143 | | * @returns whether the operation succeeded or not |
144 | | * |
145 | | * @pre |
146 | | * @li @c change is not @c nullptr |
147 | | * @li @c change points to a cache change obtained from a call to @c this->new_change |
148 | | * |
149 | | * @post memory pointed to by @c change is not accessed |
150 | | */ |
151 | | RTPS_DllAPI bool release_change( |
152 | | CacheChange_t* change); |
153 | | |
154 | | /** |
155 | | * Add a matched reader. |
156 | | * @param data Pointer to the ReaderProxyData object added. |
157 | | * @return True if added. |
158 | | */ |
159 | | RTPS_DllAPI virtual bool matched_reader_add( |
160 | | const ReaderProxyData& data) = 0; |
161 | | |
162 | | /** |
163 | | * Remove a matched reader. |
164 | | * @param reader_guid GUID of the reader to remove. |
165 | | * @return True if removed. |
166 | | */ |
167 | | RTPS_DllAPI virtual bool matched_reader_remove( |
168 | | const GUID_t& reader_guid) = 0; |
169 | | |
170 | | /** |
171 | | * Tells us if a specific Reader is matched against this writer. |
172 | | * @param reader_guid GUID of the reader to check. |
173 | | * @return True if it was matched. |
174 | | */ |
175 | | RTPS_DllAPI virtual bool matched_reader_is_matched( |
176 | | const GUID_t& reader_guid) = 0; |
177 | | |
178 | | /** |
179 | | * @brief Set a content filter to perform content filtering on this writer. |
180 | | * |
181 | | * This method sets a content filter that will be used to check whether a cache change is relevant |
182 | | * for a reader or not. |
183 | | * |
184 | | * @param filter The content filter to use on this writer. May be @c nullptr to remove the content filter |
185 | | * (i.e. treat all samples as relevant). |
186 | | */ |
187 | | RTPS_DllAPI virtual void reader_data_filter( |
188 | | fastdds::rtps::IReaderDataFilter* filter) = 0; |
189 | | |
190 | | /** |
191 | | * @brief Get the content filter used to perform content filtering on this writer. |
192 | | * |
193 | | * @return The content filter used on this writer. |
194 | | */ |
195 | | RTPS_DllAPI virtual const fastdds::rtps::IReaderDataFilter* reader_data_filter() const = 0; |
196 | | |
197 | | /** |
198 | | * Check if a specific change has been acknowledged by all Readers. |
199 | | * Is only useful in reliable Writer. In BE Writers returns false when pending to be sent. |
200 | | * @return True if acknowledged by all. |
201 | | */ |
202 | | RTPS_DllAPI virtual bool is_acked_by_all( |
203 | | const CacheChange_t* /*a_change*/) const |
204 | 0 | { |
205 | 0 | return false; |
206 | 0 | } |
207 | | |
208 | | /** |
209 | | * Waits until all changes were acknowledged or max_wait. |
210 | | * @return True if all were acknowledged. |
211 | | */ |
212 | | RTPS_DllAPI virtual bool wait_for_all_acked( |
213 | | const Duration_t& /*max_wait*/) |
214 | 0 | { |
215 | 0 | return true; |
216 | 0 | } |
217 | | |
218 | | /** |
219 | | * Update the Attributes of the Writer. |
220 | | * @param att New attributes |
221 | | */ |
222 | | RTPS_DllAPI virtual void updateAttributes( |
223 | | const WriterAttributes& att) = 0; |
224 | | |
225 | | /** |
226 | | * Get Min Seq Num in History. |
227 | | * @return Minimum sequence number in history |
228 | | */ |
229 | | RTPS_DllAPI SequenceNumber_t get_seq_num_min(); |
230 | | |
231 | | /** |
232 | | * Get Max Seq Num in History. |
233 | | * @return Maximum sequence number in history |
234 | | */ |
235 | | RTPS_DllAPI SequenceNumber_t get_seq_num_max(); |
236 | | |
237 | | /** |
238 | | * Get maximum size of the serialized type |
239 | | * @return Maximum size of the serialized type |
240 | | */ |
241 | | RTPS_DllAPI uint32_t getTypeMaxSerialized(); |
242 | | |
243 | | //!Get maximum size of the data |
244 | | uint32_t getMaxDataSize(); |
245 | | |
246 | | //! Calculates the maximum size of the data |
247 | | uint32_t calculateMaxDataSize( |
248 | | uint32_t length); |
249 | | |
250 | | /** |
251 | | * Get listener |
252 | | * @return Listener |
253 | | */ |
254 | | RTPS_DllAPI inline WriterListener* getListener() |
255 | 0 | { |
256 | 0 | return mp_listener; |
257 | 0 | } |
258 | | |
259 | | RTPS_DllAPI inline bool set_listener( |
260 | | WriterListener* listener) |
261 | 0 | { |
262 | 0 | mp_listener = listener; |
263 | 0 | return true; |
264 | 0 | } |
265 | | |
266 | | /** |
267 | | * Get the publication mode |
268 | | * @return publication mode |
269 | | */ |
270 | | RTPS_DllAPI inline bool isAsync() const |
271 | 0 | { |
272 | 0 | return is_async_; |
273 | 0 | } |
274 | | |
275 | | /** |
276 | | * Remove an specified max number of changes |
277 | | * @param max Maximum number of changes to remove. |
278 | | * @return at least one change has been removed |
279 | | */ |
280 | | RTPS_DllAPI bool remove_older_changes( |
281 | | unsigned int max = 0); |
282 | | |
283 | | /** |
284 | | * Tries to remove a change waiting a maximum of the provided microseconds. |
285 | | * @param max_blocking_time_point Maximum time to wait for. |
286 | | * @param lock Lock of the Change list. |
287 | | * @return at least one change has been removed |
288 | | */ |
289 | | virtual bool try_remove_change( |
290 | | const std::chrono::steady_clock::time_point& max_blocking_time_point, |
291 | | std::unique_lock<RecursiveTimedMutex>& lock) = 0; |
292 | | |
293 | | /** |
294 | | * Waits till a change has been acknowledged. |
295 | | * @param seq Sequence number to wait for acknowledgement. |
296 | | * @param max_blocking_time_point Maximum time to wait for. |
297 | | * @param lock Lock of the Change list. |
298 | | * @return true when change was acknowledged, false when timeout is reached. |
299 | | */ |
300 | | virtual bool wait_for_acknowledgement( |
301 | | const SequenceNumber_t& seq, |
302 | | const std::chrono::steady_clock::time_point& max_blocking_time_point, |
303 | | std::unique_lock<RecursiveTimedMutex>& lock) = 0; |
304 | | |
305 | | #ifdef FASTDDS_STATISTICS |
306 | | |
307 | | /* |
308 | | * Add a listener to receive statistics backend callbacks |
309 | | * @param listener |
310 | | * @return true if successfully added |
311 | | */ |
312 | | RTPS_DllAPI bool add_statistics_listener( |
313 | | std::shared_ptr<fastdds::statistics::IListener> listener); |
314 | | |
315 | | /* |
316 | | * Remove a listener from receiving statistics backend callbacks |
317 | | * @param listener |
318 | | * @return true if successfully removed |
319 | | */ |
320 | | RTPS_DllAPI bool remove_statistics_listener( |
321 | | std::shared_ptr<fastdds::statistics::IListener> listener); |
322 | | |
323 | | #endif // FASTDDS_STATISTICS |
324 | | |
325 | | /** |
326 | | * Get RTPS participant |
327 | | * @return RTPS participant |
328 | | */ |
329 | | inline RTPSParticipantImpl* getRTPSParticipant() const |
330 | 0 | { |
331 | 0 | return mp_RTPSParticipant; |
332 | 0 | } |
333 | | |
334 | | /** |
335 | | * Enable or disable sending data to readers separately |
336 | | * NOTE: This will only work for synchronous writers |
337 | | * @param enable If separate sending should be enabled |
338 | | */ |
339 | | void set_separate_sending ( |
340 | | bool enable) |
341 | 0 | { |
342 | 0 | m_separateSendingEnabled = enable; |
343 | 0 | } |
344 | | |
345 | | /** |
346 | | * Inform if data is sent to readers separately |
347 | | * @return true if separate sending is enabled |
348 | | */ |
349 | | bool get_separate_sending () const |
350 | 0 | { |
351 | 0 | return m_separateSendingEnabled; |
352 | 0 | } |
353 | | |
354 | | /** |
355 | | * Process an incoming ACKNACK submessage. |
356 | | * @param[in] writer_guid GUID of the writer the submessage is directed to. |
357 | | * @param[in] reader_guid GUID of the reader originating the submessage. |
358 | | * @param[in] ack_count Count field of the submessage. |
359 | | * @param[in] sn_set Sequence number bitmap field of the submessage. |
360 | | * @param[in] final_flag Final flag field of the submessage. |
361 | | * @param[out] result true if the writer could process the submessage. |
362 | | * Only valid when returned value is true. |
363 | | * @return true when the submessage was destinated to this writer, false otherwise. |
364 | | */ |
365 | | virtual bool process_acknack( |
366 | | const GUID_t& writer_guid, |
367 | | const GUID_t& reader_guid, |
368 | | uint32_t ack_count, |
369 | | const SequenceNumberSet_t& sn_set, |
370 | | bool final_flag, |
371 | | bool& result) |
372 | 0 | { |
373 | 0 | (void)reader_guid; (void)ack_count; (void)sn_set; (void)final_flag; |
374 | 0 |
|
375 | 0 | result = false; |
376 | 0 | return writer_guid == m_guid; |
377 | 0 | } |
378 | | |
379 | | /** |
380 | | * Process an incoming NACKFRAG submessage. |
381 | | * @param[in] writer_guid GUID of the writer the submessage is directed to. |
382 | | * @param[in] reader_guid GUID of the reader originating the submessage. |
383 | | * @param[in] ack_count Count field of the submessage. |
384 | | * @param[in] seq_num Sequence number field of the submessage. |
385 | | * @param[in] fragments_state Fragment number bitmap field of the submessage. |
386 | | * @param[out] result true if the writer could process the submessage. |
387 | | * Only valid when returned value is true. |
388 | | * @return true when the submessage was destinated to this writer, false otherwise. |
389 | | */ |
390 | | virtual bool process_nack_frag( |
391 | | const GUID_t& writer_guid, |
392 | | const GUID_t& reader_guid, |
393 | | uint32_t ack_count, |
394 | | const SequenceNumber_t& seq_num, |
395 | | const FragmentNumberSet_t fragments_state, |
396 | | bool& result) |
397 | 0 | { |
398 | 0 | (void)reader_guid; (void)ack_count; (void)seq_num; (void)fragments_state; |
399 | 0 |
|
400 | 0 | result = false; |
401 | 0 | return writer_guid == m_guid; |
402 | 0 | } |
403 | | |
404 | | /** |
405 | | * @brief A method to retrieve the liveliness kind |
406 | | * |
407 | | * @return Liveliness kind |
408 | | */ |
409 | | const LivelinessQosPolicyKind& get_liveliness_kind() const; |
410 | | |
411 | | /** |
412 | | * @brief A method to retrieve the liveliness lease duration |
413 | | * |
414 | | * @return Lease duration |
415 | | */ |
416 | | const Duration_t& get_liveliness_lease_duration() const; |
417 | | |
418 | | /** |
419 | | * @brief A method to return the liveliness announcement period |
420 | | * |
421 | | * @return The announcement period |
422 | | */ |
423 | | const Duration_t& get_liveliness_announcement_period() const; |
424 | | |
425 | | //! Liveliness lost status of this writer |
426 | | LivelinessLostStatus liveliness_lost_status_; |
427 | | |
428 | | /** |
429 | | * @return Whether the writer is data sharing compatible or not |
430 | | */ |
431 | | bool is_datasharing_compatible() const; |
432 | | |
433 | | /*! |
434 | | * Tells writer the sample can be sent to the network. |
435 | | * This function should be used by a fastdds::rtps::FlowController. |
436 | | * |
437 | | * @param cache_change Pointer to the CacheChange_t that represents the sample which can be sent. |
438 | | * @param group RTPSMessageGroup reference uses for generating the RTPS message. |
439 | | * @param locator_selector RTPSMessageSenderInterface reference uses for selecting locators. The reference has to |
440 | | * be a member of this RTPSWriter object. |
441 | | * @param max_blocking_time Future timepoint where blocking send should end. |
442 | | * @return Return code. |
443 | | * @note Must be non-thread safe. |
444 | | */ |
445 | | virtual DeliveryRetCode deliver_sample_nts( |
446 | | CacheChange_t* cache_change, |
447 | | RTPSMessageGroup& group, |
448 | | LocatorSelectorSender& locator_selector, |
449 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0; |
450 | | |
451 | | virtual LocatorSelectorSender& get_general_locator_selector() = 0; |
452 | | |
453 | | virtual LocatorSelectorSender& get_async_locator_selector() = 0; |
454 | | |
455 | | /** |
456 | | * Send a message through this interface. |
457 | | * |
458 | | * @param message Pointer to the buffer with the message already serialized. |
459 | | * @param locator_selector RTPSMessageSenderInterface reference uses for selecting locators. The reference has to |
460 | | * be a member of this RTPSWriter object. |
461 | | * @param max_blocking_time_point Future timepoint where blocking send should end. |
462 | | */ |
463 | | virtual bool send_nts( |
464 | | CDRMessage_t* message, |
465 | | const LocatorSelectorSender& locator_selector, |
466 | | std::chrono::steady_clock::time_point& max_blocking_time_point) const; |
467 | | |
468 | | protected: |
469 | | |
470 | | //!Is the data sent directly or announced by HB and THEN sent to the ones who ask for it?. |
471 | | bool m_pushMode = true; |
472 | | |
473 | | //! Flow controller. |
474 | | fastdds::rtps::FlowController* flow_controller_; |
475 | | |
476 | | //!WriterHistory |
477 | | WriterHistory* mp_history = nullptr; |
478 | | //!Listener |
479 | | WriterListener* mp_listener = nullptr; |
480 | | //!Asynchronous publication activated |
481 | | bool is_async_ = false; |
482 | | //!Separate sending activated |
483 | | bool m_separateSendingEnabled = false; |
484 | | |
485 | | //! The liveliness kind of this writer |
486 | | LivelinessQosPolicyKind liveliness_kind_; |
487 | | //! The liveliness lease duration of this writer |
488 | | Duration_t liveliness_lease_duration_; |
489 | | //! The liveliness announcement period |
490 | | Duration_t liveliness_announcement_period_; |
491 | | |
492 | | void add_guid( |
493 | | LocatorSelectorSender& locator_selector, |
494 | | const GUID_t& remote_guid); |
495 | | |
496 | | void compute_selected_guids( |
497 | | LocatorSelectorSender& locator_selector); |
498 | | |
499 | | void update_cached_info_nts( |
500 | | LocatorSelectorSender& locator_selector); |
501 | | |
502 | | /** |
503 | | * Add a change to the unsent list. |
504 | | * @param change Pointer to the change to add. |
505 | | * @param max_blocking_time |
506 | | */ |
507 | | virtual void unsent_change_added_to_history( |
508 | | CacheChange_t* change, |
509 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0; |
510 | | |
511 | | /** |
512 | | * Indicate the writer that a change has been removed by the history due to some HistoryQos requirement. |
513 | | * @param a_change Pointer to the change that is going to be removed. |
514 | | * @return True if removed correctly. |
515 | | */ |
516 | | virtual bool change_removed_by_history( |
517 | | CacheChange_t* a_change) = 0; |
518 | | |
519 | | bool is_datasharing_compatible_with( |
520 | | const ReaderProxyData& rdata) const; |
521 | | |
522 | | bool is_pool_initialized() const; |
523 | | |
524 | | template<typename Functor> |
525 | | bool send_data_or_fragments( |
526 | | RTPSMessageGroup& group, |
527 | | CacheChange_t* change, |
528 | | bool inline_qos, |
529 | | Functor sent_fun) |
530 | | { |
531 | | bool sent_ok = true; |
532 | | |
533 | | uint32_t n_fragments = change->getFragmentCount(); |
534 | | if (n_fragments > 0) |
535 | | { |
536 | | for (FragmentNumber_t frag = 1; frag <= n_fragments; frag++) |
537 | | { |
538 | | sent_ok &= group.add_data_frag(*change, frag, inline_qos); |
539 | | if (sent_ok) |
540 | | { |
541 | | sent_fun(change, frag); |
542 | | } |
543 | | else |
544 | | { |
545 | | logError(RTPS_WRITER, "Error sending fragment (" << change->sequenceNumber << ", " << frag << ")"); |
546 | | break; |
547 | | } |
548 | | } |
549 | | } |
550 | | else |
551 | | { |
552 | | sent_ok = group.add_data(*change, inline_qos); |
553 | | if (sent_ok) |
554 | | { |
555 | | sent_fun(change, 0); |
556 | | } |
557 | | else |
558 | | { |
559 | | logError(RTPS_WRITER, "Error sending change " << change->sequenceNumber); |
560 | | } |
561 | | } |
562 | | |
563 | | return sent_ok; |
564 | | } |
565 | | |
566 | | void add_statistics_sent_submessage( |
567 | | CacheChange_t* change, |
568 | | size_t num_locators); |
569 | | |
570 | | void deinit(); |
571 | | |
572 | | private: |
573 | | |
574 | | RecursiveTimedMutex& get_mutex() |
575 | 0 | { |
576 | 0 | return mp_mutex; |
577 | 0 | } |
578 | | |
579 | | RTPSWriter& operator =( |
580 | | const RTPSWriter&) = delete; |
581 | | |
582 | | void init( |
583 | | const std::shared_ptr<IPayloadPool>& payload_pool, |
584 | | const std::shared_ptr<IChangePool>& change_pool, |
585 | | const WriterAttributes& att); |
586 | | |
587 | | |
588 | | RTPSWriter* next_[2] = { nullptr, nullptr }; |
589 | | }; |
590 | | |
591 | | } /* namespace rtps */ |
592 | | } /* namespace fastrtps */ |
593 | | } /* namespace eprosima */ |
594 | | |
595 | | #endif /* _FASTDDS_RTPS_RTPSWRITER_H_ */ |