Coverage Report

Created: 2022-08-24 06:19

/src/Fast-DDS/include/fastdds/rtps/reader/StatelessReader.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file StatelessReader.h
17
 */
18
19
20
#ifndef _FASTDDS_RTPS_READER_STATELESSREADER_H_
21
#define _FASTDDS_RTPS_READER_STATELESSREADER_H_
22
23
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
24
25
#include <fastdds/rtps/reader/RTPSReader.h>
26
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
27
28
#include <mutex>
29
#include <map>
30
31
namespace eprosima {
32
namespace fastrtps {
33
namespace rtps {
34
35
/**
36
 * Class StatelessReader, specialization of the RTPSReader for Best Effort Readers.
37
 * @ingroup READER_MODULE
38
 */
39
class StatelessReader : public RTPSReader
40
{
41
    friend class RTPSParticipantImpl;
42
43
public:
44
45
    virtual ~StatelessReader();
46
47
protected:
48
49
    StatelessReader(
50
            RTPSParticipantImpl* pimpl,
51
            const GUID_t& guid,
52
            const ReaderAttributes& att,
53
            ReaderHistory* hist,
54
            ReaderListener* listen = nullptr);
55
56
    StatelessReader(
57
            RTPSParticipantImpl* pimpl,
58
            const GUID_t& guid,
59
            const ReaderAttributes& att,
60
            const std::shared_ptr<IPayloadPool>& payload_pool,
61
            ReaderHistory* hist,
62
            ReaderListener* listen = nullptr);
63
64
    StatelessReader(
65
            RTPSParticipantImpl* pimpl,
66
            const GUID_t& guid,
67
            const ReaderAttributes& att,
68
            const std::shared_ptr<IPayloadPool>& payload_pool,
69
            const std::shared_ptr<IChangePool>& change_pool,
70
            ReaderHistory* hist,
71
            ReaderListener* listen = nullptr);
72
73
public:
74
75
    /**
76
     * Add a matched writer represented by a WriterProxyData object.
77
     * @param wdata Pointer to the WPD object to add.
78
     * @return True if correctly added.
79
     */
80
    bool matched_writer_add(
81
            const WriterProxyData& wdata) override;
82
83
    /**
84
     * Remove a WriterProxyData from the matached writers.
85
     * @param writer_guid GUID of the writer to remove.
86
     * @param removed_by_lease true it the writer was removed due to lease duration.
87
     * @return True if correct.
88
     */
89
    bool matched_writer_remove(
90
            const GUID_t& writer_guid,
91
            bool removed_by_lease = false) override;
92
93
    /**
94
     * Tells us if a specific Writer is matched against this reader.
95
     * @param writer_guid GUID of the writer to check.
96
     * @return True if it is matched.
97
     */
98
    bool matched_writer_is_matched(
99
            const GUID_t& writer_guid) override;
100
101
    /**
102
     * Method to indicate the reader that some change has been removed due to HistoryQos requirements.
103
     * @param change Pointer to the CacheChange_t.
104
     * @param prox Pointer to the WriterProxy.
105
     * @return True if correctly removed.
106
     */
107
    bool change_removed_by_history(
108
            CacheChange_t* change,
109
            WriterProxy* prox = nullptr) override;
110
111
    /**
112
     * Processes a new DATA message.
113
     *
114
     * @param change Pointer to the CacheChange_t.
115
     * @return true if the reader accepts messages from the.
116
     */
117
    bool processDataMsg(
118
            CacheChange_t* change) override;
119
120
    /**
121
     * Processes a new DATA FRAG message.
122
     *
123
     * @param change Pointer to the CacheChange_t.
124
     * @param sampleSize Size of the complete, assembled message.
125
     * @param fragmentStartingNum Starting number of this particular message.
126
     * @param fragmentsInSubmessage Number of fragments on this particular message.
127
     * @return true if the reader accepts message.
128
     */
129
    bool processDataFragMsg(
130
            CacheChange_t* change,
131
            uint32_t sampleSize,
132
            uint32_t fragmentStartingNum,
133
            uint16_t fragmentsInSubmessage) override;
134
135
    /**
136
     * Processes a new HEARTBEAT message.
137
     *
138
     * @return true if the reader accepts messages from the.
139
     */
140
    bool processHeartbeatMsg(
141
            const GUID_t& writerGUID,
142
            uint32_t hbCount,
143
            const SequenceNumber_t& firstSN,
144
            const SequenceNumber_t& lastSN,
145
            bool finalFlag,
146
            bool livelinessFlag) override;
147
148
    bool processGapMsg(
149
            const GUID_t& writerGUID,
150
            const SequenceNumber_t& gapStart,
151
            const SequenceNumberSet_t& gapList) override;
152
153
    /**
154
     * This method is called when a new change is received. This method calls the received_change of the History
155
     * and depending on the implementation performs different actions.
156
     * @param a_change Pointer of the change to add.
157
     * @return True if added.
158
     */
159
    bool change_received(
160
            CacheChange_t* a_change);
161
162
    /**
163
     * Read the next unread CacheChange_t from the history
164
     * @param change Pointer to pointer of CacheChange_t
165
     * @param wpout Pointer to pointer of the matched writer proxy
166
     * @return True if read.
167
     */
168
    bool nextUnreadCache(
169
            CacheChange_t** change,
170
            WriterProxy** wpout = nullptr) override;
171
172
    /**
173
     * Take the next CacheChange_t from the history;
174
     * @param change Pointer to pointer of CacheChange_t
175
     * @param wpout Pointer to pointer of the matched writer proxy
176
     * @return True if read.
177
     */
178
    bool nextUntakenCache(
179
            CacheChange_t** change,
180
            WriterProxy** wpout = nullptr) override;
181
182
    /**
183
     * Get the number of matched writers
184
     * @return Number of matched writers
185
     */
186
    inline size_t getMatchedWritersSize() const
187
0
    {
188
0
        return matched_writers_.size();
189
0
    }
190
191
    /*!
192
     * @brief Returns there is a clean state with all Writers.
193
     * StatelessReader allways return true;
194
     * @return true
195
     */
196
    bool isInCleanState() override
197
0
    {
198
0
        return true;
199
0
    }
200
201
    /**
202
     * Get the RTPS participant
203
     * @return Associated RTPS participant
204
     */
205
    inline RTPSParticipantImpl* getRTPSParticipant() const
206
0
    {
207
0
        return mp_RTPSParticipant;
208
0
    }
209
210
    /**
211
     * @brief Assert liveliness of remote writer
212
     * @param guid The guid of the remote writer
213
     */
214
    void assert_writer_liveliness(
215
            const GUID_t& guid) override;
216
217
    /**
218
     * Called just before a change is going to be deserialized.
219
     * @param [in]  change            Pointer to the change being accessed.
220
     * @param [out] wp                Writer proxy the @c change belongs to.
221
     * @param [out] is_future_change  Whether the change is in the future (i.e. there are
222
     *                                earlier unreceived changes from the same writer).
223
     *
224
     * @return Whether the change is still valid or not.
225
     */
226
    bool begin_sample_access_nts(
227
            CacheChange_t* change,
228
            WriterProxy*& wp,
229
            bool& is_future_change) override;
230
231
    /**
232
     * Called after the change has been deserialized.
233
     * @param [in] change        Pointer to the change being accessed.
234
     * @param [in] wp            Writer proxy the @c change belongs to.
235
     * @param [in] mark_as_read  Whether the @c change should be marked as read or not.
236
     */
237
    void end_sample_access_nts(
238
            CacheChange_t* change,
239
            WriterProxy*& wp,
240
            bool mark_as_read) override;
241
242
    /**
243
     * Called when the user has retrieved a change from the history.
244
     * @param change Pointer to the change to ACK
245
     * @param writer Writer proxy of the \c change.
246
     * @param mark_as_read Whether the \c change should be marked as read or not
247
     */
248
    void change_read_by_user(
249
            CacheChange_t* change,
250
            WriterProxy* writer,
251
            bool mark_as_read = true) override;
252
253
private:
254
255
    struct RemoteWriterInfo_t
256
    {
257
        GUID_t guid;
258
        GUID_t persistence_guid;
259
        bool has_manual_topic_liveliness = false;
260
        CacheChange_t* fragmented_change = nullptr;
261
        bool is_datasharing = false;
262
    };
263
264
    bool acceptMsgFrom(
265
            const GUID_t& entityId,
266
            ChangeKind_t change_kind);
267
268
    bool thereIsUpperRecordOf(
269
            const GUID_t& guid,
270
            const SequenceNumber_t& seq);
271
272
    /**
273
     * @brief A method to check if a matched writer has manual_by_topic liveliness
274
     * @param guid The guid of the remote writer
275
     * @return True if writer has manual_by_topic livelinesss
276
     */
277
    bool writer_has_manual_liveliness(
278
            const GUID_t& guid);
279
280
    void remove_changes_from(
281
            const GUID_t& writerGUID,
282
            bool is_payload_pool_lost = false);
283
284
285
    //!List of GUID_t os matched writers.
286
    //!Is only used in the Discovery, to correctly notify the user using SubscriptionListener::onSubscriptionMatched();
287
    ResourceLimitedVector<RemoteWriterInfo_t> matched_writers_;
288
};
289
290
} /* namespace rtps */
291
} /* namespace fastrtps */
292
} /* namespace eprosima */
293
294
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
295
296
#endif /* _FASTDDS_RTPS_READER_STATELESSREADER_H_ */