Coverage Report

Created: 2022-08-24 06:19

/src/Fast-DDS/include/fastdds/rtps/reader/RTPSReader.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file RTPSReader.h
17
 */
18
19
#ifndef _FASTDDS_RTPS_READER_RTPSREADER_H_
20
#define _FASTDDS_RTPS_READER_RTPSREADER_H_
21
22
#include <functional>
23
24
#include <fastdds/rtps/Endpoint.h>
25
#include <fastdds/rtps/attributes/ReaderAttributes.h>
26
#include <fastdds/rtps/builtin/data/WriterProxyData.h>
27
#include <fastdds/rtps/common/SequenceNumber.h>
28
#include <fastdds/rtps/common/Time_t.h>
29
#include <fastdds/rtps/history/ReaderHistory.h>
30
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
31
#include <fastrtps/qos/LivelinessChangedStatus.h>
32
#include <fastrtps/utils/TimedConditionVariable.hpp>
33
34
#include <fastdds/statistics/rtps/StatisticsCommon.hpp>
35
36
namespace eprosima {
37
namespace fastrtps {
38
namespace rtps {
39
40
// Forward declarations
41
class LivelinessManager;
42
class ReaderListener;
43
class WriterProxy;
44
struct CacheChange_t;
45
struct ReaderHistoryState;
46
class WriterProxyData;
47
class IDataSharingListener;
48
49
/**
50
 * Class RTPSReader, manages the reception of data from its matched writers.
51
 * @ingroup READER_MODULE
52
 */
53
class RTPSReader
54
    : public Endpoint
55
    , public fastdds::statistics::StatisticsReaderImpl
56
{
57
    friend class ReaderHistory;
58
    friend class RTPSParticipantImpl;
59
    friend class MessageReceiver;
60
    friend class EDP;
61
    friend class WLP;
62
63
protected:
64
65
    RTPSReader(
66
            RTPSParticipantImpl* pimpl,
67
            const GUID_t& guid,
68
            const ReaderAttributes& att,
69
            ReaderHistory* hist,
70
            ReaderListener* listen = nullptr);
71
72
    RTPSReader(
73
            RTPSParticipantImpl* pimpl,
74
            const GUID_t& guid,
75
            const ReaderAttributes& att,
76
            const std::shared_ptr<IPayloadPool>& payload_pool,
77
            ReaderHistory* hist,
78
            ReaderListener* listen = nullptr);
79
80
    RTPSReader(
81
            RTPSParticipantImpl* pimpl,
82
            const GUID_t& guid,
83
            const ReaderAttributes& att,
84
            const std::shared_ptr<IPayloadPool>& payload_pool,
85
            const std::shared_ptr<IChangePool>& change_pool,
86
            ReaderHistory* hist,
87
            ReaderListener* listen = nullptr);
88
89
    virtual ~RTPSReader();
90
91
public:
92
93
    /**
94
     * Add a matched writer represented by its attributes.
95
     * @param wdata Attributes of the writer to add.
96
     * @return True if correctly added.
97
     */
98
    RTPS_DllAPI virtual bool matched_writer_add(
99
            const WriterProxyData& wdata) = 0;
100
101
    /**
102
     * Remove a writer represented by its attributes from the matched writers.
103
     * @param writer_guid GUID of the writer to remove.
104
     * @param removed_by_lease Whether the writer is being unmatched due to a participant drop.
105
     * @return True if correctly removed.
106
     */
107
    RTPS_DllAPI virtual bool matched_writer_remove(
108
            const GUID_t& writer_guid,
109
            bool removed_by_lease = false) = 0;
110
111
    /**
112
     * Tells us if a specific Writer is matched against this reader.
113
     * @param writer_guid GUID of the writer to check.
114
     * @return True if it is matched.
115
     */
116
    RTPS_DllAPI virtual bool matched_writer_is_matched(
117
            const GUID_t& writer_guid) = 0;
118
119
    /**
120
     * Processes a new DATA message. Previously the message must have been accepted by function acceptMsgDirectedTo.
121
     *
122
     * @param change Pointer to the CacheChange_t.
123
     * @return true if the reader accepts messages from the.
124
     */
125
    RTPS_DllAPI virtual bool processDataMsg(
126
            CacheChange_t* change) = 0;
127
128
    /**
129
     * Processes a new DATA FRAG message.
130
     *
131
     * @param change Pointer to the CacheChange_t.
132
     * @param sampleSize Size of the complete, assembled message.
133
     * @param fragmentStartingNum Starting number of this particular message.
134
     * @param fragmentsInSubmessage Number of fragments on this particular message.
135
     * @return true if the reader accepts message.
136
     */
137
    RTPS_DllAPI virtual bool processDataFragMsg(
138
            CacheChange_t* change,
139
            uint32_t sampleSize,
140
            uint32_t fragmentStartingNum,
141
            uint16_t fragmentsInSubmessage) = 0;
142
143
    /**
144
     * Processes a new HEARTBEAT message.
145
     * @param writerGUID
146
     * @param hbCount
147
     * @param firstSN
148
     * @param lastSN
149
     * @param finalFlag
150
     * @param livelinessFlag
151
     * @return true if the reader accepts messages from the.
152
     */
153
    RTPS_DllAPI virtual bool processHeartbeatMsg(
154
            const GUID_t& writerGUID,
155
            uint32_t hbCount,
156
            const SequenceNumber_t& firstSN,
157
            const SequenceNumber_t& lastSN,
158
            bool finalFlag,
159
            bool livelinessFlag) = 0;
160
161
    /**
162
     * Processes a new GAP message.
163
     * @param writerGUID
164
     * @param gapStart
165
     * @param gapList
166
     * @return true if the reader accepts messages from the.
167
     */
168
    RTPS_DllAPI virtual bool processGapMsg(
169
            const GUID_t& writerGUID,
170
            const SequenceNumber_t& gapStart,
171
            const SequenceNumberSet_t& gapList) = 0;
172
173
    /**
174
     * Method to indicate the reader that some change has been removed due to HistoryQos requirements.
175
     * @param change Pointer to the CacheChange_t.
176
     * @param prox Pointer to the WriterProxy.
177
     * @return True if correctly removed.
178
     */
179
    RTPS_DllAPI virtual bool change_removed_by_history(
180
            CacheChange_t* change,
181
            WriterProxy* prox = nullptr) = 0;
182
183
    /**
184
     * Get the associated listener, secondary attached Listener in case it is of compound type
185
     * @return Pointer to the associated reader listener.
186
     */
187
    RTPS_DllAPI ReaderListener* getListener() const;
188
189
    /**
190
     * Switch the ReaderListener kind for the Reader.
191
     * If the RTPSReader does not belong to the built-in protocols it switches out the old one.
192
     * If it belongs to the built-in protocols, it sets the new ReaderListener callbacks to be called after the
193
     * built-in ReaderListener ones.
194
     * @param target Pointed to ReaderLister to attach
195
     * @return True is correctly set.
196
     */
197
    RTPS_DllAPI bool setListener(
198
            ReaderListener* target);
199
200
    /**
201
     * Reserve a CacheChange_t.
202
     * @param change Pointer to pointer to the Cache.
203
     * @param dataCdrSerializedSize Size of the Cache.
204
     * @return True if correctly reserved.
205
     */
206
    RTPS_DllAPI bool reserveCache(
207
            CacheChange_t** change,
208
            uint32_t dataCdrSerializedSize);
209
210
    /**
211
     * Release a cacheChange.
212
     */
213
    RTPS_DllAPI void releaseCache(
214
            CacheChange_t* change);
215
216
    /**
217
     * Read the next unread CacheChange_t from the history
218
     * @param change Pointer to pointer of CacheChange_t
219
     * @param wp Pointer to pointer to the WriterProxy
220
     * @return True if read.
221
     */
222
    RTPS_DllAPI virtual bool nextUnreadCache(
223
            CacheChange_t** change,
224
            WriterProxy** wp) = 0;
225
226
    /**
227
     * Get the next CacheChange_t from the history to take.
228
     * @param change Pointer to pointer of CacheChange_t.
229
     * @param wp Pointer to pointer to the WriterProxy.
230
     * @return True if read.
231
     */
232
    RTPS_DllAPI virtual bool nextUntakenCache(
233
            CacheChange_t** change,
234
            WriterProxy** wp) = 0;
235
236
    RTPS_DllAPI bool wait_for_unread_cache(
237
            const eprosima::fastrtps::Duration_t& timeout);
238
239
    RTPS_DllAPI uint64_t get_unread_count() const;
240
241
    RTPS_DllAPI uint64_t get_unread_count(
242
            bool mark_as_read);
243
244
    /**
245
     * @return True if the reader expects Inline QOS.
246
     */
247
    RTPS_DllAPI inline bool expectsInlineQos()
248
0
    {
249
0
        return m_expectsInlineQos;
250
0
    }
251
252
    //! Returns a pointer to the associated History.
253
    RTPS_DllAPI inline ReaderHistory* getHistory()
254
0
    {
255
0
        return mp_history;
256
0
    }
257
258
    //! @return The content filter associated to this reader.
259
    RTPS_DllAPI eprosima::fastdds::rtps::IReaderDataFilter* get_content_filter() const
260
0
    {
261
0
        std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
262
0
        return data_filter_;
263
0
    }
264
265
    //! Set the content filter associated to this reader.
266
    //! @param filter Pointer to the content filter to associate to this reader.
267
    RTPS_DllAPI void set_content_filter(
268
            eprosima::fastdds::rtps::IReaderDataFilter* filter)
269
0
    {
270
0
        std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
271
0
        data_filter_ = filter;
272
0
    }
273
274
    /*!
275
     * @brief Returns there is a clean state with all Writers.
276
     * It occurs when the Reader received all samples sent by Writers. In other words,
277
     * its WriterProxies are up to date.
278
     * @return There is a clean state with all Writers.
279
     */
280
    virtual bool isInCleanState() = 0;
281
282
    //! The liveliness changed status struct as defined in the DDS
283
    LivelinessChangedStatus liveliness_changed_status_;
284
285
    inline void enableMessagesFromUnkownWriters(
286
            bool enable)
287
0
    {
288
0
        m_acceptMessagesFromUnkownWriters = enable;
289
0
    }
290
291
    void setTrustedWriter(
292
            const EntityId_t& writer)
293
0
    {
294
0
        m_acceptMessagesFromUnkownWriters = false;
295
0
        m_trustedWriterEntityId = writer;
296
0
    }
297
298
    /**
299
     * Assert the liveliness of a matched writer.
300
     * @param writer GUID of the writer to assert.
301
     */
302
    virtual void assert_writer_liveliness(
303
            const GUID_t& writer) = 0;
304
305
    /**
306
     * Called just before a change is going to be deserialized.
307
     * @param [in]  change            Pointer to the change being accessed.
308
     * @param [out] wp                Writer proxy the @c change belongs to.
309
     * @param [out] is_future_change  Whether the change is in the future (i.e. there are
310
     *                                earlier unreceived changes from the same writer).
311
     *
312
     * @return Whether the change is still valid or not.
313
     */
314
    virtual bool begin_sample_access_nts(
315
            CacheChange_t* change,
316
            WriterProxy*& wp,
317
            bool& is_future_change) = 0;
318
319
    /**
320
     * Called after the change has been deserialized.
321
     * @param [in] change        Pointer to the change being accessed.
322
     * @param [in] wp            Writer proxy the @c change belongs to.
323
     * @param [in] mark_as_read  Whether the @c change should be marked as read or not.
324
     */
325
    virtual void end_sample_access_nts(
326
            CacheChange_t* change,
327
            WriterProxy*& wp,
328
            bool mark_as_read) = 0;
329
330
    /**
331
     * Called when the user has retrieved a change from the history.
332
     * @param change Pointer to the change to ACK
333
     * @param writer Writer proxy of the \c change.
334
     * @param mark_as_read Whether the \c change should be marked as read or not
335
     */
336
    virtual void change_read_by_user(
337
            CacheChange_t* change,
338
            WriterProxy* writer,
339
            bool mark_as_read = true) = 0;
340
341
    /**
342
     * Checks whether the sample is still valid or is corrupted.
343
     *
344
     * @param data    Pointer to the sample data to check.
345
     *                If it does not belong to the payload pool passed to the
346
     *                reader on construction, it yields undefined behavior.
347
     * @param writer  GUID of the writer that sent \c data.
348
     * @param sn      Sequence number related to \c data.
349
     *
350
     * @return true if the sample is valid
351
     */
352
    RTPS_DllAPI bool is_sample_valid(
353
            const void* data,
354
            const GUID_t& writer,
355
            const SequenceNumber_t& sn) const;
356
357
    const std::unique_ptr<IDataSharingListener>& datasharing_listener() const
358
0
    {
359
0
        return datasharing_listener_;
360
0
    }
361
362
#ifdef FASTDDS_STATISTICS
363
364
    /*
365
     * Add a listener to receive statistics backend callbacks
366
     * @param listener
367
     * @return true if successfully added
368
     */
369
    RTPS_DllAPI bool add_statistics_listener(
370
            std::shared_ptr<fastdds::statistics::IListener> listener);
371
372
    /*
373
     * Remove a listener from receiving statistics backend callbacks
374
     * @param listener
375
     * @return true if successfully removed
376
     */
377
    RTPS_DllAPI bool remove_statistics_listener(
378
            std::shared_ptr<fastdds::statistics::IListener> listener);
379
380
#endif // FASTDDS_STATISTICS
381
382
protected:
383
384
    virtual bool may_remove_history_record(
385
            bool removed_by_lease);
386
387
    /*!
388
     * @brief Add a remote writer to the persistence_guid map
389
     * @param guid GUID of the remote writer
390
     * @param persistence_guid Persistence GUID of the remote writer
391
     */
392
    void add_persistence_guid(
393
            const GUID_t& guid,
394
            const GUID_t& persistence_guid);
395
396
    /*!
397
     * @brief Remove a remote writer from the persistence_guid map
398
     * @param guid GUID of the remote writer
399
     * @param persistence_guid Persistence GUID of the remote writer
400
     * @param removed_by_lease Whether the GUIDs are being removed due to a participant drop.
401
     */
402
    void remove_persistence_guid(
403
            const GUID_t& guid,
404
            const GUID_t& persistence_guid,
405
            bool removed_by_lease);
406
407
    /*!
408
     * @brief Get the last notified sequence for a RTPS guid
409
     * @param guid The RTPS guid to query
410
     * @return Last notified sequence number for input guid
411
     * @remarks Takes persistence_guid into consideration
412
     */
413
    SequenceNumber_t get_last_notified(
414
            const GUID_t& guid);
415
416
    /*!
417
     * @brief Update the last notified sequence for a RTPS guid
418
     * @param guid The RTPS guid of the writer
419
     * @param seq Max sequence number available on writer
420
     * @return Previous value of last notified sequence number for input guid
421
     * @remarks Takes persistence_guid into consideration
422
     */
423
    SequenceNumber_t update_last_notified(
424
            const GUID_t& guid,
425
            const SequenceNumber_t& seq);
426
427
    /*!
428
     * @brief Set the last notified sequence for a persistence guid
429
     * @param persistence_guid The persistence guid to update
430
     * @param seq Sequence number to set for input guid
431
     * @remarks Persistent readers will write to DB
432
     */
433
    virtual void set_last_notified(
434
            const GUID_t& persistence_guid,
435
            const SequenceNumber_t& seq);
436
437
    /*!
438
     * @brief Search if there is a CacheChange_t, giving SequenceNumber_t and writer GUID_t,
439
     * waiting to be completed because it is fragmented.
440
     * @param sequence_number SequenceNumber_t of the searched CacheChange_t.
441
     * @param writer_guid writer GUID_t of the searched CacheChange_t.
442
     * @param change If a CacheChange_t was found, this argument will fill with its pointer.
443
     * In other case nullptr is returned.
444
     * @param hint Iterator since the search will start.
445
     * Used to improve the search.
446
     * @return Iterator pointing to the position were CacheChange_t was found.
447
     * It can be used to improve next search.
448
     */
449
    History::const_iterator findCacheInFragmentedProcess(
450
            const SequenceNumber_t& sequence_number,
451
            const GUID_t& writer_guid,
452
            CacheChange_t** change,
453
            History::const_iterator hint) const;
454
455
    /**
456
     * Creates the listener for the datasharing notifications
457
     *
458
     * @param limits Resource limits for the number of matched datasharing writers
459
     */
460
    void create_datasharing_listener(
461
            ResourceLimitedContainerConfig limits);
462
463
    bool is_datasharing_compatible_with(
464
            const WriterProxyData& wdata);
465
466
    //!ReaderHistory
467
    ReaderHistory* mp_history;
468
    //!Listener
469
    ReaderListener* mp_listener;
470
    //!Accept msg to unknwon readers (default=true)
471
    bool m_acceptMessagesToUnknownReaders;
472
    //!Accept msg from unknwon writers (BE-true,RE-false)
473
    bool m_acceptMessagesFromUnkownWriters;
474
    //!Trusted writer (for Builtin)
475
    EntityId_t m_trustedWriterEntityId;
476
    //!Expects Inline Qos.
477
    bool m_expectsInlineQos;
478
479
    //!ReaderHistoryState
480
    ReaderHistoryState* history_state_;
481
482
    uint64_t total_unread_ = 0;
483
484
    TimedConditionVariable new_notification_cv_;
485
486
    //! The liveliness kind of this reader
487
    LivelinessQosPolicyKind liveliness_kind_;
488
    //! The liveliness lease duration of this reader
489
    Duration_t liveliness_lease_duration_;
490
491
    //! Whether the writer is datasharing compatible or not
492
    bool is_datasharing_compatible_ = false;
493
    //! The listener for the datasharing notifications
494
    std::unique_ptr<IDataSharingListener> datasharing_listener_;
495
496
    eprosima::fastdds::rtps::IReaderDataFilter* data_filter_ = nullptr;
497
498
private:
499
500
    RTPSReader& operator =(
501
            const RTPSReader&) = delete;
502
503
    void init(
504
            const std::shared_ptr<IPayloadPool>& payload_pool,
505
            const std::shared_ptr<IChangePool>& change_pool,
506
            const ReaderAttributes& att);
507
508
};
509
510
} /* namespace rtps */
511
} /* namespace fastrtps */
512
} /* namespace eprosima */
513
514
#endif /* _FASTDDS_RTPS_READER_RTPSREADER_H_ */