/src/Fast-DDS/src/cpp/fastdds/rpc/ReplierImpl.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | #include "ReplierImpl.hpp" |
16 | | |
17 | | #include <string> |
18 | | |
19 | | #include <fastdds/dds/builtin/topic/PublicationBuiltinTopicData.hpp> |
20 | | #include <fastdds/dds/builtin/topic/SubscriptionBuiltinTopicData.hpp> |
21 | | #include <fastdds/dds/core/condition/Condition.hpp> |
22 | | #include <fastdds/dds/core/detail/DDSReturnCode.hpp> |
23 | | #include <fastdds/dds/core/LoanableCollection.hpp> |
24 | | #include <fastdds/dds/core/LoanableSequence.hpp> |
25 | | #include <fastdds/dds/core/status/PublicationMatchedStatus.hpp> |
26 | | #include <fastdds/dds/core/status/StatusMask.hpp> |
27 | | #include <fastdds/dds/core/status/SubscriptionMatchedStatus.hpp> |
28 | | #include <fastdds/dds/domain/qos/ReplierQos.hpp> |
29 | | #include <fastdds/dds/log/Log.hpp> |
30 | | #include <fastdds/dds/rpc/RequestInfo.hpp> |
31 | | #include <fastdds/rtps/common/SampleIdentity.hpp> |
32 | | #include <fastdds/rtps/common/WriteParams.hpp> |
33 | | |
34 | | #include "ServiceImpl.hpp" |
35 | | |
36 | | namespace eprosima { |
37 | | namespace fastdds { |
38 | | namespace dds { |
39 | | namespace rpc { |
40 | | |
41 | | /** |
42 | | * @brief Fills the related sample identity of the request. |
43 | | * |
44 | | * This will fill the related sample identity of the request with values taken from the sample identity. |
45 | | * Values different from unknown are preserved. |
46 | | * |
47 | | * @param info [in,out] The request information to update. |
48 | | */ |
49 | | static void fill_related_sample_identity( |
50 | | RequestInfo& info) |
51 | 0 | { |
52 | | // When sending a reply, the code here expects that related_sample_identity |
53 | | // has the sample_identity of the corresponding request. |
54 | |
|
55 | 0 | static const rtps::SampleIdentity unknown_identity = rtps::SampleIdentity::unknown(); |
56 | | |
57 | | // If the related guid is unknown, we consider that the request is not related to a previous one, |
58 | | // so we set the related sample identity to the received sample identity |
59 | 0 | if (unknown_identity.writer_guid() == info.related_sample_identity.writer_guid()) |
60 | 0 | { |
61 | 0 | info.related_sample_identity = info.sample_identity; |
62 | 0 | return; |
63 | 0 | } |
64 | | |
65 | | // There is a special case where only the related guid is set. |
66 | | // This is used in ROS 2 to convey the GUID of the reply reader. |
67 | | // In this case we just set the sequence number of the related sample identity |
68 | 0 | if (unknown_identity.sequence_number() == info.related_sample_identity.sequence_number()) |
69 | 0 | { |
70 | 0 | info.related_sample_identity.sequence_number() = info.sample_identity.sequence_number(); |
71 | 0 | } |
72 | 0 | } |
73 | | |
74 | | ReplierImpl::ReplierImpl( |
75 | | ServiceImpl* service, |
76 | | const ReplierQos& qos) |
77 | 0 | : replier_reader_(nullptr) |
78 | 0 | , replier_writer_(nullptr) |
79 | 0 | , qos_(qos) |
80 | 0 | , service_(service) |
81 | 0 | , enabled_(false) |
82 | 0 | , matched_status_changed_(false) |
83 | 0 | { |
84 | 0 | } |
85 | | |
86 | | ReplierImpl::~ReplierImpl() |
87 | 0 | { |
88 | 0 | close(); |
89 | 0 | service_ = nullptr; |
90 | 0 | } |
91 | | |
92 | | const std::string& ReplierImpl::get_service_name() const |
93 | 0 | { |
94 | 0 | return service_->get_service_name(); |
95 | 0 | } |
96 | | |
97 | | ReturnCode_t ReplierImpl::send_reply( |
98 | | void* data, |
99 | | const RequestInfo& info) |
100 | 0 | { |
101 | 0 | if (!enabled_) |
102 | 0 | { |
103 | 0 | EPROSIMA_LOG_ERROR(REPLIER, "Trying to send a reply with a disabled replier"); |
104 | 0 | return RETCODE_PRECONDITION_NOT_MET; |
105 | 0 | } |
106 | | |
107 | 0 | Time_t timeout{3, 0}; // Default timeout of 3 seconds |
108 | 0 | auto match_status = wait_for_matching(timeout, info); |
109 | 0 | if (RequesterMatchStatus::UNMATCHED == match_status) |
110 | 0 | { |
111 | | // The writer that sent the request has been unmatched. |
112 | 0 | EPROSIMA_LOG_WARNING(REPLIER, "Trying to send a reply to a disconnected requester"); |
113 | 0 | return RETCODE_NO_DATA; |
114 | 0 | } |
115 | 0 | else if (RequesterMatchStatus::PARTIALLY_MATCHED == match_status) |
116 | 0 | { |
117 | | // The writer that sent the request is still matched, but the reply topic is not. |
118 | 0 | EPROSIMA_LOG_WARNING(REPLIER, "Trying to send a reply to a partially matched requester"); |
119 | 0 | return RETCODE_TIMEOUT; |
120 | 0 | } |
121 | | |
122 | 0 | rtps::WriteParams wparams; |
123 | 0 | wparams.related_sample_identity(info.related_sample_identity); |
124 | 0 | wparams.has_more_replies(info.has_more_replies); |
125 | |
|
126 | 0 | return replier_writer_->write(data, wparams); |
127 | 0 | } |
128 | | |
129 | | ReturnCode_t ReplierImpl::take_request( |
130 | | void* data, |
131 | | RequestInfo& info) |
132 | 0 | { |
133 | 0 | ReturnCode_t retcode; |
134 | |
|
135 | 0 | if (!enabled_) |
136 | 0 | { |
137 | 0 | EPROSIMA_LOG_ERROR(REPLIER, "Trying to take a request with a disabled replier"); |
138 | 0 | return RETCODE_PRECONDITION_NOT_MET; |
139 | 0 | } |
140 | | |
141 | 0 | retcode = replier_reader_->take_next_sample(data, &info); |
142 | 0 | fill_related_sample_identity(info); |
143 | |
|
144 | 0 | return retcode; |
145 | 0 | } |
146 | | |
147 | | ReturnCode_t ReplierImpl::take_request( |
148 | | LoanableCollection& data, |
149 | | LoanableSequence<RequestInfo>& info) |
150 | 0 | { |
151 | 0 | ReturnCode_t retcode; |
152 | |
|
153 | 0 | if (!enabled_) |
154 | 0 | { |
155 | 0 | EPROSIMA_LOG_ERROR(REPLIER, "Trying to take a request with a disabled replier"); |
156 | 0 | return RETCODE_PRECONDITION_NOT_MET; |
157 | 0 | } |
158 | | |
159 | 0 | retcode = replier_reader_->take(data, info); |
160 | | |
161 | | // Fill related_sample_identity attribute |
162 | 0 | for (LoanableCollection::size_type i = 0; i < info.length(); ++i) |
163 | 0 | { |
164 | 0 | fill_related_sample_identity(info[i]); |
165 | 0 | } |
166 | |
|
167 | 0 | return retcode; |
168 | 0 | } |
169 | | |
170 | | ReturnCode_t ReplierImpl::return_loan( |
171 | | LoanableCollection& data, |
172 | | LoanableSequence<RequestInfo>& info) |
173 | 0 | { |
174 | 0 | if (!enabled_) |
175 | 0 | { |
176 | 0 | EPROSIMA_LOG_ERROR(REPLIER, "Trying to return loan with a disabled replier"); |
177 | 0 | return RETCODE_PRECONDITION_NOT_MET; |
178 | 0 | } |
179 | | |
180 | 0 | return replier_reader_->return_loan(data, info); |
181 | 0 | } |
182 | | |
183 | | ReturnCode_t ReplierImpl::enable() |
184 | 0 | { |
185 | 0 | ReturnCode_t retcode = RETCODE_OK; |
186 | |
|
187 | 0 | if (!enabled_) |
188 | 0 | { |
189 | 0 | if (!service_) |
190 | 0 | { |
191 | 0 | EPROSIMA_LOG_ERROR(REPLIER, "Service is nullptr"); |
192 | 0 | return RETCODE_ERROR; |
193 | 0 | } |
194 | | |
195 | 0 | if (!service_->is_enabled()) |
196 | 0 | { |
197 | 0 | EPROSIMA_LOG_ERROR(REPLIER, "Trying to enable Replier on a disabled Service"); |
198 | 0 | return RETCODE_PRECONDITION_NOT_MET; |
199 | 0 | } |
200 | | |
201 | 0 | retcode = create_dds_entities(qos_); |
202 | |
|
203 | 0 | if (RETCODE_OK != retcode) |
204 | 0 | { |
205 | 0 | EPROSIMA_LOG_ERROR(REPLIER, "Unable to enable replier"); |
206 | | // If any error occurs, delete the created entities |
207 | | // This is necessary to avoid keeping requester in an inconsistent state |
208 | 0 | delete_contained_entities(); |
209 | 0 | return retcode; |
210 | 0 | } |
211 | 0 | else |
212 | 0 | { |
213 | 0 | replier_reader_->enable(); |
214 | 0 | replier_writer_->enable(); |
215 | 0 | } |
216 | | |
217 | 0 | enabled_ = true; |
218 | 0 | } |
219 | | |
220 | 0 | return retcode; |
221 | 0 | } |
222 | | |
223 | | ReturnCode_t ReplierImpl::close() |
224 | 0 | { |
225 | 0 | ReturnCode_t retcode = RETCODE_OK; |
226 | |
|
227 | 0 | if (enabled_) |
228 | 0 | { |
229 | 0 | retcode = delete_contained_entities(); |
230 | |
|
231 | 0 | if (RETCODE_OK != retcode) |
232 | 0 | { |
233 | 0 | EPROSIMA_LOG_ERROR(REPLIER, "Error deleting DDS entities"); |
234 | 0 | return retcode; |
235 | 0 | } |
236 | | |
237 | 0 | enabled_ = false; |
238 | 0 | } |
239 | | |
240 | 0 | return retcode; |
241 | 0 | } |
242 | | |
243 | | void ReplierImpl::on_publication_matched( |
244 | | DataWriter* /*writer*/, |
245 | | const PublicationMatchedStatus& /*info*/) |
246 | 0 | { |
247 | |
|
248 | 0 | matched_status_changed_.store(true); |
249 | 0 | cv_.notify_one(); |
250 | 0 | } |
251 | | |
252 | | void ReplierImpl::on_subscription_matched( |
253 | | DataReader* /*reader*/, |
254 | | const SubscriptionMatchedStatus& /*info*/) |
255 | 0 | { |
256 | 0 | matched_status_changed_.store(true); |
257 | 0 | cv_.notify_one(); |
258 | 0 | } |
259 | | |
260 | | ReturnCode_t ReplierImpl::create_dds_entities( |
261 | | const ReplierQos& qos) |
262 | 0 | { |
263 | | // Entities are not autoenabled since the publisher and |
264 | | // the subscriber have autoenable_created_entities set to false. |
265 | | |
266 | | // Create writer for the Reply topic |
267 | 0 | replier_writer_ = |
268 | 0 | service_->get_publisher()->create_datawriter( |
269 | 0 | service_->get_reply_topic(), qos.writer_qos, this, StatusMask::publication_matched()); |
270 | |
|
271 | 0 | if (!replier_writer_) |
272 | 0 | { |
273 | 0 | EPROSIMA_LOG_ERROR(REPLIER, "Error creating replier writer"); |
274 | 0 | return RETCODE_ERROR; |
275 | 0 | } |
276 | | |
277 | 0 | replier_reader_ = |
278 | 0 | service_->get_subscriber()->create_datareader( |
279 | 0 | service_->get_request_topic(), qos.reader_qos, this, StatusMask::subscription_matched()); |
280 | |
|
281 | 0 | if (!replier_reader_) |
282 | 0 | { |
283 | 0 | EPROSIMA_LOG_ERROR(REPLIER, "Error creating reply reader"); |
284 | 0 | return RETCODE_ERROR; |
285 | 0 | } |
286 | | |
287 | | // Set the related entity key on both entities |
288 | 0 | replier_reader_->set_related_datawriter(replier_writer_); |
289 | 0 | replier_writer_->set_related_datareader(replier_reader_); |
290 | |
|
291 | 0 | return RETCODE_OK; |
292 | 0 | } |
293 | | |
294 | | ReturnCode_t ReplierImpl::delete_contained_entities() |
295 | 0 | { |
296 | | // Check if DataWriter and DataReader can be deleted. |
297 | | // If not, do nothing and return an error code |
298 | 0 | if (replier_writer_) |
299 | 0 | { |
300 | 0 | if (!service_->get_publisher()->can_be_deleted(replier_writer_)) |
301 | 0 | { |
302 | 0 | EPROSIMA_LOG_ERROR(REPLIER, "Replier DataWriter cannot be deleted"); |
303 | 0 | return RETCODE_PRECONDITION_NOT_MET; |
304 | 0 | } |
305 | 0 | } |
306 | | |
307 | 0 | if (replier_reader_) |
308 | 0 | { |
309 | 0 | if (!service_->get_subscriber()->can_be_deleted(replier_reader_)) |
310 | 0 | { |
311 | 0 | EPROSIMA_LOG_ERROR(REPLIER, "Replier DataReader cannot be deleted"); |
312 | 0 | return RETCODE_PRECONDITION_NOT_MET; |
313 | 0 | } |
314 | 0 | } |
315 | | |
316 | | // Delete DataWriter and DataReader |
317 | 0 | service_->get_publisher()->delete_datawriter(replier_writer_); |
318 | 0 | replier_writer_ = nullptr; |
319 | 0 | service_->get_subscriber()->delete_datareader(replier_reader_); |
320 | 0 | replier_reader_ = nullptr; |
321 | |
|
322 | 0 | return RETCODE_OK; |
323 | 0 | } |
324 | | |
325 | | ReplierImpl::RequesterMatchStatus ReplierImpl::requester_match_status( |
326 | | const RequestInfo& info) const |
327 | 0 | { |
328 | | // Check if the replier is still matched with the requester in the request topic |
329 | 0 | PublicationBuiltinTopicData pub_data; |
330 | 0 | if (RETCODE_OK != replier_reader_->get_matched_publication_data(pub_data, info.sample_identity.writer_guid())) |
331 | 0 | { |
332 | 0 | return RequesterMatchStatus::UNMATCHED; |
333 | 0 | } |
334 | | |
335 | 0 | auto related_guid = info.related_sample_identity.writer_guid(); |
336 | 0 | bool reply_topic_matched = false; |
337 | 0 | if (related_guid.entityId.is_reader() && info.sample_identity.writer_guid() != related_guid) |
338 | 0 | { |
339 | | // Custom related GUID (i.e. reply reader GUID) sent with the request. |
340 | | // Check if the replier writer is matched with that specific reader. |
341 | 0 | SubscriptionBuiltinTopicData sub_data; |
342 | 0 | reply_topic_matched = RETCODE_OK == replier_writer_->get_matched_subscription_data(sub_data, related_guid); |
343 | 0 | } |
344 | 0 | else |
345 | 0 | { |
346 | | // Take the replier reader GUID from the related_datareader_key within PublicationBuiltinTopicData |
347 | 0 | SubscriptionBuiltinTopicData related_reader_sub_data; |
348 | 0 | reply_topic_matched = RETCODE_OK == replier_writer_->get_matched_subscription_data(related_reader_sub_data, |
349 | 0 | pub_data.related_datareader_key); |
350 | 0 | } |
351 | |
|
352 | 0 | return reply_topic_matched ? |
353 | 0 | RequesterMatchStatus::MATCHED : |
354 | 0 | RequesterMatchStatus::PARTIALLY_MATCHED; |
355 | 0 | } |
356 | | |
357 | | ReplierImpl::RequesterMatchStatus ReplierImpl::wait_for_matching( |
358 | | const fastdds::dds::Duration_t& timeout, |
359 | | const RequestInfo& info) |
360 | 0 | { |
361 | 0 | Time_t current_time; |
362 | 0 | Time_t::now(current_time); |
363 | 0 | Time_t finish_time = current_time + timeout; |
364 | |
|
365 | 0 | RequesterMatchStatus requester_status = requester_match_status(info); |
366 | 0 | while ((RequesterMatchStatus::MATCHED != requester_status) && current_time < finish_time) |
367 | 0 | { |
368 | | // Wait for the matched status to change |
369 | | // Or every 100 milliseconds. |
370 | 0 | std::unique_lock<std::mutex> lock(mtx_); |
371 | 0 | bool res = cv_.wait_for(lock, |
372 | 0 | std::chrono::milliseconds(100), |
373 | 0 | [this]() |
374 | 0 | { |
375 | 0 | return matched_status_changed_.load(); |
376 | 0 | }); |
377 | |
|
378 | 0 | if (res) |
379 | 0 | { |
380 | | // Reset the matched status changed flag |
381 | 0 | matched_status_changed_.store(false); |
382 | | |
383 | | // Check if the requester is fully matched |
384 | 0 | requester_status = requester_match_status(info); |
385 | 0 | } |
386 | | |
387 | | // Update the current time |
388 | 0 | Time_t::now(current_time); |
389 | 0 | } |
390 | |
|
391 | 0 | return requester_status; |
392 | 0 | } |
393 | | |
394 | | } // namespace rpc |
395 | | } // namespace dds |
396 | | } // namespace fastdds |
397 | | } // namespace eprosima |