Coverage Report

Created: 2022-08-24 06:19

/src/Fast-DDS/include/fastdds/rtps/writer/StatelessWriter.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file StatelessWriter.h
17
 */
18
#ifndef _FASTDDS_RTPS_STATELESSWRITER_H_
19
#define _FASTDDS_RTPS_STATELESSWRITER_H_
20
21
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
22
23
#include <fastdds/rtps/common/Time_t.h>
24
#include <fastdds/rtps/history/IChangePool.h>
25
#include <fastdds/rtps/history/IPayloadPool.h>
26
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
27
#include <fastdds/rtps/writer/ChangeForReader.h>
28
#include <fastdds/rtps/writer/ReaderLocator.h>
29
#include <fastdds/rtps/writer/RTPSWriter.h>
30
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
31
32
#include <condition_variable>
33
#include <list>
34
#include <memory>
35
#include <mutex>
36
37
namespace eprosima {
38
namespace fastrtps {
39
namespace rtps {
40
41
42
/**
43
 * Class StatelessWriter, specialization of RTPSWriter that manages writers that don't keep state of the matched readers.
44
 * @ingroup WRITER_MODULE
45
 */
46
class StatelessWriter : public RTPSWriter
47
{
48
    friend class RTPSParticipantImpl;
49
50
protected:
51
52
    StatelessWriter(
53
            RTPSParticipantImpl* participant,
54
            const GUID_t& guid,
55
            const WriterAttributes& attributes,
56
            fastdds::rtps::FlowController* flow_controller,
57
            WriterHistory* history,
58
            WriterListener* listener = nullptr);
59
60
    StatelessWriter(
61
            RTPSParticipantImpl* impl,
62
            const GUID_t& guid,
63
            const WriterAttributes& att,
64
            const std::shared_ptr<IPayloadPool>& payload_pool,
65
            fastdds::rtps::FlowController* flow_controller,
66
            WriterHistory* hist,
67
            WriterListener* listen = nullptr);
68
69
    StatelessWriter(
70
            RTPSParticipantImpl* impl,
71
            const GUID_t& guid,
72
            const WriterAttributes& att,
73
            const std::shared_ptr<IPayloadPool>& payload_pool,
74
            const std::shared_ptr<IChangePool>& change_pool,
75
            fastdds::rtps::FlowController* flow_controller,
76
            WriterHistory* hist,
77
            WriterListener* listen = nullptr);
78
79
public:
80
81
    virtual ~StatelessWriter();
82
83
    /**
84
     * Add a specific change to all ReaderLocators.
85
     * @param change Pointer to the change.
86
     * @param max_blocking_time
87
     */
88
    void unsent_change_added_to_history(
89
            CacheChange_t* change,
90
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;
91
92
    /**
93
     * Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
94
     * @param change Pointer to the change that is going to be removed.
95
     * @return True if removed correctly.
96
     */
97
    bool change_removed_by_history(
98
            CacheChange_t* change) override;
99
100
    /**
101
     * Add a matched reader.
102
     * @param data Pointer to the ReaderProxyData object added.
103
     * @return True if added.
104
     */
105
    bool matched_reader_add(
106
            const ReaderProxyData& data) override;
107
108
    /**
109
     * Remove a matched reader.
110
     * @param reader_guid GUID of the reader to remove.
111
     * @return True if removed.
112
     */
113
    bool matched_reader_remove(
114
            const GUID_t& reader_guid) override;
115
116
    /**
117
     * Tells us if a specific Reader is matched against this writer
118
     * @param reader_guid GUID of the reader to check.
119
     * @return True if it was matched.
120
     */
121
    bool matched_reader_is_matched(
122
            const GUID_t& reader_guid) override;
123
124
    /**
125
     * @brief Set a content filter to perform content filtering on this writer.
126
     *
127
     * This method sets a content filter that will be used to check whether a cache change is relevant
128
     * for a reader or not.
129
     *
130
     * @param filter  The content filter to use on this writer. May be @c nullptr to remove the content filter
131
     *                (i.e. treat all samples as relevant).
132
     */
133
    void reader_data_filter(
134
            fastdds::rtps::IReaderDataFilter* filter) final
135
0
    {
136
0
        reader_data_filter_ = filter;
137
0
    }
138
139
    /**
140
     * @brief Get the content filter used to perform content filtering on this writer.
141
     *
142
     * @return The content filter used on this writer.
143
     */
144
    const fastdds::rtps::IReaderDataFilter* reader_data_filter() const final
145
0
    {
146
0
        return reader_data_filter_;
147
0
    }
148
149
    /**
150
     * Update the Attributes of the Writer.
151
     * @param att New attributes
152
     */
153
    void updateAttributes(
154
            const WriterAttributes& att) override
155
0
    {
156
0
        (void)att;
157
0
        //FOR NOW THERE IS NOTHING TO UPDATE.
158
0
    }
159
160
    bool set_fixed_locators(
161
            const LocatorList_t& locator_list);
162
163
    //!Reset the unsent changes.
164
    void unsent_changes_reset();
165
166
    bool is_acked_by_all(
167
            const CacheChange_t* change) const override;
168
169
    bool try_remove_change(
170
            const std::chrono::steady_clock::time_point&,
171
            std::unique_lock<RecursiveTimedMutex>&) override;
172
173
    bool wait_for_acknowledgement(
174
            const SequenceNumber_t& seq,
175
            const std::chrono::steady_clock::time_point& max_blocking_time_point,
176
            std::unique_lock<RecursiveTimedMutex>& lock) override;
177
178
    /**
179
     * Send a message through this interface.
180
     *
181
     * @param message Pointer to the buffer with the message already serialized.
182
     * @param locator_selector RTPSMessageSenderInterface reference uses for selecting locators. The reference has to
183
     * be a member of this RTPSWriter object.
184
     * @param max_blocking_time_point Future timepoint where blocking send should end.
185
     */
186
    bool send_nts(
187
            CDRMessage_t* message,
188
            const LocatorSelectorSender& locator_selector,
189
            std::chrono::steady_clock::time_point& max_blocking_time_point) const override;
190
191
    /**
192
     * Get the number of matched readers
193
     * @return Number of the matched readers
194
     */
195
    inline size_t getMatchedReadersSize() const
196
0
    {
197
0
        std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
198
0
        return matched_remote_readers_.size()
199
0
               + matched_local_readers_.size()
200
0
               + matched_datasharing_readers_.size();
201
0
    }
202
203
    /*!
204
     * Tells writer the sample can be sent to the network.
205
     * This function should be used by a fastdds::rtps::FlowController.
206
     *
207
     * @param cache_change Pointer to the CacheChange_t that represents the sample which can be sent.
208
     * @param group RTPSMessageGroup reference uses for generating the RTPS message.
209
     * @param locator_selector RTPSMessageSenderInterface reference uses for selecting locators. The reference has to
210
     * be a member of this RTPSWriter object.
211
     * @param max_blocking_time Future timepoint where blocking send should end.
212
     * @return Return code.
213
     * @note Must be non-thread safe.
214
     */
215
    DeliveryRetCode deliver_sample_nts(
216
            CacheChange_t* cache_change,
217
            RTPSMessageGroup& group,
218
            LocatorSelectorSender& locator_selector,
219
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;
220
221
    LocatorSelectorSender& get_general_locator_selector() override
222
0
    {
223
0
        return locator_selector_;
224
0
    }
225
226
    LocatorSelectorSender& get_async_locator_selector() override
227
0
    {
228
0
        return locator_selector_;
229
0
    }
230
231
private:
232
233
    void init(
234
            RTPSParticipantImpl* participant,
235
            const WriterAttributes& attributes);
236
237
    void get_builtin_guid();
238
239
    bool has_builtin_guid();
240
241
    void update_reader_info(
242
            bool create_sender_resources);
243
244
    bool datasharing_delivery(
245
            CacheChange_t* change);
246
247
    bool intraprocess_delivery(
248
            CacheChange_t* change,
249
            ReaderLocator& reader_locator);
250
251
    bool is_inline_qos_expected_ = false;
252
    LocatorList_t fixed_locators_;
253
    ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_remote_readers_;
254
255
    std::condition_variable_any unsent_changes_cond_;
256
257
    uint64_t current_sequence_number_sent_ = 0;
258
259
    FragmentNumber_t current_fragment_sent_ = 0;
260
261
    uint64_t last_sequence_number_sent_ = 0;
262
263
    ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_local_readers_;
264
    ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_datasharing_readers_;
265
    ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_readers_pool_;
266
267
    LocatorSelectorSender locator_selector_;
268
269
    fastdds::rtps::IReaderDataFilter* reader_data_filter_ = nullptr;
270
};
271
272
} /* namespace rtps */
273
} /* namespace fastrtps */
274
} /* namespace eprosima */
275
276
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
277
#endif /* _FASTDDS_RTPS_STATELESSWRITER_H_ */