Coverage Report

Created: 2025-07-03 06:58

/src/Fast-DDS/src/cpp/rtps/RTPSDomain.cpp
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/*
16
 * @file RTPSDomain.cpp
17
 */
18
19
#include <fastdds/rtps/RTPSDomain.hpp>
20
21
#include <chrono>
22
#include <cstdlib>
23
#include <fstream>
24
#include <memory>
25
#include <regex>
26
#include <string>
27
#include <thread>
28
29
#include <fastdds/dds/log/Log.hpp>
30
#include <fastdds/LibrarySettings.hpp>
31
#include <fastdds/rtps/history/WriterHistory.hpp>
32
#include <fastdds/rtps/participant/RTPSParticipant.hpp>
33
#include <fastdds/rtps/reader/RTPSReader.hpp>
34
#include <fastdds/rtps/writer/RTPSWriter.hpp>
35
#include <fastdds/utils/IPFinder.hpp>
36
#include <fastdds/utils/IPLocator.hpp>
37
#include <fastdds/utils/md5.hpp>
38
39
#include <rtps/attributes/ServerAttributes.hpp>
40
#include <rtps/common/GuidUtils.hpp>
41
#include <rtps/network/utils/external_locators.hpp>
42
#include <rtps/participant/RTPSParticipantImpl.hpp>
43
#include <rtps/reader/BaseReader.hpp>
44
#include <rtps/reader/LocalReaderPointer.hpp>
45
#include <rtps/RTPSDomainImpl.hpp>
46
#include <rtps/transport/TCPv4Transport.h>
47
#include <rtps/transport/TCPv6Transport.h>
48
#include <rtps/transport/test_UDPv4Transport.h>
49
#include <rtps/transport/UDPv4Transport.h>
50
#include <rtps/transport/UDPv6Transport.h>
51
#include <rtps/writer/BaseWriter.hpp>
52
#include <utils/Host.hpp>
53
#include <utils/SystemCommandBuilder.hpp>
54
#include <utils/SystemInfo.hpp>
55
#include <xmlparser/XMLProfileManager.h>
56
57
namespace eprosima {
58
namespace fastdds {
59
namespace rtps {
60
61
const char* EASY_MODE_SERVICE_PROFILE =
62
        "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>\n"
63
        "<dds xmlns=\"http://www.eprosima.com/XMLSchemas/fastRTPS_Profiles\">\n"
64
        "    <profiles>\n"
65
        "        <data_writer profile_name=\"service\">\n"
66
        "            <qos>\n"
67
        "                <reliability>\n"
68
        "                    <max_blocking_time>\n"
69
        "                        <sec>1</sec>\n"
70
        "                        <nanosec>0</nanosec>\n"
71
        "                    </max_blocking_time>\n"
72
        "                </reliability>\n"
73
        "            </qos>\n"
74
        "        </data_writer>\n"
75
        "    </profiles>\n"
76
        "</dds>\n";
77
78
template<typename _Descriptor>
79
bool has_user_transport(
80
        const RTPSParticipantAttributes& att)
81
0
{
82
0
    const auto& transports = att.userTransports;
83
0
    const auto end_it = transports.end();
84
0
    return end_it != std::find_if(transports.begin(), end_it,
85
0
                   [](const std::shared_ptr<eprosima::fastdds::rtps::TransportDescriptorInterface>& item)
86
0
                   {
87
0
                       return nullptr != dynamic_cast<_Descriptor*>(item.get());
88
0
                   });
Unexecuted instantiation: eprosima::fastdds::rtps::has_user_transport<eprosima::fastdds::rtps::UDPv6TransportDescriptor>(eprosima::fastdds::rtps::RTPSParticipantAttributes const&)::{lambda(std::__1::shared_ptr<eprosima::fastdds::rtps::TransportDescriptorInterface> const&)#1}::operator()(std::__1::shared_ptr<eprosima::fastdds::rtps::TransportDescriptorInterface> const&) const
Unexecuted instantiation: eprosima::fastdds::rtps::has_user_transport<eprosima::fastdds::rtps::TCPv4TransportDescriptor>(eprosima::fastdds::rtps::RTPSParticipantAttributes const&)::{lambda(std::__1::shared_ptr<eprosima::fastdds::rtps::TransportDescriptorInterface> const&)#1}::operator()(std::__1::shared_ptr<eprosima::fastdds::rtps::TransportDescriptorInterface> const&) const
Unexecuted instantiation: eprosima::fastdds::rtps::has_user_transport<eprosima::fastdds::rtps::TCPv6TransportDescriptor>(eprosima::fastdds::rtps::RTPSParticipantAttributes const&)::{lambda(std::__1::shared_ptr<eprosima::fastdds::rtps::TransportDescriptorInterface> const&)#1}::operator()(std::__1::shared_ptr<eprosima::fastdds::rtps::TransportDescriptorInterface> const&) const
89
0
}
Unexecuted instantiation: bool eprosima::fastdds::rtps::has_user_transport<eprosima::fastdds::rtps::UDPv6TransportDescriptor>(eprosima::fastdds::rtps::RTPSParticipantAttributes const&)
Unexecuted instantiation: bool eprosima::fastdds::rtps::has_user_transport<eprosima::fastdds::rtps::TCPv4TransportDescriptor>(eprosima::fastdds::rtps::RTPSParticipantAttributes const&)
Unexecuted instantiation: bool eprosima::fastdds::rtps::has_user_transport<eprosima::fastdds::rtps::TCPv6TransportDescriptor>(eprosima::fastdds::rtps::RTPSParticipantAttributes const&)
90
91
static void guid_prefix_create(
92
        uint32_t ID,
93
        GuidPrefix_t& guidP)
94
0
{
95
0
    eprosima::fastdds::rtps::GuidUtils::instance().guid_prefix_create(ID, guidP);
96
0
}
97
98
std::shared_ptr<RTPSDomainImpl> RTPSDomainImpl::get_instance()
99
0
{
100
0
    static std::shared_ptr<RTPSDomainImpl> instance = std::make_shared<RTPSDomainImpl>();
101
0
    return instance;
102
0
}
103
104
void RTPSDomain::set_filewatch_thread_config(
105
        const fastdds::rtps::ThreadSettings& watch_thread,
106
        const fastdds::rtps::ThreadSettings& callback_thread)
107
0
{
108
0
    RTPSDomainImpl::set_filewatch_thread_config(watch_thread, callback_thread);
109
0
}
110
111
void RTPSDomain::stopAll()
112
0
{
113
0
    RTPSDomainImpl::stopAll();
114
0
}
115
116
RTPSParticipant* RTPSDomain::createParticipant(
117
        uint32_t domain_id,
118
        const RTPSParticipantAttributes& attrs,
119
        RTPSParticipantListener* listen)
120
0
{
121
0
    return RTPSDomain::createParticipant(domain_id, true, attrs, listen);
122
0
}
123
124
RTPSParticipant* RTPSDomain::createParticipant(
125
        uint32_t domain_id,
126
        bool enabled,
127
        const RTPSParticipantAttributes& attrs,
128
        RTPSParticipantListener* listen)
129
0
{
130
0
    RTPSParticipant* part = nullptr;
131
132
    // Try to create a participant with the default server-client setup.
133
0
    part = RTPSDomainImpl::create_client_server_participant(domain_id, enabled, attrs, listen);
134
135
0
    if (!part)
136
0
    {
137
        // Try to create the participant with the input attributes if the auto server-client setup failed
138
        // or was omitted.
139
0
        part = RTPSDomainImpl::createParticipant(domain_id, enabled, attrs, listen);
140
0
        if (!part)
141
0
        {
142
0
            EPROSIMA_LOG_ERROR(RTPS_DOMAIN, "Unable to create the participant");
143
0
        }
144
0
    }
145
0
    else
146
0
    {
147
0
        EPROSIMA_LOG_INFO(RTPS_DOMAIN, "Auto default server-client setup: Client created.");
148
0
    }
149
150
0
    return part;
151
0
}
152
153
RTPSParticipant* RTPSDomain::create_client_server_participant(
154
        uint32_t domain_id,
155
        bool enabled,
156
        const RTPSParticipantAttributes& attrs,
157
        RTPSParticipantListener* plisten /* = nullptr */)
158
0
{
159
0
    return RTPSDomainImpl::create_client_server_participant(domain_id, enabled, attrs, plisten);
160
0
}
161
162
bool RTPSDomain::removeRTPSParticipant(
163
        RTPSParticipant* p)
164
0
{
165
0
    return RTPSDomainImpl::removeRTPSParticipant(p);
166
0
}
167
168
void RTPSDomainImpl::stopAll()
169
0
{
170
0
    auto instance = get_instance();
171
0
    std::unique_lock<std::mutex> lock(instance->m_mutex);
172
0
    EPROSIMA_LOG_INFO(RTPS_PARTICIPANT, "DELETING ALL ENDPOINTS IN THIS DOMAIN");
173
174
    // Stop monitoring environment file
175
0
    SystemInfo::stop_watching_file(instance->file_watch_handle_);
176
177
0
    while (instance->m_RTPSParticipants.size() > 0)
178
0
    {
179
0
        t_p_RTPSParticipant participant = instance->m_RTPSParticipants.back();
180
0
        instance->m_RTPSParticipantIDs.erase(participant.second->getRTPSParticipantID());
181
0
        instance->m_RTPSParticipants.pop_back();
182
183
0
        lock.unlock();
184
0
        instance->removeRTPSParticipant_nts(participant);
185
0
        lock.lock();
186
0
    }
187
188
0
    xmlparser::XMLProfileManager::DeleteInstance();
189
190
0
    EPROSIMA_LOG_INFO(RTPS_PARTICIPANT, "RTPSParticipants deleted correctly ");
191
0
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
192
0
}
193
194
RTPSParticipant* RTPSDomainImpl::createParticipant(
195
        uint32_t domain_id,
196
        bool enabled,
197
        const RTPSParticipantAttributes& attrs,
198
        RTPSParticipantListener* listen)
199
0
{
200
0
    EPROSIMA_LOG_INFO(RTPS_PARTICIPANT, "");
201
202
0
    RTPSParticipantAttributes PParam = attrs;
203
204
0
    if (PParam.builtin.discovery_config.leaseDuration < dds::c_TimeInfinite &&
205
0
            PParam.builtin.discovery_config.leaseDuration <=
206
0
            PParam.builtin.discovery_config.leaseDuration_announcementperiod)
207
0
    {
208
0
        EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT,
209
0
                "RTPSParticipant Attributes: LeaseDuration should be >= leaseDuration announcement period");
210
0
        return nullptr;
211
0
    }
212
213
    // Only the first time, initialize environment file watch if the corresponding environment variable is set
214
0
    auto instance = get_instance();
215
0
    if (!instance->file_watch_handle_)
216
0
    {
217
0
        std::string filename = SystemInfo::get_environment_file();
218
0
        if (!filename.empty() && SystemInfo::file_exists(filename))
219
0
        {
220
0
            std::lock_guard<std::mutex> guard(instance->m_mutex);
221
            // Create filewatch
222
0
            instance->file_watch_handle_ = SystemInfo::watch_file(filename, RTPSDomainImpl::file_watch_callback,
223
0
                            instance->watch_thread_config_, instance->callback_thread_config_);
224
0
        }
225
0
        else if (!filename.empty())
226
0
        {
227
0
            EPROSIMA_LOG_WARNING(RTPS_PARTICIPANT, filename + " does not exist. File watching not initialized.");
228
0
        }
229
0
    }
230
231
0
    uint32_t ID;
232
0
    if (!instance->prepare_participant_id(PParam.participantID, ID))
233
0
    {
234
0
        return nullptr;
235
0
    }
236
237
0
    if (!PParam.defaultUnicastLocatorList.isValid())
238
0
    {
239
0
        EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "Default Unicast Locator List contains invalid Locator");
240
0
        return nullptr;
241
0
    }
242
0
    if (!PParam.defaultMulticastLocatorList.isValid())
243
0
    {
244
0
        EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "Default Multicast Locator List contains invalid Locator");
245
0
        return nullptr;
246
0
    }
247
248
0
    PParam.participantID = ID;
249
250
    // Generate a new GuidPrefix_t
251
0
    GuidPrefix_t guidP;
252
0
    guid_prefix_create(instance->get_id_for_prefix(ID), guidP);
253
0
    if (!PParam.builtin.metatraffic_external_unicast_locators.empty())
254
0
    {
255
0
        fastdds::rtps::LocatorList locators;
256
0
        fastdds::rtps::IPFinder::getIP4Address(&locators);
257
0
        fastdds::rtps::network::external_locators::add_external_locators(locators,
258
0
                PParam.builtin.metatraffic_external_unicast_locators);
259
0
        uint16_t host_id = Host::compute_id(locators);
260
0
        guidP.value[2] = static_cast<octet>(host_id & 0xFF);
261
0
        guidP.value[3] = static_cast<octet>((host_id >> 8) & 0xFF);
262
0
    }
263
264
0
    RTPSParticipant* p = new RTPSParticipant(nullptr);
265
0
    RTPSParticipantImpl* pimpl = nullptr;
266
267
    // If we force the participant to have a specific prefix we must define a different persistence GuidPrefix_t that
268
    // would ensure builtin endpoints are able to differentiate between a communication loss and a participant recovery
269
0
    if (PParam.prefix != c_GuidPrefix_Unknown)
270
0
    {
271
0
        pimpl = new RTPSParticipantImpl(domain_id, PParam, PParam.prefix, guidP, p, listen);
272
0
    }
273
0
    else
274
0
    {
275
0
        if (PParam.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::BACKUP)
276
0
        {
277
0
            EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "Specifying a GUID prefix is mandatory for BACKUP Discovery Servers.");
278
0
            return nullptr;
279
0
        }
280
0
        pimpl = new RTPSParticipantImpl(domain_id, PParam, guidP, p, listen);
281
0
    }
282
283
    // Check implementation was correctly initialized
284
0
    if (!pimpl->is_initialized())
285
0
    {
286
0
        EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "Cannot create participant due to initialization error");
287
0
        delete pimpl;
288
0
        return nullptr;
289
0
    }
290
291
    // Above constructors create the sender resources. If a given listening port cannot be allocated an iterative
292
    // mechanism will allocate another by default. Change the default listening port is unacceptable for
293
    // discovery server Participant.
294
0
    if ((PParam.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SERVER
295
0
            || PParam.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::BACKUP)
296
0
            && pimpl->did_mutation_took_place_on_meta(
297
0
                PParam.builtin.metatrafficMulticastLocatorList,
298
0
                PParam.builtin.metatrafficUnicastLocatorList))
299
0
    {
300
0
        if (PParam.builtin.metatrafficMulticastLocatorList.empty() &&
301
0
                PParam.builtin.metatrafficUnicastLocatorList.empty())
302
0
        {
303
0
            EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "Discovery Server requires to specify a listening address.");
304
0
        }
305
0
        else
306
0
        {
307
0
            EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT,
308
0
                    "Discovery Server wasn't able to allocate the specified listening port.");
309
0
        }
310
311
0
        delete pimpl;
312
0
        return nullptr;
313
0
    }
314
315
    // Check there is at least one transport registered.
316
0
    if (!pimpl->networkFactoryHasRegisteredTransports())
317
0
    {
318
0
        EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "Cannot create participant, because there is any transport");
319
0
        delete pimpl;
320
0
        return nullptr;
321
0
    }
322
323
0
    {
324
0
        std::lock_guard<std::mutex> guard(instance->m_mutex);
325
0
        instance->m_RTPSParticipants.push_back(t_p_RTPSParticipant(p, pimpl));
326
0
        instance->m_RTPSParticipantIDs[ID].used = true;
327
0
        instance->m_RTPSParticipantIDs[ID].reserved = true;
328
0
    }
329
330
    // Check the environment file in case it was modified during participant creation leading to a missed callback.
331
0
    if ((PParam.builtin.discovery_config.discoveryProtocol != DiscoveryProtocol::CLIENT) &&
332
0
            instance->file_watch_handle_)
333
0
    {
334
0
        pimpl->environment_file_has_changed();
335
0
    }
336
337
0
    if (enabled)
338
0
    {
339
        // Start protocols
340
0
        pimpl->enable();
341
0
    }
342
0
    return p;
343
0
}
344
345
RTPSParticipant* RTPSDomainImpl::create_client_server_participant(
346
        uint32_t domain_id,
347
        bool enabled,
348
        const RTPSParticipantAttributes& attrs,
349
        RTPSParticipantListener* plisten)
350
0
{
351
0
    RTPSParticipant* part = nullptr;
352
0
    RTPSParticipantAttributes env_attrs = attrs;
353
354
    // Fill participant attributes using set environment variables.
355
    // Note: If ROS2_EASY_MODE is configured and it is not set in the input participant attributes, it will be set.
356
    // In other case, the previous easy_mode_ip value will be kept and ROS2_EASY_MODE will be ignored.
357
0
    if (!client_server_environment_attributes_override(domain_id, env_attrs))
358
0
    {
359
0
        EPROSIMA_LOG_INFO(RTPS_DOMAIN,
360
0
                "ParticipantAttributes not overriden. Skipping auto server-client default setup.");
361
0
        return nullptr;
362
0
    }
363
364
0
    part = createParticipant(domain_id, enabled, env_attrs, plisten);
365
366
0
    if (!part)
367
0
    {
368
        // Unable to create auto server-client default participants
369
0
        EPROSIMA_LOG_ERROR(RTPS_DOMAIN, "Auto default server-client setup: Unable to create the client.");
370
0
        return nullptr;
371
0
    }
372
373
    // Launch the discovery server daemon if Easy Mode is enabled
374
0
    if (!env_attrs.easy_mode_ip.empty())
375
0
    {
376
0
        if (!run_easy_mode_discovery_server(domain_id, env_attrs.easy_mode_ip))
377
0
        {
378
0
            EPROSIMA_LOG_ERROR(RTPS_DOMAIN, "Error launching Easy Mode discovery server daemon");
379
            // Remove the client participant
380
0
            removeRTPSParticipant(part);
381
0
            part = nullptr;
382
0
            return nullptr;
383
0
        }
384
385
0
        EPROSIMA_LOG_INFO(RTPS_DOMAIN, "Easy Mode discovery server launched successfully");
386
0
    }
387
388
0
    EPROSIMA_LOG_INFO(RTPS_DOMAIN, "Auto default server-client setup: Default client created.");
389
390
    // At this point, Discovery Protocol has changed from SIMPLE to CLIENT or SUPER_CLIENT.
391
    // Set client_override_ flag to true (Simple Participant turned into a Client Participant).
392
0
    part->mp_impl->client_override(true);
393
394
0
    return part;
395
0
}
396
397
bool RTPSDomainImpl::removeRTPSParticipant(
398
        RTPSParticipant* p)
399
0
{
400
0
    if (p != nullptr)
401
0
    {
402
0
        assert((p->mp_impl != nullptr) && "This participant has been previously invalidated");
403
404
0
        auto instance = get_instance();
405
0
        std::unique_lock<std::mutex> lock(instance->m_mutex);
406
0
        for (auto it = instance->m_RTPSParticipants.begin(); it != instance->m_RTPSParticipants.end(); ++it)
407
0
        {
408
0
            if (it->second->getGuid().guidPrefix == p->getGuid().guidPrefix)
409
0
            {
410
0
                RTPSDomainImpl::t_p_RTPSParticipant participant = *it;
411
0
                instance->m_RTPSParticipants.erase(it);
412
0
                uint32_t participant_id = participant.second->getRTPSParticipantID();
413
0
                instance->m_RTPSParticipantIDs[participant_id].used = false;
414
0
                instance->m_RTPSParticipantIDs[participant_id].reserved = false;
415
0
                lock.unlock();
416
0
                instance->removeRTPSParticipant_nts(participant);
417
0
                return true;
418
0
            }
419
0
        }
420
0
    }
421
0
    EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "RTPSParticipant not valid or not recognized");
422
0
    return false;
423
0
}
424
425
void RTPSDomainImpl::removeRTPSParticipant_nts(
426
        RTPSDomainImpl::t_p_RTPSParticipant& participant)
427
0
{
428
0
    participant.second->disable();
429
    // The destructor of RTPSParticipantImpl already deletes the associated RTPSParticipant and sets
430
    // its pointer to the RTPSParticipant to nullptr, so there is no need to do it here manually.
431
0
    delete(participant.second);
432
0
}
433
434
RTPSWriter* RTPSDomain::createRTPSWriter(
435
        RTPSParticipant* p,
436
        WriterAttributes& watt,
437
        WriterHistory* hist,
438
        WriterListener* listen)
439
0
{
440
0
    RTPSParticipantImpl* impl = RTPSDomainImpl::find_local_participant(p->getGuid());
441
0
    if (impl)
442
0
    {
443
0
        RTPSWriter* ret_val = nullptr;
444
0
        if (impl->createWriter(&ret_val, watt, hist, listen))
445
0
        {
446
0
            return ret_val;
447
0
        }
448
0
    }
449
450
0
    return nullptr;
451
0
}
452
453
RTPSWriter* RTPSDomain::createRTPSWriter(
454
        RTPSParticipant* p,
455
        const EntityId_t& entity_id,
456
        WriterAttributes& watt,
457
        WriterHistory* hist,
458
        WriterListener* listen)
459
0
{
460
0
    RTPSParticipantImpl* impl = RTPSDomainImpl::find_local_participant(p->getGuid());
461
0
    if (impl)
462
0
    {
463
0
        RTPSWriter* ret_val = nullptr;
464
0
        if (impl->createWriter(&ret_val, watt, hist, listen, entity_id))
465
0
        {
466
0
            return ret_val;
467
0
        }
468
0
    }
469
470
0
    return nullptr;
471
0
}
472
473
RTPSWriter* RTPSDomainImpl::create_rtps_writer(
474
        RTPSParticipant* p,
475
        const EntityId_t& entity_id,
476
        WriterAttributes& watt,
477
        WriterHistory* hist,
478
        WriterListener* listen)
479
0
{
480
0
    RTPSParticipantImpl* impl = RTPSDomainImpl::find_local_participant(p->getGuid());
481
0
    if (impl)
482
0
    {
483
0
        RTPSWriter* ret_val = nullptr;
484
0
        if (impl->create_writer(&ret_val, watt, hist, listen, entity_id, false))
485
0
        {
486
0
            return ret_val;
487
0
        }
488
0
    }
489
490
0
    return nullptr;
491
0
}
492
493
bool RTPSDomain::removeRTPSWriter(
494
        RTPSWriter* writer)
495
0
{
496
0
    return RTPSDomainImpl::removeRTPSWriter(writer);
497
0
}
498
499
bool RTPSDomainImpl::removeRTPSWriter(
500
        RTPSWriter* writer)
501
0
{
502
0
    if (writer != nullptr)
503
0
    {
504
0
        auto instance = get_instance();
505
0
        std::unique_lock<std::mutex> lock(instance->m_mutex);
506
0
        for (auto it = instance->m_RTPSParticipants.begin(); it != instance->m_RTPSParticipants.end(); ++it)
507
0
        {
508
0
            if (it->first->getGuid().guidPrefix == writer->getGuid().guidPrefix)
509
0
            {
510
0
                t_p_RTPSParticipant participant = *it;
511
0
                lock.unlock();
512
0
                return participant.second->deleteUserEndpoint(writer->getGuid());
513
0
            }
514
0
        }
515
0
    }
516
0
    return false;
517
0
}
518
519
RTPSReader* RTPSDomain::createRTPSReader(
520
        RTPSParticipant* p,
521
        ReaderAttributes& ratt,
522
        ReaderHistory* rhist,
523
        ReaderListener* rlisten)
524
0
{
525
0
    RTPSParticipantImpl* impl = RTPSDomainImpl::find_local_participant(p->getGuid());
526
0
    if (impl)
527
0
    {
528
0
        RTPSReader* reader;
529
0
        if (impl->createReader(&reader, ratt, rhist, rlisten))
530
0
        {
531
0
            return reader;
532
0
        }
533
0
    }
534
0
    return nullptr;
535
0
}
536
537
RTPSReader* RTPSDomain::createRTPSReader(
538
        RTPSParticipant* p,
539
        ReaderAttributes& ratt,
540
        const std::shared_ptr<IPayloadPool>& payload_pool,
541
        ReaderHistory* rhist,
542
        ReaderListener* rlisten)
543
0
{
544
0
    RTPSParticipantImpl* impl = RTPSDomainImpl::find_local_participant(p->getGuid());
545
0
    if (impl)
546
0
    {
547
0
        RTPSReader* reader;
548
0
        if (impl->createReader(&reader, ratt, payload_pool, rhist, rlisten))
549
0
        {
550
0
            return reader;
551
0
        }
552
0
    }
553
0
    return nullptr;
554
0
}
555
556
RTPSReader* RTPSDomain::createRTPSReader(
557
        RTPSParticipant* p,
558
        const EntityId_t& entity_id,
559
        ReaderAttributes& ratt,
560
        const std::shared_ptr<IPayloadPool>& payload_pool,
561
        ReaderHistory* rhist,
562
        ReaderListener* rlisten)
563
0
{
564
0
    RTPSParticipantImpl* impl = p->mp_impl;
565
0
    if (impl)
566
0
    {
567
0
        RTPSReader* reader;
568
0
        if (impl->createReader(&reader, ratt, payload_pool, rhist, rlisten, entity_id))
569
0
        {
570
0
            return reader;
571
0
        }
572
0
    }
573
0
    return nullptr;
574
0
}
575
576
bool RTPSDomain::removeRTPSReader(
577
        RTPSReader* reader)
578
0
{
579
0
    return RTPSDomainImpl::removeRTPSReader(reader);
580
0
}
581
582
bool RTPSDomainImpl::removeRTPSReader(
583
        RTPSReader* reader)
584
0
{
585
0
    if (reader !=  nullptr)
586
0
    {
587
0
        auto instance = get_instance();
588
0
        std::unique_lock<std::mutex> lock(instance->m_mutex);
589
0
        for (auto it = instance->m_RTPSParticipants.begin(); it != instance->m_RTPSParticipants.end(); ++it)
590
0
        {
591
0
            if (it->first->getGuid().guidPrefix == reader->getGuid().guidPrefix)
592
0
            {
593
0
                t_p_RTPSParticipant participant = *it;
594
0
                lock.unlock();
595
0
                return participant.second->deleteUserEndpoint(reader->getGuid());
596
0
            }
597
0
        }
598
0
    }
599
0
    return false;
600
0
}
601
602
bool RTPSDomainImpl::client_server_environment_attributes_override(
603
        uint32_t domain_id,
604
        RTPSParticipantAttributes& att)
605
0
{
606
    // Check the specified discovery protocol: if other than simple it has priority over ros environment variable
607
0
    if (att.builtin.discovery_config.discoveryProtocol != DiscoveryProtocol::SIMPLE)
608
0
    {
609
0
        EPROSIMA_LOG_INFO(RTPS_DOMAIN, "Detected non simple discovery protocol attributes."
610
0
                << " Ignoring auto default client-server setup.");
611
0
        return false;
612
0
    }
613
614
    // We only make the attributes copy when we are sure is worth
615
    // Is up to the caller guarantee the att argument is not modified during the call
616
0
    RTPSParticipantAttributes client_att(att);
617
618
    /* Check whether we need to initialize in easy mode */
619
620
    // Get the IP of the remote discovery server.
621
    // 1. Check if it is configured in RTPSParticipantAttributes
622
    // 2. If not, check if it is configured in the environment variable
623
0
    std::string ros_easy_mode_ip_env = ros_easy_mode_env();
624
625
0
    if (!att.easy_mode_ip.empty())
626
0
    {
627
0
        if (!ros_easy_mode_ip_env.empty())
628
0
        {
629
0
            EPROSIMA_LOG_WARNING(RTPSDOMAIN, "Easy mode IP is configured both in RTPSParticipantAttributes and "
630
0
                    << ROS2_EASY_MODE_URI << " environment variable, ignoring the latter.");
631
0
        }
632
0
        client_att.easy_mode_ip = att.easy_mode_ip;
633
0
    }
634
0
    else
635
0
    {
636
0
        client_att.easy_mode_ip = ros_easy_mode_ip_env;
637
0
    }
638
639
0
    if (client_att.easy_mode_ip.empty())
640
0
    {
641
        // Retrieve the info from the environment variable
642
0
        LocatorList_t& server_list = client_att.builtin.discovery_config.m_DiscoveryServers;
643
0
        if (load_environment_server_info(server_list) && server_list.empty())
644
0
        {
645
            // It's not an error, the environment variable may not be set. Any issue with environment
646
            // variable syntax is EPROSIMA_LOG_ERROR already
647
0
            return false;
648
0
        }
649
650
        // Check if some address requires the UDPv6, TCPv4 or TCPv6 transport
651
0
        if (server_list.has_kind<LOCATOR_KIND_UDPv6>() &&
652
0
                !has_user_transport<fastdds::rtps::UDPv6TransportDescriptor>(client_att))
653
0
        {
654
            // Extend builtin transports with the UDPv6 transport
655
0
            auto descriptor = std::make_shared<fastdds::rtps::UDPv6TransportDescriptor>();
656
0
            descriptor->sendBufferSize = client_att.sendSocketBufferSize;
657
0
            descriptor->receiveBufferSize = client_att.listenSocketBufferSize;
658
0
            client_att.userTransports.push_back(std::move(descriptor));
659
0
        }
660
0
        if (server_list.has_kind<LOCATOR_KIND_TCPv4>() &&
661
0
                !has_user_transport<fastdds::rtps::TCPv4TransportDescriptor>(client_att))
662
0
        {
663
            // Extend builtin transports with the TCPv4 transport
664
0
            auto descriptor = std::make_shared<fastdds::rtps::TCPv4TransportDescriptor>();
665
            // Add automatic port
666
0
            descriptor->add_listener_port(0);
667
0
            descriptor->sendBufferSize = client_att.sendSocketBufferSize;
668
0
            descriptor->receiveBufferSize = client_att.listenSocketBufferSize;
669
0
            client_att.userTransports.push_back(std::move(descriptor));
670
0
        }
671
0
        if (server_list.has_kind<LOCATOR_KIND_TCPv6>() &&
672
0
                !has_user_transport<fastdds::rtps::TCPv6TransportDescriptor>(client_att))
673
0
        {
674
            // Extend builtin transports with the TCPv6 transport
675
0
            auto descriptor = std::make_shared<fastdds::rtps::TCPv6TransportDescriptor>();
676
            // Add automatic port
677
0
            descriptor->add_listener_port(0);
678
0
            descriptor->sendBufferSize = client_att.sendSocketBufferSize;
679
0
            descriptor->receiveBufferSize = client_att.listenSocketBufferSize;
680
0
            client_att.userTransports.push_back(std::move(descriptor));
681
0
        }
682
683
0
        EPROSIMA_LOG_INFO(RTPS_DOMAIN, "Detected auto client-server environment variable."
684
0
                << "Trying to create client with the default server setup: "
685
0
                << client_att.builtin.discovery_config.m_DiscoveryServers);
686
687
0
        client_att.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::CLIENT;
688
        // RemoteServerAttributes already fill in above
689
690
        // Check if the client must become a super client
691
0
        if (ros_super_client_env())
692
0
        {
693
0
            client_att.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SUPER_CLIENT;
694
0
        }
695
0
    }
696
0
    else
697
0
    {
698
        // SUPER_CLIENT
699
0
        client_att.builtin.discovery_config.discoveryProtocol = DiscoveryProtocol::SUPER_CLIENT;
700
701
        // P2P transport. Similar to LARGE_DATA, but with UDPv4 unicast
702
0
        client_att.useBuiltinTransports = false;
703
0
        client_att.setup_transports(BuiltinTransports::P2P);
704
705
        // Ignore initialpeers
706
0
        client_att.builtin.initialPeersList = LocatorList();
707
708
0
        eprosima::fastdds::rtps::PortParameters port_params;
709
710
0
        auto domain_port = port_params.get_discovery_server_port(domain_id);
711
712
        // Add user traffic TCP
713
0
        eprosima::fastdds::rtps::Locator_t locator_tcp;
714
0
        locator_tcp.kind = LOCATOR_KIND_TCPv4;
715
716
0
        IPLocator::setPhysicalPort(locator_tcp, 0);
717
0
        IPLocator::setLogicalPort(locator_tcp, 0);
718
        // Initialize to the wan interface
719
0
        IPLocator::setIPv4(locator_tcp, "0.0.0.0");
720
721
0
        client_att.defaultUnicastLocatorList.push_back(locator_tcp);
722
723
        // Add remote DS based on port
724
0
        eprosima::fastdds::rtps::Locator_t locator_udp;
725
0
        locator_udp.kind = LOCATOR_KIND_UDPv4;
726
727
0
        locator_udp.port = domain_port;
728
0
        IPLocator::setIPv4(locator_udp, 127, 0, 0, 1);
729
730
        // Point to the well known DS port in the corresponding domain
731
0
        client_att.builtin.discovery_config.m_DiscoveryServers.push_back(locator_udp);
732
733
        // Load the 'service' profile for ROS2_EASY_MODE services if there is no existing profile yet
734
0
        xmlparser::PublisherAttributes attr;
735
0
        auto ret_if = xmlparser::XMLProfileManager::fillPublisherAttributes("service", attr, false);
736
0
        if (ret_if == xmlparser::XMLP_ret::XML_ERROR)
737
0
        {
738
            // An XML_ERROR is returned if there is no existing profile for the given name
739
0
            xmlparser::XMLProfileManager::loadXMLString(EASY_MODE_SERVICE_PROFILE, strlen(EASY_MODE_SERVICE_PROFILE));
740
0
            EPROSIMA_LOG_INFO(RTPS_DOMAIN, "Loaded service profile for ROS2_EASY_MODE servers");
741
0
        }
742
0
        else
743
0
        {
744
            // There is already a profile with the given name. Do not overwrite it
745
0
            EPROSIMA_LOG_WARNING(RTPS_DOMAIN, "An XML profile for 'service' was found. When using ROS2_EASY_MODE, please ensure"
746
0
                    " the max_blocking_time is configured with a value higher than the default.");
747
0
        }
748
0
    }
749
750
0
    att = client_att;
751
752
0
    return true;
753
0
}
754
755
uint32_t RTPSDomainImpl::getNewId()
756
0
{
757
    // Get the smallest available participant ID.
758
    // Settings like maxInitialPeersRange control how many participants a peer
759
    // will look for on this host.
760
    // Choosing the smallest value ensures peers using unicast discovery will
761
    // find this participant as long as the total number of participants has
762
    // not exceeded the number of peers they will look for.
763
0
    uint32_t i = 0;
764
0
    while (m_RTPSParticipantIDs[i].reserved || m_RTPSParticipantIDs[i].used)
765
0
    {
766
0
        ++i;
767
0
    }
768
0
    m_RTPSParticipantIDs[i].reserved = true;
769
0
    return i;
770
0
}
771
772
bool RTPSDomainImpl::prepare_participant_id(
773
        int32_t input_id,
774
        uint32_t& participant_id)
775
0
{
776
0
    std::lock_guard<std::mutex> guard(m_mutex);
777
0
    if (input_id < 0)
778
0
    {
779
0
        participant_id = getNewId();
780
0
    }
781
0
    else
782
0
    {
783
0
        participant_id = input_id;
784
0
        if (m_RTPSParticipantIDs[participant_id].used == true)
785
0
        {
786
0
            EPROSIMA_LOG_ERROR(RTPS_PARTICIPANT, "RTPSParticipant with the same ID already exists");
787
0
            return false;
788
0
        }
789
0
    }
790
0
    return true;
791
0
}
792
793
uint32_t RTPSDomainImpl::get_id_for_prefix(
794
        uint32_t participant_id)
795
0
{
796
0
    uint32_t ret = participant_id;
797
0
    if (ret < 0x10000)
798
0
    {
799
0
        std::lock_guard<std::mutex> guard(m_mutex);
800
0
        ret |= m_RTPSParticipantIDs[participant_id].counter;
801
0
        m_RTPSParticipantIDs[participant_id].counter += 0x10000;
802
0
    }
803
804
0
    return ret;
805
0
}
806
807
bool RTPSDomainImpl::reserve_participant_id(
808
        int32_t& participant_id)
809
0
{
810
0
    std::lock_guard<std::mutex> guard(m_mutex);
811
0
    if (participant_id < 0)
812
0
    {
813
0
        participant_id = getNewId();
814
0
    }
815
0
    else
816
0
    {
817
0
        if (m_RTPSParticipantIDs[participant_id].reserved == true)
818
0
        {
819
0
            return false;
820
0
        }
821
0
        m_RTPSParticipantIDs[participant_id].reserved = true;
822
0
    }
823
824
0
    return true;
825
0
}
826
827
bool RTPSDomainImpl::create_participant_guid(
828
        int32_t& participant_id,
829
        GUID_t& guid)
830
0
{
831
0
    bool ret_value = get_instance()->reserve_participant_id(participant_id);
832
833
0
    if (ret_value)
834
0
    {
835
0
        guid_prefix_create(participant_id, guid.guidPrefix);
836
0
        guid.entityId = c_EntityId_RTPSParticipant;
837
0
    }
838
839
0
    return ret_value;
840
0
}
841
842
RTPSParticipantImpl* RTPSDomainImpl::find_local_participant(
843
        const GUID_t& guid)
844
0
{
845
0
    auto instance = get_instance();
846
0
    std::lock_guard<std::mutex> guard(instance->m_mutex);
847
0
    for (const t_p_RTPSParticipant& participant : instance->m_RTPSParticipants)
848
0
    {
849
0
        if (participant.second->getGuid().guidPrefix == guid.guidPrefix)
850
0
        {
851
            // Participant found, forward the query
852
0
            return participant.second;
853
0
        }
854
0
    }
855
856
0
    return nullptr;
857
0
}
858
859
void RTPSDomainImpl::find_local_reader(
860
        std::shared_ptr<LocalReaderPointer>& local_reader,
861
        const GUID_t& reader_guid)
862
0
{
863
0
    auto instance = get_instance();
864
0
    std::lock_guard<std::mutex> guard(instance->m_mutex);
865
0
    if (!local_reader)
866
0
    {
867
0
        for (const t_p_RTPSParticipant& participant : instance->m_RTPSParticipants)
868
0
        {
869
0
            if (participant.second->getGuid().guidPrefix == reader_guid.guidPrefix)
870
0
            {
871
                // Participant found, forward the query
872
0
                local_reader = participant.second->find_local_reader(reader_guid);
873
0
                break;
874
0
            }
875
0
        }
876
        // If the reader was not found, local_reader will remain nullptr
877
0
    }
878
0
}
879
880
BaseWriter* RTPSDomainImpl::find_local_writer(
881
        const GUID_t& writer_guid)
882
0
{
883
0
    auto instance = get_instance();
884
0
    std::lock_guard<std::mutex> guard(instance->m_mutex);
885
0
    for (const t_p_RTPSParticipant& participant : instance->m_RTPSParticipants)
886
0
    {
887
0
        if (participant.second->getGuid().guidPrefix == writer_guid.guidPrefix)
888
0
        {
889
            // Participant found, forward the query
890
0
            return participant.second->find_local_writer(writer_guid);
891
0
        }
892
0
    }
893
894
0
    return nullptr;
895
0
}
896
897
/**
898
 * Check whether intraprocess delivery should be used between two GUIDs.
899
 *
900
 * @param local_guid    GUID of the local endpoint performing the query.
901
 * @param matched_guid  GUID being queried about.
902
 *
903
 * @returns true when intraprocess delivery is enabled, false otherwise.
904
 */
905
bool RTPSDomainImpl::should_intraprocess_between(
906
        const GUID_t& local_guid,
907
        const GUID_t& matched_guid)
908
0
{
909
0
    if (!local_guid.is_on_same_process_as(matched_guid))
910
0
    {
911
        // Not on the same process, should not use intraprocess mechanism.
912
0
        return false;
913
0
    }
914
915
0
    if (local_guid.entityId == c_EntityId_SPDPWriter || local_guid.entityId == c_EntityId_SPDPReader)
916
0
    {
917
        // Always disabled for PDP, to avoid inter-domain communications.
918
0
        return false;
919
0
    }
920
921
0
    switch (xmlparser::XMLProfileManager::library_settings().intraprocess_delivery)
922
0
    {
923
0
        case fastdds::IntraprocessDeliveryType::INTRAPROCESS_FULL:
924
0
            return true;
925
926
0
        case fastdds::IntraprocessDeliveryType::INTRAPROCESS_USER_DATA_ONLY:
927
0
            return !matched_guid.is_builtin();
928
929
0
        case fastdds::IntraprocessDeliveryType::INTRAPROCESS_OFF:
930
0
        default:
931
0
            break;
932
0
    }
933
934
0
    return false;
935
0
}
936
937
void RTPSDomainImpl::file_watch_callback()
938
0
{
939
0
    auto _1s = std::chrono::seconds(1);
940
941
    // Ensure that all changes have been saved by the OS
942
0
    SystemInfo::wait_for_file_closure(SystemInfo::get_environment_file(), _1s);
943
944
    // For all RTPSParticipantImpl registered in the RTPSDomain, call RTPSParticipantImpl::environment_file_has_changed
945
0
    auto instance = get_instance();
946
0
    std::lock_guard<std::mutex> guard(instance->m_mutex);
947
0
    for (auto participant : instance->m_RTPSParticipants)
948
0
    {
949
0
        participant.second->environment_file_has_changed();
950
0
    }
951
0
}
952
953
void RTPSDomainImpl::set_filewatch_thread_config(
954
        const fastdds::rtps::ThreadSettings& watch_thread,
955
        const fastdds::rtps::ThreadSettings& callback_thread)
956
0
{
957
0
    auto instance = get_instance();
958
0
    std::lock_guard<std::mutex> guard(instance->m_mutex);
959
0
    instance->watch_thread_config_ = watch_thread;
960
0
    instance->callback_thread_config_ = callback_thread;
961
0
}
962
963
bool RTPSDomain::get_library_settings(
964
        fastdds::LibrarySettings& library_settings)
965
0
{
966
0
    return RTPSDomainImpl::get_library_settings(library_settings);
967
0
}
968
969
bool RTPSDomainImpl::get_library_settings(
970
        fastdds::LibrarySettings& library_settings)
971
0
{
972
0
    library_settings = xmlparser::XMLProfileManager::library_settings();
973
0
    return true;
974
0
}
975
976
bool RTPSDomain::set_library_settings(
977
        const fastdds::LibrarySettings& library_settings)
978
0
{
979
0
    return RTPSDomainImpl::set_library_settings(library_settings);
980
0
}
981
982
bool RTPSDomainImpl::set_library_settings(
983
        const fastdds::LibrarySettings& library_settings)
984
0
{
985
0
    if (!get_instance()->m_RTPSParticipants.empty())
986
0
    {
987
0
        return false;
988
0
    }
989
0
    xmlparser::XMLProfileManager::library_settings(library_settings);
990
0
    return true;
991
0
}
992
993
fastdds::dds::xtypes::ITypeObjectRegistry& RTPSDomainImpl::type_object_registry()
994
0
{
995
0
    return get_instance()->type_object_registry_;
996
0
}
997
998
fastdds::dds::xtypes::TypeObjectRegistry& RTPSDomainImpl::type_object_registry_observer()
999
0
{
1000
0
    return get_instance()->type_object_registry_;
1001
0
}
1002
1003
bool RTPSDomainImpl::run_easy_mode_discovery_server(
1004
        uint32_t domain_id,
1005
        const std::string& ip)
1006
0
{
1007
0
    SystemCommandBuilder sys_command;
1008
0
    int res = sys_command.executable(FAST_DDS_DEFAULT_CLI_SCRIPT_NAME)
1009
0
                    .verb(FAST_DDS_DEFAULT_CLI_DISCOVERY_VERB)
1010
0
                    .verb(FAST_DDS_DEFAULT_CLI_AUTO_VERB)
1011
0
                    .arg("-d")
1012
0
                    .value(std::to_string(domain_id))
1013
0
                    .value(ip + ":" + std::to_string(domain_id))
1014
0
                    .build_and_call();
1015
0
#ifndef _WIN32
1016
    // Adecuate Python subprocess return
1017
0
    res = WEXITSTATUS(res);
1018
0
#endif // _WIN32
1019
1020
0
    if (res != SystemCommandBuilder::SystemCommandResult::SUCCESS)
1021
0
    {
1022
0
        if (res == SystemCommandBuilder::SystemCommandResult::BAD_PARAM)
1023
0
        {
1024
0
            EPROSIMA_LOG_ERROR("DOMAIN", "ROS2_EASY_MODE IP connection conflicts with a previous one.");
1025
0
        }
1026
0
        else
1027
0
        {
1028
0
            EPROSIMA_LOG_ERROR(DOMAIN, "Auto discovery server client setup. Unable to spawn daemon.");
1029
0
        }
1030
0
        return false;
1031
0
    }
1032
1033
0
    return true;
1034
0
}
1035
1036
} // namespace rtps
1037
} // namespace fastdds
1038
} // namespace eprosima