/src/Fast-DDS/src/cpp/rtps/writer/BaseWriter.hpp
Line | Count | Source |
1 | | // Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | /** |
16 | | * @file BaseWriter.hpp |
17 | | */ |
18 | | |
19 | | #ifndef RTPS_WRITER__BASEWRITER_HPP |
20 | | #define RTPS_WRITER__BASEWRITER_HPP |
21 | | |
22 | | #include <atomic> |
23 | | #include <chrono> |
24 | | #include <cstdint> |
25 | | #include <memory> |
26 | | #include <mutex> |
27 | | #include <vector> |
28 | | |
29 | | #include <fastdds/fastdds_dll.hpp> |
30 | | #include <fastdds/dds/core/policy/QosPolicies.hpp> |
31 | | #include <fastdds/dds/core/status/BaseStatus.hpp> |
32 | | #include <fastdds/rtps/Endpoint.hpp> |
33 | | #include <fastdds/rtps/common/FragmentNumber.hpp> |
34 | | #include <fastdds/rtps/common/SequenceNumber.hpp> |
35 | | #include <fastdds/rtps/common/VendorId_t.hpp> |
36 | | #include <fastdds/rtps/common/Time_t.hpp> |
37 | | #include <fastdds/rtps/transport/NetworkBuffer.hpp> |
38 | | #include <fastdds/rtps/writer/RTPSWriter.hpp> |
39 | | #include <fastdds/statistics/IListeners.hpp> |
40 | | #include <fastdds/statistics/rtps/StatisticsCommon.hpp> |
41 | | #include <fastdds/statistics/rtps/monitor_service/connections_fwd.hpp> |
42 | | #include <fastdds/utils/TimedMutex.hpp> |
43 | | |
44 | | #include <rtps/builtin/data/ReaderProxyData.hpp> |
45 | | #include <rtps/writer/DeliveryRetCode.hpp> |
46 | | #include <rtps/writer/LocatorSelectorSender.hpp> |
47 | | |
48 | | namespace eprosima { |
49 | | namespace fastdds { |
50 | | namespace rtps { |
51 | | |
52 | | struct CacheChange_t; |
53 | | class DataSharingNotifier; |
54 | | class FlowController; |
55 | | struct GUID_t; |
56 | | class ICacheChangePool; |
57 | | class IPayloadPool; |
58 | | class RTPSMessageGroup; |
59 | | class RTPSParticipantImpl; |
60 | | class WriterAttributes; |
61 | | class WriterHistory; |
62 | | class WriterListener; |
63 | | |
64 | | class BaseWriter |
65 | | : public fastdds::rtps::RTPSWriter |
66 | | , public fastdds::statistics::StatisticsWriterImpl |
67 | | { |
68 | | |
69 | | public: |
70 | | |
71 | | //vvvvvvvvvvvvvvvvvvvvv [Exported API] vvvvvvvvvvvvvvvvvvvvv |
72 | | |
73 | | bool matched_reader_add( |
74 | | const SubscriptionBuiltinTopicData& rqos) final; |
75 | | |
76 | | WriterListener* get_listener() const final; |
77 | | |
78 | | bool set_listener( |
79 | | WriterListener* listener) final; |
80 | | |
81 | | bool is_async() const final; |
82 | | |
83 | | int32_t get_transport_priority() const final; |
84 | | |
85 | | void update_attributes( |
86 | | const WriterAttributes& att) override; |
87 | | |
88 | | virtual void local_actions_on_writer_removed(); |
89 | | |
90 | | #ifdef FASTDDS_STATISTICS |
91 | | |
92 | | bool add_statistics_listener( |
93 | | std::shared_ptr<fastdds::statistics::IListener> listener) final; |
94 | | |
95 | | bool remove_statistics_listener( |
96 | | std::shared_ptr<fastdds::statistics::IListener> listener) final; |
97 | | |
98 | | void set_enabled_statistics_writers_mask( |
99 | | uint32_t enabled_writers) final; |
100 | | |
101 | | #endif // FASTDDS_STATISTICS |
102 | | |
103 | | //^^^^^^^^^^^^^^^^^^^^^^^ [Exported API] ^^^^^^^^^^^^^^^^^^^^^^^ |
104 | | |
105 | | //vvvvvvvvvvvvvvvvvvvv [Implementation API] vvvvvvvvvvvvvvvvvvvv |
106 | | |
107 | | /** |
108 | | * Add a matched reader. |
109 | | * @param data Pointer to the ReaderProxyData object added. |
110 | | * @return True if added. |
111 | | */ |
112 | | virtual bool matched_reader_add_edp( |
113 | | const ReaderProxyData& data) = 0; |
114 | | |
115 | | /** |
116 | | * Add a change to the unsent list. |
117 | | * @param change Pointer to the change to add. |
118 | | * @param [in] max_blocking_time Maximum time this method has to complete the task. |
119 | | */ |
120 | | virtual void unsent_change_added_to_history( |
121 | | CacheChange_t* change, |
122 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0; |
123 | | |
124 | | /** |
125 | | * Indicate the writer that a change has been removed by the history due to some HistoryQos requirement. |
126 | | * @param a_change Pointer to the change that is going to be removed. |
127 | | * @param [in] max_blocking_time Maximum time this method has to complete the task. |
128 | | * @return True if removed correctly. |
129 | | */ |
130 | | virtual bool change_removed_by_history( |
131 | | CacheChange_t* a_change, |
132 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0; |
133 | | |
134 | | /** |
135 | | * Tells writer the sample can be sent to the network. |
136 | | * This function should be used by a fastdds::rtps::FlowController. |
137 | | * |
138 | | * @param cache_change Pointer to the CacheChange_t that represents the sample which can be sent. |
139 | | * @param group RTPSMessageGroup reference uses for generating the RTPS message. |
140 | | * @param locator_selector RTPSMessageSenderInterface reference uses for selecting locators. The reference has to |
141 | | * be a member of this RTPSWriter object. |
142 | | * @param max_blocking_time Future timepoint where blocking send should end. |
143 | | * @return Return code. |
144 | | * @note Must be non-thread safe. |
145 | | */ |
146 | | virtual DeliveryRetCode deliver_sample_nts( |
147 | | CacheChange_t* cache_change, |
148 | | RTPSMessageGroup& group, |
149 | | LocatorSelectorSender& locator_selector, |
150 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0; |
151 | | |
152 | | /** |
153 | | * @brief Get the general locator selector. |
154 | | * |
155 | | * @return Reference to the general locator selector. |
156 | | */ |
157 | | virtual LocatorSelectorSender& get_general_locator_selector() = 0; |
158 | | |
159 | | /** |
160 | | * @brief Get the async locator selector. |
161 | | * |
162 | | * @return Reference to the async locator selector. |
163 | | */ |
164 | | virtual LocatorSelectorSender& get_async_locator_selector() = 0; |
165 | | |
166 | | /** |
167 | | * Send a message through this interface. |
168 | | * |
169 | | * @param buffers Vector of NetworkBuffers to send with data already serialized. |
170 | | * @param total_bytes Total number of bytes to send. Should be equal to the sum of the @c size field of all buffers. |
171 | | * @param locator_selector RTPSMessageSenderInterface reference uses for selecting locators. The reference has to |
172 | | * be a member of this RTPSWriter object. |
173 | | * @param max_blocking_time_point Future timepoint where blocking send should end. |
174 | | */ |
175 | | virtual bool send_nts( |
176 | | const std::vector<eprosima::fastdds::rtps::NetworkBuffer>& buffers, |
177 | | const uint32_t& total_bytes, |
178 | | const LocatorSelectorSender& locator_selector, |
179 | | std::chrono::steady_clock::time_point& max_blocking_time_point) const; |
180 | | |
181 | | /** |
182 | | * Process an incoming ACKNACK submessage. |
183 | | * @param [in] writer_guid GUID of the writer the submessage is directed to. |
184 | | * @param [in] reader_guid GUID of the reader originating the submessage. |
185 | | * @param [in] ack_count Count field of the submessage. |
186 | | * @param [in] sn_set Sequence number bitmap field of the submessage. |
187 | | * @param [in] final_flag Final flag field of the submessage. |
188 | | * @param [out] result true if the writer could process the submessage. |
189 | | * Only valid when returned value is true. |
190 | | * @param [in] origin_vendor_id VendorId of the source participant from which the message was received |
191 | | * @return true when the submessage was destinated to this writer, false otherwise. |
192 | | */ |
193 | | virtual bool process_acknack( |
194 | | const GUID_t& writer_guid, |
195 | | const GUID_t& reader_guid, |
196 | | uint32_t ack_count, |
197 | | const SequenceNumberSet_t& sn_set, |
198 | | bool final_flag, |
199 | | bool& result, |
200 | | fastdds::rtps::VendorId_t origin_vendor_id) = 0; |
201 | | |
202 | | /** |
203 | | * Process an incoming NACKFRAG submessage. |
204 | | * @param [in] writer_guid GUID of the writer the submessage is directed to. |
205 | | * @param [in] reader_guid GUID of the reader originating the submessage. |
206 | | * @param [in] ack_count Count field of the submessage. |
207 | | * @param [in] seq_num Sequence number field of the submessage. |
208 | | * @param [in] fragments_state Fragment number bitmap field of the submessage. |
209 | | * @param [out] result true if the writer could process the submessage. |
210 | | * Only valid when returned value is true. |
211 | | * @param [in] origin_vendor_id VendorId of the source participant from which the message was received |
212 | | * @return true when the submessage was destinated to this writer, false otherwise. |
213 | | */ |
214 | | virtual bool process_nack_frag( |
215 | | const GUID_t& writer_guid, |
216 | | const GUID_t& reader_guid, |
217 | | uint32_t ack_count, |
218 | | const SequenceNumber_t& seq_num, |
219 | | const FragmentNumberSet_t& fragments_state, |
220 | | bool& result, |
221 | | fastdds::rtps::VendorId_t origin_vendor_id) = 0; |
222 | | |
223 | | /** |
224 | | * Tries to remove a change waiting a maximum of the provided microseconds. |
225 | | * @param max_blocking_time_point Maximum time to wait for. |
226 | | * @param lock Lock of the Change list. |
227 | | * @return at least one change has been removed |
228 | | */ |
229 | | virtual bool try_remove_change( |
230 | | const std::chrono::steady_clock::time_point& max_blocking_time_point, |
231 | | std::unique_lock<RecursiveTimedMutex>& lock) = 0; |
232 | | |
233 | | /** |
234 | | * Waits till a change has been acknowledged. |
235 | | * @param seq Sequence number to wait for acknowledgement. |
236 | | * @param max_blocking_time_point Maximum time to wait for. |
237 | | * @param lock Lock of the Change list. |
238 | | * @return true when change was acknowledged, false when timeout is reached. |
239 | | */ |
240 | | virtual bool wait_for_acknowledgement( |
241 | | const SequenceNumber_t& seq, |
242 | | const std::chrono::steady_clock::time_point& max_blocking_time_point, |
243 | | std::unique_lock<RecursiveTimedMutex>& lock) = 0; |
244 | | |
245 | | //^^^^^^^^^^^^^^^^^^^^ [Implementation API] ^^^^^^^^^^^^^^^^^^^^ |
246 | | |
247 | | /** |
248 | | * @brief Get the WriterHistory associated with this writer. |
249 | | * |
250 | | * @return pointer to the WriterHistory associated with this writer. |
251 | | */ |
252 | | inline WriterHistory* get_history() const |
253 | 0 | { |
254 | 0 | return history_; |
255 | 0 | } |
256 | | |
257 | | /** |
258 | | * @brief Get biggest output payload size allowed by this writer. |
259 | | * |
260 | | * @return Maximum number of bytes allowed for the payload. |
261 | | */ |
262 | | uint32_t get_max_allowed_payload_size(); |
263 | | |
264 | | /** |
265 | | * @brief Get the RTPS participant that this writer belongs to. |
266 | | * |
267 | | * @return pointer to the RTPSParticipantImpl object that created this writer. |
268 | | */ |
269 | | inline RTPSParticipantImpl* get_participant_impl() const |
270 | 0 | { |
271 | 0 | return mp_RTPSParticipant; |
272 | 0 | } |
273 | | |
274 | | /** |
275 | | * @brief Inform if data is sent to readers separately. |
276 | | * |
277 | | * @return true if separate sending is enabled |
278 | | */ |
279 | | inline bool get_separate_sending() const |
280 | 0 | { |
281 | 0 | return separate_sending_enabled_; |
282 | 0 | } |
283 | | |
284 | | /** |
285 | | * @brief A method to retrieve the liveliness kind |
286 | | * |
287 | | * @return Liveliness kind |
288 | | */ |
289 | | const dds::LivelinessQosPolicyKind& get_liveliness_kind() const; |
290 | | |
291 | | /** |
292 | | * @brief A method to retrieve the liveliness lease duration |
293 | | * |
294 | | * @return Lease duration |
295 | | */ |
296 | | const dds::Duration_t& get_liveliness_lease_duration() const; |
297 | | |
298 | | /** |
299 | | * @brief A method to return the liveliness announcement period |
300 | | * |
301 | | * @return The announcement period |
302 | | */ |
303 | | const dds::Duration_t& get_liveliness_announcement_period() const; |
304 | | |
305 | | /** |
306 | | * @brief Notify the writer that it has lost liveliness |
307 | | */ |
308 | | void liveliness_lost(); |
309 | | |
310 | | /** |
311 | | * @return Whether the writer is data sharing compatible or not |
312 | | */ |
313 | | bool is_datasharing_compatible() const; |
314 | | |
315 | | bool is_datasharing_compatible_with( |
316 | | const dds::DataSharingQosPolicy& qos) const; |
317 | | |
318 | | /** |
319 | | * Get Min Seq Num in History. |
320 | | * @return Minimum sequence number in history |
321 | | */ |
322 | | SequenceNumber_t get_seq_num_min(); |
323 | | |
324 | | /** |
325 | | * Get Max Seq Num in History. |
326 | | * @return Maximum sequence number in history |
327 | | */ |
328 | | SequenceNumber_t get_seq_num_max(); |
329 | | |
330 | | /** |
331 | | * @brief Get a pointer to a BaseWriter object from a RTPSWriter pointer. |
332 | | * |
333 | | * @param writer Pointer to the RTPSWriter object. |
334 | | * |
335 | | * @return Pointer to the BaseWriter object. |
336 | | */ |
337 | | static BaseWriter* downcast( |
338 | | RTPSWriter* writer); |
339 | | |
340 | | /** |
341 | | * @brief Get a pointer to a BaseWriter object from a Endpoint pointer. |
342 | | * |
343 | | * @param endpoint Pointer to the Endpoint object. |
344 | | * |
345 | | * @return Pointer to the BaseWriter object. |
346 | | */ |
347 | | static BaseWriter* downcast( |
348 | | Endpoint* endpoint); |
349 | | |
350 | | virtual ~BaseWriter(); |
351 | | |
352 | | protected: |
353 | | |
354 | | BaseWriter( |
355 | | RTPSParticipantImpl* impl, |
356 | | const GUID_t& guid, |
357 | | const WriterAttributes& att, |
358 | | FlowController* flow_controller, |
359 | | WriterHistory* hist, |
360 | | WriterListener* listen = nullptr); |
361 | | |
362 | | void init( |
363 | | const WriterAttributes& att); |
364 | | |
365 | | void add_guid( |
366 | | LocatorSelectorSender& locator_selector, |
367 | | const GUID_t& remote_guid); |
368 | | |
369 | | void compute_selected_guids( |
370 | | LocatorSelectorSender& locator_selector); |
371 | | |
372 | | void update_cached_info_nts( |
373 | | LocatorSelectorSender& locator_selector); |
374 | | |
375 | | void add_statistics_sent_submessage( |
376 | | CacheChange_t* change, |
377 | | size_t num_locators); |
378 | | |
379 | | /// Liveliness lost status of this writer |
380 | | LivelinessLostStatus liveliness_lost_status_; |
381 | | /// Is the data sent directly or announced by HB and THEN sent to the ones who ask for it?. |
382 | | bool push_mode_ = true; |
383 | | |
384 | | /// Flow controller. |
385 | | FlowController* flow_controller_; |
386 | | /// Maximum number of bytes allowed for an RTPS datagram generated by this writer. |
387 | | uint32_t max_output_message_size_ = std::numeric_limits<uint32_t>::max(); |
388 | | |
389 | | /// WriterHistory |
390 | | WriterHistory* history_ = nullptr; |
391 | | /// Listener |
392 | | WriterListener* listener_ = nullptr; |
393 | | /// Asynchronous publication activated |
394 | | bool is_async_ = false; |
395 | | /// Separate sending activated |
396 | | bool separate_sending_enabled_ = false; |
397 | | |
398 | | /// The liveliness kind of this writer |
399 | | dds::LivelinessQosPolicyKind liveliness_kind_; |
400 | | /// The liveliness lease duration of this writer |
401 | | dds::Duration_t liveliness_lease_duration_; |
402 | | /// The liveliness announcement period |
403 | | dds::Duration_t liveliness_announcement_period_; |
404 | | /// The transport priority of this writer |
405 | | std::atomic<int32_t> transport_priority_; |
406 | | |
407 | | private: |
408 | | |
409 | | /** |
410 | | * @brief Calculate the maximum payload size that can be sent in a single datagram. |
411 | | * |
412 | | * @param datagram_length Length of the datagram. |
413 | | * |
414 | | * @return Maximum payload size. |
415 | | */ |
416 | | uint32_t calculate_max_payload_size( |
417 | | uint32_t datagram_length); |
418 | | |
419 | | }; |
420 | | |
421 | | } // namespace rtps |
422 | | } // namespace fastdds |
423 | | } // namespace eprosima |
424 | | |
425 | | #endif // RTPS_WRITER__BASEWRITER_HPP |