Coverage Report

Created: 2022-08-24 06:19

/src/Fast-DDS/src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file RTPSParticipant.cpp
17
 *
18
 */
19
20
#include <rtps/participant/RTPSParticipantImpl.h>
21
22
#include <algorithm>
23
#include <functional>
24
#include <memory>
25
#include <mutex>
26
#include <sstream>
27
28
#include <fastdds/dds/log/Log.hpp>
29
#include <fastdds/rtps/attributes/ServerAttributes.h>
30
#include <fastdds/rtps/builtin/BuiltinProtocols.h>
31
#include <fastdds/rtps/builtin/discovery/endpoint/EDP.h>
32
#include <fastdds/rtps/builtin/discovery/participant/PDPSimple.h>
33
#include <fastdds/rtps/builtin/data/ParticipantProxyData.h>
34
#include <fastdds/rtps/builtin/liveliness/WLP.h>
35
#include <fastdds/rtps/history/WriterHistory.h>
36
#include <fastdds/rtps/messages/MessageReceiver.h>
37
#include <fastdds/rtps/participant/RTPSParticipant.h>
38
#include <fastdds/rtps/reader/StatelessReader.h>
39
#include <fastdds/rtps/reader/StatefulReader.h>
40
#include <fastdds/rtps/reader/StatelessPersistentReader.h>
41
#include <fastdds/rtps/reader/StatefulPersistentReader.h>
42
#include <fastdds/rtps/RTPSDomain.h>
43
#include <fastdds/rtps/transport/UDPv4TransportDescriptor.h>
44
#include <fastdds/rtps/transport/TCPv4TransportDescriptor.h>
45
#include <fastdds/rtps/transport/TCPv6TransportDescriptor.h>
46
#include <fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h>
47
#include <fastdds/rtps/writer/StatelessWriter.h>
48
#include <fastdds/rtps/writer/StatefulWriter.h>
49
#include <fastdds/rtps/writer/StatelessPersistentWriter.h>
50
#include <fastdds/rtps/writer/StatefulPersistentWriter.h>
51
52
#include <fastrtps/utils/IPFinder.h>
53
#include <fastrtps/utils/Semaphore.h>
54
#include <fastrtps/xmlparser/XMLProfileManager.h>
55
56
#include <rtps/builtin/discovery/participant/PDPServer.hpp>
57
#include <rtps/builtin/discovery/participant/PDPClient.h>
58
#include <rtps/history/BasicPayloadPool.hpp>
59
#include <rtps/persistence/PersistenceService.h>
60
#include <statistics/rtps/GuidUtils.hpp>
61
62
namespace eprosima {
63
namespace fastrtps {
64
namespace rtps {
65
66
using UDPv4TransportDescriptor = fastdds::rtps::UDPv4TransportDescriptor;
67
using TCPTransportDescriptor = fastdds::rtps::TCPTransportDescriptor;
68
using SharedMemTransportDescriptor = fastdds::rtps::SharedMemTransportDescriptor;
69
70
thread_local RTPSParticipantImpl* RTPSParticipantImpl::collections_mutex_owner_ = nullptr;
71
72
static EntityId_t TrustedWriter(
73
        const EntityId_t& reader)
74
0
{
75
0
    return
76
0
        (reader == c_EntityId_SPDPReader) ? c_EntityId_SPDPWriter :
77
0
        (reader == c_EntityId_SEDPPubReader) ? c_EntityId_SEDPPubWriter :
78
0
        (reader == c_EntityId_SEDPSubReader) ? c_EntityId_SEDPSubWriter :
79
0
        (reader == c_EntityId_ReaderLiveliness) ? c_EntityId_WriterLiveliness :
80
0
        c_EntityId_Unknown;
81
0
}
82
83
static bool should_be_intraprocess_only(
84
        const RTPSParticipantAttributes& att)
85
0
{
86
0
    return
87
0
        xmlparser::XMLProfileManager::library_settings().intraprocess_delivery == INTRAPROCESS_FULL &&
88
0
        att.builtin.discovery_config.ignoreParticipantFlags ==
89
0
        (ParticipantFilteringFlags::FILTER_DIFFERENT_HOST | ParticipantFilteringFlags::FILTER_DIFFERENT_PROCESS);
90
0
}
91
92
static bool get_unique_flows_parameters(
93
        const RTPSParticipantAttributes& part_att,
94
        const EndpointAttributes& att,
95
        bool& unique_flows,
96
        uint16_t& initial_port,
97
        uint16_t& final_port)
98
0
{
99
0
    const std::string* value = PropertyPolicyHelper::find_property(att.properties, "fastdds.unique_network_flows");
100
101
0
    unique_flows = (nullptr != value);
102
0
    if (unique_flows)
103
0
    {
104
        // TODO (Miguel C): parse value to get port range
105
0
        final_port = part_att.port.portBase;
106
0
        initial_port = part_att.port.portBase - 400;
107
0
    }
108
109
0
    return true;
110
0
}
111
112
Locator_t& RTPSParticipantImpl::applyLocatorAdaptRule(
113
        Locator_t& loc)
114
0
{
115
    // This is a completely made up rule
116
    // It is transport responsibility to interpret this new port.
117
0
    loc.port += m_att.port.participantIDGain;
118
0
    return loc;
119
0
}
120
121
RTPSParticipantImpl::RTPSParticipantImpl(
122
        uint32_t domain_id,
123
        const RTPSParticipantAttributes& PParam,
124
        const GuidPrefix_t& guidP,
125
        const GuidPrefix_t& persistence_guid,
126
        RTPSParticipant* par,
127
        RTPSParticipantListener* plisten)
128
    : domain_id_(domain_id)
129
    , m_att(PParam)
130
    , m_guid(guidP, c_EntityId_RTPSParticipant)
131
    , mp_builtinProtocols(nullptr)
132
    , mp_ResourceSemaphore(new Semaphore(0))
133
    , IdCounter(0)
134
    , type_check_fn_(nullptr)
135
    , client_override_(false)
136
    , internal_metatraffic_locators_(false)
137
    , internal_default_locators_(false)
138
#if HAVE_SECURITY
139
    , m_security_manager(this)
140
#endif // if HAVE_SECURITY
141
    , mp_participantListener(plisten)
142
    , mp_userParticipant(par)
143
    , mp_mutex(new std::recursive_mutex())
144
    , is_intraprocess_only_(should_be_intraprocess_only(PParam))
145
    , has_shm_transport_(false)
146
0
{
147
0
    if (c_GuidPrefix_Unknown != persistence_guid)
148
0
    {
149
0
        m_persistence_guid = GUID_t(persistence_guid, c_EntityId_RTPSParticipant);
150
0
    }
151
    // Builtin transports by default
152
0
    if (PParam.useBuiltinTransports)
153
0
    {
154
0
        UDPv4TransportDescriptor descriptor;
155
0
        descriptor.sendBufferSize = m_att.sendSocketBufferSize;
156
0
        descriptor.receiveBufferSize = m_att.listenSocketBufferSize;
157
0
        m_network_Factory.RegisterTransport(&descriptor, &m_att.properties);
158
159
0
#ifdef SHM_TRANSPORT_BUILTIN
160
0
        SharedMemTransportDescriptor shm_transport;
161
        // We assume (Linux) UDP doubles the user socket buffer size in kernel, so
162
        // the equivalent segment size in SHM would be socket buffer size x 2
163
0
        auto segment_size_udp_equivalent =
164
0
                std::max(m_att.sendSocketBufferSize, m_att.listenSocketBufferSize) * 2;
165
0
        shm_transport.segment_size(segment_size_udp_equivalent);
166
        // Use same default max_message_size on both UDP and SHM
167
0
        shm_transport.max_message_size(descriptor.max_message_size());
168
0
        has_shm_transport_ |= m_network_Factory.RegisterTransport(&shm_transport);
169
0
#endif // ifdef SHM_TRANSPORT_BUILTIN
170
0
    }
171
172
    // BACKUP servers guid is its persistence one
173
0
    if (PParam.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::BACKUP)
174
0
    {
175
0
        m_persistence_guid = m_guid;
176
0
    }
177
178
    // Store the Guid in string format.
179
0
    std::stringstream guid_sstr;
180
0
    guid_sstr << m_guid;
181
0
    guid_str_ = guid_sstr.str();
182
183
    // Client-server discovery protocol requires that every TCP transport has a listening port
184
0
    switch (PParam.builtin.discovery_config.discoveryProtocol)
185
0
    {
186
0
        case DiscoveryProtocol::BACKUP:
187
0
        case DiscoveryProtocol::CLIENT:
188
0
        case DiscoveryProtocol::SERVER:
189
0
        case DiscoveryProtocol::SUPER_CLIENT:
190
            // Verify if listening ports are provided
191
0
            for (auto& transportDescriptor : PParam.userTransports)
192
0
            {
193
0
                TCPTransportDescriptor* pT = dynamic_cast<TCPTransportDescriptor*>(transportDescriptor.get());
194
0
                if (pT && pT->listening_ports.empty())
195
0
                {
196
0
                    logInfo(RTPS_PARTICIPANT,
197
0
                            "Participant " << m_att.getName() << " with GUID " << m_guid <<
198
0
                            " tries to use discovery server over TCP without providing a proper listening port.");
199
0
                }
200
0
            }
201
0
        default:
202
0
            break;
203
0
    }
204
205
206
    // User defined transports
207
0
    for (const auto& transportDescriptor : PParam.userTransports)
208
0
    {
209
0
        if (m_network_Factory.RegisterTransport(transportDescriptor.get(), &m_att.properties))
210
0
        {
211
0
            has_shm_transport_ |=
212
0
                    (dynamic_cast<fastdds::rtps::SharedMemTransportDescriptor*>(transportDescriptor.get()) != nullptr);
213
0
        }
214
0
        else
215
0
        {
216
            // SHM transport could be disabled
217
0
            if ((dynamic_cast<fastdds::rtps::SharedMemTransportDescriptor*>(transportDescriptor.get()) != nullptr))
218
0
            {
219
0
                logError(RTPS_PARTICIPANT,
220
0
                        "Unable to Register SHM Transport. SHM Transport is not supported in"
221
0
                        " the current platform.");
222
0
            }
223
0
            else
224
0
            {
225
0
                logError(RTPS_PARTICIPANT,
226
0
                        "User transport failed to register.");
227
0
            }
228
229
0
        }
230
0
    }
231
232
0
    mp_userParticipant->mp_impl = this;
233
0
    mp_event_thr.init_thread();
234
235
0
    if (!networkFactoryHasRegisteredTransports())
236
0
    {
237
0
        return;
238
0
    }
239
240
    /* If metatrafficMulticastLocatorList is empty, add mandatory default Locators
241
       Else -> Take them */
242
243
    // Creation of metatraffic locator and receiver resources
244
0
    uint32_t metatraffic_multicast_port = m_att.port.getMulticastPort(domain_id_);
245
0
    uint32_t metatraffic_unicast_port = m_att.port.getUnicastPort(domain_id_,
246
0
                    static_cast<uint32_t>(m_att.participantID));
247
0
    uint32_t meta_multicast_port_for_check = metatraffic_multicast_port;
248
249
    /* INSERT DEFAULT MANDATORY MULTICAST LOCATORS HERE */
250
0
    if (m_att.builtin.metatrafficMulticastLocatorList.empty() && m_att.builtin.metatrafficUnicastLocatorList.empty())
251
0
    {
252
0
        get_default_metatraffic_locators();
253
0
        internal_metatraffic_locators_ = true;
254
0
    }
255
0
    else
256
0
    {
257
0
        if (0 < m_att.builtin.metatrafficMulticastLocatorList.size() &&
258
0
                0 !=  m_att.builtin.metatrafficMulticastLocatorList.begin()->port)
259
0
        {
260
0
            meta_multicast_port_for_check = m_att.builtin.metatrafficMulticastLocatorList.begin()->port;
261
0
        }
262
0
        std::for_each(m_att.builtin.metatrafficMulticastLocatorList.begin(),
263
0
                m_att.builtin.metatrafficMulticastLocatorList.end(), [&](Locator_t& locator)
264
0
                {
265
0
                    m_network_Factory.fillMetatrafficMulticastLocator(locator, metatraffic_multicast_port);
266
0
                });
267
0
        m_network_Factory.NormalizeLocators(m_att.builtin.metatrafficMulticastLocatorList);
268
269
0
        std::for_each(m_att.builtin.metatrafficUnicastLocatorList.begin(),
270
0
                m_att.builtin.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator)
271
0
                {
272
0
                    m_network_Factory.fillMetatrafficUnicastLocator(locator, metatraffic_unicast_port);
273
0
                });
274
0
        m_network_Factory.NormalizeLocators(m_att.builtin.metatrafficUnicastLocatorList);
275
0
    }
276
277
    // Initial peers
278
0
    if (m_att.builtin.initialPeersList.empty())
279
0
    {
280
0
        m_att.builtin.initialPeersList = m_att.builtin.metatrafficMulticastLocatorList;
281
0
    }
282
0
    else
283
0
    {
284
0
        LocatorList_t initial_peers;
285
0
        initial_peers.swap(m_att.builtin.initialPeersList);
286
287
0
        std::for_each(initial_peers.begin(), initial_peers.end(),
288
0
                [&](Locator_t& locator)
289
0
                {
290
0
                    m_network_Factory.configureInitialPeerLocator(domain_id_, locator, m_att);
291
0
                });
292
0
    }
293
294
    // Creation of user locator and receiver resources
295
    //If no default locators are defined we define some.
296
    /* The reasoning here is the following.
297
       If the parameters of the RTPS Participant don't hold default listening locators for the creation
298
       of Endpoints, we make some for Unicast only.
299
       If there is at least one listen locator of any kind, we do not create any default ones.
300
       If there are no sending locators defined, we create default ones for the transports we implement.
301
     */
302
0
    if (m_att.defaultUnicastLocatorList.empty() && m_att.defaultMulticastLocatorList.empty())
303
0
    {
304
        //Default Unicast Locators in case they have not been provided
305
        /* INSERT DEFAULT UNICAST LOCATORS FOR THE PARTICIPANT */
306
0
        get_default_unicast_locators();
307
0
        internal_default_locators_ = true;
308
0
        logInfo(RTPS_PARTICIPANT, m_att.getName() << " Created with NO default Unicast Locator List, adding Locators:"
309
0
                                                  << m_att.defaultUnicastLocatorList);
310
0
    }
311
0
    else
312
0
    {
313
        // Locator with port 0, calculate port.
314
0
        std::for_each(m_att.defaultUnicastLocatorList.begin(), m_att.defaultUnicastLocatorList.end(),
315
0
                [&](Locator_t& loc)
316
0
                {
317
0
                    m_network_Factory.fill_default_locator_port(domain_id_, loc, m_att, false);
318
0
                });
319
0
        m_network_Factory.NormalizeLocators(m_att.defaultUnicastLocatorList);
320
321
0
        std::for_each(m_att.defaultMulticastLocatorList.begin(), m_att.defaultMulticastLocatorList.end(),
322
0
                [&](Locator_t& loc)
323
0
                {
324
0
                    m_network_Factory.fill_default_locator_port(domain_id_, loc, m_att, true);
325
0
                });
326
0
    }
327
328
#if HAVE_SECURITY
329
    // Start security
330
    if (!m_security_manager.init(
331
                security_attributes_,
332
                PParam.properties))
333
    {
334
        // Participant will be deleted, no need to allocate buffers or create builtin endpoints
335
        return;
336
    }
337
#endif // if HAVE_SECURITY
338
339
0
    if (is_intraprocess_only())
340
0
    {
341
0
        m_att.builtin.metatrafficUnicastLocatorList.clear();
342
0
        m_att.defaultUnicastLocatorList.clear();
343
0
        m_att.defaultMulticastLocatorList.clear();
344
0
    }
345
346
0
    createReceiverResources(m_att.builtin.metatrafficMulticastLocatorList, true, false);
347
0
    createReceiverResources(m_att.builtin.metatrafficUnicastLocatorList, true, false);
348
0
    createReceiverResources(m_att.defaultUnicastLocatorList, true, false);
349
0
    createReceiverResources(m_att.defaultMulticastLocatorList, true, false);
350
351
    // Check metatraffic multicast port
352
0
    if (0 < m_att.builtin.metatrafficMulticastLocatorList.size() &&
353
0
            m_att.builtin.metatrafficMulticastLocatorList.begin()->port != meta_multicast_port_for_check)
354
0
    {
355
0
        logWarning(RTPS_PARTICIPANT,
356
0
                "Metatraffic multicast port " << meta_multicast_port_for_check << " cannot be opened."
357
0
                " It may is opened by another application. Discovery may fail.");
358
0
    }
359
360
0
    bool allow_growing_buffers = m_att.allocation.send_buffers.dynamic;
361
0
    size_t num_send_buffers = m_att.allocation.send_buffers.preallocated_number;
362
0
    if (num_send_buffers == 0)
363
0
    {
364
        // Two buffers (user, events)
365
0
        num_send_buffers = 2;
366
        // Add one buffer per reception thread
367
0
        num_send_buffers += m_receiverResourcelist.size();
368
0
    }
369
370
    // Create buffer pool
371
0
    send_buffers_.reset(new SendBuffersManager(num_send_buffers, allow_growing_buffers));
372
0
    send_buffers_->init(this);
373
374
    // Initialize flow controller factory.
375
    // This must be done after initiate network layer.
376
0
    flow_controller_factory_.init(this);
377
378
    // Support old API
379
0
    if (PParam.throughputController.bytesPerPeriod != UINT32_MAX && PParam.throughputController.periodMillisecs != 0)
380
0
    {
381
0
        fastdds::rtps::FlowControllerDescriptor old_descriptor;
382
0
        old_descriptor.name = guid_str_.c_str();
383
0
        old_descriptor.max_bytes_per_period = PParam.throughputController.bytesPerPeriod;
384
0
        old_descriptor.period_ms = PParam.throughputController.periodMillisecs;
385
0
        flow_controller_factory_.register_flow_controller(old_descriptor);
386
0
    }
387
388
    // Register user's flow controllers.
389
0
    for (auto flow_controller_desc : m_att.flow_controllers)
390
0
    {
391
0
        flow_controller_factory_.register_flow_controller(*flow_controller_desc.get());
392
0
    }
393
394
395
#if HAVE_SECURITY
396
    if (m_security_manager.is_security_active())
397
    {
398
        if (!m_security_manager.create_entities())
399
        {
400
            return;
401
        }
402
    }
403
#endif // if HAVE_SECURITY
404
405
0
    mp_builtinProtocols = new BuiltinProtocols();
406
407
    // Initialize builtin protocols
408
0
    if (!mp_builtinProtocols->initBuiltinProtocols(this, m_att.builtin))
409
0
    {
410
0
        logError(RTPS_PARTICIPANT, "The builtin protocols were not correctly initialized");
411
0
        return;
412
0
    }
413
414
0
    if (c_GuidPrefix_Unknown != persistence_guid)
415
0
    {
416
0
        logInfo(RTPS_PARTICIPANT, "RTPSParticipant \"" << m_att.getName() << "\" with guidPrefix: " << m_guid.guidPrefix
417
0
                                                       << " and persistence guid: " << persistence_guid);
418
0
    }
419
0
    else
420
0
    {
421
0
        logInfo(RTPS_PARTICIPANT,
422
0
                "RTPSParticipant \"" << m_att.getName() << "\" with guidPrefix: " << m_guid.guidPrefix);
423
0
    }
424
425
0
    initialized_ = true;
426
0
}
427
428
RTPSParticipantImpl::RTPSParticipantImpl(
429
        uint32_t domain_id,
430
        const RTPSParticipantAttributes& PParam,
431
        const GuidPrefix_t& guidP,
432
        RTPSParticipant* par,
433
        RTPSParticipantListener* plisten)
434
    : RTPSParticipantImpl(domain_id, PParam, guidP, c_GuidPrefix_Unknown, par, plisten)
435
0
{
436
0
}
437
438
void RTPSParticipantImpl::enable()
439
0
{
440
0
    mp_builtinProtocols->enable();
441
442
    //Start reception
443
0
    for (auto& receiver : m_receiverResourcelist)
444
0
    {
445
0
        receiver.Receiver->RegisterReceiver(receiver.mp_receiver);
446
0
    }
447
0
}
448
449
void RTPSParticipantImpl::disable()
450
0
{
451
    // Disabling event thread also disables participant announcement, so there is no need to call
452
    // stopRTPSParticipantAnnouncement()
453
0
    mp_event_thr.stop_thread();
454
455
    // Disable Retries on Transports
456
0
    m_network_Factory.Shutdown();
457
458
    // Safely abort threads.
459
0
    for (auto& block : m_receiverResourcelist)
460
0
    {
461
0
        block.Receiver->UnregisterReceiver(block.mp_receiver);
462
0
        block.disable();
463
0
    }
464
465
0
    deleteAllUserEndpoints();
466
467
0
    if (nullptr != mp_builtinProtocols)
468
0
    {
469
0
        delete(mp_builtinProtocols);
470
0
        mp_builtinProtocols = nullptr;
471
0
    }
472
0
}
473
474
const std::vector<RTPSWriter*>& RTPSParticipantImpl::getAllWriters() const
475
0
{
476
0
    return m_allWriterList;
477
0
}
478
479
const std::vector<RTPSReader*>& RTPSParticipantImpl::getAllReaders() const
480
0
{
481
0
    return m_allReaderList;
482
0
}
483
484
RTPSParticipantImpl::~RTPSParticipantImpl()
485
0
{
486
0
    disable();
487
488
#if HAVE_SECURITY
489
    m_security_manager.destroy();
490
#endif // if HAVE_SECURITY
491
492
    // Destruct message receivers
493
0
    for (auto& block : m_receiverResourcelist)
494
0
    {
495
0
        delete block.mp_receiver;
496
0
    }
497
0
    m_receiverResourcelist.clear();
498
499
0
    delete mp_ResourceSemaphore;
500
0
    delete mp_userParticipant;
501
0
    mp_userParticipant = nullptr;
502
0
    send_resource_list_.clear();
503
504
0
    delete mp_mutex;
505
0
}
506
507
template <EndpointKind_t kind, octet no_key, octet with_key>
508
bool RTPSParticipantImpl::preprocess_endpoint_attributes(
509
        const EntityId_t& entity_id,
510
        uint32_t& id_counter,
511
        EndpointAttributes& att,
512
        EntityId_t& entId)
513
0
{
514
0
    const char* debug_label = (att.endpointKind == WRITER ? "writer" : "reader");
515
516
0
    if (!att.unicastLocatorList.isValid())
517
0
    {
518
0
        logError(RTPS_PARTICIPANT, "Unicast Locator List for " << debug_label << " contains invalid Locator");
519
0
        return false;
520
0
    }
521
0
    if (!att.multicastLocatorList.isValid())
522
0
    {
523
0
        logError(RTPS_PARTICIPANT, "Multicast Locator List for " << debug_label << " contains invalid Locator");
524
0
        return false;
525
0
    }
526
0
    if (!att.remoteLocatorList.isValid())
527
0
    {
528
0
        logError(RTPS_PARTICIPANT, "Remote Locator List for " << debug_label << " contains invalid Locator");
529
0
        return false;
530
0
    }
531
532
0
    if (entity_id == c_EntityId_Unknown)
533
0
    {
534
0
        if (att.topicKind == NO_KEY)
535
0
        {
536
0
            entId.value[3] = (-2 == att.getUserDefinedID() && 0 < att.getEntityID()) ? (0x60) | no_key : no_key;
537
0
        }
538
0
        else if (att.topicKind == WITH_KEY)
539
0
        {
540
0
            entId.value[3] = (-2 == att.getUserDefinedID() && 0 < att.getEntityID()) ? (0x60) | with_key : with_key;
541
0
        }
542
0
        uint32_t idnum;
543
0
        if (att.getEntityID() > 0)
544
0
        {
545
0
            idnum = static_cast<uint32_t>(att.getEntityID());
546
0
        }
547
0
        else
548
0
        {
549
0
            idnum = ++id_counter;
550
0
        }
551
552
0
        entId.value[2] = octet(idnum);
553
0
        entId.value[1] = octet(idnum >> 8);
554
0
        entId.value[0] = octet(idnum >> 16);
555
0
    }
556
0
    else
557
0
    {
558
0
        entId = entity_id;
559
0
    }
560
561
0
    if (att.persistence_guid == c_Guid_Unknown)
562
0
    {
563
        // Try to load persistence_guid from property
564
0
        const std::string* persistence_guid_property = PropertyPolicyHelper::find_property(
565
0
            att.properties, "dds.persistence.guid");
566
0
        if (persistence_guid_property != nullptr)
567
0
        {
568
            // Load persistence_guid from property
569
0
            std::istringstream(persistence_guid_property->c_str()) >> att.persistence_guid;
570
0
            if (att.persistence_guid == c_Guid_Unknown)
571
0
            {
572
                // Wrongly configured property
573
0
                logError(RTPS_PARTICIPANT, "Cannot configure " << debug_label << "'s persistence GUID from '"
574
0
                                                               << persistence_guid_property->c_str()
575
0
                                                               << "'. Wrong input");
576
0
                return false;
577
0
            }
578
0
        }
579
0
    }
580
581
    // Error log level can be disable. Avoid unused warning
582
0
    static_cast<void>(debug_label);
583
584
0
    return true;
585
0
}
Unexecuted instantiation: bool eprosima::fastrtps::rtps::RTPSParticipantImpl::preprocess_endpoint_attributes<(eprosima::fastrtps::rtps::EndpointKind_t)1, (unsigned char)3, (unsigned char)2>(eprosima::fastrtps::rtps::EntityId_t const&, unsigned int&, eprosima::fastrtps::rtps::EndpointAttributes&, eprosima::fastrtps::rtps::EntityId_t&)
Unexecuted instantiation: bool eprosima::fastrtps::rtps::RTPSParticipantImpl::preprocess_endpoint_attributes<(eprosima::fastrtps::rtps::EndpointKind_t)0, (unsigned char)4, (unsigned char)7>(eprosima::fastrtps::rtps::EntityId_t const&, unsigned int&, eprosima::fastrtps::rtps::EndpointAttributes&, eprosima::fastrtps::rtps::EntityId_t&)
586
587
template<typename Functor>
588
bool RTPSParticipantImpl::create_writer(
589
        RTPSWriter** writer_out,
590
        WriterAttributes& param,
591
        const EntityId_t& entity_id,
592
        bool is_builtin,
593
        const Functor& callback)
594
0
{
595
0
    std::string type = (param.endpoint.reliabilityKind == RELIABLE) ? "RELIABLE" : "BEST_EFFORT";
596
0
    logInfo(RTPS_PARTICIPANT, "Creating writer of type " << type);
597
0
    EntityId_t entId;
598
0
    if (!preprocess_endpoint_attributes<WRITER, 0x03, 0x02>(entity_id, IdCounter, param.endpoint, entId))
599
0
    {
600
0
        return false;
601
0
    }
602
603
0
    if (existsEntityId(entId, WRITER))
604
0
    {
605
0
        logError(RTPS_PARTICIPANT,
606
0
                "A writer with the same entityId already exists in this RTPSParticipant");
607
0
        return false;
608
0
    }
609
610
0
    GUID_t guid(m_guid.guidPrefix, entId);
611
0
    fastdds::rtps::FlowController* flow_controller = nullptr;
612
0
    const char* flow_controller_name = param.flow_controller_name;
613
614
    // Support of old flow controller style.
615
0
    if (param.throughputController.bytesPerPeriod != UINT32_MAX && param.throughputController.periodMillisecs != 0)
616
0
    {
617
0
        flow_controller_name = guid_str_.c_str();
618
0
        if (ASYNCHRONOUS_WRITER == param.mode)
619
0
        {
620
0
            fastdds::rtps::FlowControllerDescriptor old_descriptor;
621
0
            old_descriptor.name = guid_str_.c_str();
622
0
            old_descriptor.max_bytes_per_period = param.throughputController.bytesPerPeriod;
623
0
            old_descriptor.period_ms = param.throughputController.periodMillisecs;
624
0
            flow_controller_factory_.register_flow_controller(old_descriptor);
625
0
            flow_controller =  flow_controller_factory_.retrieve_flow_controller(guid_str_.c_str(), param);
626
0
        }
627
0
        else
628
0
        {
629
0
            logWarning(RTPS_PARTICIPANT,
630
0
                    "Throughput flow controller was configured while writer's publish mode is configured as synchronous." \
631
0
                    "Throughput flow controller configuration is not taken into account.")
632
633
0
        }
634
0
    }
635
0
    if (m_att.throughputController.bytesPerPeriod != UINT32_MAX && m_att.throughputController.periodMillisecs != 0)
636
0
    {
637
0
        if (ASYNCHRONOUS_WRITER == param.mode && nullptr == flow_controller)
638
0
        {
639
0
            flow_controller_name = guid_str_.c_str();
640
0
            flow_controller = flow_controller_factory_.retrieve_flow_controller(guid_str_, param);
641
0
        }
642
0
        else
643
0
        {
644
0
            logWarning(RTPS_PARTICIPANT,
645
0
                    "Throughput flow controller was configured while writer's publish mode is configured as synchronous." \
646
0
                    "Throughput flow controller configuration is not taken into account.")
647
0
        }
648
0
    }
649
650
    // Retrieve flow controller.
651
    // If not default flow controller, publish_mode must be asynchronously.
652
0
    if (nullptr == flow_controller &&
653
0
            (fastdds::rtps::FASTDDS_FLOW_CONTROLLER_DEFAULT == flow_controller_name ||
654
0
            ASYNCHRONOUS_WRITER == param.mode))
655
0
    {
656
0
        flow_controller = flow_controller_factory_.retrieve_flow_controller(flow_controller_name, param);
657
0
    }
658
659
0
    if (nullptr == flow_controller)
660
0
    {
661
0
        if (fastdds::rtps::FASTDDS_FLOW_CONTROLLER_DEFAULT != flow_controller_name &&
662
0
                SYNCHRONOUS_WRITER == param.mode)
663
0
        {
664
0
            logError(RTPS_PARTICIPANT, "Cannot use a flow controller in synchronously publication mode.");
665
0
        }
666
0
        else
667
0
        {
668
0
            logError(RTPS_PARTICIPANT, "Cannot create the writer. Couldn't find flow controller "
669
0
                    << flow_controller_name << " for writer.");
670
0
        }
671
0
        return false;
672
0
    }
673
674
    // Check for unique_network_flows feature
675
0
    if (nullptr != PropertyPolicyHelper::find_property(param.endpoint.properties, "fastdds.unique_network_flows"))
676
0
    {
677
0
        logError(RTPS_PARTICIPANT, "Unique network flows not supported on writers");
678
0
        return false;
679
0
    }
680
681
    // Special case for DiscoveryProtocol::BACKUP, which abuses persistence guid
682
0
    GUID_t former_persistence_guid = param.endpoint.persistence_guid;
683
0
    if (param.endpoint.persistence_guid == c_Guid_Unknown)
684
0
    {
685
0
        if (m_persistence_guid != c_Guid_Unknown)
686
0
        {
687
            // Generate persistence guid from participant persistence guid
688
0
            param.endpoint.persistence_guid = GUID_t(
689
0
                m_persistence_guid.guidPrefix,
690
0
                entity_id);
691
0
        }
692
0
    }
693
694
    // Get persistence service
695
0
    IPersistenceService* persistence = nullptr;
696
0
    if (!get_persistence_service(is_builtin, param.endpoint, persistence))
697
0
    {
698
0
        return false;
699
0
    }
700
701
0
    normalize_endpoint_locators(param.endpoint);
702
703
0
    RTPSWriter* SWriter = nullptr;
704
0
    SWriter = callback(guid, param, flow_controller, persistence, param.endpoint.reliabilityKind == RELIABLE);
705
706
    // restore attributes
707
0
    param.endpoint.persistence_guid = former_persistence_guid;
708
709
0
    if (SWriter == nullptr)
710
0
    {
711
0
        return false;
712
0
    }
713
714
0
    if (!SWriter->is_pool_initialized())
715
0
    {
716
0
        delete(SWriter);
717
0
        return false;
718
0
    }
719
720
#if HAVE_SECURITY
721
    if (!is_builtin)
722
    {
723
        if (!m_security_manager.register_local_writer(SWriter->getGuid(),
724
                param.endpoint.properties, SWriter->getAttributes().security_attributes()))
725
        {
726
            delete(SWriter);
727
            return false;
728
        }
729
    }
730
    else
731
    {
732
        if (!m_security_manager.register_local_builtin_writer(SWriter->getGuid(),
733
                SWriter->getAttributes().security_attributes()))
734
        {
735
            delete(SWriter);
736
            return false;
737
        }
738
    }
739
#endif // if HAVE_SECURITY
740
741
0
    createSendResources(SWriter);
742
0
    if (param.endpoint.reliabilityKind == RELIABLE)
743
0
    {
744
0
        if (!createAndAssociateReceiverswithEndpoint(SWriter))
745
0
        {
746
0
            delete(SWriter);
747
0
            return false;
748
0
        }
749
0
    }
750
751
0
    {
752
0
        std::lock_guard<shared_mutex> _(endpoints_list_mutex);
753
0
        m_allWriterList.push_back(SWriter);
754
755
0
        if (!is_builtin)
756
0
        {
757
0
            m_userWriterList.push_back(SWriter);
758
0
        }
759
0
    }
760
0
    *writer_out = SWriter;
761
762
#ifdef FASTDDS_STATISTICS
763
764
    if (!is_builtin)
765
    {
766
        // Register all compatible statistical listeners
767
        for_each_listener([this, &guid](Key listener)
768
                {
769
                    if (are_writers_involved(listener->mask()))
770
                    {
771
                        register_in_writer(listener->get_shared_ptr(), guid);
772
                    }
773
                });
774
    }
775
776
#endif // FASTDDS_STATISTICS
777
778
0
    return true;
779
0
}
Unexecuted instantiation: RTPSParticipantImpl.cpp:bool eprosima::fastrtps::rtps::RTPSParticipantImpl::create_writer<eprosima::fastrtps::rtps::RTPSParticipantImpl::createWriter(eprosima::fastrtps::rtps::RTPSWriter**, eprosima::fastrtps::rtps::WriterAttributes&, eprosima::fastrtps::rtps::WriterHistory*, eprosima::fastrtps::rtps::WriterListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool)::$_5>(eprosima::fastrtps::rtps::RTPSWriter**, eprosima::fastrtps::rtps::WriterAttributes&, eprosima::fastrtps::rtps::EntityId_t const&, bool, eprosima::fastrtps::rtps::RTPSParticipantImpl::createWriter(eprosima::fastrtps::rtps::RTPSWriter**, eprosima::fastrtps::rtps::WriterAttributes&, eprosima::fastrtps::rtps::WriterHistory*, eprosima::fastrtps::rtps::WriterListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool)::$_5 const&)
Unexecuted instantiation: RTPSParticipantImpl.cpp:bool eprosima::fastrtps::rtps::RTPSParticipantImpl::create_writer<eprosima::fastrtps::rtps::RTPSParticipantImpl::createWriter(eprosima::fastrtps::rtps::RTPSWriter**, eprosima::fastrtps::rtps::WriterAttributes&, std::__1::shared_ptr<eprosima::fastrtps::rtps::IPayloadPool> const&, eprosima::fastrtps::rtps::WriterHistory*, eprosima::fastrtps::rtps::WriterListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool)::$_6>(eprosima::fastrtps::rtps::RTPSWriter**, eprosima::fastrtps::rtps::WriterAttributes&, eprosima::fastrtps::rtps::EntityId_t const&, bool, eprosima::fastrtps::rtps::RTPSParticipantImpl::createWriter(eprosima::fastrtps::rtps::RTPSWriter**, eprosima::fastrtps::rtps::WriterAttributes&, std::__1::shared_ptr<eprosima::fastrtps::rtps::IPayloadPool> const&, eprosima::fastrtps::rtps::WriterHistory*, eprosima::fastrtps::rtps::WriterListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool)::$_6 const&)
Unexecuted instantiation: RTPSParticipantImpl.cpp:bool eprosima::fastrtps::rtps::RTPSParticipantImpl::create_writer<eprosima::fastrtps::rtps::RTPSParticipantImpl::create_writer(eprosima::fastrtps::rtps::RTPSWriter**, eprosima::fastrtps::rtps::WriterAttributes&, std::__1::shared_ptr<eprosima::fastrtps::rtps::IPayloadPool> const&, std::__1::shared_ptr<eprosima::fastrtps::rtps::IChangePool> const&, eprosima::fastrtps::rtps::WriterHistory*, eprosima::fastrtps::rtps::WriterListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool)::$_7>(eprosima::fastrtps::rtps::RTPSWriter**, eprosima::fastrtps::rtps::WriterAttributes&, eprosima::fastrtps::rtps::EntityId_t const&, bool, eprosima::fastrtps::rtps::RTPSParticipantImpl::create_writer(eprosima::fastrtps::rtps::RTPSWriter**, eprosima::fastrtps::rtps::WriterAttributes&, std::__1::shared_ptr<eprosima::fastrtps::rtps::IPayloadPool> const&, std::__1::shared_ptr<eprosima::fastrtps::rtps::IChangePool> const&, eprosima::fastrtps::rtps::WriterHistory*, eprosima::fastrtps::rtps::WriterListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool)::$_7 const&)
780
781
template <typename Functor>
782
bool RTPSParticipantImpl::create_reader(
783
        RTPSReader** reader_out,
784
        ReaderAttributes& param,
785
        const EntityId_t& entity_id,
786
        bool is_builtin,
787
        bool enable,
788
        const Functor& callback)
789
0
{
790
0
    std::string type = (param.endpoint.reliabilityKind == RELIABLE) ? "RELIABLE" : "BEST_EFFORT";
791
0
    logInfo(RTPS_PARTICIPANT, "Creating reader of type " << type);
792
0
    EntityId_t entId;
793
0
    if (!preprocess_endpoint_attributes<READER, 0x04, 0x07>(entity_id, IdCounter, param.endpoint, entId))
794
0
    {
795
0
        return false;
796
0
    }
797
798
0
    if (existsEntityId(entId, READER))
799
0
    {
800
0
        logError(RTPS_PARTICIPANT,
801
0
                "A reader with the same entityId already exists in this RTPSParticipant");
802
0
        return false;
803
0
    }
804
805
    // Special case for DiscoveryProtocol::BACKUP, which abuses persistence guid
806
0
    GUID_t former_persistence_guid = param.endpoint.persistence_guid;
807
0
    if (param.endpoint.persistence_guid == c_Guid_Unknown)
808
0
    {
809
0
        if (m_persistence_guid != c_Guid_Unknown)
810
0
        {
811
            // Generate persistence guid from participant persistence guid
812
0
            param.endpoint.persistence_guid = GUID_t(
813
0
                m_persistence_guid.guidPrefix,
814
0
                entity_id);
815
0
        }
816
0
    }
817
818
    // Get persistence service
819
0
    IPersistenceService* persistence = nullptr;
820
0
    if (!get_persistence_service(is_builtin, param.endpoint, persistence))
821
0
    {
822
0
        return false;
823
0
    }
824
825
    // Check for unique_network_flows feature
826
0
    bool request_unique_flows = false;
827
0
    uint16_t initial_port = 0;
828
0
    uint16_t final_port = 0;
829
0
    if (!get_unique_flows_parameters(m_att, param.endpoint, request_unique_flows, initial_port, final_port))
830
0
    {
831
0
        return false;
832
0
    }
833
834
0
    normalize_endpoint_locators(param.endpoint);
835
836
0
    RTPSReader* SReader = nullptr;
837
0
    GUID_t guid(m_guid.guidPrefix, entId);
838
0
    SReader = callback(guid, param, persistence, param.endpoint.reliabilityKind == RELIABLE);
839
840
    // restore attributes
841
0
    param.endpoint.persistence_guid = former_persistence_guid;
842
843
0
    if (SReader == nullptr)
844
0
    {
845
0
        return false;
846
0
    }
847
848
#if HAVE_SECURITY
849
850
    if (!is_builtin)
851
    {
852
        if (!m_security_manager.register_local_reader(SReader->getGuid(),
853
                param.endpoint.properties, SReader->getAttributes().security_attributes()))
854
        {
855
            delete(SReader);
856
            return false;
857
        }
858
    }
859
    else
860
    {
861
        if (!m_security_manager.register_local_builtin_reader(SReader->getGuid(),
862
                SReader->getAttributes().security_attributes()))
863
        {
864
            delete(SReader);
865
            return false;
866
        }
867
    }
868
#endif // if HAVE_SECURITY
869
870
0
    if (param.endpoint.reliabilityKind == RELIABLE)
871
0
    {
872
0
        createSendResources(SReader);
873
0
    }
874
875
0
    if (is_builtin)
876
0
    {
877
0
        SReader->setTrustedWriter(TrustedWriter(SReader->getGuid().entityId));
878
0
    }
879
880
0
    if (enable)
881
0
    {
882
0
        if (!createAndAssociateReceiverswithEndpoint(SReader, request_unique_flows, initial_port, final_port))
883
0
        {
884
0
            delete(SReader);
885
0
            return false;
886
0
        }
887
0
    }
888
889
0
    {
890
0
        std::lock_guard<shared_mutex> _(endpoints_list_mutex);
891
892
0
        m_allReaderList.push_back(SReader);
893
894
0
        if (!is_builtin)
895
0
        {
896
0
            m_userReaderList.push_back(SReader);
897
0
        }
898
0
    }
899
0
    *reader_out = SReader;
900
901
#ifdef FASTDDS_STATISTICS
902
903
    if (!is_builtin)
904
    {
905
        // Register all compatible statistical listeners
906
        for_each_listener([this, &guid](Key listener)
907
                {
908
                    if (are_readers_involved(listener->mask()))
909
                    {
910
                        register_in_reader(listener->get_shared_ptr(), guid);
911
                    }
912
                });
913
    }
914
915
#endif // FASTDDS_STATISTICS
916
917
0
    return true;
918
0
}
Unexecuted instantiation: RTPSParticipantImpl.cpp:bool eprosima::fastrtps::rtps::RTPSParticipantImpl::create_reader<eprosima::fastrtps::rtps::RTPSParticipantImpl::createReader(eprosima::fastrtps::rtps::RTPSReader**, eprosima::fastrtps::rtps::ReaderAttributes&, eprosima::fastrtps::rtps::ReaderHistory*, eprosima::fastrtps::rtps::ReaderListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool, bool)::$_8>(eprosima::fastrtps::rtps::RTPSReader**, eprosima::fastrtps::rtps::ReaderAttributes&, eprosima::fastrtps::rtps::EntityId_t const&, bool, bool, eprosima::fastrtps::rtps::RTPSParticipantImpl::createReader(eprosima::fastrtps::rtps::RTPSReader**, eprosima::fastrtps::rtps::ReaderAttributes&, eprosima::fastrtps::rtps::ReaderHistory*, eprosima::fastrtps::rtps::ReaderListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool, bool)::$_8 const&)
Unexecuted instantiation: RTPSParticipantImpl.cpp:bool eprosima::fastrtps::rtps::RTPSParticipantImpl::create_reader<eprosima::fastrtps::rtps::RTPSParticipantImpl::createReader(eprosima::fastrtps::rtps::RTPSReader**, eprosima::fastrtps::rtps::ReaderAttributes&, std::__1::shared_ptr<eprosima::fastrtps::rtps::IPayloadPool> const&, eprosima::fastrtps::rtps::ReaderHistory*, eprosima::fastrtps::rtps::ReaderListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool, bool)::$_9>(eprosima::fastrtps::rtps::RTPSReader**, eprosima::fastrtps::rtps::ReaderAttributes&, eprosima::fastrtps::rtps::EntityId_t const&, bool, bool, eprosima::fastrtps::rtps::RTPSParticipantImpl::createReader(eprosima::fastrtps::rtps::RTPSReader**, eprosima::fastrtps::rtps::ReaderAttributes&, std::__1::shared_ptr<eprosima::fastrtps::rtps::IPayloadPool> const&, eprosima::fastrtps::rtps::ReaderHistory*, eprosima::fastrtps::rtps::ReaderListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool, bool)::$_9 const&)
919
920
/*
921
 *
922
 * MAIN RTPSParticipant IMPL API
923
 *
924
 */
925
bool RTPSParticipantImpl::createWriter(
926
        RTPSWriter** WriterOut,
927
        WriterAttributes& param,
928
        WriterHistory* hist,
929
        WriterListener* listen,
930
        const EntityId_t& entityId,
931
        bool isBuiltin)
932
0
{
933
0
    auto callback = [hist, listen, this]
934
0
                (const GUID_t& guid, WriterAttributes& param, fastdds::rtps::FlowController* flow_controller,
935
0
                    IPersistenceService* persistence, bool is_reliable) -> RTPSWriter*
936
0
            {
937
0
                if (is_reliable)
938
0
                {
939
0
                    if (persistence != nullptr)
940
0
                    {
941
0
                        return new StatefulPersistentWriter(this, guid, param, flow_controller,
942
0
                                       hist, listen, persistence);
943
0
                    }
944
0
                    else
945
0
                    {
946
0
                        return new StatefulWriter(this, guid, param, flow_controller,
947
0
                                       hist, listen);
948
0
                    }
949
0
                }
950
0
                else
951
0
                {
952
0
                    if (persistence != nullptr)
953
0
                    {
954
0
                        return new StatelessPersistentWriter(this, guid, param, flow_controller,
955
0
                                       hist, listen, persistence);
956
0
                    }
957
0
                    else
958
0
                    {
959
0
                        return new StatelessWriter(this, guid, param, flow_controller,
960
0
                                       hist, listen);
961
0
                    }
962
0
                }
963
0
            };
964
0
    return create_writer(WriterOut, param, entityId, isBuiltin, callback);
965
0
}
966
967
bool RTPSParticipantImpl::createWriter(
968
        RTPSWriter** WriterOut,
969
        WriterAttributes& param,
970
        const std::shared_ptr<IPayloadPool>& payload_pool,
971
        WriterHistory* hist,
972
        WriterListener* listen,
973
        const EntityId_t& entityId,
974
        bool isBuiltin)
975
0
{
976
0
    if (!payload_pool)
977
0
    {
978
0
        logError(RTPS_PARTICIPANT, "Trying to create writer with null payload pool");
979
0
        return false;
980
0
    }
981
982
0
    auto callback = [hist, listen, &payload_pool, this]
983
0
                (const GUID_t& guid, WriterAttributes& param, fastdds::rtps::FlowController* flow_controller,
984
0
                    IPersistenceService* persistence, bool is_reliable) -> RTPSWriter*
985
0
            {
986
0
                if (is_reliable)
987
0
                {
988
0
                    if (persistence != nullptr)
989
0
                    {
990
0
                        return new StatefulPersistentWriter(this, guid, param, payload_pool, flow_controller,
991
0
                                       hist, listen, persistence);
992
0
                    }
993
0
                    else
994
0
                    {
995
0
                        return new StatefulWriter(this, guid, param, payload_pool, flow_controller,
996
0
                                       hist, listen);
997
0
                    }
998
0
                }
999
0
                else
1000
0
                {
1001
0
                    if (persistence != nullptr)
1002
0
                    {
1003
0
                        return new StatelessPersistentWriter(this, guid, param, payload_pool, flow_controller,
1004
0
                                       hist, listen, persistence);
1005
0
                    }
1006
0
                    else
1007
0
                    {
1008
0
                        return new StatelessWriter(this, guid, param, payload_pool, flow_controller,
1009
0
                                       hist, listen);
1010
0
                    }
1011
0
                }
1012
0
            };
1013
0
    return create_writer(WriterOut, param, entityId, isBuiltin, callback);
1014
0
}
1015
1016
bool RTPSParticipantImpl::create_writer(
1017
        RTPSWriter** WriterOut,
1018
        WriterAttributes& watt,
1019
        const std::shared_ptr<IPayloadPool>& payload_pool,
1020
        const std::shared_ptr<IChangePool>& change_pool,
1021
        WriterHistory* hist,
1022
        WriterListener* listen,
1023
        const EntityId_t& entityId,
1024
        bool isBuiltin)
1025
0
{
1026
0
    if (!payload_pool)
1027
0
    {
1028
0
        logError(RTPS_PARTICIPANT, "Trying to create writer with null payload pool");
1029
0
        return false;
1030
0
    }
1031
1032
0
    auto callback = [hist, listen, &payload_pool, &change_pool, this]
1033
0
                (const GUID_t& guid, WriterAttributes& watt, fastdds::rtps::FlowController* flow_controller,
1034
0
                    IPersistenceService* persistence, bool is_reliable) -> RTPSWriter*
1035
0
            {
1036
0
                if (is_reliable)
1037
0
                {
1038
0
                    if (persistence != nullptr)
1039
0
                    {
1040
0
                        return new StatefulPersistentWriter(this, guid, watt, payload_pool, change_pool,
1041
0
                                       flow_controller, hist, listen, persistence);
1042
0
                    }
1043
0
                    else
1044
0
                    {
1045
0
                        return new StatefulWriter(this, guid, watt, payload_pool, change_pool,
1046
0
                                       flow_controller, hist, listen);
1047
0
                    }
1048
0
                }
1049
0
                else
1050
0
                {
1051
0
                    if (persistence != nullptr)
1052
0
                    {
1053
0
                        return new StatelessPersistentWriter(this, guid, watt, payload_pool, change_pool,
1054
0
                                       flow_controller, hist, listen, persistence);
1055
0
                    }
1056
0
                    else
1057
0
                    {
1058
0
                        return new StatelessWriter(this, guid, watt, payload_pool, change_pool,
1059
0
                                       flow_controller, hist, listen);
1060
0
                    }
1061
0
                }
1062
0
            };
1063
0
    return create_writer(WriterOut, watt, entityId, isBuiltin, callback);
1064
0
}
1065
1066
bool RTPSParticipantImpl::createReader(
1067
        RTPSReader** ReaderOut,
1068
        ReaderAttributes& param,
1069
        ReaderHistory* hist,
1070
        ReaderListener* listen,
1071
        const EntityId_t& entityId,
1072
        bool isBuiltin,
1073
        bool enable)
1074
0
{
1075
0
    auto callback = [hist, listen, this]
1076
0
                (const GUID_t& guid, ReaderAttributes& param, IPersistenceService* persistence,
1077
0
                    bool is_reliable) -> RTPSReader*
1078
0
            {
1079
0
                if (is_reliable)
1080
0
                {
1081
0
                    if (persistence != nullptr)
1082
0
                    {
1083
0
                        return new StatefulPersistentReader(this, guid, param, hist, listen, persistence);
1084
0
                    }
1085
0
                    else
1086
0
                    {
1087
0
                        return new StatefulReader(this, guid, param, hist, listen);
1088
0
                    }
1089
0
                }
1090
0
                else
1091
0
                {
1092
0
                    if (persistence != nullptr)
1093
0
                    {
1094
0
                        return new StatelessPersistentReader(this, guid, param, hist, listen, persistence);
1095
0
                    }
1096
0
                    else
1097
0
                    {
1098
0
                        return new StatelessReader(this, guid, param, hist, listen);
1099
0
                    }
1100
0
                }
1101
0
            };
1102
0
    return create_reader(ReaderOut, param, entityId, isBuiltin, enable, callback);
1103
0
}
1104
1105
bool RTPSParticipantImpl::createReader(
1106
        RTPSReader** ReaderOut,
1107
        ReaderAttributes& param,
1108
        const std::shared_ptr<IPayloadPool>& payload_pool,
1109
        ReaderHistory* hist,
1110
        ReaderListener* listen,
1111
        const EntityId_t& entityId,
1112
        bool isBuiltin,
1113
        bool enable)
1114
0
{
1115
0
    if (!payload_pool)
1116
0
    {
1117
0
        logError(RTPS_PARTICIPANT, "Trying to create reader with null payload pool");
1118
0
        return false;
1119
0
    }
1120
1121
0
    auto callback = [hist, listen, &payload_pool, this]
1122
0
                (const GUID_t& guid, ReaderAttributes& param, IPersistenceService* persistence,
1123
0
                    bool is_reliable) -> RTPSReader*
1124
0
            {
1125
0
                if (is_reliable)
1126
0
                {
1127
0
                    if (persistence != nullptr)
1128
0
                    {
1129
0
                        return new StatefulPersistentReader(this, guid, param, payload_pool, hist, listen, persistence);
1130
0
                    }
1131
0
                    else
1132
0
                    {
1133
0
                        return new StatefulReader(this, guid, param, payload_pool, hist, listen);
1134
0
                    }
1135
0
                }
1136
0
                else
1137
0
                {
1138
0
                    if (persistence != nullptr)
1139
0
                    {
1140
0
                        return new StatelessPersistentReader(this, guid, param, payload_pool, hist, listen,
1141
0
                                       persistence);
1142
0
                    }
1143
0
                    else
1144
0
                    {
1145
0
                        return new StatelessReader(this, guid, param, payload_pool, hist, listen);
1146
0
                    }
1147
0
                }
1148
0
            };
1149
0
    return create_reader(ReaderOut, param, entityId, isBuiltin, enable, callback);
1150
0
}
1151
1152
RTPSReader* RTPSParticipantImpl::find_local_reader(
1153
        const GUID_t& reader_guid)
1154
0
{
1155
0
    shared_lock<shared_mutex> _(endpoints_list_mutex);
1156
1157
0
    for (auto reader : m_allReaderList)
1158
0
    {
1159
0
        if (reader->getGuid() == reader_guid)
1160
0
        {
1161
0
            return reader;
1162
0
        }
1163
0
    }
1164
1165
0
    return nullptr;
1166
0
}
1167
1168
RTPSWriter* RTPSParticipantImpl::find_local_writer(
1169
        const GUID_t& writer_guid)
1170
0
{
1171
0
    shared_lock<shared_mutex> _(endpoints_list_mutex);
1172
1173
0
    for (auto writer : m_allWriterList)
1174
0
    {
1175
0
        if (writer->getGuid() == writer_guid)
1176
0
        {
1177
0
            return writer;
1178
0
        }
1179
0
    }
1180
1181
0
    return nullptr;
1182
0
}
1183
1184
bool RTPSParticipantImpl::enableReader(
1185
        RTPSReader* reader)
1186
0
{
1187
0
    if (!assignEndpointListenResources(reader))
1188
0
    {
1189
0
        return false;
1190
0
    }
1191
0
    return true;
1192
0
}
1193
1194
// Avoid to receive PDPSimple reader a DATA while calling ~PDPSimple and EDP was destroy already.
1195
void RTPSParticipantImpl::disableReader(
1196
        RTPSReader* reader)
1197
0
{
1198
0
    m_receiverResourcelistMutex.lock();
1199
0
    for (auto it = m_receiverResourcelist.begin(); it != m_receiverResourcelist.end(); ++it)
1200
0
    {
1201
0
        it->mp_receiver->removeEndpoint(reader);
1202
0
    }
1203
0
    m_receiverResourcelistMutex.unlock();
1204
0
}
1205
1206
bool RTPSParticipantImpl::registerWriter(
1207
        RTPSWriter* Writer,
1208
        const TopicAttributes& topicAtt,
1209
        const WriterQos& wqos)
1210
0
{
1211
0
    return this->mp_builtinProtocols->addLocalWriter(Writer, topicAtt, wqos);
1212
0
}
1213
1214
bool RTPSParticipantImpl::registerReader(
1215
        RTPSReader* reader,
1216
        const TopicAttributes& topicAtt,
1217
        const ReaderQos& rqos,
1218
        const fastdds::rtps::ContentFilterProperty* content_filter)
1219
0
{
1220
0
    return this->mp_builtinProtocols->addLocalReader(reader, topicAtt, rqos, content_filter);
1221
0
}
1222
1223
void RTPSParticipantImpl::update_attributes(
1224
        const RTPSParticipantAttributes& patt)
1225
0
{
1226
0
    bool update_pdp = false;
1227
    // Check if new interfaces have been added
1228
0
    if (internal_metatraffic_locators_)
1229
0
    {
1230
0
        LocatorList_t metatraffic_unicast_locator_list = m_att.builtin.metatrafficUnicastLocatorList;
1231
0
        get_default_metatraffic_locators();
1232
0
        if (!(metatraffic_unicast_locator_list == m_att.builtin.metatrafficUnicastLocatorList))
1233
0
        {
1234
0
            update_pdp = true;
1235
0
            m_network_Factory.update_network_interfaces();
1236
0
            logInfo(RTPS_PARTICIPANT, m_att.getName() << " updated its metatraffic locators");
1237
0
        }
1238
0
    }
1239
0
    if (internal_default_locators_)
1240
0
    {
1241
0
        LocatorList_t default_unicast_locator_list = m_att.defaultUnicastLocatorList;
1242
0
        get_default_unicast_locators();
1243
0
        if (!(default_unicast_locator_list == m_att.defaultUnicastLocatorList))
1244
0
        {
1245
0
            update_pdp = true;
1246
0
            logInfo(RTPS_PARTICIPANT, m_att.getName() << " updated default unicast locator list, current locators: "
1247
0
                                                      << m_att.defaultUnicastLocatorList);
1248
0
        }
1249
0
    }
1250
1251
0
    auto pdp = mp_builtinProtocols->mp_PDP;
1252
1253
    // Check if there are changes
1254
0
    if (patt.builtin.discovery_config.m_DiscoveryServers != m_att.builtin.discovery_config.m_DiscoveryServers
1255
0
            || patt.userData != m_att.userData
1256
0
            || update_pdp)
1257
0
    {
1258
0
        update_pdp = true;
1259
0
        std::vector<GUID_t> modified_servers;
1260
0
        LocatorList_t modified_locators;
1261
1262
        // Update RTPSParticipantAttributes member
1263
0
        m_att.userData = patt.userData;
1264
1265
        // If there's no PDP don't process Discovery-related attributes.
1266
0
        if (!pdp)
1267
0
        {
1268
0
            return;
1269
0
        }
1270
1271
        // Check that the remote servers list is consistent: all the already known remote servers must be included in
1272
        // the list and either new remote servers are added or remote server listening locator is modified.
1273
0
        for (auto existing_server : m_att.builtin.discovery_config.m_DiscoveryServers)
1274
0
        {
1275
0
            bool contained = false;
1276
0
            for (auto incoming_server : patt.builtin.discovery_config.m_DiscoveryServers)
1277
0
            {
1278
0
                if (existing_server.guidPrefix == incoming_server.guidPrefix)
1279
0
                {
1280
0
                    for (auto incoming_locator : incoming_server.metatrafficUnicastLocatorList)
1281
0
                    {
1282
0
                        bool locator_contained = false;
1283
0
                        for (auto existing_locator : existing_server.metatrafficUnicastLocatorList)
1284
0
                        {
1285
0
                            if (incoming_locator == existing_locator)
1286
0
                            {
1287
0
                                locator_contained = true;
1288
0
                                break;
1289
0
                            }
1290
0
                        }
1291
0
                        if (!locator_contained)
1292
0
                        {
1293
0
                            modified_servers.emplace_back(incoming_server.GetParticipant());
1294
0
                            modified_locators.push_back(incoming_locator);
1295
0
                            logInfo(RTPS_QOS_CHECK,
1296
0
                                    "DS Server: " << incoming_server.guidPrefix << " has modified its locators: "
1297
0
                                                  << incoming_locator << " being added")
1298
0
                        }
1299
0
                    }
1300
0
                    contained = true;
1301
0
                    break;
1302
0
                }
1303
0
            }
1304
0
            if (!contained)
1305
0
            {
1306
0
                logWarning(RTPS_QOS_CHECK,
1307
0
                        "Discovery Servers cannot be removed from the list; they can only be added");
1308
0
                return;
1309
0
            }
1310
0
        }
1311
1312
0
        {
1313
0
            std::lock_guard<std::recursive_mutex> lock(*pdp->getMutex());
1314
1315
            // Update user data
1316
0
            auto local_participant_proxy_data = pdp->getLocalParticipantProxyData();
1317
0
            local_participant_proxy_data->m_userData.data_vec(m_att.userData);
1318
1319
            // Update metatraffic locators
1320
0
            for (auto locator : m_att.builtin.metatrafficMulticastLocatorList)
1321
0
            {
1322
0
                local_participant_proxy_data->metatraffic_locators.add_multicast_locator(locator);
1323
0
            }
1324
0
            for (auto locator : m_att.builtin.metatrafficUnicastLocatorList)
1325
0
            {
1326
0
                local_participant_proxy_data->metatraffic_locators.add_unicast_locator(locator);
1327
0
            }
1328
1329
            // Update default locators
1330
0
            for (auto locator : m_att.defaultUnicastLocatorList)
1331
0
            {
1332
0
                local_participant_proxy_data->default_locators.add_unicast_locator(locator);
1333
0
            }
1334
1335
0
            createSenderResources(m_att.builtin.metatrafficMulticastLocatorList);
1336
0
            createSenderResources(m_att.builtin.metatrafficUnicastLocatorList);
1337
0
            createSenderResources(m_att.defaultUnicastLocatorList);
1338
0
            if (!modified_locators.empty())
1339
0
            {
1340
0
                createSenderResources(modified_locators);
1341
0
            }
1342
1343
            // Update remote servers list
1344
0
            if (m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::CLIENT ||
1345
0
                    m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SUPER_CLIENT ||
1346
0
                    m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SERVER ||
1347
0
                    m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::BACKUP)
1348
0
            {
1349
                // Add incoming servers iff we don't know about them already or the listening locator has been modified
1350
0
                for (auto incoming_server : patt.builtin.discovery_config.m_DiscoveryServers)
1351
0
                {
1352
0
                    eprosima::fastdds::rtps::RemoteServerList_t::iterator server_it;
1353
0
                    for (server_it = m_att.builtin.discovery_config.m_DiscoveryServers.begin();
1354
0
                            server_it != m_att.builtin.discovery_config.m_DiscoveryServers.end(); server_it++)
1355
0
                    {
1356
0
                        if (server_it->guidPrefix == incoming_server.guidPrefix)
1357
0
                        {
1358
                            // Check if the listening locators have been modified
1359
0
                            for (auto guid : modified_servers)
1360
0
                            {
1361
0
                                if (guid == incoming_server.GetParticipant())
1362
0
                                {
1363
0
                                    server_it->metatrafficUnicastLocatorList =
1364
0
                                            incoming_server.metatrafficUnicastLocatorList;
1365
0
                                    break;
1366
0
                                }
1367
0
                            }
1368
0
                            break;
1369
0
                        }
1370
0
                    }
1371
0
                    if (server_it == m_att.builtin.discovery_config.m_DiscoveryServers.end())
1372
0
                    {
1373
0
                        m_att.builtin.discovery_config.m_DiscoveryServers.push_back(incoming_server);
1374
0
                    }
1375
0
                }
1376
1377
                // Update the servers list in builtin protocols
1378
0
                {
1379
0
                    std::unique_lock<eprosima::shared_mutex> disc_lock(mp_builtinProtocols->getDiscoveryMutex());
1380
0
                    mp_builtinProtocols->m_DiscoveryServers = m_att.builtin.discovery_config.m_DiscoveryServers;
1381
0
                }
1382
1383
                // Notify PDPServer
1384
0
                if (m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SERVER ||
1385
0
                        m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::BACKUP)
1386
0
                {
1387
0
                    fastdds::rtps::PDPServer* pdp_server = static_cast<fastdds::rtps::PDPServer*>(pdp);
1388
0
                    pdp_server->update_remote_servers_list();
1389
0
                    for (auto remote_server : modified_servers)
1390
0
                    {
1391
0
                        pdp_server->remove_remote_participant(remote_server,
1392
0
                                ParticipantDiscoveryInfo::DISCOVERY_STATUS::DROPPED_PARTICIPANT);
1393
0
                    }
1394
0
                }
1395
                // Notify PDPClient
1396
0
                else if (m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::CLIENT ||
1397
0
                        m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SUPER_CLIENT)
1398
0
                {
1399
0
                    fastdds::rtps::PDPClient* pdp_client = static_cast<fastdds::rtps::PDPClient*>(pdp);
1400
0
                    pdp_client->update_remote_servers_list();
1401
0
                    for (auto remote_server : modified_servers)
1402
0
                    {
1403
0
                        pdp_client->remove_remote_participant(remote_server,
1404
0
                                ParticipantDiscoveryInfo::DISCOVERY_STATUS::DROPPED_PARTICIPANT);
1405
0
                    }
1406
0
                }
1407
0
            }
1408
0
        }
1409
0
    }
1410
1411
0
    if (update_pdp)
1412
0
    {
1413
        // Send DATA(P)
1414
0
        pdp->announceParticipantState(true);
1415
0
    }
1416
0
}
1417
1418
bool RTPSParticipantImpl::updateLocalWriter(
1419
        RTPSWriter* Writer,
1420
        const TopicAttributes& topicAtt,
1421
        const WriterQos& wqos)
1422
0
{
1423
0
    return this->mp_builtinProtocols->updateLocalWriter(Writer, topicAtt, wqos);
1424
0
}
1425
1426
bool RTPSParticipantImpl::updateLocalReader(
1427
        RTPSReader* reader,
1428
        const TopicAttributes& topicAtt,
1429
        const ReaderQos& rqos,
1430
        const fastdds::rtps::ContentFilterProperty* content_filter)
1431
0
{
1432
0
    return this->mp_builtinProtocols->updateLocalReader(reader, topicAtt, rqos, content_filter);
1433
0
}
1434
1435
/*
1436
 *
1437
 * AUXILIARY METHODS
1438
 *
1439
 *
1440
 */
1441
1442
1443
bool RTPSParticipantImpl::existsEntityId(
1444
        const EntityId_t& ent,
1445
        EndpointKind_t kind) const
1446
0
{
1447
1448
0
    auto check = [&ent](Endpoint* e)
1449
0
            {
1450
0
                return ent == e->getGuid().entityId;
1451
0
            };
1452
1453
0
    shared_lock<shared_mutex> _(endpoints_list_mutex);
1454
1455
0
    if (kind == WRITER)
1456
0
    {
1457
0
        return std::any_of(m_userWriterList.begin(), m_userWriterList.end(), check);
1458
0
    }
1459
0
    else
1460
0
    {
1461
0
        return std::any_of(m_userReaderList.begin(), m_userReaderList.end(), check);
1462
0
    }
1463
1464
0
    return false;
1465
0
}
1466
1467
/*
1468
 *
1469
 * RECEIVER RESOURCE METHODS
1470
 *
1471
 */
1472
bool RTPSParticipantImpl::assignEndpointListenResources(
1473
        Endpoint* endp)
1474
0
{
1475
    //Tag the endpoint with the ReceiverResources
1476
0
    bool valid = true;
1477
1478
    /* No need to check for emptiness on the lists, as it was already done on part function
1479
       In case are using the default list of Locators they have already been embedded to the parameters
1480
     */
1481
1482
    //UNICAST
1483
0
    assignEndpoint2LocatorList(endp, endp->getAttributes().unicastLocatorList);
1484
    //MULTICAST
1485
0
    assignEndpoint2LocatorList(endp, endp->getAttributes().multicastLocatorList);
1486
0
    return valid;
1487
0
}
1488
1489
bool RTPSParticipantImpl::createAndAssociateReceiverswithEndpoint(
1490
        Endpoint* pend,
1491
        bool unique_flows,
1492
        uint16_t initial_unique_port,
1493
        uint16_t final_unique_port)
1494
0
{
1495
    /*  This function...
1496
        - Asks the network factory for new resources
1497
        - Encapsulates the new resources within the ReceiverControlBlock list
1498
        - Associated the endpoint to the new elements in the list
1499
        - Launches the listener thread
1500
     */
1501
1502
0
    if (unique_flows)
1503
0
    {
1504
0
        pend->getAttributes().multicastLocatorList.clear();
1505
0
        pend->getAttributes().unicastLocatorList = m_att.defaultUnicastLocatorList;
1506
1507
0
        uint16_t port = initial_unique_port;
1508
0
        while (port < final_unique_port)
1509
0
        {
1510
            // Set port on unicast locators
1511
0
            for (Locator_t& loc : pend->getAttributes().unicastLocatorList)
1512
0
            {
1513
0
                loc.port = port;
1514
0
            }
1515
1516
            // Try creating receiver resources
1517
0
            if (createReceiverResources(pend->getAttributes().unicastLocatorList, false, true))
1518
0
            {
1519
0
                break;
1520
0
            }
1521
1522
            // Try with next port
1523
0
            ++port;
1524
0
        }
1525
1526
        // Fail when unique ports are exhausted
1527
0
        if (port >= final_unique_port)
1528
0
        {
1529
0
            logError(RTPS_PARTICIPANT, "Unique flows requested but exhausted. Port range: "
1530
0
                    << initial_unique_port << "-" << final_unique_port);
1531
0
            return false;
1532
0
        }
1533
0
    }
1534
0
    else
1535
0
    {
1536
        // 1 - Ask the network factory to generate the elements that do still not exist
1537
        //Iterate through the list of unicast and multicast locators the endpoint has... unless its empty
1538
        //In that case, just use the standard
1539
0
        if (pend->getAttributes().unicastLocatorList.empty() && pend->getAttributes().multicastLocatorList.empty())
1540
0
        {
1541
            // Take default locators from the participant.
1542
0
            pend->getAttributes().unicastLocatorList = m_att.defaultUnicastLocatorList;
1543
0
            pend->getAttributes().multicastLocatorList = m_att.defaultMulticastLocatorList;
1544
0
        }
1545
0
        createReceiverResources(pend->getAttributes().unicastLocatorList, false, true);
1546
0
        createReceiverResources(pend->getAttributes().multicastLocatorList, false, true);
1547
0
    }
1548
1549
    // Associate the Endpoint with ReceiverControlBlock
1550
0
    assignEndpointListenResources(pend);
1551
0
    return true;
1552
0
}
1553
1554
bool RTPSParticipantImpl::assignEndpoint2LocatorList(
1555
        Endpoint* endp,
1556
        LocatorList_t& list)
1557
0
{
1558
    /* Note:
1559
       The previous version of this function associated (or created) ListenResources and added the endpoint to them.
1560
       It then requested the list of Locators the Listener is listening to and appended to the LocatorList_t from the parameters.
1561
1562
       This has been removed because it is considered redundant. For ReceiveResources that listen on multiple interfaces, only
1563
       one of the supported Locators is needed to make the match, and the case of new ListenResources being created has been removed
1564
       since its the NetworkFactory the one that takes care of Resource creation.
1565
     */
1566
0
    for (auto lit = list.begin(); lit != list.end(); ++lit)
1567
0
    {
1568
        //Iteration of all Locators within the Locator list passed down as argument
1569
0
        std::lock_guard<std::mutex> guard(m_receiverResourcelistMutex);
1570
        //Check among ReceiverResources whether the locator is supported or not
1571
0
        for (auto it = m_receiverResourcelist.begin(); it != m_receiverResourcelist.end(); ++it)
1572
0
        {
1573
            //Take mutex for the resource since we are going to interact with shared resources
1574
            //std::lock_guard<std::mutex> guard((*it).mtx);
1575
0
            if (it->Receiver->SupportsLocator(*lit))
1576
0
            {
1577
                //Supported! Take mutex and update lists - We maintain reader/writer discrimination just in case
1578
0
                it->mp_receiver->associateEndpoint(endp);
1579
                // end association between reader/writer and the receive resources
1580
0
            }
1581
1582
0
        }
1583
        //Finished iteratig through all ListenResources for a single Locator (from the parameter list).
1584
        //Since this function is called after checking with NetFactory we do not have to create any more resource.
1585
0
    }
1586
0
    return true;
1587
0
}
1588
1589
bool RTPSParticipantImpl::createSendResources(
1590
        Endpoint* pend)
1591
0
{
1592
0
    if (pend->m_att.remoteLocatorList.empty())
1593
0
    {
1594
        // Adds the default locators of every registered transport.
1595
0
        m_network_Factory.GetDefaultOutputLocators(pend->m_att.remoteLocatorList);
1596
0
    }
1597
1598
0
    std::lock_guard<std::timed_mutex> guard(m_send_resources_mutex_);
1599
1600
    //Output locators have been specified, create them
1601
0
    for (auto it = pend->m_att.remoteLocatorList.begin(); it != pend->m_att.remoteLocatorList.end(); ++it)
1602
0
    {
1603
0
        if (!m_network_Factory.build_send_resources(send_resource_list_, (*it)))
1604
0
        {
1605
0
            logWarning(RTPS_PARTICIPANT, "Cannot create send resource for endpoint remote locator (" <<
1606
0
                    pend->getGuid() << ", " << (*it) << ")");
1607
0
        }
1608
0
    }
1609
1610
0
    return true;
1611
0
}
1612
1613
bool RTPSParticipantImpl::createReceiverResources(
1614
        LocatorList_t& Locator_list,
1615
        bool ApplyMutation,
1616
        bool RegisterReceiver)
1617
0
{
1618
0
    std::vector<std::shared_ptr<ReceiverResource>> newItemsBuffer;
1619
0
    bool ret_val = Locator_list.empty();
1620
1621
#if HAVE_SECURITY
1622
    // An auxilary buffer is needed in the ReceiverResource to to decrypt the message,
1623
    // that imposes a limit in the received messages size even if the transport allows (uint32_t) messages size.
1624
    uint32_t max_receiver_buffer_size =
1625
            is_secure() ? std::numeric_limits<uint16_t>::max() : std::numeric_limits<uint32_t>::max();
1626
#else
1627
0
    uint32_t max_receiver_buffer_size = std::numeric_limits<uint32_t>::max();
1628
0
#endif // if HAVE_SECURITY
1629
1630
0
    for (auto it_loc = Locator_list.begin(); it_loc != Locator_list.end(); ++it_loc)
1631
0
    {
1632
0
        bool ret = m_network_Factory.BuildReceiverResources(*it_loc, newItemsBuffer, max_receiver_buffer_size);
1633
0
        if (!ret && ApplyMutation)
1634
0
        {
1635
0
            uint32_t tries = 0;
1636
0
            while (!ret && (tries < m_att.builtin.mutation_tries))
1637
0
            {
1638
0
                tries++;
1639
0
                *it_loc = applyLocatorAdaptRule(*it_loc);
1640
0
                ret = m_network_Factory.BuildReceiverResources(*it_loc, newItemsBuffer, max_receiver_buffer_size);
1641
0
            }
1642
0
        }
1643
1644
0
        ret_val |= !newItemsBuffer.empty();
1645
1646
0
        for (auto it_buffer = newItemsBuffer.begin(); it_buffer != newItemsBuffer.end(); ++it_buffer)
1647
0
        {
1648
0
            std::lock_guard<std::mutex> lock(m_receiverResourcelistMutex);
1649
            //Push the new items into the ReceiverResource buffer
1650
0
            m_receiverResourcelist.emplace_back(*it_buffer);
1651
            //Create and init the MessageReceiver
1652
0
            auto mr = new MessageReceiver(this, (*it_buffer)->max_message_size());
1653
0
            m_receiverResourcelist.back().mp_receiver = mr;
1654
            //Start reception
1655
0
            if (RegisterReceiver)
1656
0
            {
1657
0
                m_receiverResourcelist.back().Receiver->RegisterReceiver(mr);
1658
0
            }
1659
0
        }
1660
0
        newItemsBuffer.clear();
1661
0
    }
1662
1663
0
    return ret_val;
1664
0
}
1665
1666
void RTPSParticipantImpl::createSenderResources(
1667
        const LocatorList_t& locator_list)
1668
0
{
1669
0
    std::lock_guard<std::timed_mutex> lock(m_send_resources_mutex_);
1670
1671
0
    for (auto it_loc = locator_list.begin(); it_loc != locator_list.end(); ++it_loc)
1672
0
    {
1673
0
        m_network_Factory.build_send_resources(send_resource_list_, *it_loc);
1674
0
    }
1675
0
}
1676
1677
void RTPSParticipantImpl::createSenderResources(
1678
        const Locator_t& locator)
1679
0
{
1680
0
    std::lock_guard<std::timed_mutex> lock(m_send_resources_mutex_);
1681
1682
0
    m_network_Factory.build_send_resources(send_resource_list_, locator);
1683
0
}
1684
1685
bool RTPSParticipantImpl::deleteUserEndpoint(
1686
        const GUID_t& endpoint)
1687
0
{
1688
0
    if ( getGuid().guidPrefix != endpoint.guidPrefix)
1689
0
    {
1690
0
        return false;
1691
0
    }
1692
1693
0
    bool found = false, found_in_users = false;
1694
0
    Endpoint* p_endpoint = nullptr;
1695
1696
0
    if (endpoint.entityId.is_writer())
1697
0
    {
1698
0
        std::lock_guard<shared_mutex> _(endpoints_list_mutex);
1699
1700
0
        for (auto wit = m_userWriterList.begin(); wit != m_userWriterList.end(); ++wit)
1701
0
        {
1702
0
            if ((*wit)->getGuid().entityId == endpoint.entityId) //Found it
1703
0
            {
1704
0
                m_userWriterList.erase(wit);
1705
0
                found_in_users = true;
1706
0
                break;
1707
0
            }
1708
0
        }
1709
1710
0
        for (auto wit = m_allWriterList.begin(); wit != m_allWriterList.end(); ++wit)
1711
0
        {
1712
0
            if ((*wit)->getGuid().entityId == endpoint.entityId) //Found it
1713
0
            {
1714
0
                p_endpoint = *wit;
1715
0
                m_allWriterList.erase(wit);
1716
0
                found = true;
1717
0
                break;
1718
0
            }
1719
0
        }
1720
0
    }
1721
0
    else
1722
0
    {
1723
0
        std::lock_guard<shared_mutex> _(endpoints_list_mutex);
1724
1725
0
        for (auto rit = m_userReaderList.begin(); rit != m_userReaderList.end(); ++rit)
1726
0
        {
1727
0
            if ((*rit)->getGuid().entityId == endpoint.entityId) //Found it
1728
0
            {
1729
0
                m_userReaderList.erase(rit);
1730
0
                found_in_users = true;
1731
0
                break;
1732
0
            }
1733
0
        }
1734
1735
0
        for (auto rit = m_allReaderList.begin(); rit != m_allReaderList.end(); ++rit)
1736
0
        {
1737
0
            if ((*rit)->getGuid().entityId == endpoint.entityId) //Found it
1738
0
            {
1739
0
                p_endpoint = *rit;
1740
0
                m_allReaderList.erase(rit);
1741
0
                found = true;
1742
0
                break;
1743
0
            }
1744
0
        }
1745
0
    }
1746
1747
0
    if (!found)
1748
0
    {
1749
0
        return false;
1750
0
    }
1751
1752
0
    {
1753
0
        std::lock_guard<std::mutex> _(m_receiverResourcelistMutex);
1754
1755
0
        for (auto& rb : m_receiverResourcelist)
1756
0
        {
1757
0
            auto receiver = rb.mp_receiver;
1758
0
            if (receiver)
1759
0
            {
1760
0
                receiver->removeEndpoint(p_endpoint);
1761
0
            }
1762
0
        }
1763
0
    }
1764
1765
    //REMOVE FROM BUILTIN PROTOCOLS
1766
0
    if (p_endpoint->getAttributes().endpointKind == WRITER)
1767
0
    {
1768
0
        if (found_in_users)
1769
0
        {
1770
0
            mp_builtinProtocols->removeLocalWriter(static_cast<RTPSWriter*>(p_endpoint));
1771
0
        }
1772
1773
#if HAVE_SECURITY
1774
        if (p_endpoint->getAttributes().security_attributes().is_submessage_protected ||
1775
                p_endpoint->getAttributes().security_attributes().is_payload_protected)
1776
        {
1777
            m_security_manager.unregister_local_writer(p_endpoint->getGuid());
1778
        }
1779
#endif // if HAVE_SECURITY
1780
0
    }
1781
0
    else
1782
0
    {
1783
0
        if (found_in_users)
1784
0
        {
1785
0
            mp_builtinProtocols->removeLocalReader(static_cast<RTPSReader*>(p_endpoint));
1786
0
        }
1787
1788
#if HAVE_SECURITY
1789
        if (p_endpoint->getAttributes().security_attributes().is_submessage_protected ||
1790
                p_endpoint->getAttributes().security_attributes().is_payload_protected)
1791
        {
1792
            m_security_manager.unregister_local_reader(p_endpoint->getGuid());
1793
        }
1794
#endif // if HAVE_SECURITY
1795
0
    }
1796
1797
0
    delete(p_endpoint);
1798
0
    return true;
1799
0
}
1800
1801
void RTPSParticipantImpl::deleteAllUserEndpoints()
1802
0
{
1803
0
    std::vector<Endpoint*> tmp(0);
1804
1805
0
    {
1806
0
        using namespace std;
1807
1808
0
        lock_guard<shared_mutex> _(endpoints_list_mutex);
1809
1810
        // move the collections to a local
1811
0
        tmp.resize(m_userWriterList.size() + m_userReaderList.size());
1812
0
        auto it = move(m_userWriterList.begin(), m_userWriterList.end(), tmp.begin());
1813
0
        it = move(m_userReaderList.begin(), m_userReaderList.end(), it);
1814
1815
        // check we have copied all elements
1816
0
        assert(tmp.end() == it);
1817
1818
        // now update the all collections by removing the user elements
1819
0
        sort(m_userWriterList.begin(), m_userWriterList.end());
1820
0
        sort(m_userReaderList.begin(), m_userReaderList.end());
1821
0
        sort(m_allWriterList.begin(), m_allWriterList.end());
1822
0
        sort(m_allReaderList.begin(), m_allReaderList.end());
1823
1824
0
        vector<RTPSWriter*> writers;
1825
0
        set_difference(m_allWriterList.begin(), m_allWriterList.end(),
1826
0
                m_userWriterList.begin(), m_userWriterList.end(),
1827
0
                back_inserter(writers));
1828
0
        swap(writers, m_allWriterList);
1829
1830
0
        vector<RTPSReader*> readers;
1831
0
        set_difference(m_allReaderList.begin(), m_allReaderList.end(),
1832
0
                m_userReaderList.begin(), m_userReaderList.end(),
1833
0
                back_inserter(readers));
1834
0
        swap(readers, m_allReaderList);
1835
1836
        // remove dangling references
1837
0
        m_userWriterList.clear();
1838
0
        m_userReaderList.clear();
1839
0
    }
1840
1841
    // unlink the transport receiver blocks from the endpoints
1842
0
    for ( auto endpoint : tmp)
1843
0
    {
1844
0
        std::lock_guard<std::mutex> _(m_receiverResourcelistMutex);
1845
1846
0
        for (auto& rb : m_receiverResourcelist)
1847
0
        {
1848
0
            auto receiver = rb.mp_receiver;
1849
0
            if (receiver)
1850
0
            {
1851
0
                receiver->removeEndpoint(endpoint);
1852
0
            }
1853
0
        }
1854
0
    }
1855
1856
    // Remove from builtin protocols
1857
0
    auto removeEndpoint = [this](EndpointKind_t kind, Endpoint* p)
1858
0
            {
1859
0
                return kind == WRITER
1860
0
               ? mp_builtinProtocols->removeLocalWriter((RTPSWriter*)p)
1861
0
               : mp_builtinProtocols->removeLocalReader((RTPSReader*)p);
1862
0
            };
1863
1864
#if HAVE_SECURITY
1865
    bool (eprosima::fastrtps::rtps::security::SecurityManager::* unregister_endpoint[2])(
1866
            const GUID_t& writer_guid);
1867
    unregister_endpoint[WRITER] = &security::SecurityManager::unregister_local_writer;
1868
    unregister_endpoint[READER] = &security::SecurityManager::unregister_local_reader;
1869
#endif // if HAVE_SECURITY
1870
1871
0
    for ( auto endpoint : tmp)
1872
0
    {
1873
0
        auto kind = endpoint->getAttributes().endpointKind;
1874
0
        removeEndpoint(kind, endpoint);
1875
1876
#if HAVE_SECURITY
1877
        if (endpoint->getAttributes().security_attributes().is_submessage_protected ||
1878
                endpoint->getAttributes().security_attributes().is_payload_protected)
1879
        {
1880
            (m_security_manager.*unregister_endpoint[kind])(endpoint->getGuid());
1881
        }
1882
#endif // if HAVE_SECURITY
1883
1884
        // remove the endpoints
1885
0
        delete(endpoint);
1886
0
    }
1887
0
}
1888
1889
void RTPSParticipantImpl::normalize_endpoint_locators(
1890
        EndpointAttributes& endpoint_att)
1891
0
{
1892
    // Locators with port 0, calculate port.
1893
0
    for (Locator_t& loc : endpoint_att.unicastLocatorList)
1894
0
    {
1895
0
        m_network_Factory.fill_default_locator_port(domain_id_, loc, m_att, false);
1896
0
    }
1897
0
    for (Locator_t& loc : endpoint_att.multicastLocatorList)
1898
0
    {
1899
0
        m_network_Factory.fill_default_locator_port(domain_id_, loc, m_att, true);
1900
0
    }
1901
1902
    // Normalize unicast locators
1903
0
    if (!endpoint_att.unicastLocatorList.empty())
1904
0
    {
1905
0
        m_network_Factory.NormalizeLocators(endpoint_att.unicastLocatorList);
1906
0
    }
1907
0
}
1908
1909
std::vector<std::string> RTPSParticipantImpl::getParticipantNames() const
1910
0
{
1911
0
    std::vector<std::string> participant_names;
1912
0
    auto pdp = mp_builtinProtocols->mp_PDP;
1913
0
    for (auto it = pdp->ParticipantProxiesBegin(); it != pdp->ParticipantProxiesEnd(); ++it)
1914
0
    {
1915
0
        participant_names.emplace_back((*it)->m_participantName.to_string());
1916
0
    }
1917
0
    return participant_names;
1918
0
}
1919
1920
void RTPSParticipantImpl::setGuid(
1921
        GUID_t& guid)
1922
0
{
1923
0
    m_guid = guid;
1924
0
}
1925
1926
void RTPSParticipantImpl::announceRTPSParticipantState()
1927
0
{
1928
0
    return mp_builtinProtocols->announceRTPSParticipantState();
1929
0
}
1930
1931
void RTPSParticipantImpl::stopRTPSParticipantAnnouncement()
1932
0
{
1933
0
    return mp_builtinProtocols->stopRTPSParticipantAnnouncement();
1934
0
}
1935
1936
void RTPSParticipantImpl::resetRTPSParticipantAnnouncement()
1937
0
{
1938
0
    return mp_builtinProtocols->resetRTPSParticipantAnnouncement();
1939
0
}
1940
1941
void RTPSParticipantImpl::loose_next_change()
1942
0
{
1943
    //NOTE: This is replaced by the test transport
1944
    //this->mp_send_thr->loose_next_change();
1945
0
}
1946
1947
bool RTPSParticipantImpl::newRemoteEndpointDiscovered(
1948
        const GUID_t& pguid,
1949
        int16_t userDefinedId,
1950
        EndpointKind_t kind)
1951
0
{
1952
0
    if (m_att.builtin.discovery_config.discoveryProtocol != DiscoveryProtocol::SIMPLE ||
1953
0
            m_att.builtin.discovery_config.use_STATIC_EndpointDiscoveryProtocol == false)
1954
0
    {
1955
0
        logWarning(RTPS_PARTICIPANT,
1956
0
                "Remote Endpoints can only be activated with static discovery protocol over PDP simple protocol");
1957
0
        return false;
1958
0
    }
1959
1960
0
    if (PDPSimple* pS = dynamic_cast<PDPSimple*>(mp_builtinProtocols->mp_PDP))
1961
0
    {
1962
0
        return pS->newRemoteEndpointStaticallyDiscovered(pguid, userDefinedId, kind);
1963
0
    }
1964
1965
0
    return false;
1966
0
}
1967
1968
void RTPSParticipantImpl::ResourceSemaphorePost()
1969
0
{
1970
0
    if (mp_ResourceSemaphore != nullptr)
1971
0
    {
1972
0
        mp_ResourceSemaphore->post();
1973
0
    }
1974
0
}
1975
1976
void RTPSParticipantImpl::ResourceSemaphoreWait()
1977
0
{
1978
0
    if (mp_ResourceSemaphore != nullptr)
1979
0
    {
1980
0
        mp_ResourceSemaphore->wait();
1981
0
    }
1982
0
}
1983
1984
void RTPSParticipantImpl::assert_remote_participant_liveliness(
1985
        const GuidPrefix_t& remote_guid)
1986
0
{
1987
0
    if (mp_builtinProtocols && mp_builtinProtocols->mp_PDP)
1988
0
    {
1989
0
        mp_builtinProtocols->mp_PDP->assert_remote_participant_liveliness(remote_guid);
1990
0
    }
1991
0
}
1992
1993
/**
1994
 * Get the list of locators from which this publisher may send data.
1995
 *
1996
 * @param [out] locators  LocatorList_t where the list of locators will be stored.
1997
 */
1998
void RTPSParticipantImpl::get_sending_locators(
1999
        rtps::LocatorList_t& locators) const
2000
0
{
2001
0
    locators.clear();
2002
2003
    // Traverse the sender list and query
2004
0
    for (const auto& send_resource : send_resource_list_)
2005
0
    {
2006
0
        send_resource->add_locators_to_list(locators);
2007
0
    }
2008
0
}
2009
2010
uint32_t RTPSParticipantImpl::getMaxMessageSize() const
2011
0
{
2012
#if HAVE_SECURITY
2013
    // An auxilary buffer is needed in the ReceiverResource to to decrypt the message,
2014
    // that imposes a limit in the received messages size even if the transport allows (uint32_t) messages size.
2015
    // So the sender limits also its size.
2016
    uint32_t max_receiver_buffer_size =
2017
            is_secure() ? std::numeric_limits<uint16_t>::max() : std::numeric_limits<uint32_t>::max();
2018
#else
2019
0
    uint32_t max_receiver_buffer_size = std::numeric_limits<uint32_t>::max();
2020
0
#endif // if HAVE_SECURITY
2021
2022
0
    return (std::min)(
2023
0
        m_network_Factory.get_max_message_size_between_transports(),
2024
0
        max_receiver_buffer_size);
2025
0
}
2026
2027
uint32_t RTPSParticipantImpl::getMaxDataSize()
2028
0
{
2029
0
    return calculateMaxDataSize(getMaxMessageSize());
2030
0
}
2031
2032
uint32_t RTPSParticipantImpl::calculateMaxDataSize(
2033
        uint32_t length)
2034
0
{
2035
0
    uint32_t maxDataSize = length;
2036
2037
#if HAVE_SECURITY
2038
    // If there is rtps messsage protection, reduce max size for messages,
2039
    // because extra data is added on encryption.
2040
    if (security_attributes_.is_rtps_protected)
2041
    {
2042
        maxDataSize -= m_security_manager.calculate_extra_size_for_rtps_message();
2043
    }
2044
#endif // if HAVE_SECURITY
2045
2046
    // RTPS header
2047
0
    maxDataSize -= RTPSMESSAGE_HEADER_SIZE;
2048
0
    return maxDataSize;
2049
0
}
2050
2051
bool RTPSParticipantImpl::networkFactoryHasRegisteredTransports() const
2052
0
{
2053
0
    return m_network_Factory.numberOfRegisteredTransports() > 0;
2054
0
}
2055
2056
#if HAVE_SECURITY
2057
bool RTPSParticipantImpl::pairing_remote_reader_with_local_writer_after_security(
2058
        const GUID_t& local_writer,
2059
        const ReaderProxyData& remote_reader_data)
2060
{
2061
    bool return_value;
2062
2063
    return_value = mp_builtinProtocols->mp_PDP->getEDP()->pairing_remote_reader_with_local_writer_after_security(
2064
        local_writer, remote_reader_data);
2065
    if (!return_value && mp_builtinProtocols->mp_WLP != nullptr)
2066
    {
2067
        return_value = mp_builtinProtocols->mp_WLP->pairing_remote_reader_with_local_writer_after_security(
2068
            local_writer, remote_reader_data);
2069
    }
2070
2071
    return return_value;
2072
}
2073
2074
bool RTPSParticipantImpl::pairing_remote_writer_with_local_reader_after_security(
2075
        const GUID_t& local_reader,
2076
        const WriterProxyData& remote_writer_data)
2077
{
2078
    bool return_value;
2079
2080
    return_value = mp_builtinProtocols->mp_PDP->getEDP()->pairing_remote_writer_with_local_reader_after_security(
2081
        local_reader, remote_writer_data);
2082
    if (!return_value && mp_builtinProtocols->mp_WLP != nullptr)
2083
    {
2084
        return_value = mp_builtinProtocols->mp_WLP->pairing_remote_writer_with_local_reader_after_security(
2085
            local_reader, remote_writer_data);
2086
    }
2087
2088
    return return_value;
2089
}
2090
2091
bool RTPSParticipantImpl::is_security_enabled_for_writer(
2092
        const WriterAttributes& writer_attributes)
2093
{
2094
    if (!is_initialized() || !is_secure())
2095
    {
2096
        return false;
2097
    }
2098
2099
    if (security_attributes().is_rtps_protected)
2100
    {
2101
        return true;
2102
    }
2103
2104
    security::EndpointSecurityAttributes security_attributes;
2105
    if (security_manager().get_datawriter_sec_attributes(writer_attributes.endpoint.properties, security_attributes))
2106
    {
2107
        return (security_attributes.is_payload_protected == true ||
2108
               security_attributes.is_submessage_protected == true);
2109
    }
2110
2111
    return false;
2112
}
2113
2114
bool RTPSParticipantImpl::is_security_enabled_for_reader(
2115
        const ReaderAttributes& reader_attributes)
2116
{
2117
    if (!is_initialized() || !is_secure())
2118
    {
2119
        return false;
2120
    }
2121
2122
    if (security_attributes().is_rtps_protected)
2123
    {
2124
        return true;
2125
    }
2126
2127
    security::EndpointSecurityAttributes security_attributes;
2128
    if (security_manager().get_datareader_sec_attributes(reader_attributes.endpoint.properties, security_attributes))
2129
    {
2130
        return (security_attributes.is_payload_protected == true ||
2131
               security_attributes.is_submessage_protected == true);
2132
    }
2133
2134
    return false;
2135
}
2136
2137
#endif // if HAVE_SECURITY
2138
2139
PDPSimple* RTPSParticipantImpl::pdpsimple()
2140
0
{
2141
0
    return dynamic_cast<PDPSimple*>(mp_builtinProtocols->mp_PDP);
2142
0
}
2143
2144
WLP* RTPSParticipantImpl::wlp()
2145
0
{
2146
0
    return mp_builtinProtocols->mp_WLP;
2147
0
}
2148
2149
fastdds::dds::builtin::TypeLookupManager* RTPSParticipantImpl::typelookup_manager() const
2150
0
{
2151
0
    return mp_builtinProtocols->tlm_;
2152
0
}
2153
2154
IPersistenceService* RTPSParticipantImpl::get_persistence_service(
2155
        const EndpointAttributes& param)
2156
0
{
2157
0
    IPersistenceService* ret_val;
2158
2159
0
    ret_val = PersistenceFactory::create_persistence_service(param.properties);
2160
0
    return ret_val != nullptr ?
2161
0
           ret_val :
2162
0
           PersistenceFactory::create_persistence_service(m_att.properties);
2163
0
}
2164
2165
bool RTPSParticipantImpl::get_persistence_service(
2166
        bool is_builtin,
2167
        const EndpointAttributes& param,
2168
        IPersistenceService*& service)
2169
0
{
2170
0
    service = nullptr;
2171
2172
0
    const char* debug_label = (param.endpointKind == WRITER ? "writer" : "reader");
2173
2174
    // Check if also support persistence with TRANSIENT_LOCAL.
2175
0
    DurabilityKind_t durability_red_line = get_persistence_durability_red_line(is_builtin);
2176
0
    if (param.durabilityKind >= durability_red_line)
2177
0
    {
2178
0
        if (param.persistence_guid == c_Guid_Unknown)
2179
0
        {
2180
0
            logError(RTPS_PARTICIPANT, "Cannot create persistence service. Persistence GUID not specified");
2181
0
            return false;
2182
0
        }
2183
0
        service = get_persistence_service(param);
2184
0
        if (service == nullptr)
2185
0
        {
2186
0
            logError(RTPS_PARTICIPANT,
2187
0
                    "Couldn't create writer persistence service for transient/persistent " << debug_label);
2188
0
            return false;
2189
0
        }
2190
0
    }
2191
2192
    // Error log level can be disable. Avoid unused warning
2193
0
    static_cast<void>(debug_label);
2194
2195
0
    return true;
2196
0
}
2197
2198
bool RTPSParticipantImpl::get_new_entity_id(
2199
        EntityId_t& entityId)
2200
0
{
2201
0
    if (entityId == c_EntityId_Unknown)
2202
0
    {
2203
0
        uint32_t idnum = ++IdCounter;
2204
0
        octet* c = reinterpret_cast<octet*>(&idnum);
2205
0
        entityId.value[2] = c[0];
2206
0
        entityId.value[1] = c[1];
2207
0
        entityId.value[0] = c[2];
2208
0
        entityId.value[3] = 0x01; // Vendor specific
2209
0
    }
2210
0
    else
2211
0
    {
2212
0
        return !existsEntityId(entityId, READER) && !existsEntityId(entityId, WRITER);
2213
0
    }
2214
2215
0
    return true;
2216
0
}
2217
2218
void RTPSParticipantImpl::set_check_type_function(
2219
        std::function<bool(const std::string&)>&& check_type)
2220
0
{
2221
0
    type_check_fn_ = std::move(check_type);
2222
0
}
2223
2224
std::unique_ptr<RTPSMessageGroup_t> RTPSParticipantImpl::get_send_buffer()
2225
0
{
2226
0
    return send_buffers_->get_buffer(this);
2227
0
}
2228
2229
void RTPSParticipantImpl::return_send_buffer(
2230
        std::unique_ptr <RTPSMessageGroup_t>&& buffer)
2231
0
{
2232
0
    send_buffers_->return_buffer(std::move(buffer));
2233
0
}
2234
2235
uint32_t RTPSParticipantImpl::get_domain_id() const
2236
0
{
2237
0
    return domain_id_;
2238
0
}
2239
2240
//!Compare metatraffic locators list searching for mutations
2241
bool RTPSParticipantImpl::did_mutation_took_place_on_meta(
2242
        const LocatorList_t& MulticastLocatorList,
2243
        const LocatorList_t& UnicastLocatorList) const
2244
0
{
2245
0
    using namespace std;
2246
0
    using namespace eprosima::fastdds::rtps;
2247
2248
0
    if (m_att.builtin.metatrafficMulticastLocatorList == MulticastLocatorList
2249
0
            && m_att.builtin.metatrafficUnicastLocatorList == UnicastLocatorList)
2250
0
    {
2251
        // no mutation
2252
0
        return false;
2253
0
    }
2254
2255
    // If one of the locators is 0.0.0.0 we must replace it by all local interfaces like the framework does
2256
0
    list<Locator_t> unicast_real_locators;
2257
0
    LocatorListConstIterator it = UnicastLocatorList.begin(), old_it;
2258
0
    LocatorList_t locals;
2259
2260
0
    do
2261
0
    {
2262
        // copy ordinary locators till the first ANY
2263
0
        old_it = it;
2264
0
        it = find_if(it, UnicastLocatorList.end(), IPLocator::isAny);
2265
2266
        // copy ordinary locators
2267
0
        copy(old_it, it, back_inserter(unicast_real_locators));
2268
2269
        // transform new ones if needed
2270
0
        if (it != UnicastLocatorList.end())
2271
0
        {
2272
0
            const Locator_t& an_any = *it;
2273
2274
            // load interfaces if needed
2275
0
            if (locals.empty())
2276
0
            {
2277
0
                IPFinder::getIP4Address(&locals);
2278
0
            }
2279
2280
            // add a locator for each local
2281
0
            transform(locals.begin(),
2282
0
                    locals.end(),
2283
0
                    back_inserter(unicast_real_locators),
2284
0
                    [&an_any](const Locator_t& loc) -> Locator_t
2285
0
                    {
2286
0
                        Locator_t specific(loc);
2287
0
                        specific.port = an_any.port;
2288
0
                        specific.kind = an_any.kind;
2289
0
                        return specific;
2290
0
                    });
2291
2292
            // search for the next if any
2293
0
            ++it;
2294
0
        }
2295
0
    } while (it != UnicastLocatorList.end());
2296
2297
    // TCP is a special case because physical ports are taken from the TransportDescriptors
2298
    // besides WAN address may be added by the transport
2299
0
    struct ResetLogical
2300
0
    {
2301
        // use of unary_function to introduce the following aliases is deprecated
2302
        // using argument_type = Locator_t;
2303
        // using result_type   = Locator_t&;
2304
2305
0
        using Transports = vector<shared_ptr<TransportDescriptorInterface>>;
2306
2307
0
        ResetLogical(
2308
0
                const Transports& tp)
2309
0
            : Transports_(tp)
2310
0
        {
2311
0
            for (auto desc : Transports_)
2312
0
            {
2313
0
                if (nullptr == tcp4)
2314
0
                {
2315
0
                    tcp4 = dynamic_pointer_cast<TCPv4TransportDescriptor>(desc);
2316
0
                }
2317
2318
0
                if (nullptr == tcp6)
2319
0
                {
2320
0
                    tcp6 = dynamic_pointer_cast<TCPv6TransportDescriptor>(desc);
2321
0
                }
2322
0
            }
2323
0
        }
2324
2325
0
        uint16_t Tcp4ListeningPort() const
2326
0
        {
2327
0
            return tcp4 ? ( tcp4->listening_ports.empty() ? 0 : tcp4->listening_ports[0]) : 0;
2328
0
        }
2329
2330
0
        uint16_t Tcp6ListeningPort() const
2331
0
        {
2332
0
            return tcp6 ? ( tcp6->listening_ports.empty() ? 0 : tcp6->listening_ports[0]) : 0;
2333
0
        }
2334
2335
0
        void set_wan_address(
2336
0
                Locator_t& loc) const
2337
0
        {
2338
0
            if (tcp4)
2339
0
            {
2340
0
                assert(LOCATOR_KIND_TCPv4 == loc.kind);
2341
0
                auto& ip = tcp4->wan_addr;
2342
0
                IPLocator::setWan(loc, ip[0], ip[1], ip[2], ip[3]);
2343
0
            }
2344
0
        }
2345
2346
0
        Locator_t operator ()(
2347
0
                const Locator_t& loc) const
2348
0
        {
2349
0
            Locator_t ret(loc);
2350
0
            switch (loc.kind)
2351
0
            {
2352
0
                case LOCATOR_KIND_TCPv4:
2353
0
                    set_wan_address(ret);
2354
0
                    IPLocator::setPhysicalPort(ret, Tcp4ListeningPort());
2355
0
                    break;
2356
0
                case LOCATOR_KIND_TCPv6:
2357
0
                    IPLocator::setPhysicalPort(ret, Tcp6ListeningPort());
2358
0
                    break;
2359
0
            }
2360
0
            return ret;
2361
0
        }
2362
2363
        // reference to the transports
2364
0
        const Transports& Transports_;
2365
0
        shared_ptr<TCPv4TransportDescriptor> tcp4;
2366
0
        shared_ptr<TCPv6TransportDescriptor> tcp6;
2367
2368
0
    }
2369
0
    transform_functor(m_att.userTransports);
2370
2371
    // transform-copy
2372
0
    set<Locator_t> update_attributes;
2373
2374
0
    transform(m_att.builtin.metatrafficMulticastLocatorList.begin(),
2375
0
            m_att.builtin.metatrafficMulticastLocatorList.end(),
2376
0
            inserter(update_attributes, update_attributes.begin()),
2377
0
            transform_functor);
2378
2379
0
    transform(m_att.builtin.metatrafficUnicastLocatorList.begin(),
2380
0
            m_att.builtin.metatrafficUnicastLocatorList.end(),
2381
0
            inserter(update_attributes, update_attributes.begin()),
2382
0
            transform_functor);
2383
2384
0
    set<Locator_t> original_ones;
2385
2386
0
    transform(MulticastLocatorList.begin(),
2387
0
            MulticastLocatorList.end(),
2388
0
            inserter(original_ones, original_ones.begin()),
2389
0
            transform_functor);
2390
2391
0
    transform(unicast_real_locators.begin(),
2392
0
            unicast_real_locators.end(),
2393
0
            inserter(original_ones, original_ones.begin()),
2394
0
            transform_functor);
2395
2396
    // if equal then no mutation took place on physical ports
2397
0
    return !(update_attributes == original_ones);
2398
0
}
2399
2400
DurabilityKind_t RTPSParticipantImpl::get_persistence_durability_red_line(
2401
        bool is_builtin_endpoint)
2402
0
{
2403
0
    DurabilityKind_t durability_red_line = TRANSIENT;
2404
0
    if (!is_builtin_endpoint)
2405
0
    {
2406
0
        std::string* persistence_support_transient_local_property = PropertyPolicyHelper::find_property(
2407
0
            m_att.properties, "dds.persistence.also-support-transient-local");
2408
0
        if (nullptr != persistence_support_transient_local_property &&
2409
0
                0 == persistence_support_transient_local_property->compare("true"))
2410
0
        {
2411
0
            durability_red_line = TRANSIENT_LOCAL;
2412
0
        }
2413
0
    }
2414
2415
0
    return durability_red_line;
2416
0
}
2417
2418
void RTPSParticipantImpl::environment_file_has_changed()
2419
0
{
2420
0
    RTPSParticipantAttributes patt = m_att;
2421
    // Only if it is a server/backup or a client override
2422
0
    if (DiscoveryProtocol_t::SERVER == m_att.builtin.discovery_config.discoveryProtocol ||
2423
0
            DiscoveryProtocol_t::BACKUP == m_att.builtin.discovery_config.discoveryProtocol ||
2424
0
            client_override_)
2425
0
    {
2426
0
        if (load_environment_server_info(patt.builtin.discovery_config.m_DiscoveryServers))
2427
0
        {
2428
0
            update_attributes(patt);
2429
0
        }
2430
0
    }
2431
0
    else
2432
0
    {
2433
0
        logWarning(RTPS_QOS_CHECK, "Trying to add Discovery Servers to a participant which is not a SERVER, BACKUP " <<
2434
0
                "or an overriden CLIENT (SIMPLE participant transformed into CLIENT with the environment variable)");
2435
0
    }
2436
0
}
2437
2438
void RTPSParticipantImpl::get_default_metatraffic_locators()
2439
0
{
2440
0
    uint32_t metatraffic_multicast_port = m_att.port.getMulticastPort(domain_id_);
2441
0
    uint32_t metatraffic_unicast_port = m_att.port.getUnicastPort(domain_id_,
2442
0
                    static_cast<uint32_t>(m_att.participantID));
2443
2444
0
    m_network_Factory.getDefaultMetatrafficMulticastLocators(m_att.builtin.metatrafficMulticastLocatorList,
2445
0
            metatraffic_multicast_port);
2446
0
    m_network_Factory.NormalizeLocators(m_att.builtin.metatrafficMulticastLocatorList);
2447
2448
0
    m_network_Factory.getDefaultMetatrafficUnicastLocators(m_att.builtin.metatrafficUnicastLocatorList,
2449
0
            metatraffic_unicast_port);
2450
0
    m_network_Factory.NormalizeLocators(m_att.builtin.metatrafficUnicastLocatorList);
2451
0
}
2452
2453
void RTPSParticipantImpl::get_default_unicast_locators()
2454
0
{
2455
0
    m_network_Factory.getDefaultUnicastLocators(domain_id_, m_att.defaultUnicastLocatorList, m_att);
2456
0
    m_network_Factory.NormalizeLocators(m_att.defaultUnicastLocatorList);
2457
0
}
2458
2459
#ifdef FASTDDS_STATISTICS
2460
2461
bool RTPSParticipantImpl::register_in_writer(
2462
        std::shared_ptr<fastdds::statistics::IListener> listener,
2463
        GUID_t writer_guid)
2464
{
2465
    bool res = false;
2466
2467
    if ( GUID_t::unknown() == writer_guid )
2468
    {
2469
        shared_lock<shared_mutex> _(endpoints_list_mutex);
2470
        res = true;
2471
        for ( auto writer : m_userWriterList)
2472
        {
2473
            if (!fastdds::statistics::is_statistics_builtin(writer->getGuid().entityId))
2474
            {
2475
                res &= writer->add_statistics_listener(listener);
2476
            }
2477
        }
2478
    }
2479
    else if (!fastdds::statistics::is_statistics_builtin(writer_guid.entityId))
2480
    {
2481
        RTPSWriter* writer = find_local_writer(writer_guid);
2482
        res = writer->add_statistics_listener(listener);
2483
    }
2484
2485
    return res;
2486
}
2487
2488
bool RTPSParticipantImpl::register_in_reader(
2489
        std::shared_ptr<fastdds::statistics::IListener> listener,
2490
        GUID_t reader_guid)
2491
{
2492
    bool res = false;
2493
2494
    if ( GUID_t::unknown() == reader_guid )
2495
    {
2496
        shared_lock<shared_mutex> _(endpoints_list_mutex);
2497
        res = true;
2498
        for ( auto reader : m_userReaderList)
2499
        {
2500
            if (!fastdds::statistics::is_statistics_builtin(reader->getGuid().entityId))
2501
            {
2502
                res &= reader->add_statistics_listener(listener);
2503
            }
2504
        }
2505
    }
2506
    else if (!fastdds::statistics::is_statistics_builtin(reader_guid.entityId))
2507
    {
2508
        RTPSReader* reader = find_local_reader(reader_guid);
2509
        res = reader->add_statistics_listener(listener);
2510
    }
2511
2512
    return res;
2513
}
2514
2515
bool RTPSParticipantImpl::unregister_in_writer(
2516
        std::shared_ptr<fastdds::statistics::IListener> listener)
2517
{
2518
    shared_lock<shared_mutex> _(endpoints_list_mutex);
2519
    bool res = true;
2520
2521
    for ( auto writer : m_userWriterList)
2522
    {
2523
        if (!fastdds::statistics::is_statistics_builtin(writer->getGuid().entityId))
2524
        {
2525
            res &= writer->remove_statistics_listener(listener);
2526
        }
2527
    }
2528
2529
    return res;
2530
}
2531
2532
bool RTPSParticipantImpl::unregister_in_reader(
2533
        std::shared_ptr<fastdds::statistics::IListener> listener)
2534
{
2535
    shared_lock<shared_mutex> _(endpoints_list_mutex);
2536
    bool res = true;
2537
2538
    for ( auto reader : m_userReaderList)
2539
    {
2540
        if (!fastdds::statistics::is_statistics_builtin(reader->getGuid().entityId))
2541
        {
2542
            res &= reader->remove_statistics_listener(listener);
2543
        }
2544
    }
2545
2546
    return res;
2547
}
2548
2549
#endif // FASTDDS_STATISTICS
2550
2551
} /* namespace rtps */
2552
} /* namespace fastrtps */
2553
} /* namespace eprosima */