Coverage Report

Created: 2025-07-03 06:58

/src/Fast-DDS/src/cpp/rtps/writer/StatelessWriter.hpp
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file StatelessWriter.hpp
17
 */
18
#ifndef FASTDDS_RTPS_WRITER__STATELESSWRITER_HPP
19
#define FASTDDS_RTPS_WRITER__STATELESSWRITER_HPP
20
21
#include <condition_variable>
22
#include <memory>
23
#include <mutex>
24
#include <vector>
25
26
#include <fastdds/rtps/common/LocatorList.hpp>
27
#include <fastdds/rtps/common/Time_t.hpp>
28
#include <fastdds/rtps/history/IChangePool.hpp>
29
#include <fastdds/rtps/history/IPayloadPool.hpp>
30
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
31
#include <fastdds/rtps/writer/RTPSWriter.hpp>
32
#include <fastdds/utils/collections/ResourceLimitedVector.hpp>
33
34
#include <rtps/writer/BaseWriter.hpp>
35
#include <rtps/writer/ChangeForReader.hpp>
36
#include <rtps/writer/ReaderLocator.hpp>
37
38
namespace eprosima {
39
namespace fastdds {
40
namespace rtps {
41
42
/**
43
 * Class StatelessWriter, specialization of BaseWriter that manages writers that don't keep state of the matched readers.
44
 * @ingroup WRITER_MODULE
45
 */
46
class StatelessWriter : public BaseWriter
47
{
48
49
public:
50
51
    StatelessWriter(
52
            RTPSParticipantImpl* participant,
53
            const GUID_t& guid,
54
            const WriterAttributes& attributes,
55
            FlowController* flow_controller,
56
            WriterHistory* history,
57
            WriterListener* listener = nullptr);
58
59
    virtual ~StatelessWriter();
60
61
    void local_actions_on_writer_removed() override;
62
63
    //vvvvvvvvvvvvvvvvvvvvv [Exported API] vvvvvvvvvvvvvvvvvvvvv
64
65
    bool matched_reader_add_edp(
66
            const ReaderProxyData& data) override;
67
68
    bool matched_reader_remove(
69
            const GUID_t& reader_guid) override;
70
71
    bool matched_reader_is_matched(
72
            const GUID_t& reader_guid) final;
73
74
    void reader_data_filter(
75
            IReaderDataFilter* filter) final
76
0
    {
77
0
        reader_data_filter_ = filter;
78
0
    }
79
80
    const IReaderDataFilter* reader_data_filter() const final
81
0
    {
82
0
        return reader_data_filter_;
83
0
    }
84
85
    bool has_been_fully_delivered(
86
            const SequenceNumber_t& seq_num) const final;
87
88
    bool is_acked_by_all(
89
            const SequenceNumber_t& seq_num) const final;
90
91
    bool wait_for_all_acked(
92
            const dds::Duration_t& max_wait) final;
93
94
    void update_attributes(
95
            const WriterAttributes& att) final
96
0
    {
97
0
        static_cast<void>(att);
98
        //FOR NOW THERE IS NOTHING TO UPDATE.
99
0
    }
100
101
    bool get_disable_positive_acks() const final;
102
103
    /**
104
     * @brief Fills the provided vector with the GUIDs of the matched readers.
105
     *
106
     * @param[out] guids Vector to be filled with the GUIDs of the matched readers.
107
     * @return True if the operation was successful.
108
     */
109
    bool matched_readers_guids(
110
            std::vector<GUID_t>& guids) const final;
111
112
#ifdef FASTDDS_STATISTICS
113
    bool get_connections(
114
            fastdds::statistics::rtps::ConnectionList& connection_list) final;
115
#endif // FASTDDS_STATISTICS
116
117
    //^^^^^^^^^^^^^^^^^^^^^^ [Exported API] ^^^^^^^^^^^^^^^^^^^^^^^
118
119
    //vvvvvvvvvvvvvvvvvvvvv [BaseWriter API] vvvvvvvvvvvvvvvvvvvvvv
120
121
    void unsent_change_added_to_history(
122
            CacheChange_t* change,
123
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;
124
125
    bool change_removed_by_history(
126
            CacheChange_t* change,
127
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;
128
129
    DeliveryRetCode deliver_sample_nts(
130
            CacheChange_t* cache_change,
131
            RTPSMessageGroup& group,
132
            LocatorSelectorSender& locator_selector,
133
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) final;
134
135
    LocatorSelectorSender& get_general_locator_selector() final
136
0
    {
137
0
        return locator_selector_;
138
0
    }
139
140
    LocatorSelectorSender& get_async_locator_selector() final
141
0
    {
142
0
        return locator_selector_;
143
0
    }
144
145
    bool send_nts(
146
            const std::vector<NetworkBuffer>& buffers,
147
            const uint32_t& total_bytes,
148
            const LocatorSelectorSender& locator_selector,
149
            std::chrono::steady_clock::time_point& max_blocking_time_point) const final;
150
151
    bool process_acknack(
152
            const GUID_t& writer_guid,
153
            const GUID_t& reader_guid,
154
            uint32_t ack_count,
155
            const SequenceNumberSet_t& sn_set,
156
            bool final_flag,
157
            bool& result,
158
            fastdds::rtps::VendorId_t origin_vendor_id) final;
159
160
    bool process_nack_frag(
161
            const GUID_t& writer_guid,
162
            const GUID_t& reader_guid,
163
            uint32_t ack_count,
164
            const SequenceNumber_t& seq_num,
165
            const FragmentNumberSet_t& fragments_state,
166
            bool& result,
167
            fastdds::rtps::VendorId_t origin_vendor_id) final;
168
169
    bool try_remove_change(
170
            const std::chrono::steady_clock::time_point&,
171
            std::unique_lock<RecursiveTimedMutex>&) final;
172
173
    bool wait_for_acknowledgement(
174
            const SequenceNumber_t& seq,
175
            const std::chrono::steady_clock::time_point& max_blocking_time_point,
176
            std::unique_lock<RecursiveTimedMutex>& lock) final;
177
178
    //^^^^^^^^^^^^^^^^^^^^^^ [BaseWriter API] ^^^^^^^^^^^^^^^^^^^^^^^
179
180
    /**
181
     * Get the number of matched readers
182
     * @return Number of the matched readers
183
     */
184
    inline size_t get_matched_readers_size() const
185
0
    {
186
0
        std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
187
0
        return matched_remote_readers_.size()
188
0
               + matched_local_readers_.size()
189
0
               + matched_datasharing_readers_.size();
190
0
    }
191
192
protected:
193
194
    mutable LocatorList_t fixed_locators_;
195
196
    virtual bool send_to_fixed_locators(
197
            const std::vector<eprosima::fastdds::rtps::NetworkBuffer>& buffers,
198
            const uint32_t& total_bytes,
199
            std::chrono::steady_clock::time_point& max_blocking_time_point) const;
200
201
private:
202
203
    void init(
204
            RTPSParticipantImpl* participant,
205
            const WriterAttributes& attributes);
206
207
    void get_builtin_guid();
208
209
    bool has_builtin_guid();
210
211
    void update_reader_info(
212
            bool create_sender_resources);
213
214
    bool datasharing_delivery(
215
            CacheChange_t* change);
216
217
    bool intraprocess_delivery(
218
            CacheChange_t* change,
219
            ReaderLocator& reader_locator);
220
221
    bool is_inline_qos_expected_ = false;
222
    ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_remote_readers_;
223
224
    std::condition_variable_any unsent_changes_cond_;
225
226
    uint64_t current_sequence_number_sent_ = 0;
227
228
    FragmentNumber_t current_fragment_sent_ = 0;
229
230
    uint64_t last_sequence_number_sent_ = 0;
231
232
    ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_local_readers_;
233
    ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_datasharing_readers_;
234
    ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_readers_pool_;
235
236
    LocatorSelectorSender locator_selector_;
237
238
    IReaderDataFilter* reader_data_filter_ = nullptr;
239
};
240
241
} // namespace rtps
242
} // namespace fastdds
243
} // namespace eprosima
244
245
#endif // FASTDDS_RTPS_WRITER__STATELESSWRITER_HPP
246