Coverage Report

Created: 2025-07-03 06:58

/src/Fast-DDS/src/cpp/fastdds/rpc/ReplierImpl.cpp
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
#include "ReplierImpl.hpp"
16
17
#include <string>
18
19
#include <fastdds/dds/builtin/topic/PublicationBuiltinTopicData.hpp>
20
#include <fastdds/dds/builtin/topic/SubscriptionBuiltinTopicData.hpp>
21
#include <fastdds/dds/core/condition/Condition.hpp>
22
#include <fastdds/dds/core/detail/DDSReturnCode.hpp>
23
#include <fastdds/dds/core/LoanableCollection.hpp>
24
#include <fastdds/dds/core/LoanableSequence.hpp>
25
#include <fastdds/dds/core/status/PublicationMatchedStatus.hpp>
26
#include <fastdds/dds/core/status/StatusMask.hpp>
27
#include <fastdds/dds/core/status/SubscriptionMatchedStatus.hpp>
28
#include <fastdds/dds/domain/qos/ReplierQos.hpp>
29
#include <fastdds/dds/log/Log.hpp>
30
#include <fastdds/dds/rpc/RequestInfo.hpp>
31
#include <fastdds/rtps/common/SampleIdentity.hpp>
32
#include <fastdds/rtps/common/WriteParams.hpp>
33
34
#include "ServiceImpl.hpp"
35
36
namespace eprosima {
37
namespace fastdds {
38
namespace dds {
39
namespace rpc {
40
41
/**
42
 * @brief Fills the related sample identity of the request.
43
 *
44
 * This will fill the related sample identity of the request with values taken from the sample identity.
45
 * Values different from unknown are preserved.
46
 *
47
 * @param info [in,out] The request information to update.
48
 */
49
static void fill_related_sample_identity(
50
        RequestInfo& info)
51
0
{
52
    // When sending a reply, the code here expects that related_sample_identity
53
    // has the sample_identity of the corresponding request.
54
55
0
    static const rtps::SampleIdentity unknown_identity = rtps::SampleIdentity::unknown();
56
57
    // If the related guid is unknown, we consider that the request is not related to a previous one,
58
    // so we set the related sample identity to the received sample identity
59
0
    if (unknown_identity.writer_guid() == info.related_sample_identity.writer_guid())
60
0
    {
61
0
        info.related_sample_identity = info.sample_identity;
62
0
        return;
63
0
    }
64
65
    // There is a special case where only the related guid is set.
66
    // This is used in ROS 2 to convey the GUID of the reply reader.
67
    // In this case we just set the sequence number of the related sample identity
68
0
    if (unknown_identity.sequence_number() == info.related_sample_identity.sequence_number())
69
0
    {
70
0
        info.related_sample_identity.sequence_number() = info.sample_identity.sequence_number();
71
0
    }
72
0
}
73
74
ReplierImpl::ReplierImpl(
75
        ServiceImpl* service,
76
        const ReplierQos& qos)
77
0
    : replier_reader_(nullptr)
78
0
    , replier_writer_(nullptr)
79
0
    , qos_(qos)
80
0
    , service_(service)
81
0
    , enabled_(false)
82
0
    , matched_status_changed_(false)
83
0
{
84
0
}
85
86
ReplierImpl::~ReplierImpl()
87
0
{
88
0
    close();
89
0
    service_ = nullptr;
90
0
}
91
92
const std::string& ReplierImpl::get_service_name() const
93
0
{
94
0
    return service_->get_service_name();
95
0
}
96
97
ReturnCode_t ReplierImpl::send_reply(
98
        void* data,
99
        const RequestInfo& info)
100
0
{
101
0
    if (!enabled_)
102
0
    {
103
0
        EPROSIMA_LOG_ERROR(REPLIER, "Trying to send a reply with a disabled replier");
104
0
        return RETCODE_PRECONDITION_NOT_MET;
105
0
    }
106
107
0
    Time_t timeout{3, 0}; // Default timeout of 3 seconds
108
0
    auto match_status = wait_for_matching(timeout, info);
109
0
    if (RequesterMatchStatus::UNMATCHED == match_status)
110
0
    {
111
        // The writer that sent the request has been unmatched.
112
0
        EPROSIMA_LOG_WARNING(REPLIER, "Trying to send a reply to a disconnected requester");
113
0
        return RETCODE_NO_DATA;
114
0
    }
115
0
    else if (RequesterMatchStatus::PARTIALLY_MATCHED == match_status)
116
0
    {
117
        // The writer that sent the request is still matched, but the reply topic is not.
118
0
        EPROSIMA_LOG_WARNING(REPLIER, "Trying to send a reply to a partially matched requester");
119
0
        return RETCODE_TIMEOUT;
120
0
    }
121
122
0
    rtps::WriteParams wparams;
123
0
    wparams.related_sample_identity(info.related_sample_identity);
124
0
    wparams.has_more_replies(info.has_more_replies);
125
126
0
    return replier_writer_->write(data, wparams);
127
0
}
128
129
ReturnCode_t ReplierImpl::take_request(
130
        void* data,
131
        RequestInfo& info)
132
0
{
133
0
    ReturnCode_t retcode;
134
135
0
    if (!enabled_)
136
0
    {
137
0
        EPROSIMA_LOG_ERROR(REPLIER, "Trying to take a request with a disabled replier");
138
0
        return RETCODE_PRECONDITION_NOT_MET;
139
0
    }
140
141
0
    retcode = replier_reader_->take_next_sample(data, &info);
142
0
    fill_related_sample_identity(info);
143
144
0
    return retcode;
145
0
}
146
147
ReturnCode_t ReplierImpl::take_request(
148
        LoanableCollection& data,
149
        LoanableSequence<RequestInfo>& info)
150
0
{
151
0
    ReturnCode_t retcode;
152
153
0
    if (!enabled_)
154
0
    {
155
0
        EPROSIMA_LOG_ERROR(REPLIER, "Trying to take a request with a disabled replier");
156
0
        return RETCODE_PRECONDITION_NOT_MET;
157
0
    }
158
159
0
    retcode = replier_reader_->take(data, info);
160
161
    // Fill related_sample_identity attribute
162
0
    for (LoanableCollection::size_type i = 0; i < info.length(); ++i)
163
0
    {
164
0
        fill_related_sample_identity(info[i]);
165
0
    }
166
167
0
    return retcode;
168
0
}
169
170
ReturnCode_t ReplierImpl::return_loan(
171
        LoanableCollection& data,
172
        LoanableSequence<RequestInfo>& info)
173
0
{
174
0
    if (!enabled_)
175
0
    {
176
0
        EPROSIMA_LOG_ERROR(REPLIER, "Trying to return loan with a disabled replier");
177
0
        return RETCODE_PRECONDITION_NOT_MET;
178
0
    }
179
180
0
    return replier_reader_->return_loan(data, info);
181
0
}
182
183
ReturnCode_t ReplierImpl::enable()
184
0
{
185
0
    ReturnCode_t retcode = RETCODE_OK;
186
187
0
    if (!enabled_)
188
0
    {
189
0
        if (!service_)
190
0
        {
191
0
            EPROSIMA_LOG_ERROR(REPLIER, "Service is nullptr");
192
0
            return RETCODE_ERROR;
193
0
        }
194
195
0
        if (!service_->is_enabled())
196
0
        {
197
0
            EPROSIMA_LOG_ERROR(REPLIER, "Trying to enable Replier on a disabled Service");
198
0
            return RETCODE_PRECONDITION_NOT_MET;
199
0
        }
200
201
0
        retcode = create_dds_entities(qos_);
202
203
0
        if (RETCODE_OK != retcode)
204
0
        {
205
0
            EPROSIMA_LOG_ERROR(REPLIER, "Unable to enable replier");
206
            // If any error occurs, delete the created entities
207
            // This is necessary to avoid keeping requester in an inconsistent state
208
0
            delete_contained_entities();
209
0
            return retcode;
210
0
        }
211
0
        else
212
0
        {
213
0
            replier_reader_->enable();
214
0
            replier_writer_->enable();
215
0
        }
216
217
0
        enabled_ = true;
218
0
    }
219
220
0
    return retcode;
221
0
}
222
223
ReturnCode_t ReplierImpl::close()
224
0
{
225
0
    ReturnCode_t retcode = RETCODE_OK;
226
227
0
    if (enabled_)
228
0
    {
229
0
        retcode = delete_contained_entities();
230
231
0
        if (RETCODE_OK != retcode)
232
0
        {
233
0
            EPROSIMA_LOG_ERROR(REPLIER, "Error deleting DDS entities");
234
0
            return retcode;
235
0
        }
236
237
0
        enabled_ = false;
238
0
    }
239
240
0
    return retcode;
241
0
}
242
243
void ReplierImpl::on_publication_matched(
244
        DataWriter* /*writer*/,
245
        const PublicationMatchedStatus& /*info*/)
246
0
{
247
248
0
    matched_status_changed_.store(true);
249
0
    cv_.notify_one();
250
0
}
251
252
void ReplierImpl::on_subscription_matched(
253
        DataReader* /*reader*/,
254
        const SubscriptionMatchedStatus& /*info*/)
255
0
{
256
0
    matched_status_changed_.store(true);
257
0
    cv_.notify_one();
258
0
}
259
260
ReturnCode_t ReplierImpl::create_dds_entities(
261
        const ReplierQos& qos)
262
0
{
263
    // Entities are not autoenabled since the publisher and
264
    // the subscriber have autoenable_created_entities set to false.
265
266
    // Create writer for the Reply topic
267
0
    replier_writer_ =
268
0
            service_->get_publisher()->create_datawriter(
269
0
        service_->get_reply_topic(), qos.writer_qos, this, StatusMask::publication_matched());
270
271
0
    if (!replier_writer_)
272
0
    {
273
0
        EPROSIMA_LOG_ERROR(REPLIER, "Error creating replier writer");
274
0
        return RETCODE_ERROR;
275
0
    }
276
277
0
    replier_reader_ =
278
0
            service_->get_subscriber()->create_datareader(
279
0
        service_->get_request_topic(), qos.reader_qos, this, StatusMask::subscription_matched());
280
281
0
    if (!replier_reader_)
282
0
    {
283
0
        EPROSIMA_LOG_ERROR(REPLIER, "Error creating reply reader");
284
0
        return RETCODE_ERROR;
285
0
    }
286
287
    // Set the related entity key on both entities
288
0
    replier_reader_->set_related_datawriter(replier_writer_);
289
0
    replier_writer_->set_related_datareader(replier_reader_);
290
291
0
    return RETCODE_OK;
292
0
}
293
294
ReturnCode_t ReplierImpl::delete_contained_entities()
295
0
{
296
    // Check if DataWriter and DataReader can be deleted.
297
    // If not, do nothing and return an error code
298
0
    if (replier_writer_)
299
0
    {
300
0
        if (!service_->get_publisher()->can_be_deleted(replier_writer_))
301
0
        {
302
0
            EPROSIMA_LOG_ERROR(REPLIER, "Replier DataWriter cannot be deleted");
303
0
            return RETCODE_PRECONDITION_NOT_MET;
304
0
        }
305
0
    }
306
307
0
    if (replier_reader_)
308
0
    {
309
0
        if (!service_->get_subscriber()->can_be_deleted(replier_reader_))
310
0
        {
311
0
            EPROSIMA_LOG_ERROR(REPLIER, "Replier DataReader cannot be deleted");
312
0
            return RETCODE_PRECONDITION_NOT_MET;
313
0
        }
314
0
    }
315
316
    // Delete DataWriter and DataReader
317
0
    service_->get_publisher()->delete_datawriter(replier_writer_);
318
0
    replier_writer_ = nullptr;
319
0
    service_->get_subscriber()->delete_datareader(replier_reader_);
320
0
    replier_reader_ = nullptr;
321
322
0
    return RETCODE_OK;
323
0
}
324
325
ReplierImpl::RequesterMatchStatus ReplierImpl::requester_match_status(
326
        const RequestInfo& info) const
327
0
{
328
    // Check if the replier is still matched with the requester in the request topic
329
0
    PublicationBuiltinTopicData pub_data;
330
0
    if (RETCODE_OK != replier_reader_->get_matched_publication_data(pub_data, info.sample_identity.writer_guid()))
331
0
    {
332
0
        return RequesterMatchStatus::UNMATCHED;
333
0
    }
334
335
0
    auto related_guid = info.related_sample_identity.writer_guid();
336
0
    bool reply_topic_matched = false;
337
0
    if (related_guid.entityId.is_reader() && info.sample_identity.writer_guid() != related_guid)
338
0
    {
339
        // Custom related GUID (i.e. reply reader GUID) sent with the request.
340
        // Check if the replier writer is matched with that specific reader.
341
0
        SubscriptionBuiltinTopicData sub_data;
342
0
        reply_topic_matched = RETCODE_OK == replier_writer_->get_matched_subscription_data(sub_data, related_guid);
343
0
    }
344
0
    else
345
0
    {
346
        // Take the replier reader GUID from the related_datareader_key within PublicationBuiltinTopicData
347
0
        SubscriptionBuiltinTopicData related_reader_sub_data;
348
0
        reply_topic_matched = RETCODE_OK == replier_writer_->get_matched_subscription_data(related_reader_sub_data,
349
0
                        pub_data.related_datareader_key);
350
0
    }
351
352
0
    return reply_topic_matched ?
353
0
           RequesterMatchStatus::MATCHED :
354
0
           RequesterMatchStatus::PARTIALLY_MATCHED;
355
0
}
356
357
ReplierImpl::RequesterMatchStatus ReplierImpl::wait_for_matching(
358
        const fastdds::dds::Duration_t& timeout,
359
        const RequestInfo& info)
360
0
{
361
0
    Time_t current_time;
362
0
    Time_t::now(current_time);
363
0
    Time_t finish_time = current_time + timeout;
364
365
0
    RequesterMatchStatus requester_status = requester_match_status(info);
366
0
    while ((RequesterMatchStatus::MATCHED != requester_status) && current_time < finish_time)
367
0
    {
368
        // Wait for the matched status to change
369
        // Or every 100 milliseconds.
370
0
        std::unique_lock<std::mutex> lock(mtx_);
371
0
        bool res = cv_.wait_for(lock,
372
0
                        std::chrono::milliseconds(100),
373
0
                        [this]()
374
0
                        {
375
0
                            return matched_status_changed_.load();
376
0
                        });
377
378
0
        if (res)
379
0
        {
380
            // Reset the matched status changed flag
381
0
            matched_status_changed_.store(false);
382
383
            // Check if the requester is fully matched
384
0
            requester_status = requester_match_status(info);
385
0
        }
386
387
        // Update the current time
388
0
        Time_t::now(current_time);
389
0
    }
390
391
0
    return requester_status;
392
0
}
393
394
} // namespace rpc
395
} // namespace dds
396
} // namespace fastdds
397
} // namespace eprosima