/src/Fast-DDS/include/fastdds/rtps/reader/RTPSReader.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | /** |
16 | | * @file RTPSReader.h |
17 | | */ |
18 | | |
19 | | #ifndef _FASTDDS_RTPS_READER_RTPSREADER_H_ |
20 | | #define _FASTDDS_RTPS_READER_RTPSREADER_H_ |
21 | | |
22 | | #include <functional> |
23 | | |
24 | | #include <fastdds/rtps/Endpoint.h> |
25 | | #include <fastdds/rtps/attributes/ReaderAttributes.h> |
26 | | #include <fastdds/rtps/builtin/data/WriterProxyData.h> |
27 | | #include <fastdds/rtps/common/SequenceNumber.h> |
28 | | #include <fastdds/rtps/common/Time_t.h> |
29 | | #include <fastdds/rtps/history/ReaderHistory.h> |
30 | | #include <fastdds/rtps/interfaces/IReaderDataFilter.hpp> |
31 | | #include <fastrtps/qos/LivelinessChangedStatus.h> |
32 | | #include <fastrtps/utils/TimedConditionVariable.hpp> |
33 | | |
34 | | #include <fastdds/statistics/rtps/StatisticsCommon.hpp> |
35 | | |
36 | | namespace eprosima { |
37 | | namespace fastrtps { |
38 | | namespace rtps { |
39 | | |
40 | | // Forward declarations |
41 | | class LivelinessManager; |
42 | | class ReaderListener; |
43 | | class WriterProxy; |
44 | | struct CacheChange_t; |
45 | | struct ReaderHistoryState; |
46 | | class WriterProxyData; |
47 | | class IDataSharingListener; |
48 | | |
49 | | /** |
50 | | * Class RTPSReader, manages the reception of data from its matched writers. |
51 | | * @ingroup READER_MODULE |
52 | | */ |
53 | | class RTPSReader |
54 | | : public Endpoint |
55 | | , public fastdds::statistics::StatisticsReaderImpl |
56 | | { |
57 | | friend class ReaderHistory; |
58 | | friend class RTPSParticipantImpl; |
59 | | friend class MessageReceiver; |
60 | | friend class EDP; |
61 | | friend class WLP; |
62 | | |
63 | | protected: |
64 | | |
65 | | RTPSReader( |
66 | | RTPSParticipantImpl* pimpl, |
67 | | const GUID_t& guid, |
68 | | const ReaderAttributes& att, |
69 | | ReaderHistory* hist, |
70 | | ReaderListener* listen = nullptr); |
71 | | |
72 | | RTPSReader( |
73 | | RTPSParticipantImpl* pimpl, |
74 | | const GUID_t& guid, |
75 | | const ReaderAttributes& att, |
76 | | const std::shared_ptr<IPayloadPool>& payload_pool, |
77 | | ReaderHistory* hist, |
78 | | ReaderListener* listen = nullptr); |
79 | | |
80 | | RTPSReader( |
81 | | RTPSParticipantImpl* pimpl, |
82 | | const GUID_t& guid, |
83 | | const ReaderAttributes& att, |
84 | | const std::shared_ptr<IPayloadPool>& payload_pool, |
85 | | const std::shared_ptr<IChangePool>& change_pool, |
86 | | ReaderHistory* hist, |
87 | | ReaderListener* listen = nullptr); |
88 | | |
89 | | virtual ~RTPSReader(); |
90 | | |
91 | | public: |
92 | | |
93 | | /** |
94 | | * Add a matched writer represented by its attributes. |
95 | | * @param wdata Attributes of the writer to add. |
96 | | * @return True if correctly added. |
97 | | */ |
98 | | RTPS_DllAPI virtual bool matched_writer_add( |
99 | | const WriterProxyData& wdata) = 0; |
100 | | |
101 | | /** |
102 | | * Remove a writer represented by its attributes from the matched writers. |
103 | | * @param writer_guid GUID of the writer to remove. |
104 | | * @param removed_by_lease Whether the writer is being unmatched due to a participant drop. |
105 | | * @return True if correctly removed. |
106 | | */ |
107 | | RTPS_DllAPI virtual bool matched_writer_remove( |
108 | | const GUID_t& writer_guid, |
109 | | bool removed_by_lease = false) = 0; |
110 | | |
111 | | /** |
112 | | * Tells us if a specific Writer is matched against this reader. |
113 | | * @param writer_guid GUID of the writer to check. |
114 | | * @return True if it is matched. |
115 | | */ |
116 | | RTPS_DllAPI virtual bool matched_writer_is_matched( |
117 | | const GUID_t& writer_guid) = 0; |
118 | | |
119 | | /** |
120 | | * Processes a new DATA message. Previously the message must have been accepted by function acceptMsgDirectedTo. |
121 | | * |
122 | | * @param change Pointer to the CacheChange_t. |
123 | | * @return true if the reader accepts messages from the. |
124 | | */ |
125 | | RTPS_DllAPI virtual bool processDataMsg( |
126 | | CacheChange_t* change) = 0; |
127 | | |
128 | | /** |
129 | | * Processes a new DATA FRAG message. |
130 | | * |
131 | | * @param change Pointer to the CacheChange_t. |
132 | | * @param sampleSize Size of the complete, assembled message. |
133 | | * @param fragmentStartingNum Starting number of this particular message. |
134 | | * @param fragmentsInSubmessage Number of fragments on this particular message. |
135 | | * @return true if the reader accepts message. |
136 | | */ |
137 | | RTPS_DllAPI virtual bool processDataFragMsg( |
138 | | CacheChange_t* change, |
139 | | uint32_t sampleSize, |
140 | | uint32_t fragmentStartingNum, |
141 | | uint16_t fragmentsInSubmessage) = 0; |
142 | | |
143 | | /** |
144 | | * Processes a new HEARTBEAT message. |
145 | | * @param writerGUID |
146 | | * @param hbCount |
147 | | * @param firstSN |
148 | | * @param lastSN |
149 | | * @param finalFlag |
150 | | * @param livelinessFlag |
151 | | * @return true if the reader accepts messages from the. |
152 | | */ |
153 | | RTPS_DllAPI virtual bool processHeartbeatMsg( |
154 | | const GUID_t& writerGUID, |
155 | | uint32_t hbCount, |
156 | | const SequenceNumber_t& firstSN, |
157 | | const SequenceNumber_t& lastSN, |
158 | | bool finalFlag, |
159 | | bool livelinessFlag) = 0; |
160 | | |
161 | | /** |
162 | | * Processes a new GAP message. |
163 | | * @param writerGUID |
164 | | * @param gapStart |
165 | | * @param gapList |
166 | | * @return true if the reader accepts messages from the. |
167 | | */ |
168 | | RTPS_DllAPI virtual bool processGapMsg( |
169 | | const GUID_t& writerGUID, |
170 | | const SequenceNumber_t& gapStart, |
171 | | const SequenceNumberSet_t& gapList) = 0; |
172 | | |
173 | | /** |
174 | | * Method to indicate the reader that some change has been removed due to HistoryQos requirements. |
175 | | * @param change Pointer to the CacheChange_t. |
176 | | * @param prox Pointer to the WriterProxy. |
177 | | * @return True if correctly removed. |
178 | | */ |
179 | | RTPS_DllAPI virtual bool change_removed_by_history( |
180 | | CacheChange_t* change, |
181 | | WriterProxy* prox = nullptr) = 0; |
182 | | |
183 | | /** |
184 | | * Get the associated listener, secondary attached Listener in case it is of compound type |
185 | | * @return Pointer to the associated reader listener. |
186 | | */ |
187 | | RTPS_DllAPI ReaderListener* getListener() const; |
188 | | |
189 | | /** |
190 | | * Switch the ReaderListener kind for the Reader. |
191 | | * If the RTPSReader does not belong to the built-in protocols it switches out the old one. |
192 | | * If it belongs to the built-in protocols, it sets the new ReaderListener callbacks to be called after the |
193 | | * built-in ReaderListener ones. |
194 | | * @param target Pointed to ReaderLister to attach |
195 | | * @return True is correctly set. |
196 | | */ |
197 | | RTPS_DllAPI bool setListener( |
198 | | ReaderListener* target); |
199 | | |
200 | | /** |
201 | | * Reserve a CacheChange_t. |
202 | | * @param change Pointer to pointer to the Cache. |
203 | | * @param dataCdrSerializedSize Size of the Cache. |
204 | | * @return True if correctly reserved. |
205 | | */ |
206 | | RTPS_DllAPI bool reserveCache( |
207 | | CacheChange_t** change, |
208 | | uint32_t dataCdrSerializedSize); |
209 | | |
210 | | /** |
211 | | * Release a cacheChange. |
212 | | */ |
213 | | RTPS_DllAPI void releaseCache( |
214 | | CacheChange_t* change); |
215 | | |
216 | | /** |
217 | | * Read the next unread CacheChange_t from the history |
218 | | * @param change Pointer to pointer of CacheChange_t |
219 | | * @param wp Pointer to pointer to the WriterProxy |
220 | | * @return True if read. |
221 | | */ |
222 | | RTPS_DllAPI virtual bool nextUnreadCache( |
223 | | CacheChange_t** change, |
224 | | WriterProxy** wp) = 0; |
225 | | |
226 | | /** |
227 | | * Get the next CacheChange_t from the history to take. |
228 | | * @param change Pointer to pointer of CacheChange_t. |
229 | | * @param wp Pointer to pointer to the WriterProxy. |
230 | | * @return True if read. |
231 | | */ |
232 | | RTPS_DllAPI virtual bool nextUntakenCache( |
233 | | CacheChange_t** change, |
234 | | WriterProxy** wp) = 0; |
235 | | |
236 | | RTPS_DllAPI bool wait_for_unread_cache( |
237 | | const eprosima::fastrtps::Duration_t& timeout); |
238 | | |
239 | | RTPS_DllAPI uint64_t get_unread_count() const; |
240 | | |
241 | | RTPS_DllAPI uint64_t get_unread_count( |
242 | | bool mark_as_read); |
243 | | |
244 | | /** |
245 | | * @return True if the reader expects Inline QOS. |
246 | | */ |
247 | | RTPS_DllAPI inline bool expectsInlineQos() |
248 | 0 | { |
249 | 0 | return m_expectsInlineQos; |
250 | 0 | } |
251 | | |
252 | | //! Returns a pointer to the associated History. |
253 | | RTPS_DllAPI inline ReaderHistory* getHistory() |
254 | 0 | { |
255 | 0 | return mp_history; |
256 | 0 | } |
257 | | |
258 | | //! @return The content filter associated to this reader. |
259 | | RTPS_DllAPI eprosima::fastdds::rtps::IReaderDataFilter* get_content_filter() const |
260 | 0 | { |
261 | 0 | std::lock_guard<RecursiveTimedMutex> guard(mp_mutex); |
262 | 0 | return data_filter_; |
263 | 0 | } |
264 | | |
265 | | //! Set the content filter associated to this reader. |
266 | | //! @param filter Pointer to the content filter to associate to this reader. |
267 | | RTPS_DllAPI void set_content_filter( |
268 | | eprosima::fastdds::rtps::IReaderDataFilter* filter) |
269 | 0 | { |
270 | 0 | std::lock_guard<RecursiveTimedMutex> guard(mp_mutex); |
271 | 0 | data_filter_ = filter; |
272 | 0 | } |
273 | | |
274 | | /*! |
275 | | * @brief Returns there is a clean state with all Writers. |
276 | | * It occurs when the Reader received all samples sent by Writers. In other words, |
277 | | * its WriterProxies are up to date. |
278 | | * @return There is a clean state with all Writers. |
279 | | */ |
280 | | virtual bool isInCleanState() = 0; |
281 | | |
282 | | //! The liveliness changed status struct as defined in the DDS |
283 | | LivelinessChangedStatus liveliness_changed_status_; |
284 | | |
285 | | inline void enableMessagesFromUnkownWriters( |
286 | | bool enable) |
287 | 0 | { |
288 | 0 | m_acceptMessagesFromUnkownWriters = enable; |
289 | 0 | } |
290 | | |
291 | | void setTrustedWriter( |
292 | | const EntityId_t& writer) |
293 | 0 | { |
294 | 0 | m_acceptMessagesFromUnkownWriters = false; |
295 | 0 | m_trustedWriterEntityId = writer; |
296 | 0 | } |
297 | | |
298 | | /** |
299 | | * Assert the liveliness of a matched writer. |
300 | | * @param writer GUID of the writer to assert. |
301 | | */ |
302 | | virtual void assert_writer_liveliness( |
303 | | const GUID_t& writer) = 0; |
304 | | |
305 | | /** |
306 | | * Called just before a change is going to be deserialized. |
307 | | * @param [in] change Pointer to the change being accessed. |
308 | | * @param [out] wp Writer proxy the @c change belongs to. |
309 | | * @param [out] is_future_change Whether the change is in the future (i.e. there are |
310 | | * earlier unreceived changes from the same writer). |
311 | | * |
312 | | * @return Whether the change is still valid or not. |
313 | | */ |
314 | | virtual bool begin_sample_access_nts( |
315 | | CacheChange_t* change, |
316 | | WriterProxy*& wp, |
317 | | bool& is_future_change) = 0; |
318 | | |
319 | | /** |
320 | | * Called after the change has been deserialized. |
321 | | * @param [in] change Pointer to the change being accessed. |
322 | | * @param [in] wp Writer proxy the @c change belongs to. |
323 | | * @param [in] mark_as_read Whether the @c change should be marked as read or not. |
324 | | */ |
325 | | virtual void end_sample_access_nts( |
326 | | CacheChange_t* change, |
327 | | WriterProxy*& wp, |
328 | | bool mark_as_read) = 0; |
329 | | |
330 | | /** |
331 | | * Called when the user has retrieved a change from the history. |
332 | | * @param change Pointer to the change to ACK |
333 | | * @param writer Writer proxy of the \c change. |
334 | | * @param mark_as_read Whether the \c change should be marked as read or not |
335 | | */ |
336 | | virtual void change_read_by_user( |
337 | | CacheChange_t* change, |
338 | | WriterProxy* writer, |
339 | | bool mark_as_read = true) = 0; |
340 | | |
341 | | /** |
342 | | * Checks whether the sample is still valid or is corrupted. |
343 | | * |
344 | | * @param data Pointer to the sample data to check. |
345 | | * If it does not belong to the payload pool passed to the |
346 | | * reader on construction, it yields undefined behavior. |
347 | | * @param writer GUID of the writer that sent \c data. |
348 | | * @param sn Sequence number related to \c data. |
349 | | * |
350 | | * @return true if the sample is valid |
351 | | */ |
352 | | RTPS_DllAPI bool is_sample_valid( |
353 | | const void* data, |
354 | | const GUID_t& writer, |
355 | | const SequenceNumber_t& sn) const; |
356 | | |
357 | | const std::unique_ptr<IDataSharingListener>& datasharing_listener() const |
358 | 0 | { |
359 | 0 | return datasharing_listener_; |
360 | 0 | } |
361 | | |
362 | | #ifdef FASTDDS_STATISTICS |
363 | | |
364 | | /* |
365 | | * Add a listener to receive statistics backend callbacks |
366 | | * @param listener |
367 | | * @return true if successfully added |
368 | | */ |
369 | | RTPS_DllAPI bool add_statistics_listener( |
370 | | std::shared_ptr<fastdds::statistics::IListener> listener); |
371 | | |
372 | | /* |
373 | | * Remove a listener from receiving statistics backend callbacks |
374 | | * @param listener |
375 | | * @return true if successfully removed |
376 | | */ |
377 | | RTPS_DllAPI bool remove_statistics_listener( |
378 | | std::shared_ptr<fastdds::statistics::IListener> listener); |
379 | | |
380 | | #endif // FASTDDS_STATISTICS |
381 | | |
382 | | protected: |
383 | | |
384 | | virtual bool may_remove_history_record( |
385 | | bool removed_by_lease); |
386 | | |
387 | | /*! |
388 | | * @brief Add a remote writer to the persistence_guid map |
389 | | * @param guid GUID of the remote writer |
390 | | * @param persistence_guid Persistence GUID of the remote writer |
391 | | */ |
392 | | void add_persistence_guid( |
393 | | const GUID_t& guid, |
394 | | const GUID_t& persistence_guid); |
395 | | |
396 | | /*! |
397 | | * @brief Remove a remote writer from the persistence_guid map |
398 | | * @param guid GUID of the remote writer |
399 | | * @param persistence_guid Persistence GUID of the remote writer |
400 | | * @param removed_by_lease Whether the GUIDs are being removed due to a participant drop. |
401 | | */ |
402 | | void remove_persistence_guid( |
403 | | const GUID_t& guid, |
404 | | const GUID_t& persistence_guid, |
405 | | bool removed_by_lease); |
406 | | |
407 | | /*! |
408 | | * @brief Get the last notified sequence for a RTPS guid |
409 | | * @param guid The RTPS guid to query |
410 | | * @return Last notified sequence number for input guid |
411 | | * @remarks Takes persistence_guid into consideration |
412 | | */ |
413 | | SequenceNumber_t get_last_notified( |
414 | | const GUID_t& guid); |
415 | | |
416 | | /*! |
417 | | * @brief Update the last notified sequence for a RTPS guid |
418 | | * @param guid The RTPS guid of the writer |
419 | | * @param seq Max sequence number available on writer |
420 | | * @return Previous value of last notified sequence number for input guid |
421 | | * @remarks Takes persistence_guid into consideration |
422 | | */ |
423 | | SequenceNumber_t update_last_notified( |
424 | | const GUID_t& guid, |
425 | | const SequenceNumber_t& seq); |
426 | | |
427 | | /*! |
428 | | * @brief Set the last notified sequence for a persistence guid |
429 | | * @param persistence_guid The persistence guid to update |
430 | | * @param seq Sequence number to set for input guid |
431 | | * @remarks Persistent readers will write to DB |
432 | | */ |
433 | | virtual void set_last_notified( |
434 | | const GUID_t& persistence_guid, |
435 | | const SequenceNumber_t& seq); |
436 | | |
437 | | /*! |
438 | | * @brief Search if there is a CacheChange_t, giving SequenceNumber_t and writer GUID_t, |
439 | | * waiting to be completed because it is fragmented. |
440 | | * @param sequence_number SequenceNumber_t of the searched CacheChange_t. |
441 | | * @param writer_guid writer GUID_t of the searched CacheChange_t. |
442 | | * @param change If a CacheChange_t was found, this argument will fill with its pointer. |
443 | | * In other case nullptr is returned. |
444 | | * @param hint Iterator since the search will start. |
445 | | * Used to improve the search. |
446 | | * @return Iterator pointing to the position were CacheChange_t was found. |
447 | | * It can be used to improve next search. |
448 | | */ |
449 | | History::const_iterator findCacheInFragmentedProcess( |
450 | | const SequenceNumber_t& sequence_number, |
451 | | const GUID_t& writer_guid, |
452 | | CacheChange_t** change, |
453 | | History::const_iterator hint) const; |
454 | | |
455 | | /** |
456 | | * Creates the listener for the datasharing notifications |
457 | | * |
458 | | * @param limits Resource limits for the number of matched datasharing writers |
459 | | */ |
460 | | void create_datasharing_listener( |
461 | | ResourceLimitedContainerConfig limits); |
462 | | |
463 | | bool is_datasharing_compatible_with( |
464 | | const WriterProxyData& wdata); |
465 | | |
466 | | //!ReaderHistory |
467 | | ReaderHistory* mp_history; |
468 | | //!Listener |
469 | | ReaderListener* mp_listener; |
470 | | //!Accept msg to unknwon readers (default=true) |
471 | | bool m_acceptMessagesToUnknownReaders; |
472 | | //!Accept msg from unknwon writers (BE-true,RE-false) |
473 | | bool m_acceptMessagesFromUnkownWriters; |
474 | | //!Trusted writer (for Builtin) |
475 | | EntityId_t m_trustedWriterEntityId; |
476 | | //!Expects Inline Qos. |
477 | | bool m_expectsInlineQos; |
478 | | |
479 | | //!ReaderHistoryState |
480 | | ReaderHistoryState* history_state_; |
481 | | |
482 | | uint64_t total_unread_ = 0; |
483 | | |
484 | | TimedConditionVariable new_notification_cv_; |
485 | | |
486 | | //! The liveliness kind of this reader |
487 | | LivelinessQosPolicyKind liveliness_kind_; |
488 | | //! The liveliness lease duration of this reader |
489 | | Duration_t liveliness_lease_duration_; |
490 | | |
491 | | //! Whether the writer is datasharing compatible or not |
492 | | bool is_datasharing_compatible_ = false; |
493 | | //! The listener for the datasharing notifications |
494 | | std::unique_ptr<IDataSharingListener> datasharing_listener_; |
495 | | |
496 | | eprosima::fastdds::rtps::IReaderDataFilter* data_filter_ = nullptr; |
497 | | |
498 | | private: |
499 | | |
500 | | RTPSReader& operator =( |
501 | | const RTPSReader&) = delete; |
502 | | |
503 | | void init( |
504 | | const std::shared_ptr<IPayloadPool>& payload_pool, |
505 | | const std::shared_ptr<IChangePool>& change_pool, |
506 | | const ReaderAttributes& att); |
507 | | |
508 | | }; |
509 | | |
510 | | } /* namespace rtps */ |
511 | | } /* namespace fastrtps */ |
512 | | } /* namespace eprosima */ |
513 | | |
514 | | #endif /* _FASTDDS_RTPS_READER_RTPSREADER_H_ */ |