/src/Fast-DDS/src/cpp/rtps/reader/WriterProxy.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | /** |
16 | | * @file WriterProxy.h |
17 | | */ |
18 | | |
19 | | #ifndef FASTRTPS_RTPS_READER_WRITERPROXY_H_ |
20 | | #define FASTRTPS_RTPS_READER_WRITERPROXY_H_ |
21 | | #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC |
22 | | |
23 | | #include <fastdds/rtps/common/Types.h> |
24 | | #include <fastdds/rtps/common/Locator.h> |
25 | | #include <fastdds/rtps/common/CacheChange.h> |
26 | | #include <fastdds/rtps/attributes/ReaderAttributes.h> |
27 | | #include <fastdds/rtps/attributes/RTPSParticipantAllocationAttributes.hpp> |
28 | | #include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp> |
29 | | #include <fastrtps/utils/collections/ResourceLimitedVector.hpp> |
30 | | #include <fastdds/rtps/builtin/data/WriterProxyData.h> |
31 | | #include <fastdds/rtps/common/LocatorSelectorEntry.hpp> |
32 | | |
33 | | #include <foonathan/memory/container.hpp> |
34 | | #include <foonathan/memory/memory_pool.hpp> |
35 | | |
36 | | #include <set> |
37 | | |
38 | | // Testing purpose |
39 | | #ifndef TEST_FRIENDS |
40 | | #define TEST_FRIENDS |
41 | | #endif // TEST_FRIENDS |
42 | | |
43 | | namespace eprosima { |
44 | | namespace fastrtps { |
45 | | namespace rtps { |
46 | | |
47 | | class RTPSParticipantImpl; |
48 | | class StatefulReader; |
49 | | class RTPSMessageGroup_t; |
50 | | class TimedEvent; |
51 | | |
52 | | /** |
53 | | * Class WriterProxy that contains the state of each matched writer for a specific reader. |
54 | | * @ingroup READER_MODULE |
55 | | */ |
56 | | class WriterProxy : public RTPSMessageSenderInterface |
57 | | { |
58 | | TEST_FRIENDS |
59 | | |
60 | | public: |
61 | | |
62 | | ~WriterProxy(); |
63 | | |
64 | | /** |
65 | | * Constructor. |
66 | | * @param reader Pointer to the StatefulReader creating this proxy. |
67 | | * @param changes_allocation Configuration for the set of Change |
68 | | */ |
69 | | WriterProxy( |
70 | | StatefulReader* reader, |
71 | | const RemoteLocatorsAllocationAttributes& loc_alloc, |
72 | | const ResourceLimitedContainerConfig& changes_allocation); |
73 | | |
74 | | /** |
75 | | * Activate this proxy associating it to a remote writer. |
76 | | * @param attributes WriterProxyData of the writer for which to keep state. |
77 | | * @param initial_sequence Sequence number of last acknowledged change. |
78 | | */ |
79 | | void start( |
80 | | const WriterProxyData& attributes, |
81 | | const SequenceNumber_t& initial_sequence); |
82 | | |
83 | | /** |
84 | | * Activate this proxy associating it to a remote writer. |
85 | | * @param attributes WriterProxyData of the writer for which to keep state. |
86 | | * @param initial_sequence Sequence number of last acknowledged change. |
87 | | * @param is_datasharing Whether the writer is datasharing with us or not. |
88 | | */ |
89 | | void start( |
90 | | const WriterProxyData& attributes, |
91 | | const SequenceNumber_t& initial_sequence, |
92 | | bool is_datasharing); |
93 | | |
94 | | /** |
95 | | * Update information on the remote writer. |
96 | | * @param attributes WriterProxyData with updated information of the writer. |
97 | | */ |
98 | | void update( |
99 | | const WriterProxyData& attributes); |
100 | | |
101 | | /** |
102 | | * Disable this proxy. |
103 | | */ |
104 | | void stop(); |
105 | | |
106 | | /** |
107 | | * Get the maximum sequenceNumber received from this Writer. |
108 | | * @return the maximum sequence number. |
109 | | */ |
110 | | const SequenceNumber_t available_changes_max() const; |
111 | | |
112 | | /** |
113 | | * Update the missing changes up to the provided sequenceNumber. |
114 | | * All changes with status UNKNOWN with seq_num <= input seq_num are marked MISSING. |
115 | | * @param[in] seq_num Pointer to the SequenceNumber. |
116 | | */ |
117 | | void missing_changes_update( |
118 | | const SequenceNumber_t& seq_num); |
119 | | |
120 | | /** |
121 | | * Update the lost changes up to the provided sequenceNumber. |
122 | | * All changes with status UNKNOWN or MISSING with seq_num < input seq_num are marked LOST. |
123 | | * @param[in] seq_num Pointer to the SequenceNumber. |
124 | | */ |
125 | | int32_t lost_changes_update( |
126 | | const SequenceNumber_t& seq_num); |
127 | | |
128 | | /** |
129 | | * The provided change is marked as RECEIVED. |
130 | | * @param seq_num Sequence number of the change |
131 | | * @return True if correct. |
132 | | */ |
133 | | bool received_change_set( |
134 | | const SequenceNumber_t& seq_num); |
135 | | |
136 | | /** |
137 | | * Set a change as RECEIVED and NOT RELEVANT. |
138 | | * @param seq_num Sequence number of the change |
139 | | * @return true on success |
140 | | */ |
141 | | bool irrelevant_change_set( |
142 | | const SequenceNumber_t& seq_num); |
143 | | |
144 | | /** |
145 | | * Check if this proxy has any missing change. |
146 | | * @return true when there is at least one missing change on this proxy. |
147 | | */ |
148 | | bool are_there_missing_changes() const; |
149 | | |
150 | | /** |
151 | | * The method returns a SequenceNumberSet_t containing the sequence number of all missing changes. |
152 | | * @return Sequence number set of missing changes. |
153 | | */ |
154 | | SequenceNumberSet_t missing_changes() const; |
155 | | |
156 | | /** |
157 | | * Get the number of missing changes up to a certain sequence number. |
158 | | * @param seq_num Sequence number limiting the query. |
159 | | * Only changes with a sequence number less than this one will be considered. |
160 | | * @return the number of missing changes with a sequence number less than seq_num. |
161 | | */ |
162 | | size_t unknown_missing_changes_up_to( |
163 | | const SequenceNumber_t& seq_num) const; |
164 | | |
165 | | /** |
166 | | * Get the GUID of the writer represented by this proxy. |
167 | | * @return const reference to the GUID of the writer represented by this proxy. |
168 | | */ |
169 | | inline const GUID_t& guid() const |
170 | 0 | { |
171 | 0 | return locators_entry_.remote_guid; |
172 | 0 | } |
173 | | |
174 | | inline const GUID_t& persistence_guid() const |
175 | 0 | { |
176 | 0 | return persistence_guid_; |
177 | 0 | } |
178 | | |
179 | | inline LivelinessQosPolicyKind liveliness_kind() const |
180 | 0 | { |
181 | 0 | return liveliness_kind_; |
182 | 0 | } |
183 | | |
184 | | /** |
185 | | * Get the ownership strength of the writer represented by this proxy. |
186 | | * @return ownership strength of the writer represented by this proxy. |
187 | | */ |
188 | | inline uint32_t ownership_strength() const |
189 | 0 | { |
190 | 0 | return ownership_strength_; |
191 | 0 | } |
192 | | |
193 | | /** |
194 | | * Get the locators that should be used to send data to the writer represented by this proxy. |
195 | | * @return the locators that should be used to send data to the writer represented by this proxy. |
196 | | */ |
197 | | inline const ResourceLimitedVector<Locator_t>& remote_locators_shrinked() const |
198 | 0 | { |
199 | 0 | return locators_entry_.unicast.empty() ? |
200 | 0 | locators_entry_.multicast : |
201 | 0 | locators_entry_.unicast; |
202 | 0 | } |
203 | | |
204 | | /** |
205 | | * Check if the writer is alive |
206 | | * @return true if the writer is alive |
207 | | */ |
208 | | inline bool is_alive() const |
209 | 0 | { |
210 | 0 | return is_alive_; |
211 | 0 | } |
212 | | |
213 | | /*! |
214 | | * @brief Returns number of ChangeFromWriter_t managed currently by the WriterProxy. |
215 | | * @return Number of ChangeFromWriter_t managed currently by the WriterProxy. |
216 | | */ |
217 | | size_t number_of_changes_from_writer() const; |
218 | | |
219 | | /*! |
220 | | * @brief Returns next SequenceNumber_t to be notified. |
221 | | * @return Next SequenceNumber_t to be nofified or invalid SequenceNumber_t |
222 | | * if any SequenceNumber_t to be notified. |
223 | | */ |
224 | | SequenceNumber_t next_cache_change_to_be_notified(); |
225 | | |
226 | | /** |
227 | | * Checks whether a cache change was already received from this proxy. |
228 | | * @param[in] seq_num Sequence number of the cache change to check. |
229 | | * @return true if the cache change was received, false otherwise. |
230 | | */ |
231 | | bool change_was_received( |
232 | | const SequenceNumber_t& seq_num) const; |
233 | | |
234 | | /** |
235 | | * Sends a preemptive acknack to the writer represented by this proxy. |
236 | | */ |
237 | | bool perform_initial_ack_nack(); |
238 | | |
239 | | /** |
240 | | * Sends the necessary acknac and nackfrag messages to answer the last received heartbeat message. |
241 | | */ |
242 | | void perform_heartbeat_response(); |
243 | | |
244 | | /** |
245 | | * Process an incoming heartbeat from the writer represented by this proxy. |
246 | | * @param count Count field of the heartbeat message. |
247 | | * @param first_seq First sequence field of the heartbeat message. |
248 | | * @param last_seq Last sequence field of the heartbeat message. |
249 | | * @param final_flag Final flag of the heartbeat message. |
250 | | * @param liveliness_flag Liveliness flag of the heartbeat message. |
251 | | * @param disable_positive True if positive ACKs are disabled. |
252 | | * @param [out] assert_liveliness Returns true when liveliness should be asserted on this writer |
253 | | * @return true if the message is processed, false if the message is ignored. |
254 | | */ |
255 | | bool process_heartbeat( |
256 | | uint32_t count, |
257 | | const SequenceNumber_t& first_seq, |
258 | | const SequenceNumber_t& last_seq, |
259 | | bool final_flag, |
260 | | bool liveliness_flag, |
261 | | bool disable_positive, |
262 | | bool& assert_liveliness, |
263 | | int32_t& current_sample_lost); |
264 | | |
265 | | /** |
266 | | * Set a new value for the interval of the heartbeat response event. |
267 | | * @param interval New interval value. |
268 | | */ |
269 | | void update_heartbeat_response_interval( |
270 | | const Duration_t& interval); |
271 | | |
272 | | /** |
273 | | * Check if the destinations managed by this sender interface have changed. |
274 | | * |
275 | | * @return true if destinations have changed, false otherwise. |
276 | | */ |
277 | | virtual bool destinations_have_changed() const override |
278 | 0 | { |
279 | 0 | return false; |
280 | 0 | } |
281 | | |
282 | | /** |
283 | | * Get a GUID prefix representing all destinations. |
284 | | * |
285 | | * @return When all the destinations share the same prefix (i.e. belong to the same participant) |
286 | | * that prefix is returned. When there are no destinations, or they belong to different |
287 | | * participants, c_GuidPrefix_Unknown is returned. |
288 | | */ |
289 | | virtual GuidPrefix_t destination_guid_prefix() const override |
290 | 0 | { |
291 | 0 | return guid_prefix_as_vector_.at(0); |
292 | 0 | } |
293 | | |
294 | | /** |
295 | | * Get the GUID prefix of all the destination participants. |
296 | | * |
297 | | * @return a const reference to a vector with the GUID prefix of all destination participants. |
298 | | */ |
299 | | virtual const std::vector<GuidPrefix_t>& remote_participants() const override |
300 | 0 | { |
301 | 0 | return guid_prefix_as_vector_; |
302 | 0 | } |
303 | | |
304 | | /** |
305 | | * Get the GUID of all destinations. |
306 | | * |
307 | | * @return a const reference to a vector with the GUID of all destinations. |
308 | | */ |
309 | | virtual const std::vector<GUID_t>& remote_guids() const override |
310 | 0 | { |
311 | 0 | return guid_as_vector_; |
312 | 0 | } |
313 | | |
314 | | /** |
315 | | * Send a message through this interface. |
316 | | * |
317 | | * @param message Pointer to the buffer with the message already serialized. |
318 | | * @param max_blocking_time_point Future timepoint where blocking send should end. |
319 | | */ |
320 | | virtual bool send( |
321 | | CDRMessage_t* message, |
322 | | std::chrono::steady_clock::time_point max_blocking_time_point) const override; |
323 | | |
324 | | bool is_on_same_process() const |
325 | 0 | { |
326 | 0 | return is_on_same_process_; |
327 | 0 | } |
328 | | |
329 | | bool is_datasharing_writer() const |
330 | 0 | { |
331 | 0 | return is_datasharing_writer_; |
332 | 0 | } |
333 | | |
334 | | /* |
335 | | * Do nothing. |
336 | | * This object always is protected by reader's mutex. |
337 | | */ |
338 | | void lock() override |
339 | 0 | { |
340 | 0 | } |
341 | | |
342 | | /* |
343 | | * Do nothing. |
344 | | * This object always is protected by reader's mutex. |
345 | | */ |
346 | | void unlock() override |
347 | 0 | { |
348 | 0 | } |
349 | | |
350 | | private: |
351 | | |
352 | | /** |
353 | | * Set initial value for last acked sequence number. |
354 | | * @param[in] seq_num last acked sequence number. |
355 | | */ |
356 | | void loaded_from_storage( |
357 | | const SequenceNumber_t& seq_num); |
358 | | |
359 | | bool received_change_set( |
360 | | const SequenceNumber_t& seq_num, |
361 | | bool is_relevance); |
362 | | |
363 | | void cleanup(); |
364 | | |
365 | | void clear(); |
366 | | |
367 | | //! Pointer to associated StatefulReader. |
368 | | StatefulReader* reader_; |
369 | | //!Timed event to postpone the heartbeatResponse. |
370 | | TimedEvent* heartbeat_response_; |
371 | | //! Timed event to send initial acknack. |
372 | | TimedEvent* initial_acknack_; |
373 | | //! Last Heartbeatcount. |
374 | | std::atomic<uint32_t> last_heartbeat_count_; |
375 | | //!Indicates if the heartbeat has the final flag set. |
376 | | std::atomic<bool> heartbeat_final_flag_; |
377 | | //!Is the writer alive |
378 | | bool is_alive_; |
379 | | |
380 | | using pool_allocator_t = |
381 | | foonathan::memory::memory_pool<foonathan::memory::node_pool, foonathan::memory::heap_allocator>; |
382 | | |
383 | | //! Memory pool allocator for changes_received_ |
384 | | pool_allocator_t changes_pool_; |
385 | | //! Vector containing the sequence number of the received ChangeFromWriter_t objects. |
386 | | foonathan::memory::set<SequenceNumber_t, pool_allocator_t> changes_received_; |
387 | | //! Sequence number of the highest available change |
388 | | SequenceNumber_t changes_from_writer_low_mark_; |
389 | | //! Highest sequence number informed by writer |
390 | | SequenceNumber_t max_sequence_number_; |
391 | | //! Store last ChacheChange_t notified. |
392 | | SequenceNumber_t last_notified_; |
393 | | //!To fool RTPSMessageGroup when using this proxy as single destination |
394 | | ResourceLimitedVector<GUID_t> guid_as_vector_; |
395 | | //!To fool RTPSMessageGroup when using this proxy as single destination |
396 | | ResourceLimitedVector<GuidPrefix_t> guid_prefix_as_vector_; |
397 | | //! Is the writer on the same process |
398 | | bool is_on_same_process_; |
399 | | //! Taken from QoS |
400 | | uint32_t ownership_strength_; |
401 | | //! Taken from QoS |
402 | | LivelinessQosPolicyKind liveliness_kind_; |
403 | | //! Taken from proxy data |
404 | | GUID_t persistence_guid_; |
405 | | //! Taken from proxy data |
406 | | LocatorSelectorEntry locators_entry_; |
407 | | //! Is the writer datasharing |
408 | | bool is_datasharing_writer_; |
409 | | //! Wether at least one heartbeat was recevied. |
410 | | bool received_at_least_one_heartbeat_; |
411 | | |
412 | | using ChangeIterator = decltype(changes_received_)::iterator; |
413 | | |
414 | | #if !defined(NDEBUG) && defined(FASTRTPS_SOURCE) && defined(__unix__) |
415 | | int get_mutex_owner() const; |
416 | | |
417 | | int get_thread_id() const; |
418 | | #endif // if !defined(NDEBUG) && defined(FASTRTPS_SOURCE) && defined(__unix__) |
419 | | }; |
420 | | |
421 | | } /* namespace rtps */ |
422 | | } /* namespace fastrtps */ |
423 | | } /* namespace eprosima */ |
424 | | |
425 | | #endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC |
426 | | #endif /* FASTRTPS_RTPS_READER_WRITERPROXY_H_ */ |