/src/Fast-DDS/src/cpp/rtps/writer/StatelessWriter.hpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | /** |
16 | | * @file StatelessWriter.hpp |
17 | | */ |
18 | | #ifndef FASTDDS_RTPS_WRITER__STATELESSWRITER_HPP |
19 | | #define FASTDDS_RTPS_WRITER__STATELESSWRITER_HPP |
20 | | |
21 | | #include <condition_variable> |
22 | | #include <memory> |
23 | | #include <mutex> |
24 | | #include <vector> |
25 | | |
26 | | #include <fastdds/rtps/common/LocatorList.hpp> |
27 | | #include <fastdds/rtps/common/Time_t.hpp> |
28 | | #include <fastdds/rtps/history/IChangePool.hpp> |
29 | | #include <fastdds/rtps/history/IPayloadPool.hpp> |
30 | | #include <fastdds/rtps/interfaces/IReaderDataFilter.hpp> |
31 | | #include <fastdds/rtps/writer/RTPSWriter.hpp> |
32 | | #include <fastdds/utils/collections/ResourceLimitedVector.hpp> |
33 | | |
34 | | #include <rtps/writer/BaseWriter.hpp> |
35 | | #include <rtps/writer/ChangeForReader.hpp> |
36 | | #include <rtps/writer/ReaderLocator.hpp> |
37 | | |
38 | | namespace eprosima { |
39 | | namespace fastdds { |
40 | | namespace rtps { |
41 | | |
42 | | /** |
43 | | * Class StatelessWriter, specialization of BaseWriter that manages writers that don't keep state of the matched readers. |
44 | | * @ingroup WRITER_MODULE |
45 | | */ |
46 | | class StatelessWriter : public BaseWriter |
47 | | { |
48 | | |
49 | | public: |
50 | | |
51 | | StatelessWriter( |
52 | | RTPSParticipantImpl* participant, |
53 | | const GUID_t& guid, |
54 | | const WriterAttributes& attributes, |
55 | | FlowController* flow_controller, |
56 | | WriterHistory* history, |
57 | | WriterListener* listener = nullptr); |
58 | | |
59 | | virtual ~StatelessWriter(); |
60 | | |
61 | | void local_actions_on_writer_removed() override; |
62 | | |
63 | | //vvvvvvvvvvvvvvvvvvvvv [Exported API] vvvvvvvvvvvvvvvvvvvvv |
64 | | |
65 | | bool matched_reader_add_edp( |
66 | | const ReaderProxyData& data) override; |
67 | | |
68 | | bool matched_reader_remove( |
69 | | const GUID_t& reader_guid) override; |
70 | | |
71 | | bool matched_reader_is_matched( |
72 | | const GUID_t& reader_guid) final; |
73 | | |
74 | | void reader_data_filter( |
75 | | IReaderDataFilter* filter) final |
76 | 0 | { |
77 | 0 | reader_data_filter_ = filter; |
78 | 0 | } |
79 | | |
80 | | const IReaderDataFilter* reader_data_filter() const final |
81 | 0 | { |
82 | 0 | return reader_data_filter_; |
83 | 0 | } |
84 | | |
85 | | bool has_been_fully_delivered( |
86 | | const SequenceNumber_t& seq_num) const final; |
87 | | |
88 | | bool is_acked_by_all( |
89 | | const SequenceNumber_t& seq_num) const final; |
90 | | |
91 | | bool wait_for_all_acked( |
92 | | const dds::Duration_t& max_wait) final; |
93 | | |
94 | | void update_attributes( |
95 | | const WriterAttributes& att) final |
96 | 0 | { |
97 | 0 | static_cast<void>(att); |
98 | | //FOR NOW THERE IS NOTHING TO UPDATE. |
99 | 0 | } |
100 | | |
101 | | bool get_disable_positive_acks() const final; |
102 | | |
103 | | /** |
104 | | * @brief Fills the provided vector with the GUIDs of the matched readers. |
105 | | * |
106 | | * @param[out] guids Vector to be filled with the GUIDs of the matched readers. |
107 | | * @return True if the operation was successful. |
108 | | */ |
109 | | bool matched_readers_guids( |
110 | | std::vector<GUID_t>& guids) const final; |
111 | | |
112 | | #ifdef FASTDDS_STATISTICS |
113 | | bool get_connections( |
114 | | fastdds::statistics::rtps::ConnectionList& connection_list) final; |
115 | | #endif // FASTDDS_STATISTICS |
116 | | |
117 | | //^^^^^^^^^^^^^^^^^^^^^^ [Exported API] ^^^^^^^^^^^^^^^^^^^^^^^ |
118 | | |
119 | | //vvvvvvvvvvvvvvvvvvvvv [BaseWriter API] vvvvvvvvvvvvvvvvvvvvvv |
120 | | |
121 | | void unsent_change_added_to_history( |
122 | | CacheChange_t* change, |
123 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override; |
124 | | |
125 | | bool change_removed_by_history( |
126 | | CacheChange_t* change, |
127 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override; |
128 | | |
129 | | DeliveryRetCode deliver_sample_nts( |
130 | | CacheChange_t* cache_change, |
131 | | RTPSMessageGroup& group, |
132 | | LocatorSelectorSender& locator_selector, |
133 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) final; |
134 | | |
135 | | LocatorSelectorSender& get_general_locator_selector() final |
136 | 0 | { |
137 | 0 | return locator_selector_; |
138 | 0 | } |
139 | | |
140 | | LocatorSelectorSender& get_async_locator_selector() final |
141 | 0 | { |
142 | 0 | return locator_selector_; |
143 | 0 | } |
144 | | |
145 | | bool send_nts( |
146 | | const std::vector<NetworkBuffer>& buffers, |
147 | | const uint32_t& total_bytes, |
148 | | const LocatorSelectorSender& locator_selector, |
149 | | std::chrono::steady_clock::time_point& max_blocking_time_point) const final; |
150 | | |
151 | | bool process_acknack( |
152 | | const GUID_t& writer_guid, |
153 | | const GUID_t& reader_guid, |
154 | | uint32_t ack_count, |
155 | | const SequenceNumberSet_t& sn_set, |
156 | | bool final_flag, |
157 | | bool& result, |
158 | | fastdds::rtps::VendorId_t origin_vendor_id) final; |
159 | | |
160 | | bool process_nack_frag( |
161 | | const GUID_t& writer_guid, |
162 | | const GUID_t& reader_guid, |
163 | | uint32_t ack_count, |
164 | | const SequenceNumber_t& seq_num, |
165 | | const FragmentNumberSet_t& fragments_state, |
166 | | bool& result, |
167 | | fastdds::rtps::VendorId_t origin_vendor_id) final; |
168 | | |
169 | | bool try_remove_change( |
170 | | const std::chrono::steady_clock::time_point&, |
171 | | std::unique_lock<RecursiveTimedMutex>&) final; |
172 | | |
173 | | bool wait_for_acknowledgement( |
174 | | const SequenceNumber_t& seq, |
175 | | const std::chrono::steady_clock::time_point& max_blocking_time_point, |
176 | | std::unique_lock<RecursiveTimedMutex>& lock) final; |
177 | | |
178 | | //^^^^^^^^^^^^^^^^^^^^^^ [BaseWriter API] ^^^^^^^^^^^^^^^^^^^^^^^ |
179 | | |
180 | | /** |
181 | | * Get the number of matched readers |
182 | | * @return Number of the matched readers |
183 | | */ |
184 | | inline size_t get_matched_readers_size() const |
185 | 0 | { |
186 | 0 | std::lock_guard<RecursiveTimedMutex> guard(mp_mutex); |
187 | 0 | return matched_remote_readers_.size() |
188 | 0 | + matched_local_readers_.size() |
189 | 0 | + matched_datasharing_readers_.size(); |
190 | 0 | } |
191 | | |
192 | | protected: |
193 | | |
194 | | mutable LocatorList_t fixed_locators_; |
195 | | |
196 | | virtual bool send_to_fixed_locators( |
197 | | const std::vector<eprosima::fastdds::rtps::NetworkBuffer>& buffers, |
198 | | const uint32_t& total_bytes, |
199 | | std::chrono::steady_clock::time_point& max_blocking_time_point) const; |
200 | | |
201 | | private: |
202 | | |
203 | | void init( |
204 | | RTPSParticipantImpl* participant, |
205 | | const WriterAttributes& attributes); |
206 | | |
207 | | void get_builtin_guid(); |
208 | | |
209 | | bool has_builtin_guid(); |
210 | | |
211 | | void update_reader_info( |
212 | | bool create_sender_resources); |
213 | | |
214 | | bool datasharing_delivery( |
215 | | CacheChange_t* change); |
216 | | |
217 | | bool intraprocess_delivery( |
218 | | CacheChange_t* change, |
219 | | ReaderLocator& reader_locator); |
220 | | |
221 | | bool is_inline_qos_expected_ = false; |
222 | | ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_remote_readers_; |
223 | | |
224 | | std::condition_variable_any unsent_changes_cond_; |
225 | | |
226 | | uint64_t current_sequence_number_sent_ = 0; |
227 | | |
228 | | FragmentNumber_t current_fragment_sent_ = 0; |
229 | | |
230 | | uint64_t last_sequence_number_sent_ = 0; |
231 | | |
232 | | ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_local_readers_; |
233 | | ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_datasharing_readers_; |
234 | | ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_readers_pool_; |
235 | | |
236 | | LocatorSelectorSender locator_selector_; |
237 | | |
238 | | IReaderDataFilter* reader_data_filter_ = nullptr; |
239 | | }; |
240 | | |
241 | | } // namespace rtps |
242 | | } // namespace fastdds |
243 | | } // namespace eprosima |
244 | | |
245 | | #endif // FASTDDS_RTPS_WRITER__STATELESSWRITER_HPP |
246 | | |