Coverage Report

Created: 2026-02-14 07:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/Fast-DDS/src/cpp/fastdds/domain/DomainParticipantImpl.cpp
Line
Count
Source
1
// Copyright 2019 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file DomainParticipantImpl.cpp
17
 *
18
 */
19
20
#include <fastdds/domain/DomainParticipantImpl.hpp>
21
22
#include <algorithm>
23
#include <chrono>
24
#include <string>
25
26
#include <asio.hpp>
27
28
#include <fastdds/core/policy/QosPolicyUtils.hpp>
29
#include <fastdds/dds/builtin/topic/ParticipantBuiltinTopicData.hpp>
30
#include <fastdds/dds/core/ReturnCode.hpp>
31
#include <fastdds/dds/domain/DomainParticipant.hpp>
32
#include <fastdds/dds/domain/DomainParticipantFactory.hpp>
33
#include <fastdds/dds/domain/DomainParticipantListener.hpp>
34
#include <fastdds/dds/log/Log.hpp>
35
#include <fastdds/dds/publisher/DataWriter.hpp>
36
#include <fastdds/dds/publisher/Publisher.hpp>
37
#include <fastdds/dds/subscriber/DataReader.hpp>
38
#include <fastdds/dds/subscriber/Subscriber.hpp>
39
#include <fastdds/dds/topic/IContentFilterFactory.hpp>
40
#include <fastdds/dds/topic/TypeSupport.hpp>
41
#include <fastdds/dds/xtypes/dynamic_types/DynamicPubSubType.hpp>
42
#include <fastdds/dds/xtypes/dynamic_types/DynamicType.hpp>
43
#include <fastdds/rtps/attributes/PropertyPolicy.hpp>
44
#include <fastdds/rtps/attributes/RTPSParticipantAttributes.hpp>
45
#include <fastdds/rtps/common/Guid.hpp>
46
#include <fastdds/rtps/common/GuidPrefix_t.hpp>
47
#include <fastdds/rtps/participant/ParticipantDiscoveryInfo.hpp>
48
#include <fastdds/rtps/participant/RTPSParticipant.hpp>
49
#include <fastdds/rtps/reader/ReaderDiscoveryStatus.hpp>
50
#include <fastdds/rtps/RTPSDomain.hpp>
51
#include <fastdds/rtps/writer/WriterDiscoveryStatus.hpp>
52
53
#include <fastdds/builtin/type_lookup_service/TypeLookupManager.hpp>
54
#include <fastdds/core/policy/QosPolicyUtils.hpp>
55
#include <fastdds/publisher/DataWriterImpl.hpp>
56
#include <fastdds/publisher/PublisherImpl.hpp>
57
#include <fastdds/subscriber/SubscriberImpl.hpp>
58
#include <fastdds/topic/ContentFilteredTopicImpl.hpp>
59
#include <fastdds/topic/TopicImpl.hpp>
60
#include <fastdds/topic/TopicProxy.hpp>
61
#include <fastdds/topic/TopicProxyFactory.hpp>
62
#include <fastdds/utils/QosConverters.hpp>
63
#include <fastdds/utils/TypePropagation.hpp>
64
#include <rtps/builtin/liveliness/WLP.hpp>
65
#include <rtps/domain/RTPSDomainImpl.hpp>
66
#include <utils/SystemInfo.hpp>
67
#include <xmlparser/attributes/PublisherAttributes.hpp>
68
#include <xmlparser/attributes/ReplierAttributes.hpp>
69
#include <xmlparser/attributes/RequesterAttributes.hpp>
70
#include <xmlparser/attributes/SubscriberAttributes.hpp>
71
#include <xmlparser/attributes/TopicAttributes.hpp>
72
#include <xmlparser/XMLProfileManager.h>
73
74
#include "../rpc/ReplierImpl.hpp"
75
#include "../rpc/RequesterImpl.hpp"
76
#include "../rpc/ServiceImpl.hpp"
77
78
namespace eprosima {
79
namespace fastdds {
80
namespace dds {
81
82
using xmlparser::XMLProfileManager;
83
using xmlparser::XMLP_ret;
84
using rtps::RTPSDomain;
85
using rtps::RTPSDomainImpl;
86
using rtps::RTPSParticipant;
87
using xmlparser::XMLP_ret;
88
using xmlparser::XMLProfileManager;
89
#if HAVE_SECURITY
90
using rtps::ParticipantAuthenticationInfo;
91
#endif // if HAVE_SECURITY
92
using rtps::EndpointKind_t;
93
using rtps::ReaderDiscoveryStatus;
94
using rtps::ResourceEvent;
95
using rtps::WriterDiscoveryStatus;
96
97
DomainParticipantImpl::DomainParticipantImpl(
98
        DomainParticipant* dp,
99
        DomainId_t did,
100
        const DomainParticipantQos& qos,
101
        DomainParticipantListener* listen)
102
0
    : domain_id_(did)
103
0
    , next_instance_id_(0)
104
0
    , qos_(qos)
105
0
    , rtps_participant_(nullptr)
106
0
    , participant_(dp)
107
0
    , listener_(listen)
108
0
    , default_pub_qos_(PUBLISHER_QOS_DEFAULT)
109
0
    , default_sub_qos_(SUBSCRIBER_QOS_DEFAULT)
110
0
    , default_topic_qos_(TOPIC_QOS_DEFAULT)
111
0
    , id_counter_(0)
112
#pragma warning (disable : 4355 )
113
0
    , rtps_listener_(this)
114
0
{
115
0
    participant_->impl_ = this;
116
117
0
    xmlparser::PublisherAttributes pub_attr;
118
0
    XMLProfileManager::getDefaultPublisherAttributes(pub_attr);
119
0
    utils::set_qos_from_attributes(default_pub_qos_, pub_attr);
120
121
0
    xmlparser::SubscriberAttributes sub_attr;
122
0
    XMLProfileManager::getDefaultSubscriberAttributes(sub_attr);
123
0
    utils::set_qos_from_attributes(default_sub_qos_, sub_attr);
124
125
0
    xmlparser::TopicAttributes top_attr;
126
0
    XMLProfileManager::getDefaultTopicAttributes(top_attr);
127
0
    utils::set_qos_from_attributes(default_topic_qos_, top_attr);
128
129
    // Pre calculate participant id and generated guid
130
0
    participant_id_ = qos_.wire_protocol().participant_id;
131
0
    if (!eprosima::fastdds::rtps::RTPSDomainImpl::get_instance()->create_participant_guid(participant_id_, guid_))
132
0
    {
133
0
        EPROSIMA_LOG_ERROR(DOMAIN_PARTICIPANT, "Error generating GUID for participant");
134
0
    }
135
0
    handle_ = guid_;
136
137
    /* Fill physical data properties if they are found and empty */
138
0
    std::string* property_value = fastdds::rtps::PropertyPolicyHelper::find_property(
139
0
        qos_.properties(), parameter_policy_physical_data_host);
140
0
    if (nullptr != property_value && property_value->empty())
141
0
    {
142
0
        if (SystemInfo::instance().machine_id().size() > 0)
143
0
        {
144
0
            property_value->assign(SystemInfo::instance().machine_id().to_string());
145
0
        }
146
0
        else
147
0
        {
148
0
            property_value->assign(asio::ip::host_name() + ":" + std::to_string(utils::default_domain_id()));
149
0
        }
150
0
    }
151
152
0
    property_value = fastdds::rtps::PropertyPolicyHelper::find_property(
153
0
        qos_.properties(), parameter_policy_physical_data_user);
154
0
    if (nullptr != property_value && property_value->empty())
155
0
    {
156
0
        std::string username = "unknown";
157
0
        if (RETCODE_OK == SystemInfo::get_username(username))
158
0
        {
159
0
            property_value->assign(username);
160
0
        }
161
0
    }
162
163
0
    property_value = fastdds::rtps::PropertyPolicyHelper::find_property(
164
0
        qos_.properties(), parameter_policy_physical_data_process);
165
0
    if (nullptr != property_value && property_value->empty())
166
0
    {
167
0
        property_value->assign(std::to_string(SystemInfo::instance().process_id()));
168
0
    }
169
0
}
170
171
void DomainParticipantImpl::disable()
172
0
{
173
0
    DomainParticipant* participant = get_participant();
174
0
    if (participant)
175
0
    {
176
0
        participant->set_listener(nullptr);
177
0
    }
178
179
    // The function to disable the DomainParticipantImpl is called from
180
    // DomainParticipantFactory::delete_participant() and DomainParticipantFactory destructor.
181
0
    auto rtps_participant = get_rtps_participant();
182
0
    if (rtps_participant != nullptr)
183
0
    {
184
0
        rtps_participant->set_listener(nullptr);
185
186
0
        {
187
0
            std::lock_guard<std::mutex> lock(mtx_pubs_);
188
0
            for (auto pub_it = publishers_.begin(); pub_it != publishers_.end(); ++pub_it)
189
0
            {
190
0
                pub_it->second->disable();
191
0
            }
192
0
        }
193
194
0
        {
195
0
            std::lock_guard<std::mutex> lock(mtx_subs_);
196
0
            for (auto sub_it = subscribers_.begin(); sub_it != subscribers_.end(); ++sub_it)
197
0
            {
198
0
                sub_it->second->disable();
199
0
            }
200
0
        }
201
0
    }
202
0
}
203
204
DomainParticipantImpl::~DomainParticipantImpl()
205
0
{
206
0
    {
207
0
        std::lock_guard<std::mutex> lock(mtx_services_);
208
0
        for (auto service_it = services_.begin(); service_it != services_.end(); ++service_it)
209
0
        {
210
0
            delete service_it->second;
211
0
        }
212
0
        services_.clear();
213
0
    }
214
215
0
    services_publisher_ = {};
216
0
    services_subscriber_ = {};
217
218
0
    {
219
0
        std::lock_guard<std::mutex> lock(mtx_pubs_);
220
0
        for (auto pub_it = publishers_.begin(); pub_it != publishers_.end(); ++pub_it)
221
0
        {
222
0
            delete pub_it->second;
223
0
        }
224
0
        publishers_.clear();
225
0
        publishers_by_handle_.clear();
226
0
    }
227
228
0
    {
229
0
        std::lock_guard<std::mutex> lock(mtx_subs_);
230
231
0
        for (auto sub_it = subscribers_.begin(); sub_it != subscribers_.end(); ++sub_it)
232
0
        {
233
0
            delete sub_it->second;
234
0
        }
235
0
        subscribers_.clear();
236
0
        subscribers_by_handle_.clear();
237
0
    }
238
239
0
    {
240
0
        std::lock_guard<std::mutex> lock(mtx_topics_);
241
242
0
        filtered_topics_.clear();
243
244
0
        for (auto topic_it = topics_.begin(); topic_it != topics_.end(); ++topic_it)
245
0
        {
246
0
            delete topic_it->second;
247
0
        }
248
0
        topics_.clear();
249
0
        topics_by_handle_.clear();
250
0
    }
251
252
0
    auto rtps_participant = get_rtps_participant();
253
0
    if (rtps_participant != nullptr)
254
0
    {
255
0
        RTPSDomain::removeRTPSParticipant(rtps_participant);
256
0
    }
257
258
0
    {
259
0
        std::lock_guard<std::mutex> lock(mtx_types_);
260
0
        types_.clear();
261
0
    }
262
263
0
    std::lock_guard<std::mutex> _(mtx_gs_);
264
265
    // Assert no callbacks are being executed.
266
    // Note that this should never occur since reception and events threads joined when removing rtps_participant.
267
0
    assert(!(rtps_listener_.callback_counter_ > 0));
268
269
0
    if (participant_)
270
0
    {
271
0
        participant_->impl_ = nullptr;
272
0
        delete participant_;
273
0
        participant_ = nullptr;
274
0
    }
275
0
}
276
277
ReturnCode_t DomainParticipantImpl::enable()
278
0
{
279
    // Should not have been previously enabled
280
0
    assert(get_rtps_participant() == nullptr);
281
    // Should not have failed assigning the GUID
282
0
    assert (guid_ != fastdds::rtps::GUID_t::unknown());
283
284
0
    auto qos_check = check_qos(qos_);
285
286
0
    if (RETCODE_OK != qos_check)
287
0
    {
288
0
        return qos_check;
289
0
    }
290
291
0
    fastdds::rtps::RTPSParticipantAttributes rtps_attr;
292
0
    utils::set_attributes_from_qos(rtps_attr, qos_);
293
0
    rtps_attr.participantID = participant_id_;
294
295
0
    RTPSParticipant* part = RTPSDomain::createParticipant(
296
0
        domain_id_,
297
0
        false,
298
0
        rtps_attr,
299
0
        &rtps_listener_);
300
301
0
    if (part == nullptr)
302
0
    {
303
0
        EPROSIMA_LOG_ERROR(DOMAIN_PARTICIPANT, "Problem creating RTPSParticipant");
304
0
        return RETCODE_ERROR;
305
0
    }
306
307
0
    guid_ = part->getGuid();
308
0
    handle_ = guid_;
309
310
0
    {
311
0
        std::lock_guard<std::mutex> _(mtx_gs_);
312
313
0
        rtps_participant_ = part;
314
315
0
        part->set_check_type_function(
316
0
            [this](const std::string& type_name) -> bool
317
0
            {
318
0
                return find_type(type_name).get() != nullptr;
319
0
            });
320
0
    }
321
322
0
    if (qos_.entity_factory().autoenable_created_entities)
323
0
    {
324
        // Enable topics first
325
0
        {
326
0
            std::lock_guard<std::mutex> lock(mtx_topics_);
327
328
0
            for (auto topic : topics_)
329
0
            {
330
0
                topic.second->enable_topic();
331
0
            }
332
0
        }
333
334
        // Enable publishers
335
0
        {
336
0
            std::lock_guard<std::mutex> lock(mtx_pubs_);
337
0
            for (auto pub : publishers_)
338
0
            {
339
0
                pub.second->rtps_participant_ = part;
340
0
                pub.second->user_publisher_->enable();
341
0
            }
342
0
        }
343
344
        // Enable subscribers
345
0
        {
346
0
            std::lock_guard<std::mutex> lock(mtx_subs_);
347
348
0
            for (auto sub : subscribers_)
349
0
            {
350
0
                sub.second->rtps_participant_ = part;
351
0
                sub.second->user_subscriber_->enable();
352
0
            }
353
0
        }
354
0
    }
355
356
0
    part->enable();
357
358
0
    return RETCODE_OK;
359
0
}
360
361
ReturnCode_t DomainParticipantImpl::set_qos(
362
        const DomainParticipantQos& qos)
363
0
{
364
0
    bool enabled = false;
365
0
    bool qos_should_be_updated = false;
366
0
    fastdds::rtps::RTPSParticipantAttributes patt;
367
0
    fastdds::rtps::RTPSParticipant* rtps_participant = nullptr;
368
369
0
    {
370
0
        std::lock_guard<std::mutex> _(mtx_gs_);
371
372
0
        rtps_participant = rtps_participant_;
373
0
        enabled = rtps_participant != nullptr;
374
0
        const DomainParticipantQos& qos_to_set = (&qos == &PARTICIPANT_QOS_DEFAULT) ?
375
0
                DomainParticipantFactory::get_instance()->get_default_participant_qos() : qos;
376
377
0
        if (&qos != &PARTICIPANT_QOS_DEFAULT)
378
0
        {
379
0
            ReturnCode_t ret_val = check_qos(qos_to_set);
380
0
            if (RETCODE_OK != ret_val)
381
0
            {
382
0
                return ret_val;
383
0
            }
384
0
        }
385
386
0
        if (enabled && !can_qos_be_updated(qos_, qos_to_set))
387
0
        {
388
0
            return RETCODE_IMMUTABLE_POLICY;
389
0
        }
390
391
0
        qos_should_be_updated = set_qos(qos_, qos_to_set, !enabled);
392
0
        if (enabled)
393
0
        {
394
0
            if (qos_should_be_updated)
395
0
            {
396
                // Notify the participant that there is a QoS update
397
0
                utils::set_attributes_from_qos(patt, qos_);
398
0
            }
399
0
            else
400
0
            {
401
                // Trigger update of network interfaces by calling update_attributes with current attributes
402
0
                patt = rtps_participant->get_attributes();
403
0
            }
404
0
        }
405
0
    }
406
407
0
    if (enabled)
408
0
    {
409
0
        rtps_participant->update_attributes(patt);
410
0
    }
411
412
0
    return RETCODE_OK;
413
0
}
414
415
ReturnCode_t DomainParticipantImpl::get_qos(
416
        DomainParticipantQos& qos) const
417
0
{
418
0
    std::lock_guard<std::mutex> _(mtx_gs_);
419
0
    qos = qos_;
420
0
    return RETCODE_OK;
421
0
}
422
423
const DomainParticipantQos& DomainParticipantImpl::get_qos() const
424
0
{
425
0
    std::lock_guard<std::mutex> _(mtx_gs_);
426
0
    return qos_;
427
0
}
428
429
ReturnCode_t DomainParticipantImpl::delete_publisher(
430
        const Publisher* pub)
431
0
{
432
0
    if (get_participant() != pub->get_participant())
433
0
    {
434
0
        return RETCODE_PRECONDITION_NOT_MET;
435
0
    }
436
0
    std::lock_guard<std::mutex> lock(mtx_pubs_);
437
0
    auto pit = publishers_.find(const_cast<Publisher*>(pub));
438
439
0
    if (pit != publishers_.end())
440
0
    {
441
0
        assert(pub->get_instance_handle() == pit->second->get_instance_handle()
442
0
                && "The publisher instance handle does not match the publisher implementation instance handle");
443
0
        if (pub->has_datawriters())
444
0
        {
445
0
            return RETCODE_PRECONDITION_NOT_MET;
446
0
        }
447
0
        pit->second->set_listener(nullptr);
448
0
        publishers_by_handle_.erase(publishers_by_handle_.find(pit->second->get_instance_handle()));
449
0
        delete pit->second;
450
0
        publishers_.erase(pit);
451
0
        return RETCODE_OK;
452
0
    }
453
454
0
    return RETCODE_ERROR;
455
0
}
456
457
ReturnCode_t DomainParticipantImpl::delete_subscriber(
458
        const Subscriber* sub)
459
0
{
460
0
    if (get_participant() != sub->get_participant())
461
0
    {
462
0
        return RETCODE_PRECONDITION_NOT_MET;
463
0
    }
464
0
    std::lock_guard<std::mutex> lock(mtx_subs_);
465
0
    auto sit = subscribers_.find(const_cast<Subscriber*>(sub));
466
467
0
    if (sit != subscribers_.end())
468
0
    {
469
0
        assert(sub->get_instance_handle() == sit->second->get_instance_handle()
470
0
                && "The subscriber instance handle does not match the subscriber implementation instance handle");
471
0
        if (sub->has_datareaders())
472
0
        {
473
0
            return RETCODE_PRECONDITION_NOT_MET;
474
0
        }
475
0
        sit->second->set_listener(nullptr);
476
0
        subscribers_by_handle_.erase(subscribers_by_handle_.find(sit->second->get_instance_handle()));
477
0
        delete sit->second;
478
0
        subscribers_.erase(sit);
479
0
        return RETCODE_OK;
480
0
    }
481
482
0
    return RETCODE_ERROR;
483
0
}
484
485
Topic* DomainParticipantImpl::find_topic(
486
        const std::string& topic_name,
487
        const fastdds::dds::Duration_t& timeout)
488
0
{
489
0
    auto find_fn = [this, &topic_name]()
490
0
            {
491
0
                return topics_.count(topic_name) > 0;
492
0
            };
493
494
0
    std::unique_lock<std::mutex> lock(mtx_topics_);
495
0
    if (timeout.is_infinite())
496
0
    {
497
0
        cond_topics_.wait(lock, find_fn);
498
0
    }
499
0
    else
500
0
    {
501
0
        auto duration = std::chrono::seconds(timeout.seconds) + std::chrono::nanoseconds(timeout.nanosec);
502
0
        if (!cond_topics_.wait_for(lock, duration, find_fn))
503
0
        {
504
0
            return nullptr;
505
0
        }
506
0
    }
507
508
0
    Topic* ret_val = topics_[topic_name]->create_topic()->get_topic();
509
510
0
    InstanceHandle_t topic_handle;
511
0
    create_instance_handle(topic_handle);
512
0
    ret_val->set_instance_handle(topic_handle);
513
0
    topics_by_handle_[topic_handle] = ret_val;
514
515
0
    return ret_val;
516
0
}
517
518
void DomainParticipantImpl::set_topic_listener(
519
        const TopicProxyFactory* factory,
520
        TopicImpl* impl,
521
        TopicListener* listener,
522
        const StatusMask& mask)
523
0
{
524
0
    std::lock_guard<std::mutex> lock(mtx_topics_);
525
0
    impl->set_listener(listener);
526
0
    factory->for_each([mask](const std::unique_ptr<TopicProxy>& proxy)
527
0
            {
528
0
                proxy->get_topic()->status_mask_ = mask;
529
0
            });
530
0
}
531
532
ReturnCode_t DomainParticipantImpl::delete_topic(
533
        const Topic* topic)
534
0
{
535
0
    if (topic == nullptr)
536
0
    {
537
0
        return RETCODE_BAD_PARAMETER;
538
0
    }
539
540
0
    std::lock_guard<std::mutex> lock(mtx_topics_);
541
0
    auto handle_it = std::find_if(topics_by_handle_.begin(), topics_by_handle_.end(),
542
0
                    [topic](const decltype(topics_by_handle_)::value_type& item)
543
0
                    {
544
0
                        return item.second == topic;
545
0
                    });
546
0
    if (handle_it != topics_by_handle_.end())
547
0
    {
548
0
        auto it = topics_.find(topic->get_name());
549
0
        assert(it != topics_.end() && "Topic found by handle but factory not found");
550
0
        InstanceHandle_t handle = topic->get_instance_handle();
551
552
0
        TopicProxy* proxy = dynamic_cast<TopicProxy*>(topic->get_impl());
553
0
        assert(nullptr != proxy);
554
0
        auto ret_code = it->second->delete_topic(proxy);
555
0
        if (RETCODE_OK == ret_code)
556
0
        {
557
0
            topics_by_handle_.erase(handle);
558
559
0
            if (it->second->can_be_deleted())
560
0
            {
561
0
                auto factory = it->second;
562
0
                topics_.erase(it);
563
0
                delete factory;
564
0
            }
565
0
        }
566
0
        return ret_code;
567
0
    }
568
569
0
    return RETCODE_PRECONDITION_NOT_MET;
570
0
}
571
572
ContentFilteredTopic* DomainParticipantImpl::create_contentfilteredtopic(
573
        const std::string& name,
574
        Topic* related_topic,
575
        const std::string& filter_expression,
576
        const std::vector<std::string>& expression_parameters,
577
        const char* filter_class_name)
578
0
{
579
0
    if ((nullptr == related_topic) || (nullptr == filter_class_name))
580
0
    {
581
0
        return nullptr;
582
0
    }
583
584
0
    std::lock_guard<std::mutex> lock(mtx_topics_);
585
586
    // Check there is no Topic with the same name
587
0
    if ((topics_.find(name) != topics_.end()) ||
588
0
            (filtered_topics_.find(name) != filtered_topics_.end()))
589
0
    {
590
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Topic with name : " << name << " already exists");
591
0
        return nullptr;
592
0
    }
593
594
0
    if (related_topic->get_participant() != get_participant())
595
0
    {
596
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Creating ContentFilteredTopic with name " << name <<
597
0
                ": related_topic not from this participant");
598
0
        return nullptr;
599
0
    }
600
601
0
    IContentFilterFactory* filter_factory = find_content_filter_factory(filter_class_name);
602
0
    if (nullptr == filter_factory)
603
0
    {
604
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Could not find factory for filter class " << filter_class_name);
605
0
        return nullptr;
606
0
    }
607
608
0
    if (expression_parameters.size() > qos_.allocation().content_filter.expression_parameters.maximum)
609
0
    {
610
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Number of expression parameters exceeds maximum allocation limit: "
611
0
                << expression_parameters.size() << " > "
612
0
                << qos_.allocation().content_filter.expression_parameters.maximum);
613
0
        return nullptr;
614
0
    }
615
616
0
    if (expression_parameters.size() > 100)
617
0
    {
618
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Number of expression parameters exceeds maximum protocol limit: "
619
0
                << expression_parameters.size() << " > 100");
620
0
        return nullptr;
621
0
    }
622
623
0
    TopicProxy* topic_impl = dynamic_cast<TopicProxy*>(related_topic->get_impl());
624
0
    assert(nullptr != topic_impl);
625
0
    const TypeSupport& type = topic_impl->get_type();
626
0
    LoanableSequence<const char*>::size_type n_params;
627
0
    n_params = static_cast<LoanableSequence<const char*>::size_type>(expression_parameters.size());
628
0
    LoanableSequence<const char*> filter_parameters(n_params);
629
0
    filter_parameters.length(n_params);
630
0
    while (n_params > 0)
631
0
    {
632
0
        n_params--;
633
0
        filter_parameters[n_params] = expression_parameters[n_params].c_str();
634
0
    }
635
636
    // Tell filter factory to compile the expression
637
0
    IContentFilter* filter_instance = nullptr;
638
0
    if (RETCODE_OK !=
639
0
            filter_factory->create_content_filter(filter_class_name, related_topic->get_type_name().c_str(),
640
0
            type.get(), filter_expression.c_str(), filter_parameters, filter_instance))
641
0
    {
642
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Could not create filter of class " << filter_class_name << " for expression \"" <<
643
0
                filter_expression);
644
0
        return nullptr;
645
0
    }
646
647
0
    ContentFilteredTopic* topic;
648
0
    topic = new ContentFilteredTopic(name, related_topic, filter_expression, expression_parameters);
649
0
    ContentFilteredTopicImpl* content_topic_impl = static_cast<ContentFilteredTopicImpl*>(topic->get_impl());
650
0
    content_topic_impl->filter_property.filter_class_name = filter_class_name;
651
0
    content_topic_impl->filter_factory = filter_factory;
652
0
    content_topic_impl->filter_instance = filter_instance;
653
0
    content_topic_impl->update_signature();
654
655
    // Save the topic into the map
656
0
    filtered_topics_.emplace(std::make_pair(name, std::unique_ptr<ContentFilteredTopic>(topic)));
657
658
0
    return topic;
659
0
}
660
661
ReturnCode_t DomainParticipantImpl::delete_contentfilteredtopic(
662
        const ContentFilteredTopic* topic)
663
0
{
664
0
    if (topic == nullptr)
665
0
    {
666
0
        return RETCODE_BAD_PARAMETER;
667
0
    }
668
669
0
    std::lock_guard<std::mutex> lock(mtx_topics_);
670
0
    auto it = filtered_topics_.find(topic->get_name());
671
672
0
    if (it != filtered_topics_.end())
673
0
    {
674
0
        if (it->second->get_impl()->is_referenced())
675
0
        {
676
0
            return RETCODE_PRECONDITION_NOT_MET;
677
0
        }
678
0
        filtered_topics_.erase(it);
679
0
        return RETCODE_OK;
680
0
    }
681
682
0
    return RETCODE_PRECONDITION_NOT_MET;
683
0
}
684
685
ReturnCode_t DomainParticipantImpl::register_content_filter_factory(
686
        const char* filter_class_name,
687
        IContentFilterFactory* const filter_factory)
688
0
{
689
0
    if (nullptr == filter_factory || nullptr == filter_class_name || strlen(filter_class_name) > 255)
690
0
    {
691
0
        return RETCODE_BAD_PARAMETER;
692
0
    }
693
694
0
    std::lock_guard<std::mutex> lock(mtx_topics_);
695
0
    auto it = filter_factories_.find(filter_class_name);
696
0
    if ((it != filter_factories_.end()) || (0 == strcmp(filter_class_name, FASTDDS_SQLFILTER_NAME)))
697
0
    {
698
0
        return RETCODE_PRECONDITION_NOT_MET;
699
0
    }
700
701
0
    filter_factories_[filter_class_name] = filter_factory;
702
0
    return RETCODE_OK;
703
0
}
704
705
IContentFilterFactory* DomainParticipantImpl::lookup_content_filter_factory(
706
        const char* filter_class_name)
707
0
{
708
0
    if (nullptr == filter_class_name)
709
0
    {
710
0
        return nullptr;
711
0
    }
712
713
0
    std::lock_guard<std::mutex> lock(mtx_topics_);
714
0
    auto it = filter_factories_.find(filter_class_name);
715
0
    if ((it == filter_factories_.end()) || (it->first == FASTDDS_SQLFILTER_NAME))
716
0
    {
717
0
        return nullptr;
718
0
    }
719
0
    return it->second;
720
0
}
721
722
ReturnCode_t DomainParticipantImpl::unregister_content_filter_factory(
723
        const char* filter_class_name)
724
0
{
725
0
    if (nullptr == filter_class_name)
726
0
    {
727
0
        return RETCODE_BAD_PARAMETER;
728
0
    }
729
730
0
    std::lock_guard<std::mutex> lock(mtx_topics_);
731
0
    auto it = filter_factories_.find(filter_class_name);
732
0
    if ((it == filter_factories_.end()) || (it->first == FASTDDS_SQLFILTER_NAME))
733
0
    {
734
0
        return RETCODE_PRECONDITION_NOT_MET;
735
0
    }
736
737
0
    for (auto& topic : filtered_topics_)
738
0
    {
739
0
        if (topic.second->impl_->filter_property.filter_class_name == filter_class_name)
740
0
        {
741
0
            return RETCODE_PRECONDITION_NOT_MET;
742
0
        }
743
0
    }
744
745
0
    for (auto& pub : publishers_)
746
0
    {
747
0
        for (auto& topic : pub.second->writers_)
748
0
        {
749
0
            for (auto& wr : topic.second)
750
0
            {
751
0
                wr->filter_is_being_removed(filter_class_name);
752
0
            }
753
0
        }
754
0
    }
755
756
0
    filter_factories_.erase(it);
757
758
0
    return RETCODE_OK;
759
0
}
760
761
IContentFilterFactory* DomainParticipantImpl::find_content_filter_factory(
762
        const char* filter_class_name)
763
0
{
764
0
    auto it = filter_factories_.find(filter_class_name);
765
0
    if (it != filter_factories_.end())
766
0
    {
767
0
        return it->second;
768
0
    }
769
770
0
    if (0 != strcmp(filter_class_name, FASTDDS_SQLFILTER_NAME))
771
0
    {
772
0
        return nullptr;
773
0
    }
774
775
0
    return &dds_sql_filter_factory_;
776
0
}
777
778
InstanceHandle_t DomainParticipantImpl::get_instance_handle() const
779
0
{
780
0
    return handle_;
781
0
}
782
783
const fastdds::rtps::GUID_t& DomainParticipantImpl::guid() const
784
0
{
785
0
    return guid_;
786
0
}
787
788
Publisher* DomainParticipantImpl::create_publisher(
789
        const PublisherQos& qos,
790
        PublisherListener* listener,
791
        const StatusMask& mask)
792
0
{
793
0
    return create_publisher(qos, nullptr, listener, mask);
794
0
}
795
796
Publisher* DomainParticipantImpl::create_publisher(
797
        const PublisherQos& qos,
798
        PublisherImpl** impl,
799
        PublisherListener* listener,
800
        const StatusMask& mask)
801
0
{
802
0
    if (RETCODE_OK != PublisherImpl::check_qos(qos))
803
0
    {
804
        // The PublisherImpl::check_qos() function is not yet implemented and always returns RETCODE_OK.
805
        // It will be implemented in future releases of Fast DDS.
806
        // EPROSIMA_LOG_ERROR(PARTICIPANT, "PublisherQos inconsistent or not supported");
807
        // return nullptr;
808
0
    }
809
810
    //TODO CONSTRUIR LA IMPLEMENTACION DENTRO DEL OBJETO DEL USUARIO.
811
0
    PublisherImpl* pubimpl = create_publisher_impl(qos, listener);
812
0
    Publisher* pub = new Publisher(pubimpl, mask);
813
0
    pubimpl->user_publisher_ = pub;
814
0
    pubimpl->rtps_participant_ = get_rtps_participant();
815
0
    bool enabled = get_rtps_participant() != nullptr;
816
817
    // Create InstanceHandle for the new publisher
818
0
    InstanceHandle_t pub_handle;
819
0
    create_instance_handle(pub_handle);
820
0
    pubimpl->handle_ = pub_handle;
821
822
    //SAVE THE PUBLISHER INTO MAPS
823
0
    std::lock_guard<std::mutex> lock(mtx_pubs_);
824
0
    publishers_by_handle_[pub_handle] = pub;
825
0
    publishers_[pub] = pubimpl;
826
827
    // Enable publisher if appropriate
828
0
    if (enabled && qos_.entity_factory().autoenable_created_entities)
829
0
    {
830
0
        ReturnCode_t ret_publisher_enable = pub->enable();
831
0
        assert(RETCODE_OK == ret_publisher_enable);
832
0
        (void)ret_publisher_enable;
833
0
    }
834
835
0
    if (impl)
836
0
    {
837
0
        *impl = pubimpl;
838
0
    }
839
840
0
    return pub;
841
0
}
842
843
Publisher* DomainParticipantImpl::create_publisher_with_profile(
844
        const std::string& profile_name,
845
        PublisherListener* listener,
846
        const StatusMask& mask)
847
0
{
848
    // TODO (ILG): Change when we have full XML support for DDS QoS profiles
849
0
    xmlparser::PublisherAttributes attr;
850
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fillPublisherAttributes(profile_name, attr))
851
0
    {
852
0
        PublisherQos qos = default_pub_qos_;
853
0
        utils::set_qos_from_attributes(qos, attr);
854
0
        return create_publisher(qos, listener, mask);
855
0
    }
856
857
0
    return nullptr;
858
0
}
859
860
PublisherImpl* DomainParticipantImpl::create_publisher_impl(
861
        const PublisherQos& qos,
862
        PublisherListener* listener)
863
0
{
864
0
    return new PublisherImpl(this, qos, listener);
865
0
}
866
867
/* TODO
868
   Subscriber* DomainParticipantImpl::get_builtin_subscriber()
869
   {
870
    EPROSIMA_LOG_ERROR(PARTICIPANT, "Not implemented.");
871
    return nullptr;
872
   }
873
 */
874
875
ReturnCode_t DomainParticipantImpl::ignore_participant(
876
        const InstanceHandle_t& handle)
877
0
{
878
0
    return (nullptr == rtps_participant_) ? RETCODE_NOT_ENABLED :
879
0
           rtps_participant_->ignore_participant(iHandle2GUID(handle).guidPrefix) ? RETCODE_OK :
880
0
           RETCODE_BAD_PARAMETER;
881
0
}
882
883
/* TODO
884
   bool DomainParticipantImpl::ignore_topic(
885
        const InstanceHandle_t& handle)
886
   {
887
    (void)handle;
888
    EPROSIMA_LOG_ERROR(PARTICIPANT, "Not implemented.");
889
    return false;
890
   }
891
 */
892
893
bool DomainParticipantImpl::ignore_publication(
894
        const InstanceHandle_t& handle)
895
0
{
896
0
    static_cast<void>(handle);
897
0
    EPROSIMA_LOG_ERROR(PARTICIPANT, "Not implemented.");
898
0
    return false;
899
0
}
900
901
bool DomainParticipantImpl::ignore_subscription(
902
        const InstanceHandle_t& handle)
903
0
{
904
0
    static_cast<void>(handle);
905
0
    EPROSIMA_LOG_ERROR(PARTICIPANT, "Not implemented.");
906
0
    return false;
907
0
}
908
909
DomainId_t DomainParticipantImpl::get_domain_id() const
910
0
{
911
0
    return domain_id_;
912
0
}
913
914
ReturnCode_t DomainParticipantImpl::delete_contained_entities()
915
0
{
916
0
    bool can_be_deleted = true;
917
918
0
    std::lock_guard<std::mutex> lock_subscribers(mtx_subs_);
919
920
0
    for (auto subscriber : subscribers_)
921
0
    {
922
0
        can_be_deleted = subscriber.second->can_be_deleted();
923
0
        if (!can_be_deleted)
924
0
        {
925
0
            return RETCODE_PRECONDITION_NOT_MET;
926
0
        }
927
0
    }
928
929
0
    std::lock_guard<std::mutex> lock_publishers(mtx_pubs_);
930
931
932
933
0
    for (auto publisher : publishers_)
934
0
    {
935
0
        can_be_deleted = publisher.second->can_be_deleted();
936
0
        if (!can_be_deleted)
937
0
        {
938
0
            return RETCODE_PRECONDITION_NOT_MET;
939
0
        }
940
0
    }
941
942
0
    ReturnCode_t ret_code = RETCODE_OK;
943
944
0
    for (auto& subscriber : subscribers_)
945
0
    {
946
0
        ret_code = subscriber.first->delete_contained_entities();
947
0
        if (RETCODE_OK != ret_code)
948
0
        {
949
0
            return RETCODE_ERROR;
950
0
        }
951
0
    }
952
953
0
    auto it_subs = subscribers_.begin();
954
0
    while (it_subs != subscribers_.end())
955
0
    {
956
0
        it_subs->second->set_listener(nullptr);
957
0
        subscribers_by_handle_.erase(it_subs->second->get_subscriber()->get_instance_handle());
958
0
        delete it_subs->second;
959
0
        it_subs = subscribers_.erase(it_subs);
960
0
    }
961
962
0
    for (auto& publisher : publishers_)
963
0
    {
964
0
        ret_code = publisher.first->delete_contained_entities();
965
0
        if (RETCODE_OK != ret_code)
966
0
        {
967
0
            return RETCODE_ERROR;
968
0
        }
969
0
    }
970
971
0
    auto it_pubs = publishers_.begin();
972
0
    while (it_pubs != publishers_.end())
973
0
    {
974
0
        it_pubs->second->set_listener(nullptr);
975
0
        publishers_by_handle_.erase(it_pubs->second->get_publisher()->get_instance_handle());
976
0
        delete it_pubs->second;
977
0
        it_pubs = publishers_.erase(it_pubs);
978
0
    }
979
980
0
    services_publisher_ = {};
981
0
    services_subscriber_ = {};
982
983
0
    std::lock_guard<std::mutex> lock_topics(mtx_topics_);
984
985
0
    filtered_topics_.clear();
986
0
    topics_by_handle_.clear();
987
988
0
    auto it_topics = topics_.begin();
989
0
    while (it_topics != topics_.end())
990
0
    {
991
0
        delete it_topics->second;
992
0
        it_topics = topics_.erase(it_topics);
993
0
    }
994
995
0
    return RETCODE_OK;
996
0
}
997
998
ReturnCode_t DomainParticipantImpl::assert_liveliness()
999
0
{
1000
0
    fastdds::rtps::RTPSParticipant* rtps_participant = get_rtps_participant();
1001
0
    if (rtps_participant == nullptr)
1002
0
    {
1003
0
        return RETCODE_NOT_ENABLED;
1004
0
    }
1005
1006
0
    if (rtps_participant->wlp() != nullptr)
1007
0
    {
1008
0
        if (rtps_participant->wlp()->assert_liveliness_manual_by_participant())
1009
0
        {
1010
0
            return RETCODE_OK;
1011
0
        }
1012
0
    }
1013
0
    else
1014
0
    {
1015
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Invalid WLP, cannot assert liveliness of participant");
1016
0
    }
1017
0
    return RETCODE_ERROR;
1018
0
}
1019
1020
ReturnCode_t DomainParticipantImpl::set_default_publisher_qos(
1021
        const PublisherQos& qos)
1022
0
{
1023
0
    if (&qos == &PUBLISHER_QOS_DEFAULT)
1024
0
    {
1025
0
        reset_default_publisher_qos();
1026
0
        return RETCODE_OK;
1027
0
    }
1028
1029
0
    ReturnCode_t ret_val = PublisherImpl::check_qos(qos);
1030
0
    if (RETCODE_OK != ret_val)
1031
0
    {
1032
        // The PublisherImpl::check_qos() function is not yet implemented and always returns RETCODE_OK.
1033
        // It will be implemented in future releases of Fast DDS.
1034
        // return ret_val;
1035
0
    }
1036
0
    PublisherImpl::set_qos(default_pub_qos_, qos, true);
1037
0
    return RETCODE_OK;
1038
0
}
1039
1040
void DomainParticipantImpl::reset_default_publisher_qos()
1041
0
{
1042
    // TODO (ILG): Change when we have full XML support for DDS QoS profiles
1043
0
    PublisherImpl::set_qos(default_pub_qos_, PUBLISHER_QOS_DEFAULT, true);
1044
0
    xmlparser::PublisherAttributes attr;
1045
0
    XMLProfileManager::getDefaultPublisherAttributes(attr);
1046
0
    utils::set_qos_from_attributes(default_pub_qos_, attr);
1047
0
}
1048
1049
const PublisherQos& DomainParticipantImpl::get_default_publisher_qos() const
1050
0
{
1051
0
    return default_pub_qos_;
1052
0
}
1053
1054
ReturnCode_t DomainParticipantImpl::get_publisher_qos_from_profile(
1055
        const std::string& profile_name,
1056
        PublisherQos& qos) const
1057
0
{
1058
0
    xmlparser::PublisherAttributes attr;
1059
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fillPublisherAttributes(profile_name, attr))
1060
0
    {
1061
0
        qos = default_pub_qos_;
1062
0
        utils::set_qos_from_attributes(qos, attr);
1063
0
        return RETCODE_OK;
1064
0
    }
1065
1066
0
    return RETCODE_BAD_PARAMETER;
1067
0
}
1068
1069
ReturnCode_t DomainParticipantImpl::get_publisher_qos_from_xml(
1070
        const std::string& xml,
1071
        PublisherQos& qos) const
1072
0
{
1073
0
    xmlparser::PublisherAttributes attr;
1074
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fill_publisher_attributes_from_xml(xml, attr, false))
1075
0
    {
1076
0
        qos = default_pub_qos_;
1077
0
        utils::set_qos_from_attributes(qos, attr);
1078
0
        return RETCODE_OK;
1079
0
    }
1080
1081
0
    return RETCODE_BAD_PARAMETER;
1082
0
}
1083
1084
ReturnCode_t DomainParticipantImpl::get_publisher_qos_from_xml(
1085
        const std::string& xml,
1086
        PublisherQos& qos,
1087
        const std::string& profile_name) const
1088
0
{
1089
0
    if (profile_name.empty())
1090
0
    {
1091
0
        EPROSIMA_LOG_ERROR(DOMAIN_PARTICIPANT, "Provided profile name must be non-empty");
1092
0
        return RETCODE_BAD_PARAMETER;
1093
0
    }
1094
1095
0
    xmlparser::PublisherAttributes attr;
1096
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fill_publisher_attributes_from_xml(xml, attr, true, profile_name))
1097
0
    {
1098
0
        qos = default_pub_qos_;
1099
0
        utils::set_qos_from_attributes(qos, attr);
1100
0
        return RETCODE_OK;
1101
0
    }
1102
1103
0
    return RETCODE_BAD_PARAMETER;
1104
0
}
1105
1106
ReturnCode_t DomainParticipantImpl::get_default_publisher_qos_from_xml(
1107
        const std::string& xml,
1108
        PublisherQos& qos) const
1109
0
{
1110
0
    xmlparser::PublisherAttributes attr;
1111
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fill_default_publisher_attributes_from_xml(xml, attr, true))
1112
0
    {
1113
0
        qos = default_pub_qos_;
1114
0
        utils::set_qos_from_attributes(qos, attr);
1115
0
        return RETCODE_OK;
1116
0
    }
1117
1118
0
    return RETCODE_BAD_PARAMETER;
1119
0
}
1120
1121
ReturnCode_t DomainParticipantImpl::set_default_subscriber_qos(
1122
        const SubscriberQos& qos)
1123
0
{
1124
0
    if (&qos == &SUBSCRIBER_QOS_DEFAULT)
1125
0
    {
1126
0
        reset_default_subscriber_qos();
1127
0
        return RETCODE_OK;
1128
0
    }
1129
0
    ReturnCode_t check_result = SubscriberImpl::check_qos(qos);
1130
0
    if (RETCODE_OK != check_result)
1131
0
    {
1132
        // The SubscriberImpl::check_qos() function is not yet implemented and always returns RETCODE_OK.
1133
        // It will be implemented in future releases of Fast DDS.
1134
        // return check_result;
1135
0
    }
1136
0
    SubscriberImpl::set_qos(default_sub_qos_, qos, true);
1137
0
    return RETCODE_OK;
1138
0
}
1139
1140
void DomainParticipantImpl::reset_default_subscriber_qos()
1141
0
{
1142
    // TODO (ILG): Change when we have full XML support for DDS QoS profiles
1143
0
    SubscriberImpl::set_qos(default_sub_qos_, SUBSCRIBER_QOS_DEFAULT, true);
1144
0
    xmlparser::SubscriberAttributes attr;
1145
0
    XMLProfileManager::getDefaultSubscriberAttributes(attr);
1146
0
    utils::set_qos_from_attributes(default_sub_qos_, attr);
1147
0
}
1148
1149
const SubscriberQos& DomainParticipantImpl::get_default_subscriber_qos() const
1150
0
{
1151
0
    return default_sub_qos_;
1152
0
}
1153
1154
ReturnCode_t DomainParticipantImpl::get_subscriber_qos_from_profile(
1155
        const std::string& profile_name,
1156
        SubscriberQos& qos) const
1157
0
{
1158
0
    xmlparser::SubscriberAttributes attr;
1159
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fillSubscriberAttributes(profile_name, attr))
1160
0
    {
1161
0
        qos = default_sub_qos_;
1162
0
        utils::set_qos_from_attributes(qos, attr);
1163
0
        return RETCODE_OK;
1164
0
    }
1165
1166
0
    return RETCODE_BAD_PARAMETER;
1167
0
}
1168
1169
ReturnCode_t DomainParticipantImpl::get_subscriber_qos_from_xml(
1170
        const std::string& xml,
1171
        SubscriberQos& qos) const
1172
0
{
1173
0
    xmlparser::SubscriberAttributes attr;
1174
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fill_subscriber_attributes_from_xml(xml, attr, false))
1175
0
    {
1176
0
        qos = default_sub_qos_;
1177
0
        utils::set_qos_from_attributes(qos, attr);
1178
0
        return RETCODE_OK;
1179
0
    }
1180
1181
0
    return RETCODE_BAD_PARAMETER;
1182
0
}
1183
1184
ReturnCode_t DomainParticipantImpl::get_subscriber_qos_from_xml(
1185
        const std::string& xml,
1186
        SubscriberQos& qos,
1187
        const std::string& profile_name) const
1188
0
{
1189
0
    if (profile_name.empty())
1190
0
    {
1191
0
        EPROSIMA_LOG_ERROR(DOMAIN_PARTICIPANT, "Provided profile name must be non-empty");
1192
0
        return RETCODE_BAD_PARAMETER;
1193
0
    }
1194
1195
0
    xmlparser::SubscriberAttributes attr;
1196
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fill_subscriber_attributes_from_xml(xml, attr, true, profile_name))
1197
0
    {
1198
0
        qos = default_sub_qos_;
1199
0
        utils::set_qos_from_attributes(qos, attr);
1200
0
        return RETCODE_OK;
1201
0
    }
1202
1203
0
    return RETCODE_BAD_PARAMETER;
1204
0
}
1205
1206
ReturnCode_t DomainParticipantImpl::get_default_subscriber_qos_from_xml(
1207
        const std::string& xml,
1208
        SubscriberQos& qos) const
1209
0
{
1210
0
    xmlparser::SubscriberAttributes attr;
1211
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fill_default_subscriber_attributes_from_xml(xml, attr, true))
1212
0
    {
1213
0
        qos = default_sub_qos_;
1214
0
        utils::set_qos_from_attributes(qos, attr);
1215
0
        return RETCODE_OK;
1216
0
    }
1217
1218
0
    return RETCODE_BAD_PARAMETER;
1219
0
}
1220
1221
ReturnCode_t DomainParticipantImpl::set_default_topic_qos(
1222
        const TopicQos& qos)
1223
0
{
1224
0
    if (&qos == &TOPIC_QOS_DEFAULT)
1225
0
    {
1226
0
        reset_default_topic_qos();
1227
0
        return RETCODE_OK;
1228
0
    }
1229
1230
0
    ReturnCode_t ret_val = TopicImpl::check_qos(qos);
1231
0
    if (RETCODE_OK != ret_val)
1232
0
    {
1233
0
        return ret_val;
1234
0
    }
1235
1236
0
    TopicImpl::set_qos(default_topic_qos_, qos, true);
1237
0
    return RETCODE_OK;
1238
0
}
1239
1240
void DomainParticipantImpl::reset_default_topic_qos()
1241
0
{
1242
    // TODO (ILG): Change when we have full XML support for DDS QoS profiles
1243
0
    TopicImpl::set_qos(default_topic_qos_, TOPIC_QOS_DEFAULT, true);
1244
0
    xmlparser::TopicAttributes attr;
1245
0
    XMLProfileManager::getDefaultTopicAttributes(attr);
1246
0
    utils::set_qos_from_attributes(default_topic_qos_, attr);
1247
0
}
1248
1249
const TopicQos& DomainParticipantImpl::get_default_topic_qos() const
1250
0
{
1251
0
    return default_topic_qos_;
1252
0
}
1253
1254
ReturnCode_t DomainParticipantImpl::get_topic_qos_from_profile(
1255
        const std::string& profile_name,
1256
        TopicQos& qos) const
1257
0
{
1258
0
    std::string _topic_name, _topic_data_type;
1259
0
    return get_topic_qos_from_profile(profile_name, qos, _topic_name, _topic_data_type);
1260
0
}
1261
1262
ReturnCode_t DomainParticipantImpl::get_topic_qos_from_profile(
1263
        const std::string& profile_name,
1264
        TopicQos& qos,
1265
        std::string& topic_name,
1266
        std::string& topic_data_type) const
1267
0
{
1268
0
    xmlparser::TopicAttributes attr;
1269
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fillTopicAttributes(profile_name, attr))
1270
0
    {
1271
0
        qos = default_topic_qos_;
1272
0
        utils::set_qos_from_attributes(qos, attr);
1273
0
        topic_name = attr.getTopicName();
1274
0
        topic_data_type = attr.getTopicDataType();
1275
0
        return RETCODE_OK;
1276
0
    }
1277
1278
0
    return RETCODE_BAD_PARAMETER;
1279
0
}
1280
1281
ReturnCode_t DomainParticipantImpl::get_topic_qos_from_xml(
1282
        const std::string& xml,
1283
        TopicQos& qos) const
1284
0
{
1285
0
    std::string _topic_name, _topic_data_type;
1286
0
    return get_topic_qos_from_xml(xml, qos, _topic_name, _topic_data_type);
1287
0
}
1288
1289
ReturnCode_t DomainParticipantImpl::get_topic_qos_from_xml(
1290
        const std::string& xml,
1291
        TopicQos& qos,
1292
        std::string& topic_name,
1293
        std::string& topic_data_type) const
1294
0
{
1295
0
    xmlparser::TopicAttributes attr;
1296
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fill_topic_attributes_from_xml(xml, attr, false))
1297
0
    {
1298
0
        qos = default_topic_qos_;
1299
0
        utils::set_qos_from_attributes(qos, attr);
1300
0
        topic_name = attr.getTopicName();
1301
0
        topic_data_type = attr.getTopicDataType();
1302
0
        return RETCODE_OK;
1303
0
    }
1304
1305
0
    return RETCODE_BAD_PARAMETER;
1306
0
}
1307
1308
ReturnCode_t DomainParticipantImpl::get_topic_qos_from_xml(
1309
        const std::string& xml,
1310
        TopicQos& qos,
1311
        const std::string& profile_name) const
1312
0
{
1313
0
    std::string _topic_name, _topic_data_type;
1314
0
    return get_topic_qos_from_xml(xml, qos, _topic_name, _topic_data_type, profile_name);
1315
0
}
1316
1317
ReturnCode_t DomainParticipantImpl::get_topic_qos_from_xml(
1318
        const std::string& xml,
1319
        TopicQos& qos,
1320
        std::string& topic_name,
1321
        std::string& topic_data_type,
1322
        const std::string& profile_name) const
1323
0
{
1324
0
    if (profile_name.empty())
1325
0
    {
1326
0
        EPROSIMA_LOG_ERROR(DOMAIN_PARTICIPANT, "Provided profile name must be non-empty");
1327
0
        return RETCODE_BAD_PARAMETER;
1328
0
    }
1329
1330
0
    xmlparser::TopicAttributes attr;
1331
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fill_topic_attributes_from_xml(xml, attr, true, profile_name))
1332
0
    {
1333
0
        qos = default_topic_qos_;
1334
0
        utils::set_qos_from_attributes(qos, attr);
1335
0
        topic_name = attr.getTopicName();
1336
0
        topic_data_type = attr.getTopicDataType();
1337
0
        return RETCODE_OK;
1338
0
    }
1339
1340
0
    return RETCODE_BAD_PARAMETER;
1341
0
}
1342
1343
ReturnCode_t DomainParticipantImpl::get_default_topic_qos_from_xml(
1344
        const std::string& xml,
1345
        TopicQos& qos) const
1346
0
{
1347
0
    std::string _topic_name, _topic_data_type;
1348
0
    return get_default_topic_qos_from_xml(xml, qos, _topic_name, _topic_data_type);
1349
0
}
1350
1351
ReturnCode_t DomainParticipantImpl::get_default_topic_qos_from_xml(
1352
        const std::string& xml,
1353
        TopicQos& qos,
1354
        std::string& topic_name,
1355
        std::string& topic_data_type) const
1356
0
{
1357
0
    xmlparser::TopicAttributes attr;
1358
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fill_default_topic_attributes_from_xml(xml, attr, true))
1359
0
    {
1360
0
        qos = default_topic_qos_;
1361
0
        utils::set_qos_from_attributes(qos, attr);
1362
0
        topic_name = attr.getTopicName();
1363
0
        topic_data_type = attr.getTopicDataType();
1364
0
        return RETCODE_OK;
1365
0
    }
1366
1367
0
    return RETCODE_BAD_PARAMETER;
1368
0
}
1369
1370
ReturnCode_t DomainParticipantImpl::get_replier_qos_from_profile(
1371
        const std::string& profile_name,
1372
        ReplierQos& qos) const
1373
0
{
1374
0
    xmlparser::ReplierAttributes attr;
1375
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fillReplierAttributes(profile_name, attr))
1376
0
    {
1377
0
        utils::set_qos_from_attributes(qos, attr);
1378
0
        return RETCODE_OK;
1379
0
    }
1380
1381
0
    return RETCODE_BAD_PARAMETER;
1382
0
}
1383
1384
ReturnCode_t DomainParticipantImpl::get_replier_qos_from_xml(
1385
        const std::string& xml,
1386
        ReplierQos& qos) const
1387
0
{
1388
0
    xmlparser::ReplierAttributes attr;
1389
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fill_replier_attributes_from_xml(xml, attr, false))
1390
0
    {
1391
0
        utils::set_qos_from_attributes(qos, attr);
1392
0
        return RETCODE_OK;
1393
0
    }
1394
1395
0
    return RETCODE_BAD_PARAMETER;
1396
0
}
1397
1398
ReturnCode_t DomainParticipantImpl::get_replier_qos_from_xml(
1399
        const std::string& xml,
1400
        ReplierQos& qos,
1401
        const std::string& profile_name) const
1402
0
{
1403
0
    if (profile_name.empty())
1404
0
    {
1405
0
        EPROSIMA_LOG_ERROR(DOMAIN_PARTICIPANT, "Provided profile name must be non-empty");
1406
0
        return RETCODE_BAD_PARAMETER;
1407
0
    }
1408
1409
0
    xmlparser::ReplierAttributes attr;
1410
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fill_replier_attributes_from_xml(xml, attr, true, profile_name))
1411
0
    {
1412
0
        utils::set_qos_from_attributes(qos, attr);
1413
0
        return RETCODE_OK;
1414
0
    }
1415
1416
0
    return RETCODE_BAD_PARAMETER;
1417
0
}
1418
1419
ReturnCode_t DomainParticipantImpl::get_default_replier_qos_from_xml(
1420
        const std::string& xml,
1421
        ReplierQos& qos) const
1422
0
{
1423
0
    xmlparser::ReplierAttributes attr;
1424
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fill_default_replier_attributes_from_xml(xml, attr, true))
1425
0
    {
1426
0
        utils::set_qos_from_attributes(qos, attr);
1427
0
        return RETCODE_OK;
1428
0
    }
1429
1430
0
    return RETCODE_BAD_PARAMETER;
1431
0
}
1432
1433
ReturnCode_t DomainParticipantImpl::get_requester_qos_from_profile(
1434
        const std::string& profile_name,
1435
        RequesterQos& qos) const
1436
0
{
1437
0
    xmlparser::RequesterAttributes attr;
1438
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fillRequesterAttributes(profile_name, attr))
1439
0
    {
1440
0
        utils::set_qos_from_attributes(qos, attr);
1441
0
        return RETCODE_OK;
1442
0
    }
1443
1444
0
    return RETCODE_BAD_PARAMETER;
1445
0
}
1446
1447
ReturnCode_t DomainParticipantImpl::get_requester_qos_from_xml(
1448
        const std::string& xml,
1449
        RequesterQos& qos) const
1450
0
{
1451
0
    xmlparser::RequesterAttributes attr;
1452
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fill_requester_attributes_from_xml(xml, attr, false))
1453
0
    {
1454
0
        utils::set_qos_from_attributes(qos, attr);
1455
0
        return RETCODE_OK;
1456
0
    }
1457
1458
0
    return RETCODE_BAD_PARAMETER;
1459
0
}
1460
1461
ReturnCode_t DomainParticipantImpl::get_requester_qos_from_xml(
1462
        const std::string& xml,
1463
        RequesterQos& qos,
1464
        const std::string& profile_name) const
1465
0
{
1466
0
    if (profile_name.empty())
1467
0
    {
1468
0
        EPROSIMA_LOG_ERROR(DOMAIN_PARTICIPANT, "Provided profile name must be non-empty");
1469
0
        return RETCODE_BAD_PARAMETER;
1470
0
    }
1471
1472
0
    xmlparser::RequesterAttributes attr;
1473
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fill_requester_attributes_from_xml(xml, attr, true, profile_name))
1474
0
    {
1475
0
        utils::set_qos_from_attributes(qos, attr);
1476
0
        return RETCODE_OK;
1477
0
    }
1478
1479
0
    return RETCODE_BAD_PARAMETER;
1480
0
}
1481
1482
ReturnCode_t DomainParticipantImpl::get_default_requester_qos_from_xml(
1483
        const std::string& xml,
1484
        RequesterQos& qos) const
1485
0
{
1486
0
    xmlparser::RequesterAttributes attr;
1487
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fill_default_requester_attributes_from_xml(xml, attr, true))
1488
0
    {
1489
0
        utils::set_qos_from_attributes(qos, attr);
1490
0
        return RETCODE_OK;
1491
0
    }
1492
1493
0
    return RETCODE_BAD_PARAMETER;
1494
0
}
1495
1496
/* TODO
1497
   bool DomainParticipantImpl::get_discovered_participants(
1498
        std::vector<InstanceHandle_t>& participant_handles) const
1499
   {
1500
    (void)participant_handles;
1501
    EPROSIMA_LOG_ERROR(PARTICIPANT, "Not implemented.");
1502
    return false;
1503
   }
1504
 */
1505
1506
/* TODO
1507
   bool DomainParticipantImpl::get_discovered_topics(
1508
        std::vector<InstanceHandle_t>& topic_handles) const
1509
   {
1510
    (void)topic_handles;
1511
    EPROSIMA_LOG_ERROR(PARTICIPANT, "Not implemented.");
1512
    return false;
1513
   }
1514
 */
1515
1516
bool DomainParticipantImpl::contains_entity(
1517
        const InstanceHandle_t& handle,
1518
        bool recursive) const
1519
0
{
1520
    // Look for publishers
1521
0
    {
1522
0
        std::lock_guard<std::mutex> lock(mtx_pubs_);
1523
0
        if (publishers_by_handle_.find(handle) != publishers_by_handle_.end())
1524
0
        {
1525
0
            return true;
1526
0
        }
1527
0
    }
1528
1529
    // Look for subscribers
1530
0
    {
1531
0
        std::lock_guard<std::mutex> lock(mtx_subs_);
1532
0
        if (subscribers_by_handle_.find(handle) != subscribers_by_handle_.end())
1533
0
        {
1534
0
            return true;
1535
0
        }
1536
0
    }
1537
1538
    // Look for topics
1539
0
    {
1540
0
        std::lock_guard<std::mutex> lock(mtx_topics_);
1541
0
        if (topics_by_handle_.find(handle) != topics_by_handle_.end())
1542
0
        {
1543
0
            return true;
1544
0
        }
1545
0
    }
1546
1547
0
    if (recursive)
1548
0
    {
1549
        // Look into publishers
1550
0
        {
1551
0
            std::lock_guard<std::mutex> lock(mtx_pubs_);
1552
0
            for (auto pit : publishers_)
1553
0
            {
1554
0
                if (pit.second->contains_entity(handle))
1555
0
                {
1556
0
                    return true;
1557
0
                }
1558
0
            }
1559
0
        }
1560
1561
        // Look into subscribers
1562
0
        {
1563
0
            std::lock_guard<std::mutex> lock(mtx_subs_);
1564
0
            for (auto sit : subscribers_)
1565
0
            {
1566
0
                if (sit.second->contains_entity(handle))
1567
0
                {
1568
0
                    return true;
1569
0
                }
1570
0
            }
1571
0
        }
1572
0
    }
1573
1574
0
    return false;
1575
0
}
1576
1577
ReturnCode_t DomainParticipantImpl::get_current_time(
1578
        fastdds::dds::Time_t& current_time) const
1579
0
{
1580
0
    fastdds::dds::Time_t::now(current_time);
1581
1582
0
    return RETCODE_OK;
1583
0
}
1584
1585
std::vector<std::string> DomainParticipantImpl::get_participant_names() const
1586
0
{
1587
0
    std::lock_guard<std::mutex> _(mtx_gs_);
1588
0
    return rtps_participant_ == nullptr ?
1589
0
           std::vector<std::string> {}
1590
0
           :
1591
0
           rtps_participant_->getParticipantNames();
1592
0
}
1593
1594
Subscriber* DomainParticipantImpl::create_subscriber(
1595
        const SubscriberQos& qos,
1596
        SubscriberListener* listener,
1597
        const StatusMask& mask)
1598
0
{
1599
0
    if (RETCODE_OK != SubscriberImpl::check_qos(qos))
1600
0
    {
1601
        // The SubscriberImpl::check_qos() function is not yet implemented and always returns RETCODE_OK.
1602
        // It will be implemented in future releases of Fast DDS.
1603
        // EPROSIMA_LOG_ERROR(PARTICIPANT, "SubscriberQos inconsistent or not supported");
1604
        // return nullptr;
1605
0
    }
1606
1607
    //TODO CONSTRUIR LA IMPLEMENTACION DENTRO DEL OBJETO DEL USUARIO.
1608
0
    SubscriberImpl* subimpl = create_subscriber_impl(qos, listener);
1609
0
    Subscriber* sub = new Subscriber(subimpl, mask);
1610
0
    subimpl->user_subscriber_ = sub;
1611
0
    subimpl->rtps_participant_ = get_rtps_participant();
1612
1613
    // Create InstanceHandle for the new subscriber
1614
0
    InstanceHandle_t sub_handle;
1615
0
    bool enabled = get_rtps_participant() != nullptr;
1616
1617
    // Create InstanceHandle for the new subscriber
1618
0
    create_instance_handle(sub_handle);
1619
0
    subimpl->handle_ = sub_handle;
1620
1621
    //SAVE THE PUBLISHER INTO MAPS
1622
0
    std::lock_guard<std::mutex> lock(mtx_subs_);
1623
0
    subscribers_by_handle_[sub_handle] = sub;
1624
0
    subscribers_[sub] = subimpl;
1625
1626
    // Enable subscriber if appropriate
1627
0
    if (enabled && qos_.entity_factory().autoenable_created_entities)
1628
0
    {
1629
0
        ReturnCode_t ret_subscriber_enable = sub->enable();
1630
0
        assert(RETCODE_OK == ret_subscriber_enable);
1631
0
        (void)ret_subscriber_enable;
1632
0
    }
1633
1634
0
    return sub;
1635
0
}
1636
1637
Subscriber* DomainParticipantImpl::create_subscriber_with_profile(
1638
        const std::string& profile_name,
1639
        SubscriberListener* listener,
1640
        const StatusMask& mask)
1641
0
{
1642
    // TODO (ILG): Change when we have full XML support for DDS QoS profiles
1643
0
    xmlparser::SubscriberAttributes attr;
1644
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fillSubscriberAttributes(profile_name, attr))
1645
0
    {
1646
0
        SubscriberQos qos = default_sub_qos_;
1647
0
        utils::set_qos_from_attributes(qos, attr);
1648
0
        return create_subscriber(qos, listener, mask);
1649
0
    }
1650
1651
0
    return nullptr;
1652
0
}
1653
1654
SubscriberImpl* DomainParticipantImpl::create_subscriber_impl(
1655
        const SubscriberQos& qos,
1656
        SubscriberListener* listener)
1657
0
{
1658
0
    return new SubscriberImpl(this, qos, listener);
1659
0
}
1660
1661
Topic* DomainParticipantImpl::create_topic(
1662
        const std::string& topic_name,
1663
        const std::string& type_name,
1664
        const TopicQos& qos,
1665
        TopicListener* listener,
1666
        const StatusMask& mask)
1667
0
{
1668
    //Look for the correct type registration
1669
0
    TypeSupport type_support = find_type(type_name);
1670
0
    if (type_support.empty())
1671
0
    {
1672
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Type : " << type_name << " Not Registered");
1673
0
        return nullptr;
1674
0
    }
1675
1676
0
    if (RETCODE_OK != TopicImpl::check_qos_including_resource_limits(qos, type_support))
1677
0
    {
1678
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "TopicQos inconsistent or not supported");
1679
0
        return nullptr;
1680
0
    }
1681
1682
0
    bool enabled = get_rtps_participant() != nullptr;
1683
1684
0
    std::lock_guard<std::mutex> lock(mtx_topics_);
1685
1686
    // Check there is no Topic with the same name
1687
0
    if ((topics_.find(topic_name) != topics_.end()) ||
1688
0
            (filtered_topics_.find(topic_name) != filtered_topics_.end()))
1689
0
    {
1690
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Topic with name : " << topic_name << " already exists");
1691
0
        return nullptr;
1692
0
    }
1693
1694
0
    InstanceHandle_t topic_handle;
1695
0
    create_instance_handle(topic_handle);
1696
1697
0
    TopicProxyFactory* factory = new TopicProxyFactory(this, topic_name, type_name, mask, type_support, qos, listener);
1698
0
    TopicProxy* proxy = factory->create_topic();
1699
0
    Topic* topic = proxy->get_topic();
1700
0
    topic->set_instance_handle(topic_handle);
1701
1702
    //SAVE THE TOPIC INTO MAPS
1703
0
    topics_by_handle_[topic_handle] = topic;
1704
0
    topics_[topic_name] = factory;
1705
1706
    // Enable topic if appropriate
1707
0
    if (enabled && qos_.entity_factory().autoenable_created_entities)
1708
0
    {
1709
0
        ReturnCode_t ret_topic_enable = topic->enable();
1710
0
        assert(RETCODE_OK == ret_topic_enable);
1711
0
        (void)ret_topic_enable;
1712
0
    }
1713
1714
0
    cond_topics_.notify_all();
1715
1716
0
    return topic;
1717
0
}
1718
1719
Topic* DomainParticipantImpl::create_topic_with_profile(
1720
        const std::string& topic_name,
1721
        const std::string& type_name,
1722
        const std::string& profile_name,
1723
        TopicListener* listener,
1724
        const StatusMask& mask)
1725
0
{
1726
    // TODO (ILG): Change when we have full XML support for DDS QoS profiles
1727
0
    xmlparser::TopicAttributes attr;
1728
0
    if (XMLP_ret::XML_OK == XMLProfileManager::fillTopicAttributes(profile_name, attr))
1729
0
    {
1730
0
        TopicQos qos = default_topic_qos_;
1731
0
        utils::set_qos_from_attributes(qos, attr);
1732
0
        return create_topic(topic_name, type_name, qos, listener, mask);
1733
0
    }
1734
1735
0
    return nullptr;
1736
0
}
1737
1738
TopicDescription* DomainParticipantImpl::lookup_topicdescription(
1739
        const std::string& topic_name) const
1740
0
{
1741
0
    std::lock_guard<std::mutex> lock(mtx_topics_);
1742
1743
0
    auto it = topics_.find(topic_name);
1744
0
    if (it != topics_.end())
1745
0
    {
1746
0
        return it->second->get_topic()->get_topic();
1747
0
    }
1748
1749
0
    auto filtered_it = filtered_topics_.find(topic_name);
1750
0
    if (filtered_it != filtered_topics_.end())
1751
0
    {
1752
0
        return filtered_it->second.get();
1753
0
    }
1754
1755
0
    return nullptr;
1756
0
}
1757
1758
const TypeSupport DomainParticipantImpl::find_type(
1759
        const std::string& type_name) const
1760
0
{
1761
0
    std::lock_guard<std::mutex> lock(mtx_types_);
1762
1763
0
    auto type_it = types_.find(type_name);
1764
1765
0
    if (type_it != types_.end())
1766
0
    {
1767
0
        return type_it->second;
1768
0
    }
1769
1770
0
    return TypeSupport(nullptr);
1771
0
}
1772
1773
ReturnCode_t DomainParticipantImpl::register_type(
1774
        const TypeSupport type,
1775
        const std::string& type_name)
1776
0
{
1777
0
    if (type_name.size() <= 0)
1778
0
    {
1779
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Registered Type must have a name");
1780
0
        return RETCODE_BAD_PARAMETER;
1781
0
    }
1782
1783
    /*
1784
     * The type object registration sets the TypeIdentifiers in the type support's underlying TopicDataType.
1785
     * This means that we need need to trigger the registration of the type object representation
1786
     * (idempotent operation) before finding the type in the registry.
1787
     * Otherwise, registering two TypeSupport instances with the same underlying TopicDataType will fail upon
1788
     * the second registration, as the TypeIdentifiers of the retrieved type from the registry would not be equal
1789
     * to those of the incoming type support.
1790
     */
1791
0
    type.get()->register_type_object_representation();
1792
1793
0
    TypeSupport t = find_type(type_name);
1794
1795
0
    if (!t.empty())
1796
0
    {
1797
0
        if (t == type)
1798
0
        {
1799
0
            return RETCODE_OK;
1800
0
        }
1801
1802
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Another type with the same name '" << type_name << "' is already registered.");
1803
0
        return RETCODE_PRECONDITION_NOT_MET;
1804
0
    }
1805
1806
0
    EPROSIMA_LOG_INFO(PARTICIPANT, "Type " << type_name << " registered.");
1807
0
    std::lock_guard<std::mutex> lock(mtx_types_);
1808
0
    types_.insert(std::make_pair(type_name, type));
1809
1810
0
    return RETCODE_OK;
1811
0
}
1812
1813
ReturnCode_t DomainParticipantImpl::unregister_type(
1814
        const std::string& type_name)
1815
0
{
1816
0
    if (type_name.size() <= 0)
1817
0
    {
1818
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Registered Type must have a name");
1819
0
        return RETCODE_BAD_PARAMETER;
1820
0
    }
1821
1822
0
    TypeSupport t = find_type(type_name);
1823
1824
0
    if (t.empty())
1825
0
    {
1826
0
        return RETCODE_OK; // Not registered, so unregistering complete.
1827
0
    }
1828
1829
0
    {
1830
        // Check is any subscriber is using the type
1831
0
        std::lock_guard<std::mutex> lock(mtx_subs_);
1832
1833
0
        for (auto sit : subscribers_)
1834
0
        {
1835
0
            if (sit.second->type_in_use(type_name))
1836
0
            {
1837
0
                EPROSIMA_LOG_WARNING(PARTICIPANT, "Type '" << type_name << "' is in use");
1838
0
                return RETCODE_PRECONDITION_NOT_MET;
1839
0
            }
1840
0
        }
1841
0
    }
1842
1843
0
    {
1844
        // Check is any publisher is using the type
1845
0
        std::lock_guard<std::mutex> lock(mtx_pubs_);
1846
1847
0
        for (auto pit : publishers_)
1848
0
        {
1849
0
            if (pit.second->type_in_use(type_name))
1850
0
            {
1851
0
                EPROSIMA_LOG_WARNING(PARTICIPANT, "Type '" << type_name << "' is in use");
1852
0
                return RETCODE_PRECONDITION_NOT_MET;
1853
0
            }
1854
0
        }
1855
0
    }
1856
1857
0
    std::lock_guard<std::mutex> lock(mtx_types_);
1858
0
    types_.erase(type_name);
1859
1860
0
    return RETCODE_OK;
1861
0
}
1862
1863
const rpc::ServiceTypeSupport DomainParticipantImpl::find_service_type(
1864
        const std::string& service_type_name) const
1865
0
{
1866
0
    std::lock_guard<std::mutex> lock(mtx_service_types_);
1867
1868
0
    auto service_type_it = service_types_.find(service_type_name);
1869
1870
0
    if (service_type_it != service_types_.end())
1871
0
    {
1872
0
        return service_type_it->second;
1873
0
    }
1874
1875
0
    return rpc::ServiceTypeSupport(TypeSupport(nullptr), TypeSupport(nullptr));
1876
0
}
1877
1878
ReturnCode_t DomainParticipantImpl::register_service_type(
1879
        rpc::ServiceTypeSupport service_type,
1880
        const std::string& service_type_name)
1881
0
{
1882
0
    if (service_type_name.size() <= 0)
1883
0
    {
1884
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Registered Service Type must have a name");
1885
0
        return RETCODE_BAD_PARAMETER;
1886
0
    }
1887
1888
0
    if (service_type.empty_types())
1889
0
    {
1890
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Trying to register a Service Type with empty request/reply types");
1891
0
        return RETCODE_BAD_PARAMETER;
1892
0
    }
1893
1894
    // Check if the service type is already registered
1895
0
    rpc::ServiceTypeSupport t = find_service_type(service_type_name);
1896
1897
0
    if (!t.empty_types())
1898
0
    {
1899
0
        if (t == service_type)
1900
0
        {
1901
0
            return RETCODE_OK;
1902
0
        }
1903
1904
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Another service type with the same name '" << service_type_name <<
1905
0
                "' is already registered.");
1906
0
        return RETCODE_PRECONDITION_NOT_MET;
1907
0
    }
1908
1909
    // Register request/reply types
1910
0
    ReturnCode_t ret_code;
1911
0
    ret_code = register_type(service_type.request_type(), service_type_name + "_Request");
1912
1913
0
    if (RETCODE_OK != ret_code)
1914
0
    {
1915
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Error registering Request Type for Service Type " << service_type_name);
1916
0
        return ret_code;
1917
0
    }
1918
1919
0
    ret_code = register_type(service_type.reply_type(), service_type_name + "_Reply");
1920
1921
0
    if (RETCODE_OK != ret_code)
1922
0
    {
1923
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Error registering Request Type for Service Type " << service_type_name);
1924
0
        return ret_code;
1925
0
    }
1926
1927
0
    EPROSIMA_LOG_INFO(PARTICIPANT, "Service Type " << service_type_name << " registered.");
1928
0
    std::lock_guard<std::mutex> lock(mtx_service_types_);
1929
0
    service_types_.insert(std::make_pair(service_type_name, service_type));
1930
1931
0
    return RETCODE_OK;
1932
0
}
1933
1934
ReturnCode_t DomainParticipantImpl::unregister_service_type(
1935
        const std::string& service_type_name)
1936
0
{
1937
0
    if (service_type_name.size() <= 0)
1938
0
    {
1939
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Registered Service Type must have a name");
1940
0
        return RETCODE_BAD_PARAMETER;
1941
0
    }
1942
1943
0
    rpc::ServiceTypeSupport t = find_service_type(service_type_name);
1944
0
    if (t.empty_types())
1945
0
    {
1946
0
        return RETCODE_OK; // Not registered, so unregistering complete.
1947
0
    }
1948
1949
    // Iterate over all active services and check if any of them is using the service type
1950
0
    {
1951
0
        std::lock_guard<std::mutex> lock(mtx_services_);
1952
0
        for (auto service_it : services_)
1953
0
        {
1954
0
            if (service_it.second->service_type_in_use(service_type_name))
1955
0
            {
1956
0
                EPROSIMA_LOG_ERROR(PARTICIPANT, "Service Type " << service_type_name << " is in use by service " <<
1957
0
                        service_it.first);
1958
0
                return RETCODE_PRECONDITION_NOT_MET;
1959
0
            }
1960
0
        }
1961
0
    }
1962
1963
    // Unregister request/reply types
1964
0
    ReturnCode_t ret_code;
1965
0
    ret_code = unregister_type(service_type_name + "_Request");
1966
1967
0
    if (RETCODE_OK != ret_code)
1968
0
    {
1969
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Error unregistering Request Type for Service Type " << service_type_name);
1970
0
        return ret_code;
1971
0
    }
1972
1973
0
    ret_code = unregister_type(service_type_name + "_Reply");
1974
1975
0
    if (RETCODE_OK != ret_code)
1976
0
    {
1977
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Error unregistering Reply Type for Service Type " << service_type_name);
1978
1979
        // Request topic type was unregistered, so register it again to avoid leaving the participant in an inconsistent state
1980
0
        register_type(t.request_type(), service_type_name + "_Request");
1981
1982
0
        return ret_code;
1983
0
    }
1984
1985
0
    {
1986
0
        std::lock_guard<std::mutex> lock(mtx_service_types_);
1987
0
        service_types_.erase(service_type_name);
1988
0
    }
1989
1990
0
    return RETCODE_OK;
1991
0
}
1992
1993
rpc::Service* DomainParticipantImpl::create_service(
1994
        const std::string& service_name,
1995
        const std::string& service_type_name)
1996
0
{
1997
0
    if (service_name.empty())
1998
0
    {
1999
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Service name cannot be empty.");
2000
0
        return nullptr;
2001
0
    }
2002
2003
0
    if (service_type_name.empty())
2004
0
    {
2005
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Service type name cannot be empty.");
2006
0
        return nullptr;
2007
0
    }
2008
2009
    // Check if the service has already been created
2010
0
    {
2011
0
        std::lock_guard<std::mutex> lock(mtx_services_);
2012
0
        auto it = services_.find(service_name);
2013
2014
0
        if (it != services_.end())
2015
0
        {
2016
0
            EPROSIMA_LOG_ERROR(PARTICIPANT, "Service with name '" << service_name << "' already exists.");
2017
0
            return nullptr;
2018
0
        }
2019
0
    }
2020
2021
    // Check if the request/reply content filter factory is registered. If not, register it.
2022
0
    IContentFilterFactory* factory = find_content_filter_factory(rpc::RequestReplyContentFilterFactory::FILTER_NAME);
2023
2024
0
    if (!factory)
2025
0
    {
2026
0
        factory = &req_rep_filter_factory_;
2027
0
        ReturnCode_t ret_code =
2028
0
                register_content_filter_factory(rpc::RequestReplyContentFilterFactory::FILTER_NAME, factory);
2029
2030
0
        if (RETCODE_OK != ret_code)
2031
0
        {
2032
0
            EPROSIMA_LOG_ERROR(PARTICIPANT, "Error registering Request/Reply Content Filter Factory.");
2033
0
            return nullptr;
2034
0
        }
2035
0
    }
2036
2037
    // Before creating the service, create the publisher and subscriber
2038
    // that it will use for DDS endpoints creation, if it is necessary
2039
0
    if (!services_publisher_.second)
2040
0
    {
2041
        // Disable automatic enabling of created entities
2042
        // This is necessary to previously set the related_entity_key
2043
0
        PublisherQos pub_qos = PUBLISHER_QOS_DEFAULT;
2044
0
        pub_qos.entity_factory().autoenable_created_entities = false;
2045
2046
0
        Publisher* pub = create_publisher(pub_qos);
2047
0
        if (!pub)
2048
0
        {
2049
0
            EPROSIMA_LOG_ERROR(PARTICIPANT, "Error creating Services publisher.");
2050
0
            return nullptr;
2051
0
        }
2052
0
        services_publisher_ = std::make_pair(pub, publishers_[pub]);
2053
0
    }
2054
2055
0
    if (!services_subscriber_.second)
2056
0
    {
2057
        // Disable automatic enabling of created entities
2058
        // This is necessary to previously set the related_entity_key
2059
0
        SubscriberQos sub_qos = SUBSCRIBER_QOS_DEFAULT;
2060
0
        sub_qos.entity_factory().autoenable_created_entities = false;
2061
2062
0
        Subscriber* sub = create_subscriber(sub_qos);
2063
0
        if (!sub)
2064
0
        {
2065
0
            EPROSIMA_LOG_ERROR(PARTICIPANT, "Error creating Services subscriber.");
2066
0
            return nullptr;
2067
0
        }
2068
0
        services_subscriber_ = std::make_pair(sub, subscribers_[sub]);
2069
0
    }
2070
    // Create and store the service (Internally, this will create the required DDS Request/Reply Topics))
2071
0
    rpc::ServiceImpl* service(nullptr);
2072
2073
0
    try
2074
0
    {
2075
0
        service = new rpc::ServiceImpl(
2076
0
            service_name, service_type_name, this, services_publisher_.second, services_subscriber_.second);
2077
0
    }
2078
0
    catch (const std::exception& e)
2079
0
    {
2080
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Error creating service: " << e.what());
2081
0
        return nullptr;
2082
0
    }
2083
2084
    // Try to enable the created service
2085
0
    if (RETCODE_OK != service->enable())
2086
0
    {
2087
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Error enabling service.");
2088
0
        delete service;
2089
0
        return nullptr;
2090
0
    }
2091
2092
0
    {
2093
        // Register the service in the participant
2094
0
        std::lock_guard<std::mutex> lock(mtx_services_);
2095
0
        services_[service_name] = service;
2096
0
    }
2097
2098
0
    return service;
2099
0
}
2100
2101
rpc::Service* DomainParticipantImpl::find_service(
2102
        const std::string& service_name) const
2103
0
{
2104
0
    std::lock_guard<std::mutex> lock(mtx_services_);
2105
0
    auto it = services_.find(service_name);
2106
2107
0
    if (it != services_.end())
2108
0
    {
2109
0
        return it->second;
2110
0
    }
2111
0
    return nullptr;
2112
0
}
2113
2114
ReturnCode_t DomainParticipantImpl::delete_service(
2115
        const rpc::Service* service)
2116
0
{
2117
0
    if (!service)
2118
0
    {
2119
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Service is nullptr.");
2120
0
        return RETCODE_BAD_PARAMETER;
2121
0
    }
2122
2123
0
    const rpc::ServiceImpl* service_impl = dynamic_cast<const rpc::ServiceImpl*>(service);
2124
0
    assert(service_impl != nullptr);
2125
2126
0
    std::lock_guard<std::mutex> lock(mtx_services_);
2127
0
    auto it = services_.find(service->get_service_name());
2128
2129
0
    if (it != services_.end())
2130
0
    {
2131
0
        if (service_impl != it->second)
2132
0
        {
2133
0
            EPROSIMA_LOG_ERROR(PARTICIPANT, "Service mismatch.");
2134
0
            return RETCODE_PRECONDITION_NOT_MET;
2135
0
        }
2136
2137
        // Check that the service is disabled
2138
0
        if (it->second->is_enabled())
2139
0
        {
2140
0
            EPROSIMA_LOG_INFO(PARTICIPANT, "Trying to delete an enabled service.");
2141
0
            ReturnCode_t retcode = it->second->close();
2142
0
            if (RETCODE_OK != retcode)
2143
0
            {
2144
0
                EPROSIMA_LOG_ERROR(PARTICIPANT, "Error closing service: " << retcode);
2145
0
                return retcode;
2146
0
            }
2147
0
        }
2148
2149
0
        delete it->second;
2150
0
        services_.erase(it);
2151
0
        return RETCODE_OK;
2152
0
    }
2153
2154
    // The service was not found in this participant
2155
0
    EPROSIMA_LOG_ERROR(PARTICIPANT, "Service with name '" << service->get_service_name() << "' not found.");
2156
0
    return RETCODE_PRECONDITION_NOT_MET;
2157
0
}
2158
2159
rpc::Requester* DomainParticipantImpl::create_service_requester(
2160
        rpc::Service* service,
2161
        const RequesterQos& qos)
2162
0
{
2163
    // Check if the service is valid and registered in participant
2164
0
    if (!service)
2165
0
    {
2166
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Service is nullptr.");
2167
0
        return nullptr;
2168
0
    }
2169
2170
0
    const rpc::ServiceImpl* service_impl = dynamic_cast<const rpc::ServiceImpl*>(service);
2171
0
    assert(service_impl != nullptr);
2172
2173
0
    std::lock_guard<std::mutex> lock(mtx_services_);
2174
0
    auto it = services_.find(service->get_service_name());
2175
0
    if (it == services_.end())
2176
0
    {
2177
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Service with name '" << service->get_service_name() << "' not found.");
2178
0
        return nullptr;
2179
0
    }
2180
2181
    // Make sure that both services are the same
2182
0
    if (it->second != service_impl)
2183
0
    {
2184
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Service mismatch.");
2185
0
        return nullptr;
2186
0
    }
2187
2188
    // Create the requester and register it in the service
2189
0
    return it->second->create_requester(qos);
2190
0
}
2191
2192
ReturnCode_t DomainParticipantImpl::delete_service_requester(
2193
        const std::string& service_name,
2194
        rpc::Requester* requester)
2195
0
{
2196
2197
0
    rpc::RequesterImpl* requester_impl = dynamic_cast<rpc::RequesterImpl*>(requester);
2198
0
    assert(requester_impl != nullptr);
2199
2200
    // Get registered service implementation
2201
0
    std::lock_guard<std::mutex> lock(mtx_services_);
2202
0
    auto it = services_.find(service_name);
2203
2204
0
    if (it == services_.end())
2205
0
    {
2206
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Service with name '" << service_name << "' not registered.");
2207
0
        return RETCODE_PRECONDITION_NOT_MET;
2208
0
    }
2209
2210
0
    return it->second->remove_requester(requester_impl);
2211
0
}
2212
2213
rpc::Replier* DomainParticipantImpl::create_service_replier(
2214
        rpc::Service* service,
2215
        const ReplierQos& qos)
2216
0
{
2217
    // Check if the service is valid and registered in participant
2218
0
    if (!service)
2219
0
    {
2220
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Service is nullptr.");
2221
0
        return nullptr;
2222
0
    }
2223
2224
0
    const rpc::ServiceImpl* service_impl = dynamic_cast<const rpc::ServiceImpl*>(service);
2225
0
    assert(service_impl != nullptr);
2226
2227
0
    std::lock_guard<std::mutex> lock(mtx_services_);
2228
0
    auto it = services_.find(service->get_service_name());
2229
0
    if (it == services_.end())
2230
0
    {
2231
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Service with name '" << service->get_service_name() << "' not found.");
2232
0
        return nullptr;
2233
0
    }
2234
2235
    // Make sure that both services are the same
2236
0
    if (it->second != service_impl)
2237
0
    {
2238
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Service mismatch.");
2239
0
        return nullptr;
2240
0
    }
2241
2242
    // Create the replier and register it in the service
2243
0
    return it->second->create_replier(qos);
2244
0
}
2245
2246
ReturnCode_t DomainParticipantImpl::delete_service_replier(
2247
        const std::string& service_name,
2248
        rpc::Replier* replier)
2249
0
{
2250
0
    rpc::ReplierImpl* replier_impl = dynamic_cast<rpc::ReplierImpl*>(replier);
2251
0
    assert(replier_impl != nullptr);
2252
2253
    // Get registered service implementation
2254
0
    std::lock_guard<std::mutex> lock(mtx_services_);
2255
0
    auto it = services_.find(service_name);
2256
2257
0
    if (it == services_.end())
2258
0
    {
2259
0
        EPROSIMA_LOG_ERROR(PARTICIPANT, "Service with name '" << service_name << "' not registered.");
2260
0
        return RETCODE_PRECONDITION_NOT_MET;
2261
0
    }
2262
2263
0
    return it->second->remove_replier(replier_impl);
2264
0
}
2265
2266
void DomainParticipantImpl::MyRTPSParticipantListener::on_participant_discovery(
2267
        RTPSParticipant*,
2268
        eprosima::fastdds::rtps::ParticipantDiscoveryStatus reason,
2269
        const ParticipantBuiltinTopicData& info,
2270
        bool& should_be_ignored)
2271
0
{
2272
0
    should_be_ignored = false;
2273
0
    Sentry sentinel(this);
2274
0
    if (sentinel)
2275
0
    {
2276
0
        participant_->listener_->on_participant_discovery(participant_->participant_, reason, std::move(info),
2277
0
                should_be_ignored);
2278
0
    }
2279
0
}
2280
2281
#if HAVE_SECURITY
2282
void DomainParticipantImpl::MyRTPSParticipantListener::onParticipantAuthentication(
2283
        RTPSParticipant*,
2284
        ParticipantAuthenticationInfo&& info)
2285
{
2286
    Sentry sentinel(this);
2287
    if (sentinel)
2288
    {
2289
        participant_->listener_->onParticipantAuthentication(participant_->participant_, std::move(info));
2290
    }
2291
}
2292
2293
#endif // if HAVE_SECURITY
2294
2295
void DomainParticipantImpl::MyRTPSParticipantListener::on_reader_discovery(
2296
        RTPSParticipant*,
2297
        ReaderDiscoveryStatus reason,
2298
        const SubscriptionBuiltinTopicData& info,
2299
        bool& should_be_ignored)
2300
0
{
2301
0
    should_be_ignored = false;
2302
2303
0
    Sentry sentinel(this);
2304
0
    if (sentinel)
2305
0
    {
2306
0
        DomainParticipantListener* listener = participant_->listener_;
2307
0
        if (nullptr != listener)
2308
0
        {
2309
0
            listener->on_data_reader_discovery(participant_->participant_, reason, info, should_be_ignored);
2310
0
        }
2311
0
    }
2312
0
}
2313
2314
void DomainParticipantImpl::MyRTPSParticipantListener::on_writer_discovery(
2315
        RTPSParticipant*,
2316
        WriterDiscoveryStatus reason,
2317
        const PublicationBuiltinTopicData& info,
2318
        bool& should_be_ignored)
2319
0
{
2320
0
    should_be_ignored = false;
2321
2322
0
    Sentry sentinel(this);
2323
0
    if (sentinel)
2324
0
    {
2325
0
        DomainParticipantListener* listener = participant_->listener_;
2326
0
        if (nullptr != listener)
2327
0
        {
2328
0
            listener->on_data_writer_discovery(participant_->participant_, reason, info, should_be_ignored);
2329
0
        }
2330
0
    }
2331
0
}
2332
2333
bool DomainParticipantImpl::new_remote_endpoint_discovered(
2334
        const fastdds::rtps::GUID_t& partguid,
2335
        uint16_t endpointId,
2336
        EndpointKind_t kind)
2337
0
{
2338
0
    if (get_rtps_participant() != nullptr)
2339
0
    {
2340
0
        if (kind == fastdds::rtps::WRITER)
2341
0
        {
2342
0
            return get_rtps_participant()->newRemoteWriterDiscovered(partguid, static_cast<int16_t>(endpointId));
2343
0
        }
2344
0
        else
2345
0
        {
2346
0
            return get_rtps_participant()->newRemoteReaderDiscovered(partguid, static_cast<int16_t>(endpointId));
2347
0
        }
2348
0
    }
2349
2350
0
    return false;
2351
0
}
2352
2353
ResourceEvent& DomainParticipantImpl::get_resource_event() const
2354
0
{
2355
0
    assert(nullptr != get_rtps_participant());
2356
0
    return get_rtps_participant()->get_resource_event();
2357
0
}
2358
2359
ReturnCode_t DomainParticipantImpl::register_dynamic_type(
2360
        DynamicType::_ref_type dyn_type)
2361
0
{
2362
0
    TypeSupport type(new DynamicPubSubType(dyn_type));
2363
0
    return get_participant()->register_type(type);
2364
0
}
2365
2366
bool DomainParticipantImpl::has_active_entities()
2367
0
{
2368
0
    if (!publishers_.empty())
2369
0
    {
2370
0
        return true;
2371
0
    }
2372
0
    if (!subscribers_.empty())
2373
0
    {
2374
0
        return true;
2375
0
    }
2376
0
    if (!topics_.empty())
2377
0
    {
2378
0
        return true;
2379
0
    }
2380
0
    return false;
2381
0
}
2382
2383
bool DomainParticipantImpl::set_qos(
2384
        DomainParticipantQos& to,
2385
        const DomainParticipantQos& from,
2386
        bool first_time)
2387
0
{
2388
0
    bool qos_should_be_updated = false;
2389
2390
0
    if (!(to.entity_factory() == from.entity_factory()))
2391
0
    {
2392
0
        to.entity_factory() = from.entity_factory();
2393
0
    }
2394
0
    if (!(to.user_data() == from.user_data()))
2395
0
    {
2396
0
        to.user_data() = from.user_data();
2397
0
        if (!first_time)
2398
0
        {
2399
0
            qos_should_be_updated = true;
2400
0
        }
2401
0
    }
2402
0
    if (first_time && !(to.allocation() == from.allocation()))
2403
0
    {
2404
0
        to.allocation() = from.allocation();
2405
0
    }
2406
0
    if (first_time && (to.properties() != from.properties()))
2407
0
    {
2408
0
        to.properties() = from.properties();
2409
0
    }
2410
0
    if (!(to.wire_protocol() == from.wire_protocol()))
2411
0
    {
2412
0
        to.wire_protocol() = from.wire_protocol();
2413
0
        if (!first_time)
2414
0
        {
2415
0
            qos_should_be_updated = true;
2416
0
        }
2417
0
    }
2418
0
    if (first_time && !(to.transport() == from.transport()))
2419
0
    {
2420
0
        to.transport() = from.transport();
2421
0
    }
2422
0
    if (first_time && to.name() != from.name())
2423
0
    {
2424
0
        to.name() = from.name();
2425
0
    }
2426
2427
0
    return qos_should_be_updated;
2428
0
}
2429
2430
ReturnCode_t DomainParticipantImpl::check_qos(
2431
        const DomainParticipantQos& qos)
2432
0
{
2433
0
    ReturnCode_t ret_val = RETCODE_OK;
2434
2435
0
    if (qos.allocation().data_limits.max_user_data != 0 &&
2436
0
            qos.allocation().data_limits.max_user_data <= qos.user_data().getValue().size())
2437
0
    {
2438
0
        ret_val = RETCODE_INCONSISTENT_POLICY;
2439
0
    }
2440
2441
0
    if (RETCODE_OK == ret_val)
2442
0
    {
2443
        // Check participant's type propagation policy
2444
0
        using utils::to_type_propagation;
2445
0
        using utils::TypePropagation;
2446
2447
0
        auto type_propagation = to_type_propagation(qos.properties());
2448
2449
0
        if (TypePropagation::TYPEPROPAGATION_UNKNOWN == type_propagation)
2450
0
        {
2451
0
            EPROSIMA_LOG_ERROR(RTPS_QOS_CHECK, "Invalid value for property " << parameter_policy_type_propagation);
2452
0
            return RETCODE_INCONSISTENT_POLICY;
2453
0
        }
2454
0
    }
2455
2456
    // Check participant's wire protocol (builtin flow controller) configuration
2457
0
    if (RETCODE_OK == ret_val)
2458
0
    {
2459
0
        const std::string& builtin_flow_controller_name = qos.wire_protocol().builtin.flow_controller_name;
2460
2461
0
        if (!builtin_flow_controller_name.empty())
2462
0
        {
2463
            // Get the list of flow controllers
2464
0
            auto flow_controllers = qos.flow_controllers();
2465
2466
            // Check if any flow controller matches the builtin flow controller name
2467
0
            bool found = std::any_of(flow_controllers.begin(), flow_controllers.end(),
2468
0
                            [&builtin_flow_controller_name](const std::shared_ptr<fastdds::rtps::
2469
0
                                    FlowControllerDescriptor>& fc)
2470
0
                            {
2471
0
                                return fc && fc->name == builtin_flow_controller_name;
2472
0
                            });
2473
2474
0
            if (!found)
2475
0
            {
2476
0
                EPROSIMA_LOG_ERROR(RTPS_QOS_CHECK, "Flow controller name not found in flow controllers list");
2477
0
                return RETCODE_INCONSISTENT_POLICY;
2478
0
            }
2479
0
        }
2480
0
    }
2481
2482
2483
0
    return ret_val;
2484
0
}
2485
2486
bool DomainParticipantImpl::can_qos_be_updated(
2487
        const DomainParticipantQos& to,
2488
        const DomainParticipantQos& from)
2489
0
{
2490
0
    bool updatable = true;
2491
0
    if (!(to.allocation() == from.allocation()))
2492
0
    {
2493
0
        updatable = false;
2494
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
2495
0
                "ParticipantResourceLimitsQos cannot be changed after the participant is enabled");
2496
0
    }
2497
0
    if ((to.properties() != from.properties()))
2498
0
    {
2499
0
        updatable = false;
2500
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK, "PropertyPolicyQos cannot be changed after the participant is enabled");
2501
0
    }
2502
0
    if (!(to.wire_protocol() == from.wire_protocol()))
2503
0
    {
2504
        // Check that the only modification was in wire_protocol().discovery_config.m_DiscoveryServers
2505
0
        if ((to.wire_protocol().builtin.discovery_config.m_DiscoveryServers ==
2506
0
                from.wire_protocol().builtin.discovery_config.m_DiscoveryServers) ||
2507
0
                (!(to.wire_protocol().builtin.discovery_config.m_DiscoveryServers ==
2508
0
                from.wire_protocol().builtin.discovery_config.m_DiscoveryServers) &&
2509
0
                (!(to.wire_protocol().prefix == from.wire_protocol().prefix) ||
2510
0
                !(to.wire_protocol().participant_id == from.wire_protocol().participant_id) ||
2511
0
                !(to.wire_protocol().port == from.wire_protocol().port) ||
2512
0
                !(to.wire_protocol().default_unicast_locator_list ==
2513
0
                from.wire_protocol().default_unicast_locator_list) ||
2514
0
                !(to.wire_protocol().default_multicast_locator_list ==
2515
0
                from.wire_protocol().default_multicast_locator_list) ||
2516
0
                !(to.wire_protocol().default_external_unicast_locators ==
2517
0
                from.wire_protocol().default_external_unicast_locators) ||
2518
0
                !(to.wire_protocol().ignore_non_matching_locators ==
2519
0
                from.wire_protocol().ignore_non_matching_locators) ||
2520
0
                !(to.wire_protocol().builtin.use_WriterLivelinessProtocol ==
2521
0
                from.wire_protocol().builtin.use_WriterLivelinessProtocol) ||
2522
0
                !(to.wire_protocol().builtin.network_configuration ==
2523
0
                from.wire_protocol().builtin.network_configuration) ||
2524
0
                !(to.wire_protocol().builtin.metatrafficUnicastLocatorList ==
2525
0
                from.wire_protocol().builtin.metatrafficUnicastLocatorList) ||
2526
0
                !(to.wire_protocol().builtin.metatrafficMulticastLocatorList ==
2527
0
                from.wire_protocol().builtin.metatrafficMulticastLocatorList) ||
2528
0
                !(to.wire_protocol().builtin.metatraffic_external_unicast_locators ==
2529
0
                from.wire_protocol().builtin.metatraffic_external_unicast_locators) ||
2530
0
                !(to.wire_protocol().builtin.initialPeersList == from.wire_protocol().builtin.initialPeersList) ||
2531
0
                !(to.wire_protocol().builtin.readerHistoryMemoryPolicy ==
2532
0
                from.wire_protocol().builtin.readerHistoryMemoryPolicy) ||
2533
0
                !(to.wire_protocol().builtin.readerPayloadSize == from.wire_protocol().builtin.readerPayloadSize) ||
2534
0
                !(to.wire_protocol().builtin.writerHistoryMemoryPolicy ==
2535
0
                from.wire_protocol().builtin.writerHistoryMemoryPolicy) ||
2536
0
                !(to.wire_protocol().builtin.writerPayloadSize == from.wire_protocol().builtin.writerPayloadSize) ||
2537
0
                !(to.wire_protocol().builtin.mutation_tries == from.wire_protocol().builtin.mutation_tries) ||
2538
0
                !(to.wire_protocol().builtin.flow_controller_name ==
2539
0
                from.wire_protocol().builtin.flow_controller_name) ||
2540
0
                !(to.wire_protocol().builtin.avoid_builtin_multicast ==
2541
0
                from.wire_protocol().builtin.avoid_builtin_multicast) ||
2542
0
                !(to.wire_protocol().builtin.discovery_config.discoveryProtocol ==
2543
0
                from.wire_protocol().builtin.discovery_config.discoveryProtocol) ||
2544
0
                !(to.wire_protocol().builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol ==
2545
0
                from.wire_protocol().builtin.discovery_config.use_SIMPLE_EndpointDiscoveryProtocol) ||
2546
0
                !(to.wire_protocol().builtin.discovery_config.use_STATIC_EndpointDiscoveryProtocol ==
2547
0
                from.wire_protocol().builtin.discovery_config.use_STATIC_EndpointDiscoveryProtocol) ||
2548
0
                !(to.wire_protocol().builtin.discovery_config.discoveryServer_client_syncperiod ==
2549
0
                from.wire_protocol().builtin.discovery_config.discoveryServer_client_syncperiod) ||
2550
0
                !(to.wire_protocol().builtin.discovery_config.m_PDPfactory ==
2551
0
                from.wire_protocol().builtin.discovery_config.m_PDPfactory) ||
2552
0
                !(to.wire_protocol().builtin.discovery_config.leaseDuration ==
2553
0
                from.wire_protocol().builtin.discovery_config.leaseDuration) ||
2554
0
                !(to.wire_protocol().builtin.discovery_config.leaseDuration_announcementperiod ==
2555
0
                from.wire_protocol().builtin.discovery_config.leaseDuration_announcementperiod) ||
2556
0
                !(to.wire_protocol().builtin.discovery_config.initial_announcements ==
2557
0
                from.wire_protocol().builtin.discovery_config.initial_announcements) ||
2558
0
                !(to.wire_protocol().builtin.discovery_config.m_simpleEDP ==
2559
0
                from.wire_protocol().builtin.discovery_config.m_simpleEDP) ||
2560
0
                !(strcmp(to.wire_protocol().builtin.discovery_config.static_edp_xml_config(),
2561
0
                from.wire_protocol().builtin.discovery_config.static_edp_xml_config()) == 0) ||
2562
0
                !(to.wire_protocol().builtin.discovery_config.ignoreParticipantFlags ==
2563
0
                from.wire_protocol().builtin.discovery_config.ignoreParticipantFlags))))
2564
0
        {
2565
0
            updatable = false;
2566
0
            EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK, "WireProtocolConfigQos cannot be changed after the participant is enabled, "
2567
0
                    << "with the exception of builtin.discovery_config.m_DiscoveryServers");
2568
0
        }
2569
0
    }
2570
0
    if (!(to.transport() == from.transport()))
2571
0
    {
2572
0
        updatable = false;
2573
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK, "TransportConfigQos cannot be changed after the participant is enabled");
2574
0
    }
2575
0
    if (!(to.name() == from.name()))
2576
0
    {
2577
0
        updatable = false;
2578
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK, "Participant name cannot be changed after the participant is enabled");
2579
0
    }
2580
0
    if (!(to.builtin_controllers_sender_thread() == from.builtin_controllers_sender_thread()))
2581
0
    {
2582
0
        updatable = false;
2583
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
2584
0
                "Participant builtin_controllers_sender_thread cannot be changed after the participant is enabled");
2585
0
    }
2586
0
    if (!(to.timed_events_thread() == from.timed_events_thread()))
2587
0
    {
2588
0
        updatable = false;
2589
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
2590
0
                "Participant timed_events_thread cannot be changed after the participant is enabled");
2591
0
    }
2592
0
    if (!(to.discovery_server_thread() == from.discovery_server_thread()))
2593
0
    {
2594
0
        updatable = false;
2595
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
2596
0
                "Participant discovery_server_thread cannot be changed after the participant is enabled");
2597
0
    }
2598
0
    if (!(to.typelookup_service_thread() == from.typelookup_service_thread()))
2599
0
    {
2600
0
        updatable = false;
2601
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
2602
0
                "Participant typelookup_service_thread cannot be changed after the participant is enabled");
2603
0
    }
2604
#if HAVE_SECURITY
2605
    if (!(to.security_log_thread() == from.security_log_thread()))
2606
    {
2607
        updatable = false;
2608
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
2609
                "Participant security_log_thread cannot be changed after the participant is enabled");
2610
    }
2611
#endif // if HAVE_SECURITY
2612
0
    return updatable;
2613
0
}
2614
2615
void DomainParticipantImpl::create_instance_handle(
2616
        InstanceHandle_t& handle)
2617
0
{
2618
0
    using rtps::octet;
2619
2620
0
    uint32_t id = ++next_instance_id_;
2621
0
    handle = guid_;
2622
0
    handle.value[15] = 0x01; // Vendor specific;
2623
0
    handle.value[14] = static_cast<octet>(id & 0xFF);
2624
0
    handle.value[13] = static_cast<octet>((id >> 8) & 0xFF);
2625
0
    handle.value[12] = static_cast<octet>((id >> 16) & 0xFF);
2626
0
}
2627
2628
DomainParticipantListener* DomainParticipantImpl::get_listener_for(
2629
        const StatusMask& status)
2630
0
{
2631
0
    if (get_participant()->get_status_mask().is_active(status))
2632
0
    {
2633
0
        return get_listener();
2634
0
    }
2635
0
    return nullptr;
2636
0
}
2637
2638
bool DomainParticipantImpl::fill_type_information(
2639
        const TypeSupport& type,
2640
        xtypes::TypeInformationParameter& type_information)
2641
0
{
2642
0
    using utils::to_type_propagation;
2643
0
    using utils::TypePropagation;
2644
2645
0
    auto properties = qos_.properties();
2646
0
    auto type_propagation = to_type_propagation(properties);
2647
0
    bool should_assign_type_information =
2648
0
            (TypePropagation::TYPEPROPAGATION_ENABLED == type_propagation) ||
2649
0
            (TypePropagation::TYPEPROPAGATION_MINIMAL_BANDWIDTH == type_propagation);
2650
2651
0
    if (should_assign_type_information && (xtypes::TK_NONE != type->type_identifiers().type_identifier1()._d()))
2652
0
    {
2653
0
        xtypes::TypeInformation type_info;
2654
2655
0
        if (RETCODE_OK ==
2656
0
                fastdds::rtps::RTPSDomainImpl::get_instance()->type_object_registry_observer().get_type_information(
2657
0
                    type->type_identifiers(), type_info))
2658
0
        {
2659
0
            switch (type_propagation)
2660
0
            {
2661
0
                case TypePropagation::TYPEPROPAGATION_ENABLED:
2662
0
                {
2663
                    // Use both complete and minimal type information
2664
0
                    type_information.type_information = type_info;
2665
0
                    break;
2666
0
                }
2667
0
                case TypePropagation::TYPEPROPAGATION_MINIMAL_BANDWIDTH:
2668
0
                {
2669
                    // Use minimal type information only
2670
0
                    type_information.type_information.minimal() = type_info.minimal();
2671
0
                    break;
2672
0
                }
2673
0
                default:
2674
                    // This should never happen as other cases are protected by should_assign_type_information
2675
0
                    assert(false);
2676
0
                    break;
2677
0
            }
2678
2679
0
            type_information.assigned(true);
2680
0
            return true;
2681
0
        }
2682
0
    }
2683
2684
0
    return false;
2685
0
}
2686
2687
}  // namespace dds
2688
}  // namespace fastdds
2689
}  // namespace eprosima