/src/Fast-DDS/include/fastdds/rtps/writer/StatelessWriter.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | /** |
16 | | * @file StatelessWriter.h |
17 | | */ |
18 | | #ifndef _FASTDDS_RTPS_STATELESSWRITER_H_ |
19 | | #define _FASTDDS_RTPS_STATELESSWRITER_H_ |
20 | | |
21 | | #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC |
22 | | |
23 | | #include <fastdds/rtps/common/Time_t.h> |
24 | | #include <fastdds/rtps/history/IChangePool.h> |
25 | | #include <fastdds/rtps/history/IPayloadPool.h> |
26 | | #include <fastdds/rtps/interfaces/IReaderDataFilter.hpp> |
27 | | #include <fastdds/rtps/writer/ChangeForReader.h> |
28 | | #include <fastdds/rtps/writer/ReaderLocator.h> |
29 | | #include <fastdds/rtps/writer/RTPSWriter.h> |
30 | | #include <fastrtps/utils/collections/ResourceLimitedVector.hpp> |
31 | | |
32 | | #include <condition_variable> |
33 | | #include <list> |
34 | | #include <memory> |
35 | | #include <mutex> |
36 | | |
37 | | namespace eprosima { |
38 | | namespace fastrtps { |
39 | | namespace rtps { |
40 | | |
41 | | |
42 | | /** |
43 | | * Class StatelessWriter, specialization of RTPSWriter that manages writers that don't keep state of the matched readers. |
44 | | * @ingroup WRITER_MODULE |
45 | | */ |
46 | | class StatelessWriter : public RTPSWriter |
47 | | { |
48 | | friend class RTPSParticipantImpl; |
49 | | |
50 | | protected: |
51 | | |
52 | | StatelessWriter( |
53 | | RTPSParticipantImpl* participant, |
54 | | const GUID_t& guid, |
55 | | const WriterAttributes& attributes, |
56 | | fastdds::rtps::FlowController* flow_controller, |
57 | | WriterHistory* history, |
58 | | WriterListener* listener = nullptr); |
59 | | |
60 | | StatelessWriter( |
61 | | RTPSParticipantImpl* impl, |
62 | | const GUID_t& guid, |
63 | | const WriterAttributes& att, |
64 | | const std::shared_ptr<IPayloadPool>& payload_pool, |
65 | | fastdds::rtps::FlowController* flow_controller, |
66 | | WriterHistory* hist, |
67 | | WriterListener* listen = nullptr); |
68 | | |
69 | | StatelessWriter( |
70 | | RTPSParticipantImpl* impl, |
71 | | const GUID_t& guid, |
72 | | const WriterAttributes& att, |
73 | | const std::shared_ptr<IPayloadPool>& payload_pool, |
74 | | const std::shared_ptr<IChangePool>& change_pool, |
75 | | fastdds::rtps::FlowController* flow_controller, |
76 | | WriterHistory* hist, |
77 | | WriterListener* listen = nullptr); |
78 | | |
79 | | public: |
80 | | |
81 | | virtual ~StatelessWriter(); |
82 | | |
83 | | /** |
84 | | * Add a specific change to all ReaderLocators. |
85 | | * @param change Pointer to the change. |
86 | | * @param max_blocking_time |
87 | | */ |
88 | | void unsent_change_added_to_history( |
89 | | CacheChange_t* change, |
90 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override; |
91 | | |
92 | | /** |
93 | | * Indicate the writer that a change has been removed by the history due to some HistoryQos requirement. |
94 | | * @param change Pointer to the change that is going to be removed. |
95 | | * @return True if removed correctly. |
96 | | */ |
97 | | bool change_removed_by_history( |
98 | | CacheChange_t* change) override; |
99 | | |
100 | | /** |
101 | | * Add a matched reader. |
102 | | * @param data Pointer to the ReaderProxyData object added. |
103 | | * @return True if added. |
104 | | */ |
105 | | bool matched_reader_add( |
106 | | const ReaderProxyData& data) override; |
107 | | |
108 | | /** |
109 | | * Remove a matched reader. |
110 | | * @param reader_guid GUID of the reader to remove. |
111 | | * @return True if removed. |
112 | | */ |
113 | | bool matched_reader_remove( |
114 | | const GUID_t& reader_guid) override; |
115 | | |
116 | | /** |
117 | | * Tells us if a specific Reader is matched against this writer |
118 | | * @param reader_guid GUID of the reader to check. |
119 | | * @return True if it was matched. |
120 | | */ |
121 | | bool matched_reader_is_matched( |
122 | | const GUID_t& reader_guid) override; |
123 | | |
124 | | /** |
125 | | * @brief Set a content filter to perform content filtering on this writer. |
126 | | * |
127 | | * This method sets a content filter that will be used to check whether a cache change is relevant |
128 | | * for a reader or not. |
129 | | * |
130 | | * @param filter The content filter to use on this writer. May be @c nullptr to remove the content filter |
131 | | * (i.e. treat all samples as relevant). |
132 | | */ |
133 | | void reader_data_filter( |
134 | | fastdds::rtps::IReaderDataFilter* filter) final |
135 | 0 | { |
136 | 0 | reader_data_filter_ = filter; |
137 | 0 | } |
138 | | |
139 | | /** |
140 | | * @brief Get the content filter used to perform content filtering on this writer. |
141 | | * |
142 | | * @return The content filter used on this writer. |
143 | | */ |
144 | | const fastdds::rtps::IReaderDataFilter* reader_data_filter() const final |
145 | 0 | { |
146 | 0 | return reader_data_filter_; |
147 | 0 | } |
148 | | |
149 | | /** |
150 | | * Update the Attributes of the Writer. |
151 | | * @param att New attributes |
152 | | */ |
153 | | void updateAttributes( |
154 | | const WriterAttributes& att) override |
155 | 0 | { |
156 | 0 | (void)att; |
157 | 0 | //FOR NOW THERE IS NOTHING TO UPDATE. |
158 | 0 | } |
159 | | |
160 | | bool set_fixed_locators( |
161 | | const LocatorList_t& locator_list); |
162 | | |
163 | | //!Reset the unsent changes. |
164 | | void unsent_changes_reset(); |
165 | | |
166 | | bool is_acked_by_all( |
167 | | const CacheChange_t* change) const override; |
168 | | |
169 | | bool try_remove_change( |
170 | | const std::chrono::steady_clock::time_point&, |
171 | | std::unique_lock<RecursiveTimedMutex>&) override; |
172 | | |
173 | | bool wait_for_acknowledgement( |
174 | | const SequenceNumber_t& seq, |
175 | | const std::chrono::steady_clock::time_point& max_blocking_time_point, |
176 | | std::unique_lock<RecursiveTimedMutex>& lock) override; |
177 | | |
178 | | /** |
179 | | * Send a message through this interface. |
180 | | * |
181 | | * @param message Pointer to the buffer with the message already serialized. |
182 | | * @param locator_selector RTPSMessageSenderInterface reference uses for selecting locators. The reference has to |
183 | | * be a member of this RTPSWriter object. |
184 | | * @param max_blocking_time_point Future timepoint where blocking send should end. |
185 | | */ |
186 | | bool send_nts( |
187 | | CDRMessage_t* message, |
188 | | const LocatorSelectorSender& locator_selector, |
189 | | std::chrono::steady_clock::time_point& max_blocking_time_point) const override; |
190 | | |
191 | | /** |
192 | | * Get the number of matched readers |
193 | | * @return Number of the matched readers |
194 | | */ |
195 | | inline size_t getMatchedReadersSize() const |
196 | 0 | { |
197 | 0 | std::lock_guard<RecursiveTimedMutex> guard(mp_mutex); |
198 | 0 | return matched_remote_readers_.size() |
199 | 0 | + matched_local_readers_.size() |
200 | 0 | + matched_datasharing_readers_.size(); |
201 | 0 | } |
202 | | |
203 | | /*! |
204 | | * Tells writer the sample can be sent to the network. |
205 | | * This function should be used by a fastdds::rtps::FlowController. |
206 | | * |
207 | | * @param cache_change Pointer to the CacheChange_t that represents the sample which can be sent. |
208 | | * @param group RTPSMessageGroup reference uses for generating the RTPS message. |
209 | | * @param locator_selector RTPSMessageSenderInterface reference uses for selecting locators. The reference has to |
210 | | * be a member of this RTPSWriter object. |
211 | | * @param max_blocking_time Future timepoint where blocking send should end. |
212 | | * @return Return code. |
213 | | * @note Must be non-thread safe. |
214 | | */ |
215 | | DeliveryRetCode deliver_sample_nts( |
216 | | CacheChange_t* cache_change, |
217 | | RTPSMessageGroup& group, |
218 | | LocatorSelectorSender& locator_selector, |
219 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override; |
220 | | |
221 | | LocatorSelectorSender& get_general_locator_selector() override |
222 | 0 | { |
223 | 0 | return locator_selector_; |
224 | 0 | } |
225 | | |
226 | | LocatorSelectorSender& get_async_locator_selector() override |
227 | 0 | { |
228 | 0 | return locator_selector_; |
229 | 0 | } |
230 | | |
231 | | private: |
232 | | |
233 | | void init( |
234 | | RTPSParticipantImpl* participant, |
235 | | const WriterAttributes& attributes); |
236 | | |
237 | | void get_builtin_guid(); |
238 | | |
239 | | bool has_builtin_guid(); |
240 | | |
241 | | void update_reader_info( |
242 | | bool create_sender_resources); |
243 | | |
244 | | bool datasharing_delivery( |
245 | | CacheChange_t* change); |
246 | | |
247 | | bool intraprocess_delivery( |
248 | | CacheChange_t* change, |
249 | | ReaderLocator& reader_locator); |
250 | | |
251 | | bool is_inline_qos_expected_ = false; |
252 | | LocatorList_t fixed_locators_; |
253 | | ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_remote_readers_; |
254 | | |
255 | | std::condition_variable_any unsent_changes_cond_; |
256 | | |
257 | | uint64_t current_sequence_number_sent_ = 0; |
258 | | |
259 | | FragmentNumber_t current_fragment_sent_ = 0; |
260 | | |
261 | | uint64_t last_sequence_number_sent_ = 0; |
262 | | |
263 | | ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_local_readers_; |
264 | | ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_datasharing_readers_; |
265 | | ResourceLimitedVector<std::unique_ptr<ReaderLocator>> matched_readers_pool_; |
266 | | |
267 | | LocatorSelectorSender locator_selector_; |
268 | | |
269 | | fastdds::rtps::IReaderDataFilter* reader_data_filter_ = nullptr; |
270 | | }; |
271 | | |
272 | | } /* namespace rtps */ |
273 | | } /* namespace fastrtps */ |
274 | | } /* namespace eprosima */ |
275 | | |
276 | | #endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC |
277 | | #endif /* _FASTDDS_RTPS_STATELESSWRITER_H_ */ |