Coverage Report

Created: 2022-08-24 06:19

/src/Fast-DDS/include/fastdds/rtps/reader/StatefulReader.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file StatefulReader.h
17
 */
18
19
#ifndef _FASTDDS_RTPS_READER_STATEFULREADER_H_
20
#define _FASTDDS_RTPS_READER_STATEFULREADER_H_
21
22
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
23
24
#include <fastdds/rtps/reader/RTPSReader.h>
25
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
26
#include <fastdds/rtps/common/CDRMessage_t.h>
27
#include <fastdds/rtps/messages/RTPSMessageGroup.h>
28
29
#include <mutex>
30
31
namespace eprosima {
32
namespace fastrtps {
33
namespace rtps {
34
35
class WriterProxy;
36
class RTPSMessageSenderInterface;
37
38
/**
39
 * Class StatefulReader, specialization of RTPSReader than stores the state of the matched writers.
40
 * @ingroup READER_MODULE
41
 */
42
class StatefulReader : public RTPSReader
43
{
44
public:
45
46
    friend class RTPSParticipantImpl;
47
48
    virtual ~StatefulReader();
49
50
protected:
51
52
    StatefulReader(
53
            RTPSParticipantImpl* pimpl,
54
            const GUID_t& guid,
55
            const ReaderAttributes& att,
56
            ReaderHistory* hist,
57
            ReaderListener* listen = nullptr);
58
59
    StatefulReader(
60
            RTPSParticipantImpl* pimpl,
61
            const GUID_t& guid,
62
            const ReaderAttributes& att,
63
            const std::shared_ptr<IPayloadPool>& payload_pool,
64
            ReaderHistory* hist,
65
            ReaderListener* listen = nullptr);
66
67
    StatefulReader(
68
            RTPSParticipantImpl* pimpl,
69
            const GUID_t& guid,
70
            const ReaderAttributes& att,
71
            const std::shared_ptr<IPayloadPool>& payload_pool,
72
            const std::shared_ptr<IChangePool>& change_pool,
73
            ReaderHistory* hist,
74
            ReaderListener* listen = nullptr);
75
76
public:
77
78
    /**
79
     * Add a matched writer represented by its attributes.
80
     * @param wdata Attributes of the writer to add.
81
     * @return True if correctly added.
82
     */
83
    bool matched_writer_add(
84
            const WriterProxyData& wdata) override;
85
86
    /**
87
     * Remove a WriterProxyData from the matached writers.
88
     * @param writer_guid GUID of the writer to remove.
89
     * @param removed_by_lease true it the writer was removed due to lease duration.
90
     * @return True if correct.
91
     */
92
    bool matched_writer_remove(
93
            const GUID_t& writer_guid,
94
            bool removed_by_lease = false) override;
95
96
    /**
97
     * Tells us if a specific Writer is matched against this reader.
98
     * @param writer_guid GUID of the writer to check.
99
     * @return True if it is matched.
100
     */
101
    bool matched_writer_is_matched(
102
            const GUID_t& writer_guid) override;
103
104
    /**
105
     * Look for a specific WriterProxy.
106
     * @param writerGUID GUID_t of the writer we are looking for.
107
     * @param WP Pointer to pointer to a WriterProxy.
108
     * @return True if found.
109
     */
110
    bool matched_writer_lookup(
111
            const GUID_t& writerGUID,
112
            WriterProxy** WP);
113
114
    /**
115
     * Processes a new DATA message.
116
     * @param change Pointer to the CacheChange_t.
117
     * @return true if the reader accepts messages.
118
     */
119
    bool processDataMsg(
120
            CacheChange_t* change) override;
121
122
    /**
123
     * Processes a new DATA FRAG message.
124
     *
125
     * @param change Pointer to the CacheChange_t.
126
     * @param sampleSize Size of the complete, assembled message.
127
     * @param fragmentStartingNum Starting number of this particular message.
128
     * @param fragmentsInSubmessage Number of fragments on this particular message.
129
     * @return true if the reader accepts message.
130
     */
131
    bool processDataFragMsg(
132
            CacheChange_t* change,
133
            uint32_t sampleSize,
134
            uint32_t fragmentStartingNum,
135
            uint16_t fragmentsInSubmessage) override;
136
137
    /**
138
     * Processes a new HEARTBEAT message.
139
     *
140
     * @return true if the reader accepts messages.
141
     */
142
    bool processHeartbeatMsg(
143
            const GUID_t& writerGUID,
144
            uint32_t hbCount,
145
            const SequenceNumber_t& firstSN,
146
            const SequenceNumber_t& lastSN,
147
            bool finalFlag,
148
            bool livelinessFlag) override;
149
150
    bool processGapMsg(
151
            const GUID_t& writerGUID,
152
            const SequenceNumber_t& gapStart,
153
            const SequenceNumberSet_t& gapList) override;
154
155
    /**
156
     * Method to indicate the reader that some change has been removed due to HistoryQos requirements.
157
     * @param change Pointer to the CacheChange_t.
158
     * @param prox Pointer to the WriterProxy.
159
     * @return True if correctly removed.
160
     */
161
    bool change_removed_by_history(
162
            CacheChange_t* change,
163
            WriterProxy* prox = nullptr) override;
164
165
    /**
166
     * This method is called when a new change is received. This method calls the received_change of the History
167
     * and depending on the implementation performs different actions.
168
     * @param a_change Pointer of the change to add.
169
     * @param prox Pointer to the WriterProxy that adds the Change.
170
     * @param unknown_missing_changes_up_to The number of changes from the same writer with a lower sequence number that
171
     *                                      could potentially be received in the future.
172
     * @return True if added.
173
     */
174
    bool change_received(
175
            CacheChange_t* a_change,
176
            WriterProxy* prox,
177
            size_t unknown_missing_changes_up_to);
178
179
    /**
180
     * Get the RTPS participant
181
     * @return Associated RTPS participant
182
     */
183
    inline RTPSParticipantImpl* getRTPSParticipant() const
184
0
    {
185
0
        return mp_RTPSParticipant;
186
0
    }
187
188
    /**
189
     * Read the next unread CacheChange_t from the history
190
     * @param change Pointer to pointer of CacheChange_t
191
     * @param wpout Pointer to pointer the matched writer proxy
192
     * @return True if read.
193
     */
194
    bool nextUnreadCache(
195
            CacheChange_t** change,
196
            WriterProxy** wpout = nullptr) override;
197
198
    /**
199
     * Take the next CacheChange_t from the history;
200
     * @param change Pointer to pointer of CacheChange_t
201
     * @param wpout Pointer to pointer the matched writer proxy
202
     * @return True if read.
203
     */
204
    bool nextUntakenCache(
205
            CacheChange_t** change,
206
            WriterProxy** wpout = nullptr) override;
207
208
    /**
209
     * Update the times parameters of the Reader.
210
     * @param times ReaderTimes reference.
211
     * @return True if correctly updated.
212
     */
213
    bool updateTimes(
214
            const ReaderTimes& times);
215
216
    /**
217
     *
218
     * @return Reference to the ReaderTimes.
219
     */
220
    inline ReaderTimes& getTimes()
221
0
    {
222
0
        return times_;
223
0
    }
224
225
    /**
226
     * Get the number of matched writers
227
     * @return Number of matched writers
228
     */
229
    inline size_t getMatchedWritersSize() const
230
0
    {
231
0
        return matched_writers_.size();
232
0
    }
233
234
    /*!
235
     * @brief Returns there is a clean state with all Writers.
236
     * It occurs when the Reader received all samples sent by Writers. In other words,
237
     * its WriterProxies are up to date.
238
     * @return There is a clean state with all Writers.
239
     */
240
    bool isInCleanState() override;
241
242
    /**
243
     * Sends an acknack message from this reader.
244
     * @param writer Pointer to the info of the remote writer.
245
     * @param sns Sequence number bitmap with the acknack information.
246
     * @param sender Message sender interface.
247
     * @param is_final Value for final flag.
248
     */
249
    void send_acknack(
250
            const WriterProxy* writer,
251
            const SequenceNumberSet_t& sns,
252
            RTPSMessageSenderInterface* sender,
253
            bool is_final);
254
255
    /**
256
     * Sends an acknack message from this reader in response to a heartbeat.
257
     * @param writer Pointer to the proxy representing the writer to send the acknack to.
258
     * @param sender Message sender interface.
259
     * @param heartbeat_was_final Final flag of the last received heartbeat.
260
     */
261
    void send_acknack(
262
            const WriterProxy* writer,
263
            RTPSMessageSenderInterface* sender,
264
            bool heartbeat_was_final);
265
266
    /**
267
     * Use the participant of this reader to send a message to certain locator.
268
     * @param message Message to be sent.
269
     * @param locators_begin Destination locators iterator begin.
270
     * @param locators_end Destination locators iterator end.
271
     * @param max_blocking_time_point Future time point where any blocking should end.
272
     */
273
    bool send_sync_nts(
274
            CDRMessage_t* message,
275
            const Locators& locators_begin,
276
            const Locators& locators_end,
277
            std::chrono::steady_clock::time_point& max_blocking_time_point);
278
279
    /**
280
     * Assert the livelines of a matched writer.
281
     * @param writer GUID of the writer to assert.
282
     */
283
    void assert_writer_liveliness(
284
            const GUID_t& writer) override;
285
286
    /**
287
     * Called just before a change is going to be deserialized.
288
     * @param [in]  change            Pointer to the change being accessed.
289
     * @param [out] wp                Writer proxy the @c change belongs to.
290
     * @param [out] is_future_change  Whether the change is in the future (i.e. there are
291
     *                                earlier unreceived changes from the same writer).
292
     *
293
     * @return Whether the change is still valid or not.
294
     */
295
    bool begin_sample_access_nts(
296
            CacheChange_t* change,
297
            WriterProxy*& wp,
298
            bool& is_future_change) override;
299
300
    /**
301
     * Called after the change has been deserialized.
302
     * @param [in] change        Pointer to the change being accessed.
303
     * @param [in] wp            Writer proxy the @c change belongs to.
304
     * @param [in] mark_as_read  Whether the @c change should be marked as read or not.
305
     */
306
    void end_sample_access_nts(
307
            CacheChange_t* change,
308
            WriterProxy*& wp,
309
            bool mark_as_read) override;
310
311
    /**
312
     * Called when the user has retrieved a change from the history.
313
     * @param change Pointer to the change to ACK
314
     * @param writer Writer proxy of the \c change.
315
     * @param mark_as_read Whether the \c change should be marked as read or not
316
     */
317
    void change_read_by_user(
318
            CacheChange_t* change,
319
            WriterProxy* writer,
320
            bool mark_as_read = true) override;
321
322
private:
323
324
    void init(
325
            RTPSParticipantImpl* pimpl,
326
            const ReaderAttributes& att);
327
328
    bool acceptMsgFrom(
329
            const GUID_t& entityGUID,
330
            WriterProxy** wp) const;
331
332
    /*!
333
     * @remarks Non thread-safe.
334
     */
335
    bool findWriterProxy(
336
            const GUID_t& writerGUID,
337
            WriterProxy** wp) const;
338
339
    void NotifyChanges(
340
            WriterProxy* wp);
341
342
    void remove_changes_from(
343
            const GUID_t& writerGUID,
344
            bool is_payload_pool_lost = false);
345
346
    //! Acknack Count
347
    uint32_t acknack_count_;
348
    //! NACKFRAG Count
349
    uint32_t nackfrag_count_;
350
    //!ReaderTimes of the StatefulReader.
351
    ReaderTimes times_;
352
    //! Vector containing pointers to all the active WriterProxies.
353
    ResourceLimitedVector<WriterProxy*> matched_writers_;
354
    //! Vector containing pointers to all the inactive, ready for reuse, WriterProxies.
355
    ResourceLimitedVector<WriterProxy*> matched_writers_pool_;
356
    //!
357
    ResourceLimitedContainerConfig proxy_changes_config_;
358
    //! True to disable positive ACKs
359
    bool disable_positive_acks_;
360
    //! False when being destroyed
361
    bool is_alive_;
362
};
363
364
} /* namespace rtps */
365
} /* namespace fastrtps */
366
} /* namespace eprosima */
367
368
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
369
370
#endif // _FASTDDS_RTPS_READER_STATEFULREADER_H_