Coverage Report

Created: 2026-02-14 07:11

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/Fast-DDS/src/cpp/rtps/participant/RTPSParticipantImpl.hpp
Line
Count
Source
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file RTPSParticipantImpl.hpp
17
 */
18
19
#ifndef FASTDDS_RTPS_PARTICIPANT__RTPSPARTICIPANTIMPL_H
20
#define FASTDDS_RTPS_PARTICIPANT__RTPSPARTICIPANTIMPL_H
21
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
22
23
#include <atomic>
24
#include <chrono>
25
#include <cstdio>
26
#include <cstdlib>
27
#include <limits>
28
#include <list>
29
#include <mutex>
30
#include <set>
31
#include <sys/types.h>
32
#include <vector>
33
34
#if defined(_WIN32)
35
#include <process.h>
36
#else
37
#include <unistd.h>
38
#endif // if defined(_WIN32)
39
40
#include <fastdds/dds/core/ReturnCode.hpp>
41
#include <fastdds/rtps/attributes/RTPSParticipantAttributes.hpp>
42
#include <fastdds/rtps/builtin/data/ContentFilterProperty.hpp>
43
#include <fastdds/rtps/builtin/data/SubscriptionBuiltinTopicData.hpp>
44
#include <fastdds/rtps/builtin/data/ParticipantBuiltinTopicData.hpp>
45
#include <fastdds/rtps/common/Guid.hpp>
46
#include <fastdds/rtps/common/LocatorList.hpp>
47
#include <fastdds/rtps/history/IChangePool.hpp>
48
#include <fastdds/rtps/history/IPayloadPool.hpp>
49
#include <fastdds/rtps/reader/ReaderDiscoveryStatus.hpp>
50
#include <fastdds/rtps/transport/SenderResource.hpp>
51
#include <fastdds/rtps/writer/WriterDiscoveryStatus.hpp>
52
53
#include <fastdds/types.hpp>
54
#include <fastdds/utils/TypePropagation.hpp>
55
#include <rtps/builtin/data/ReaderProxyData.hpp>
56
#include <rtps/builtin/data/WriterProxyData.hpp>
57
#include <rtps/messages/MessageReceiver.h>
58
#include <rtps/messages/RTPSMessageGroup_t.hpp>
59
#include <rtps/messages/SendBuffersManager.hpp>
60
#include <rtps/network/NetworkFactory.hpp>
61
#include <rtps/network/ReceiverResource.h>
62
#include <rtps/reader/LocalReaderPointer.hpp>
63
#include <rtps/resources/ResourceEvent.h>
64
#include <statistics/rtps/monitor-service/interfaces/IConnectionsObserver.hpp>
65
#include <statistics/rtps/monitor-service/interfaces/IConnectionsQueryable.hpp>
66
#include <statistics/rtps/StatisticsBase.hpp>
67
#include <statistics/types/monitorservice_types.hpp>
68
#include <utils/shared_mutex.hpp>
69
70
#if HAVE_SECURITY
71
#include <fastdds/rtps/Endpoint.hpp>
72
#include <rtps/security/accesscontrol/ParticipantSecurityAttributes.h>
73
#include <rtps/security/SecurityManager.h>
74
#include <rtps/security/SecurityPluginFactory.h>
75
#endif // if HAVE_SECURITY
76
77
namespace eprosima {
78
79
namespace fastdds {
80
81
#ifdef FASTDDS_STATISTICS
82
83
namespace statistics {
84
namespace rtps {
85
86
struct IStatusQueryable;
87
struct IStatusObserver;
88
struct IConnectionsObserver;
89
class SimpleQueryable;
90
class MonitorService;
91
92
} // namespace rtps
93
} // namespace statistics
94
95
#endif //FASTDDS_STATISTICS
96
97
namespace rtps {
98
99
class BaseReader;
100
class BaseWriter;
101
class StatefulWriterListener;
102
103
} // namespace rtps
104
105
namespace dds {
106
namespace builtin {
107
108
class TypeLookupManager;
109
110
} // namespace builtin
111
} // namespace dds
112
} // namespace fastdds
113
114
namespace fastdds {
115
116
class MessageReceiver;
117
118
namespace rtps {
119
120
struct PublicationBuiltinTopicData;
121
struct TopicDescription;
122
struct RemoteLocatorList;
123
class RTPSParticipant;
124
class RTPSParticipantListener;
125
class BuiltinProtocols;
126
struct CDRMessage_t;
127
class Endpoint;
128
class RTPSWriter;
129
class WriterAttributes;
130
class WriterHistory;
131
class WriterListener;
132
class RTPSReader;
133
class ReaderAttributes;
134
class ReaderHistory;
135
class ReaderListener;
136
class StatefulReader;
137
class PDP;
138
class PDPSimple;
139
class IPersistenceService;
140
class WLP;
141
142
/**
143
 * @brief Class RTPSParticipantImpl, it contains the private implementation of the RTPSParticipant functions and
144
 * allows the creation and removal of writers and readers. It manages the send and receive threads.
145
 * @ingroup RTPS_MODULE
146
 */
147
class RTPSParticipantImpl
148
    : public fastdds::statistics::StatisticsParticipantImpl
149
    , public fastdds::statistics::rtps::IConnectionsQueryable
150
#if HAVE_SECURITY
151
    , private security::SecurityPluginFactory
152
#endif // if HAVE_SECURITY
153
{
154
155
    /*
156
       Receiver Control block is a struct we use to encapsulate the resources that take part in message reception.
157
       It contains:
158
       -A ReceiverResource (as produced by the NetworkFactory Element)
159
       -Its associated MessageReceiver
160
     */
161
    struct ReceiverControlBlock
162
    {
163
        std::shared_ptr<ReceiverResource> Receiver;
164
        MessageReceiver* mp_receiver;                  //Associated Readers/Writers inside of MessageReceiver
165
166
        ReceiverControlBlock(
167
                std::shared_ptr<ReceiverResource>& rec)
168
0
            : Receiver(rec)
169
0
            , mp_receiver(nullptr)
170
0
        {
171
0
        }
172
173
        ReceiverControlBlock(
174
                ReceiverControlBlock&& origen)
175
            : Receiver(origen.Receiver)
176
            , mp_receiver(origen.mp_receiver)
177
0
        {
178
0
            origen.mp_receiver = nullptr;
179
0
            origen.Receiver.reset();
180
0
        }
181
182
        void disable()
183
0
        {
184
0
            if (Receiver != nullptr)
185
0
            {
186
0
                Receiver->disable();
187
0
            }
188
0
        }
189
190
    private:
191
192
        ReceiverControlBlock(
193
                const ReceiverControlBlock&) = delete;
194
        const ReceiverControlBlock& operator =(
195
                const ReceiverControlBlock&) = delete;
196
197
    };
198
199
public:
200
201
    /**
202
     * @param param
203
     * @param guidP
204
     * @param part
205
     * @param plisten
206
     */
207
    RTPSParticipantImpl(
208
            uint32_t domain_id,
209
            const RTPSParticipantAttributes& param,
210
            const GuidPrefix_t& guidP,
211
            RTPSParticipant* part,
212
            RTPSParticipantListener* plisten = nullptr);
213
214
    /**
215
     * @param param
216
     * @param guidP
217
     * @param persistence_guid
218
     * @param part
219
     * @param plisten
220
     */
221
    RTPSParticipantImpl(
222
            uint32_t domain_id,
223
            const RTPSParticipantAttributes& param,
224
            const GuidPrefix_t& guidP,
225
            const GuidPrefix_t& persistence_guid,
226
            RTPSParticipant* part,
227
            RTPSParticipantListener* plisten = nullptr);
228
229
    virtual ~RTPSParticipantImpl();
230
231
    // Create receiver resources and start builtin protocols
232
    void enable();
233
234
    // Stop builtin protocols and delete receiver resources
235
    void disable();
236
237
    /**
238
     * Get associated GUID
239
     * @return Associated GUID
240
     */
241
    inline const GUID_t& getGuid() const
242
0
    {
243
0
        return m_guid;
244
0
    }
245
246
    void setGuid(
247
            GUID_t& guid);
248
249
    //! Announce RTPSParticipantState (force the sending of a DPD message.)
250
    void announceRTPSParticipantState();
251
252
    //!Stop the RTPSParticipant Announcement (used in tests to avoid multiple packets being send)
253
    void stopRTPSParticipantAnnouncement();
254
255
    //!Reset to timer to make periodic RTPSParticipant Announcements.
256
    void resetRTPSParticipantAnnouncement();
257
258
    void loose_next_change();
259
260
    /**
261
     * Activate a Remote Endpoint defined in the Static Discovery.
262
     * @param pguid GUID_t of the endpoint.
263
     * @param userDefinedId userDeinfed Id of the endpoint.
264
     * @param kind kind of endpoint
265
     * @return True if correct.
266
     */
267
    bool newRemoteEndpointDiscovered(
268
            const GUID_t& pguid,
269
            int16_t userDefinedId,
270
            EndpointKind_t kind);
271
272
    /**
273
     * Assert the liveliness of a remote participant
274
     * @param remote_guid GuidPrefix_t of the participant.
275
     */
276
    void assert_remote_participant_liveliness(
277
            const GuidPrefix_t& remote_guid);
278
279
    /**
280
     * Get the RTPSParticipant ID
281
     * @return RTPSParticipant ID
282
     */
283
    inline uint32_t getRTPSParticipantID() const
284
0
    {
285
0
        return (uint32_t)m_att.participantID;
286
0
    }
287
288
    //!Get Pointer to the Event Resource.
289
    ResourceEvent& getEventResource()
290
0
    {
291
0
        return mp_event_thr;
292
0
    }
293
294
    /**
295
     * Send a message to several locations
296
     * @param buffers Vector of buffers to send.
297
     * @param total_bytes Total number of bytes to send.
298
     * @param sender_guid GUID of the producer of the message.
299
     * @param destination_locators_begin Iterator at the first destination locator.
300
     * @param destination_locators_end Iterator at the end destination locator.
301
     * @param max_blocking_time_point execution time limit timepoint.
302
     * @param transport_priority Transport priority of the message.
303
     * @return true if at least one locator has been sent.
304
     */
305
    template<class LocatorIteratorT>
306
    bool sendSync(
307
            const std::vector<NetworkBuffer>& buffers,
308
            const uint32_t& total_bytes,
309
            const GUID_t& sender_guid,
310
            const LocatorIteratorT& destination_locators_begin,
311
            const LocatorIteratorT& destination_locators_end,
312
            std::chrono::steady_clock::time_point& max_blocking_time_point,
313
            int32_t transport_priority)
314
0
    {
315
0
        bool ret_code = false;
316
#if HAVE_STRICT_REALTIME
317
        std::unique_lock<std::timed_mutex> lock(m_send_resources_mutex_, std::defer_lock);
318
        if (lock.try_lock_until(max_blocking_time_point))
319
#else
320
0
        std::unique_lock<std::timed_mutex> lock(m_send_resources_mutex_);
321
0
#endif // if HAVE_STRICT_REALTIME
322
0
        {
323
0
            ret_code = true;
324
325
0
            for (auto& send_resource : send_resource_list_)
326
0
            {
327
0
                LocatorIteratorT locators_begin = destination_locators_begin;
328
0
                LocatorIteratorT locators_end = destination_locators_end;
329
0
                send_resource->send(buffers, total_bytes, &locators_begin, &locators_end,
330
0
                        max_blocking_time_point, transport_priority);
331
0
            }
332
333
0
            lock.unlock();
334
335
            // notify statistics module
336
0
            on_rtps_send(
337
0
                sender_guid,
338
0
                destination_locators_begin,
339
0
                destination_locators_end,
340
0
                total_bytes);
341
342
            // checkout if sender is a discovery endpoint
343
0
            on_discovery_packet(
344
0
                sender_guid,
345
0
                destination_locators_begin,
346
0
                destination_locators_end);
347
0
        }
348
349
0
        return ret_code;
350
0
    }
Unexecuted instantiation: bool eprosima::fastdds::rtps::RTPSParticipantImpl::sendSync<eprosima::fastdds::rtps::Locators>(std::__1::vector<eprosima::fastdds::rtps::NetworkBuffer, std::__1::allocator<eprosima::fastdds::rtps::NetworkBuffer> > const&, unsigned int const&, eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::Locators const&, eprosima::fastdds::rtps::Locators const&, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >&, int)
Unexecuted instantiation: bool eprosima::fastdds::rtps::RTPSParticipantImpl::sendSync<eprosima::fastdds::rtps::LocatorSelector::iterator>(std::__1::vector<eprosima::fastdds::rtps::NetworkBuffer, std::__1::allocator<eprosima::fastdds::rtps::NetworkBuffer> > const&, unsigned int const&, eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::LocatorSelector::iterator const&, eprosima::fastdds::rtps::LocatorSelector::iterator const&, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >&, int)
351
352
    /**
353
     * Get the participant listener
354
     * @return participant listener
355
     */
356
    inline RTPSParticipantListener* getListener()
357
0
    {
358
0
        std::lock_guard<std::mutex> _(mutex_);
359
0
        return mp_participantListener;
360
0
    }
361
362
    /**
363
     * @brief Modifies the participant listener
364
     * @param listener
365
     */
366
    void set_listener(
367
            RTPSParticipantListener* listener)
368
0
    {
369
0
        std::lock_guard<std::mutex> _(mutex_);
370
0
        mp_participantListener = listener;
371
0
    }
372
373
    std::vector<std::string> getParticipantNames() const;
374
375
    /**
376
     * Get the participant
377
     * @return participant
378
     */
379
    inline RTPSParticipant* getUserRTPSParticipant()
380
0
    {
381
0
        return mp_userParticipant;
382
0
    }
383
384
    uint32_t getMaxMessageSize() const;
385
386
    uint32_t getMaxDataSize();
387
388
    uint32_t calculateMaxDataSize(
389
            uint32_t length);
390
391
#if HAVE_SECURITY
392
    uint32_t calculate_extra_size_for_rtps_message()
393
    {
394
        uint32_t ret_val = 0u;
395
        if (security_attributes_.is_rtps_protected)
396
        {
397
            ret_val = m_security_manager.calculate_extra_size_for_rtps_message();
398
        }
399
400
        return ret_val;
401
    }
402
403
    security::SecurityManager& security_manager()
404
    {
405
        return m_security_manager;
406
    }
407
408
    const security::ParticipantSecurityAttributes& security_attributes()
409
    {
410
        return security_attributes_;
411
    }
412
413
    inline bool is_security_initialized() const
414
    {
415
        return m_security_manager.is_security_initialized();
416
    }
417
418
    inline bool is_secure() const
419
    {
420
        return m_security_manager.is_security_active();
421
    }
422
423
    bool pairing_remote_reader_with_local_writer_after_security(
424
            const GUID_t& local_writer,
425
            const ReaderProxyData& remote_reader_data);
426
427
    bool pairing_remote_writer_with_local_reader_after_security(
428
            const GUID_t& local_reader,
429
            const WriterProxyData& remote_writer_data);
430
431
    /**
432
     * @brief Checks whether the writer has security attributes enabled
433
     * @param writer_attributes Attibutes of the writer as given to the create_writer
434
     */
435
    bool is_security_enabled_for_writer(
436
            const WriterAttributes& writer_attributes);
437
438
    /**
439
     * @brief Checks whether the reader has security attributes enabled
440
     * @param reader_attributes Attibutes of the reader as given to the create_reader
441
     */
442
    bool is_security_enabled_for_reader(
443
            const ReaderAttributes& reader_attributes);
444
445
    security::Logging* create_builtin_logging_plugin() override;
446
447
#endif // if HAVE_SECURITY
448
449
    PDPSimple* pdpsimple();
450
451
    PDP* pdp();
452
453
    WLP* wlp();
454
455
    fastdds::dds::builtin::TypeLookupManager* typelookup_manager() const;
456
457
    bool is_intraprocess_only() const
458
0
    {
459
0
        return is_intraprocess_only_;
460
0
    }
461
462
    NetworkFactory& network_factory()
463
0
    {
464
0
        return m_network_Factory;
465
0
    }
466
467
    inline bool has_shm_transport()
468
0
    {
469
0
        return has_shm_transport_;
470
0
    }
471
472
    uint32_t get_min_network_send_buffer_size()
473
0
    {
474
0
        return m_network_Factory.get_min_send_buffer_size();
475
0
    }
476
477
    /**
478
     * Get the list of locators from which this participant may send data.
479
     *
480
     * @param [out] locators  LocatorList_t where the list of locators will be stored.
481
     */
482
    void get_sending_locators(
483
            rtps::LocatorList_t& locators) const;
484
485
    /***
486
     * @returns A pointer to a local reader given its endpoint guid, or nullptr if not found.
487
     */
488
    std::shared_ptr<LocalReaderPointer> find_local_reader(
489
            const GUID_t& reader_guid);
490
491
    /***
492
     * @returns A pointer to a local writer given its endpoint guid, or nullptr if not found.
493
     */
494
    BaseWriter* find_local_writer(
495
            const GUID_t& writer_guid);
496
497
    /**
498
     * @brief Fills a new entityId if set to unknown, or checks if a entity already exists with that
499
     * entityId in other case.
500
     * @param entityId to check of fill. If filled, EntityKind will be "vendor-specific" (0x01)
501
     * @return True if filled or the entityId is available.
502
     */
503
    bool get_new_entity_id(
504
            EntityId_t& entityId);
505
506
    void set_check_type_function(
507
            std::function<bool(const std::string&)>&& check_type);
508
509
    bool check_type(
510
            const std::string& type_name)
511
0
    {
512
0
        if (type_check_fn_ != nullptr)
513
0
        {
514
0
            return type_check_fn_(type_name);
515
0
        }
516
0
        return false;
517
0
    }
518
519
    std::unique_ptr<RTPSMessageGroup_t> get_send_buffer(
520
            const std::chrono::steady_clock::time_point& max_blocking_time);
521
    void return_send_buffer(
522
            std::unique_ptr <RTPSMessageGroup_t>&& buffer);
523
524
    uint32_t get_domain_id() const;
525
526
    //!Compare metatraffic locators list searching for mutations
527
    bool did_mutation_took_place_on_meta(
528
            const LocatorList_t& MulticastLocatorList,
529
            const LocatorList_t& UnicastLocatorList) const;
530
531
    //! Getter client_override flag
532
    bool client_override()
533
0
    {
534
0
        return client_override_;
535
0
    }
536
537
    //! Setter client_override flag
538
    void client_override(
539
            bool value)
540
0
    {
541
0
        client_override_ = value;
542
0
    }
543
544
    //! Retrieve persistence guid prefix
545
    GuidPrefix_t get_persistence_guid_prefix() const
546
0
    {
547
0
        return m_persistence_guid.guidPrefix;
548
0
    }
549
550
    bool is_initialized() const
551
0
    {
552
0
        return initialized_;
553
0
    }
554
555
    /**
556
     * @brief Notify reader discovery event.
557
     *
558
     * Will be called when a remote reader is discovered, updated or removed.
559
     * Will call the user listener if set.
560
     *
561
     * @param reason Discovery reason.
562
     * @param info Information about the discovered reader.
563
     */
564
    virtual void notify_reader_discovery(
565
            ReaderDiscoveryStatus reason,
566
            const SubscriptionBuiltinTopicData& info);
567
568
    /**
569
     * @brief Notify reader discovery event.
570
     *
571
     * Will be called when a remote reader is discovered, updated or removed.
572
     * Will call the specified listener.
573
     *
574
     * @param reason Discovery reason.
575
     * @param info Information about the discovered reader.
576
     * @param listener Listener to be notified.
577
     */
578
    virtual void notify_reader_discovery(
579
            ReaderDiscoveryStatus reason,
580
            const SubscriptionBuiltinTopicData& info,
581
            RTPSParticipantListener* listener);
582
583
    /**
584
     * @brief Notify writer discovery event.
585
     *
586
     * Will be called when a remote writer is discovered, updated or removed.
587
     * Will call the user listener if set.
588
     *
589
     * @param reason Discovery reason.
590
     * @param info Information about the discovered writer.
591
     */
592
    virtual void notify_writer_discovery(
593
            WriterDiscoveryStatus reason,
594
            const PublicationBuiltinTopicData& info);
595
596
    /**
597
     * @brief Notify writer discovery event.
598
     *
599
     * Will be called when a remote writer is discovered, updated or removed.
600
     * Will call the specified listener.
601
     *
602
     * @param reason Discovery reason.
603
     * @param info Information about the discovered writer.
604
     * @param listener Listener to be notified.
605
     */
606
    virtual void notify_writer_discovery(
607
            WriterDiscoveryStatus reason,
608
            const PublicationBuiltinTopicData& info,
609
            RTPSParticipantListener* listener);
610
611
protected:
612
613
    //! DomainId
614
    uint32_t domain_id_;
615
    //!Attributes of the RTPSParticipant
616
    RTPSParticipantAttributes m_att;
617
    //! Metatraffic unicast port used by default on this participant
618
    uint32_t metatraffic_unicast_port_ = 0;
619
    //! Default unicast port used by default on this participant
620
    uint32_t default_unicast_port_ = 0;
621
    //!Guid of the RTPSParticipant.
622
    GUID_t m_guid;
623
    //! String containing the RTPSParticipant Guid.
624
    std::string guid_str_;
625
    //!Persistence guid of the RTPSParticipant
626
    GUID_t m_persistence_guid;
627
    //! Event Resource
628
    ResourceEvent mp_event_thr;
629
    //! BuiltinProtocols of this RTPSParticipant
630
    BuiltinProtocols* mp_builtinProtocols;
631
    //!Id counter to correctly assign the ids to writers and readers.
632
    std::atomic<uint32_t> IdCounter;
633
    //! Mutex to safely access endpoints collections
634
    mutable shared_mutex endpoints_list_mutex;
635
    //!Writer List.
636
    std::vector<BaseWriter*> m_allWriterList;
637
    //!Reader List
638
    std::vector<BaseReader*> m_allReaderList;
639
    //!Writer List.
640
    std::vector<BaseWriter*> m_userWriterList;
641
    //!Reader List
642
    std::vector<BaseReader*> m_userReaderList;
643
    //!Network Factory
644
    NetworkFactory m_network_Factory;
645
    //! Type cheking function
646
    std::function<bool(const std::string&)> type_check_fn_;
647
    //!Pool of send buffers
648
    std::unique_ptr<SendBuffersManager> send_buffers_;
649
    //! Maximum number of bytes allowed for an RTPS datagram generated by this writer.
650
    uint32_t max_output_message_size_ = std::numeric_limits<uint32_t>::max();
651
    //! The stateful writer listener for congestion control
652
    StatefulWriterListener* stateful_writer_listener_ = nullptr;
653
654
    /**
655
     * Client override flag: SIMPLE participant that has been overriden with the environment variable and transformed
656
     * into a client.
657
     */
658
    bool client_override_;
659
660
    //! Autogenerated metatraffic locators flag
661
    bool internal_metatraffic_locators_;
662
    //! Autogenerated default locators flag
663
    bool internal_default_locators_;
664
665
#if HAVE_SECURITY
666
    // Security manager
667
    security::SecurityManager m_security_manager;
668
#endif // if HAVE_SECURITY
669
670
    //! Encapsulates all associated resources on a Receiving element.
671
    std::list<ReceiverControlBlock> m_receiverResourcelist;
672
    //! Receiver resource list needs its own mutext to avoid a race condition.
673
    std::mutex m_receiverResourcelistMutex;
674
675
    //!SenderResource List
676
    std::timed_mutex m_send_resources_mutex_;
677
    SendResourceList send_resource_list_;
678
679
    //!Participant Listener
680
    RTPSParticipantListener* mp_participantListener;
681
    //!Pointer to the user participant
682
    RTPSParticipant* mp_userParticipant;
683
684
    //! Determine if the RTPSParticipantImpl was initialized successfully.
685
    bool initialized_ = false;
686
687
    //! Whether the participant should send optional QoS in the discovery
688
    //! This is regulated by the `fastdds.send_optional_qos` property
689
    mutable signed char should_send_optional_qos_ = -1;
690
691
    //! Ignored entities collections
692
    std::set<GuidPrefix_t> ignored_participants_;
693
    std::set<GUID_t> ignored_writers_;
694
    std::set<GUID_t> ignored_readers_;
695
    //! Protect ignored entities collection concurrent access
696
    mutable shared_mutex ignored_mtx_;
697
    //! Participant Mutex
698
    mutable std::mutex mutex_;
699
700
    //! Will this participant use intraprocess only?
701
    bool is_intraprocess_only_;
702
703
#ifdef FASTDDS_STATISTICS
704
    std::unique_ptr<fastdds::statistics::rtps::MonitorService> monitor_server_;
705
    std::unique_ptr<fastdds::statistics::rtps::SimpleQueryable> simple_queryable_;
706
    std::atomic<const fastdds::statistics::rtps::IConnectionsObserver*> conns_observer_;
707
#endif // ifdef FASTDDS_STATISTICS
708
709
    /*
710
     * Flow controller factory.
711
     */
712
    FlowControllerFactoryType flow_controller_factory_;
713
714
#if HAVE_SECURITY
715
    security::ParticipantSecurityAttributes security_attributes_;
716
#endif // if HAVE_SECURITY
717
718
    //! Indicates whether the participant has shared-memory transport
719
    bool has_shm_transport_;
720
721
    bool match_local_endpoints_ = true;
722
723
private:
724
725
    void setup_guids(
726
            const GuidPrefix_t& persistence_guid);
727
    bool setup_transports();
728
    void setup_timed_events();
729
    void setup_meta_traffic();
730
    void setup_user_traffic();
731
    void setup_initial_peers();
732
    void setup_output_traffic();
733
    bool setup_builtin_protocols();
734
735
    RTPSParticipantImpl& operator =(
736
            const RTPSParticipantImpl&) = delete;
737
738
    /**
739
     * Method to check if a specific entityId already exists in this RTPSParticipant
740
     * @param ent EnityId to check
741
     * @param kind Endpoint Kind.
742
     * @return True if exists.
743
     */
744
    bool entity_id_exists(
745
            const EntityId_t& ent,
746
            EndpointKind_t kind) const;
747
748
    /**
749
     * Method to check if the EntityId conditions are coherent with the endpoint:
750
     * - Checks if it already exists in this RTPSParticipant.
751
     * - It is consistent with the topic kind of the endpoint.
752
     *
753
     * @param entity_id EntityId to check
754
     * @param endpoint_kind Endpoint Kind.
755
     * @param topic_kind Topic kind.
756
     * @return True if the EntityId conditions are correct.
757
     */
758
    bool check_entity_id_conditions(
759
            const EntityId_t& entity_id,
760
            EndpointKind_t endpoint_kind,
761
            TopicKind_t topic_kind) const;
762
763
    /**
764
     * Assign an endpoint to the ReceiverResources, based on its LocatorLists.
765
     * @param endp Pointer to the endpoint.
766
     * @return True if correct.
767
     */
768
    bool assignEndpointListenResources(
769
            Endpoint* endp);
770
771
    /** Assign an endpoint to the ReceiverResources as specified specifically on parameter list
772
     * @param pend Pointer to the endpoint.
773
     * @param lit Locator list iterator.
774
     * @param isMulticast Boolean indicating that is multicast.
775
     * @param isFixed Boolean indicating that is a fixed listenresource.
776
     * @return True if assigned.
777
     */
778
    bool assignEndpoint2LocatorList(
779
            Endpoint* pend,
780
            LocatorList_t& list);
781
782
    /** Create the new ReceiverResources needed for a new Locator, contains the calls to assignEndpointListenResources
783
        and consequently assignEndpoint2LocatorList
784
        @param pend - Pointer to the endpoint which triggered the creation of the Receivers.
785
        @param unique_flows - Whether unique listening ports should be created for this endpoint.
786
        @param initial_unique_port - First unique listening port to try.
787
        @param final_unique_port - Unique listening port that will not be tried.
788
     */
789
    bool createAndAssociateReceiverswithEndpoint(
790
            Endpoint* pend,
791
            bool unique_flows = false,
792
            uint16_t initial_unique_port = 0,
793
            uint16_t final_unique_port = 0);
794
795
    /** Create non-existent SendResources based on the Locator list of the entity
796
        @param pend - Pointer to the endpoint whose SenderResources are to be created
797
     */
798
    bool createSendResources(
799
            Endpoint* pend);
800
801
    /** Add participant's external locators to endpoint's when none available
802
        @param endpoint - Pointer to the endpoint whose external locators are to be set
803
     */
804
    void setup_external_locators(
805
            Endpoint* endpoint);
806
807
    /** When we want to create a new Resource but the physical channel specified by the Locator
808
        can not be opened, we want to mutate the Locator to open a more or less equivalent channel.
809
        @param loc -  Locator we want to change
810
     */
811
    Locator_t& applyLocatorAdaptRule(
812
            Locator_t& loc);
813
814
    /**
815
     * Update port for all endpoint locators when it has a value of 0 and then
816
     * apply locator normalization.
817
     *
818
     * @param [in, out] endpoint_att  EndpointAttributes to be updated
819
     */
820
    void normalize_endpoint_locators(
821
            EndpointAttributes& endpoint_att);
822
823
    /**
824
     * Get persistence service from factory, using endpoint attributes (or participant
825
     * attributes if endpoint does not define a persistence service config)
826
     */
827
    IPersistenceService* get_persistence_service(
828
            const EndpointAttributes& param);
829
830
    /**
831
     * Returns the Durability kind from which a endpoint is able to use the persistence service.
832
     */
833
    DurabilityKind_t get_persistence_durability_red_line(
834
            bool is_builtin_endpoint);
835
836
    /**
837
     * Check if persistence is required and return persistence service from factory,
838
     * using endpoint attributes (or participant
839
     * attributes if endpoint does not define a persistence service config)
840
     *
841
     * @param [in]  is_builtin  Whether the enpoint being created is a builtin one.
842
     * @param [in]  param       Attributes of the endpoint being created.
843
     * @param [out] service     Pointer to the persistence service.
844
     *
845
     * @return false if parameters are not consistent or the service should be created and couldn't
846
     * @return true if persistence service is not required
847
     * @return true if persistence service is created
848
     */
849
    bool get_persistence_service(
850
            bool is_builtin,
851
            const EndpointAttributes& param,
852
            IPersistenceService*& service);
853
854
    template<typename Functor>
855
    bool create_writer(
856
            RTPSWriter** writer_out,
857
            WriterAttributes& param,
858
            const EntityId_t& entity_id,
859
            bool is_builtin,
860
            const Functor& callback);
861
862
    template<typename Functor>
863
    bool create_reader(
864
            RTPSReader** reader_out,
865
            ReaderAttributes& param,
866
            const EntityId_t& entity_id,
867
            bool is_builtin,
868
            bool enable,
869
            const Functor& callback);
870
871
    /**
872
     * @brief Fill the default metatraffic locators.
873
     *
874
     * @param [in] att @ref RTPSParticipantAttributes in which the locators are filled.
875
     *
876
     * @note This function in meant to be used iff the locators are not provided by the user.
877
     */
878
    void get_default_metatraffic_locators(
879
            RTPSParticipantAttributes& att);
880
881
    /**
882
     * @brief Fill the default unicast locators.
883
     *
884
     * @param [in] att @ref RTPSParticipantAttributes in which the locators are filled.
885
     *
886
     * @note This function in meant to be used iff the locators are not provided by the user.
887
     */
888
    void get_default_unicast_locators(
889
            RTPSParticipantAttributes& att);
890
891
    bool should_match_local_endpoints(
892
            const RTPSParticipantAttributes& att);
893
894
public:
895
896
    const RTPSParticipantAttributes& get_attributes() const;
897
898
    /**
899
     * Create a Writer in this RTPSParticipant.
900
     * @param Writer Pointer to pointer of the Writer, used as output. Only valid if return==true.
901
     * @param param WriterAttributes to define the Writer.
902
     * @param hist Pointer to the WriterHistory.
903
     * @param listen Pointer to the WriterListener.
904
     * @param entityId EntityId assigned to the Writer.
905
     * @param isBuiltin Bool value indicating if the Writer is builtin (Discovery or Liveliness protocol) or is created for the end user.
906
     * @return True if the Writer was correctly created.
907
     */
908
    virtual bool createWriter(
909
            RTPSWriter** Writer,
910
            WriterAttributes& param,
911
            WriterHistory* hist,
912
            WriterListener* listen,
913
            const EntityId_t& entityId = c_EntityId_Unknown,
914
            bool isBuiltin = false);
915
916
    /**
917
     * Create a Writer in this RTPSParticipant with a custom payload pool.
918
     *
919
     * @param Writer     Pointer to pointer of the Writer, used as output. Only valid if return==true.
920
     * @param watt       WriterAttributes to define the Writer.
921
     * @param hist       Pointer to the WriterHistory.
922
     * @param listen     Pointer to the WriterListener.
923
     * @param entityId   EntityId assigned to the Writer.
924
     * @param isBuiltin  Bool value indicating if the Writer is builtin (Discovery or Liveliness protocol) or is created for the end user.
925
     *
926
     * @return True if the Writer was correctly created.
927
     */
928
    virtual bool create_writer(
929
            RTPSWriter** Writer,
930
            WriterAttributes& watt,
931
            WriterHistory* hist,
932
            WriterListener* listen,
933
            const EntityId_t& entityId,
934
            bool isBuiltin);
935
936
    /**
937
     * Create a Reader in this RTPSParticipant.
938
     * @param Reader Pointer to pointer of the Reader, used as output. Only valid if return==true.
939
     * @param param ReaderAttributes to define the Reader.
940
     * @param hist Pointer to the ReaderHistory.
941
     * @param listen Pointer to the ReaderListener.
942
     * @param entityId EntityId assigned to the Reader.
943
     * @param isBuiltin Bool value indicating if the Reader is builtin (Discovery or Liveliness protocol) or is created for the end user.
944
     * @param enable Whether the reader should be automatically enabled.
945
     * @return True if the Reader was correctly created.
946
     */
947
    virtual bool createReader(
948
            RTPSReader** Reader,
949
            ReaderAttributes& param,
950
            ReaderHistory* hist,
951
            ReaderListener* listen,
952
            const EntityId_t& entityId = c_EntityId_Unknown,
953
            bool isBuiltin = false,
954
            bool enable = true);
955
956
    /**
957
     * Create a Reader in this RTPSParticipant with a custom payload pool.
958
     * @param Reader Pointer to pointer of the Reader, used as output. Only valid if return==true.
959
     * @param param ReaderAttributes to define the Reader.
960
     * @param payload_pool Shared pointer to the IPayloadPool
961
     * @param hist Pointer to the ReaderHistory.
962
     * @param listen Pointer to the ReaderListener.
963
     * @param entityId EntityId assigned to the Reader.
964
     * @param isBuiltin Bool value indicating if the Reader is builtin (Discovery or Liveliness protocol) or is created for the end user.
965
     * @param enable Whether the reader should be automatically enabled.
966
     * @return True if the Reader was correctly created.
967
     */
968
    virtual bool createReader(
969
            RTPSReader** Reader,
970
            ReaderAttributes& param,
971
            const std::shared_ptr<IPayloadPool>& payload_pool,
972
            ReaderHistory* hist,
973
            ReaderListener* listen,
974
            const EntityId_t& entityId = c_EntityId_Unknown,
975
            bool isBuiltin = false,
976
            bool enable = true);
977
978
    bool enableReader(
979
            RTPSReader* reader);
980
981
    void disableReader(
982
            RTPSReader* reader);
983
984
    /**
985
     * Register a Writer in the BuiltinProtocols.
986
     *
987
     * @param Writer  Pointer to the RTPSWriter.
988
     * @param topic   Information regarding the topic where the writer is registering.
989
     * @param qos     Qos policies of the writer.
990
     *
991
     * @return True if correctly registered.
992
     */
993
    bool register_writer(
994
            RTPSWriter* Writer,
995
            const TopicDescription& topic,
996
            const fastdds::dds::WriterQos& qos);
997
998
    /**
999
     * Register a Writer in the BuiltinProtocols.
1000
     *
1001
     * @param Writer                  Pointer to the RTPSWriter.
1002
     * @param topic                   Information regarding the topic where the writer is registering.
1003
     * @param pub_builtin_topic_data  Information on the publication endpoint.
1004
     *
1005
     * @return OK if correctly registered, ERROR otherwise.
1006
     */
1007
    dds::ReturnCode_t register_writer(
1008
            RTPSWriter* Writer,
1009
            const TopicDescription& topic,
1010
            const PublicationBuiltinTopicData& pub_builtin_topic_data);
1011
1012
    /**
1013
     * Register a Reader in the BuiltinProtocols.
1014
     *
1015
     * @param Reader          Pointer to the RTPSReader.
1016
     * @param topic           Information regarding the topic where the reader is registering.
1017
     * @param qos             Qos policies of the reader.
1018
     * @param content_filter  Optional content filtering information.
1019
     *
1020
     * @return True if correctly registered.
1021
     */
1022
    bool register_reader(
1023
            RTPSReader* Reader,
1024
            const TopicDescription& topic,
1025
            const fastdds::dds::ReaderQos& qos,
1026
            const ContentFilterProperty* content_filter = nullptr);
1027
1028
    /**
1029
     * Register a Reader in the BuiltinProtocols.
1030
     *
1031
     * @param Reader                  Pointer to the RTPSReader.
1032
     * @param topic                   Information regarding the topic where the reader is registering.
1033
     * @param sub_builtin_topic_data  Information on the subscription endpoint.
1034
     * @param content_filter          Optional content filtering information.
1035
     *
1036
     * @return OK if correctly registered, ERROR otherwise.
1037
     */
1038
    dds::ReturnCode_t register_reader(
1039
            RTPSReader* Reader,
1040
            const TopicDescription& topic,
1041
            const SubscriptionBuiltinTopicData& sub_builtin_topic_data,
1042
            const ContentFilterProperty* content_filter = nullptr);
1043
1044
    /**
1045
     * Check if the property @parameter_serialize_optional_qos is set to enable the sent of optional QoS.
1046
     */
1047
    bool should_send_optional_qos() const;
1048
1049
    /**
1050
     * Update participant attributes.
1051
     * @param patt New participant attributes.
1052
     */
1053
    void update_attributes(
1054
            const RTPSParticipantAttributes& patt);
1055
1056
    /**
1057
     * Update local writer QoS
1058
     * @param rtps_writer Writer to update.
1059
     * @param wqos        New QoS for the writer.
1060
     * @return True on success
1061
     */
1062
    bool update_writer(
1063
            RTPSWriter* rtps_writer,
1064
            const fastdds::dds::WriterQos& wqos);
1065
1066
    /**
1067
     * Update local reader QoS
1068
     * @param rtps_reader      Reader to update.
1069
     * @param rqos             New QoS for the reader.
1070
     * @param content_filter   Optional content filtering information.
1071
     * @return True on success
1072
     */
1073
    bool update_reader(
1074
            RTPSReader* rtps_reader,
1075
            const fastdds::dds::ReaderQos& rqos,
1076
            const ContentFilterProperty* content_filter = nullptr);
1077
1078
    /**
1079
     * Delete a user endpoint
1080
     * @param Endpoint to delete
1081
     * @return True on success
1082
     */
1083
    bool deleteUserEndpoint(
1084
            const GUID_t&);
1085
1086
    //! Delete all user endpoints, builtin are disposed in its related classes
1087
    void deleteAllUserEndpoints();
1088
1089
    /** Traverses the user writers collection transforming its elements with a provided functor
1090
     * @param f - Functor applied to each element. Must accept a reference as parameter. Should return true to keep iterating.
1091
     * @return Functor provided in order to allow aggregates retrieval
1092
     */
1093
    template<class Functor>
1094
    Functor forEachUserWriter(
1095
            Functor f)
1096
0
    {
1097
        // check if we are reentrying
1098
0
        shared_lock<shared_mutex> _(endpoints_list_mutex);
1099
1100
        // traverse the list
1101
0
        for (BaseWriter* pw : m_userWriterList)
1102
0
        {
1103
0
            if (!f(*pw))
1104
0
            {
1105
0
                break;
1106
0
            }
1107
0
        }
1108
1109
0
        return f;
1110
0
    }
Unexecuted instantiation: EDP.cpp:eprosima::fastdds::rtps::EDP::unpairReaderProxy(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&)::$_0 eprosima::fastdds::rtps::RTPSParticipantImpl::forEachUserWriter<eprosima::fastdds::rtps::EDP::unpairReaderProxy(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&)::$_0>(eprosima::fastdds::rtps::EDP::unpairReaderProxy(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&)::$_0)
Unexecuted instantiation: EDP.cpp:eprosima::fastdds::rtps::EDP::pairing_reader_proxy_with_any_local_writer(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::ReaderProxyData*)::$_0 eprosima::fastdds::rtps::RTPSParticipantImpl::forEachUserWriter<eprosima::fastdds::rtps::EDP::pairing_reader_proxy_with_any_local_writer(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::ReaderProxyData*)::$_0>(eprosima::fastdds::rtps::EDP::pairing_reader_proxy_with_any_local_writer(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::ReaderProxyData*)::$_0)
1111
1112
    /** Traverses the user readers collection transforming its elements with a provided functor
1113
     * @param f - Functor applied to each element. Must accept a reference as parameter. Should return true to keep iterating.
1114
     * @return Functor provided in order to allow aggregates retrieval
1115
     */
1116
    template<class Functor>
1117
    Functor forEachUserReader(
1118
            Functor f)
1119
0
    {
1120
        // check if we are reentrying
1121
0
        shared_lock<shared_mutex> _(endpoints_list_mutex);
1122
1123
0
        for (BaseReader* pr : m_userReaderList)
1124
0
        {
1125
0
            if (!f(*pr))
1126
0
            {
1127
0
                break;
1128
0
            }
1129
0
        }
1130
1131
0
        return f;
1132
0
    }
Unexecuted instantiation: EDP.cpp:eprosima::fastdds::rtps::EDP::unpairWriterProxy(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, bool)::$_0 eprosima::fastdds::rtps::RTPSParticipantImpl::forEachUserReader<eprosima::fastdds::rtps::EDP::unpairWriterProxy(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, bool)::$_0>(eprosima::fastdds::rtps::EDP::unpairWriterProxy(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, bool)::$_0)
Unexecuted instantiation: EDP.cpp:eprosima::fastdds::rtps::EDP::pairing_writer_proxy_with_any_local_reader(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::WriterProxyData*)::$_0 eprosima::fastdds::rtps::RTPSParticipantImpl::forEachUserReader<eprosima::fastdds::rtps::EDP::pairing_writer_proxy_with_any_local_reader(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::WriterProxyData*)::$_0>(eprosima::fastdds::rtps::EDP::pairing_writer_proxy_with_any_local_reader(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::WriterProxyData*)::$_0)
1133
1134
    /** Helper function that creates ReceiverResources based on a Locator_t List, possibly mutating
1135
     * some and updating the list. DOES NOT associate endpoints with it.
1136
     * @param Locator_list - Locator list to be used to create the ReceiverResources
1137
     * @param ApplyMutation - True if we want to create a Resource with a "similar" locator if the one we provide is unavailable
1138
     * @param RegisterReceiver - True if we want the receiver to be registered. Useful for receivers created after participant is enabled.
1139
     * @param log_when_creation_fails - True if a log warning shall be issued for each locator when a receiver resource cannot be created.
1140
     * @return True if a receiver resource was created for at least a locator in the list, false otherwise.
1141
     */
1142
    bool createReceiverResources(
1143
            LocatorList_t& Locator_list,
1144
            bool ApplyMutation,
1145
            bool RegisterReceiver,
1146
            bool log_when_creation_fails);
1147
1148
    void createSenderResources(
1149
            const LocatorList_t& locator_list);
1150
1151
    void createSenderResources(
1152
            const Locator_t& locator);
1153
1154
    void createSenderResources(
1155
            const RemoteLocatorList& locator_list,
1156
            const EndpointAttributes& param);
1157
1158
    /**
1159
     * Creates sender resources for the given locator selector entry by calling the NetworkFactory's
1160
     * build_send_resources method.
1161
     *
1162
     * @param locator_selector The locator selector entry for which sender resources need to be created.
1163
     */
1164
    void createSenderResources(
1165
            const LocatorSelectorEntry& locator_selector);
1166
1167
    bool networkFactoryHasRegisteredTransports() const;
1168
1169
    /**
1170
     * Function run when the RTPSDomain is notified that the environment file has changed.
1171
     */
1172
    void environment_file_has_changed();
1173
1174
    /**
1175
     * @brief Query if the participant is found in the ignored collection
1176
     *
1177
     * @param [in] participant_guid Participant to be queried
1178
     * @return True if found in the ignored collection. False otherwise.
1179
     */
1180
    bool is_participant_ignored(
1181
            const GuidPrefix_t& participant_guid);
1182
1183
    /**
1184
     * @brief Query if the writer is found in the ignored collection
1185
     *
1186
     * @param [in] writer_guid Writer to be queried
1187
     * @return True if found in the ignored collection. False otherwise.
1188
     */
1189
    bool is_writer_ignored(
1190
            const GUID_t& writer_guid);
1191
1192
    /**
1193
     * @brief Query if the reader is found in the ignored collection
1194
     *
1195
     * @param [in] reader_guid Reader to be queried
1196
     * @return True if found in the ignored collection. False otherwise.
1197
     */
1198
    bool is_reader_ignored(
1199
            const GUID_t& reader_guid);
1200
1201
    /**
1202
     * @brief Add a Participant into the corresponding ignore collection.
1203
     *
1204
     * @param [in] participant_guid Participant that is to be ignored.
1205
     * @return True if correctly included into the ignore collection. False otherwise.
1206
     */
1207
    bool ignore_participant(
1208
            const GuidPrefix_t& participant_guid);
1209
1210
    /**
1211
     * @brief Add a Writer into the corresponding ignore collection.
1212
     *
1213
     * @param [in] writer_guid Writer that is to be ignored.
1214
     * @return True if correctly included into the ignore collection. False otherwise.
1215
     */
1216
    bool ignore_writer(
1217
            const GUID_t& writer_guid);
1218
1219
    /**
1220
     * @brief Add a Reader into the corresponding ignore collection.
1221
     *
1222
     * @param [in] reader_guid Reader that is to be ignored.
1223
     * @return True if correctly included into the ignore collection. False otherwise.
1224
     */
1225
    bool ignore_reader(
1226
            const GUID_t& reader_guid);
1227
1228
    /**
1229
     * @brief Returns registered transports' netmask filter information (transport's netmask filter kind and allowlist).
1230
     *
1231
     * @return A vector with all registered transports' netmask filter information.
1232
     */
1233
    std::vector<TransportNetmaskFilterInfo> get_netmask_filter_info() const;
1234
1235
    /**
1236
     * @brief Fills the provided @ref PublicationBuiltinTopicData with the information of the
1237
     * writer identified by writer_guid.
1238
     *
1239
     * @param[out] data @ref PublicationBuiltinTopicData to fill.
1240
     * @param[in] writer_guid GUID of the writer to get the information from.
1241
     * @return True if the writer was found and the data was filled.
1242
     */
1243
    bool get_publication_info(
1244
            PublicationBuiltinTopicData& data,
1245
            const GUID_t& writer_guid) const;
1246
1247
    /**
1248
     * @brief Fills the provided @ref SubscriptionBuiltinTopicData with the information of the
1249
     * reader identified by reader_guid.
1250
     *
1251
     * @param[out] data @ref SubscriptionBuiltinTopicData to fill.
1252
     * @param[in] reader_guid GUID of the reader to get the information from.
1253
     * @return True if the reader was found and the data was filled.
1254
     */
1255
    bool get_subscription_info(
1256
            SubscriptionBuiltinTopicData& data,
1257
            const GUID_t& reader_guid) const;
1258
1259
    template <EndpointKind_t kind, octet no_key, octet with_key>
1260
    static bool preprocess_endpoint_attributes(
1261
            const EntityId_t& entity_id,
1262
            std::atomic<uint32_t>& id_count,
1263
            EndpointAttributes& att,
1264
            EntityId_t& entId);
1265
1266
#if HAVE_SECURITY
1267
    void set_endpoint_rtps_protection_supports(
1268
            Endpoint* endpoint,
1269
            bool support)
1270
    {
1271
        endpoint->supports_rtps_protection_ = support;
1272
    }
1273
1274
#endif // if HAVE_SECURITY
1275
1276
#ifdef FASTDDS_STATISTICS
1277
1278
    /** Register a listener in participant RTPSWriter entities.
1279
     * @param listener, smart pointer to the listener interface to register
1280
     * @param guid, RTPSWriter identifier. If unknown the listener is registered in all enable ones
1281
     * @return true on success
1282
     */
1283
    bool register_in_writer(
1284
            std::shared_ptr<fastdds::statistics::IListener> listener,
1285
            GUID_t guid = GUID_t::unknown()) override;
1286
1287
    /** Register a listener in participant RTPSReader entities.
1288
     * @param listener, smart pointer to the listener interface to register
1289
     * @param guid, RTPSReader identifier. If unknown the listener is registered in all enable ones
1290
     * @return true on success
1291
     */
1292
    bool register_in_reader(
1293
            std::shared_ptr<fastdds::statistics::IListener> listener,
1294
            GUID_t guid = GUID_t::unknown()) override;
1295
1296
    /** Unregister a listener in participant RTPSWriter entities.
1297
     * @param listener, smart pointer to the listener interface to unregister
1298
     * @return true on success
1299
     */
1300
    bool unregister_in_writer(
1301
            std::shared_ptr<fastdds::statistics::IListener> listener) override;
1302
1303
    /** Unregister a listener in participant RTPSReader entities.
1304
     * @param listener, smart pointer to the listener interface to unregister
1305
     * @return true on success
1306
     */
1307
    bool unregister_in_reader(
1308
            std::shared_ptr<fastdds::statistics::IListener> listener) override;
1309
1310
    /**
1311
     * @brief Set the enabled statistics writers mask
1312
     *
1313
     * @param enabled_writers The new mask to set
1314
     */
1315
    void set_enabled_statistics_writers_mask(
1316
            uint32_t enabled_writers) override;
1317
1318
    /**
1319
     * Creates the monitor service in this RTPSParticipant with the provided interfaces.
1320
     *
1321
     * @param sq reference to the object implementing the StatusQueryable interface.
1322
     * It will usually be the DDS DomainParticipant
1323
     *
1324
     * @return A const pointer to the listener (implemented within the RTPSParticipant)
1325
     *
1326
     */
1327
    const fastdds::statistics::rtps::IStatusObserver* create_monitor_service(
1328
            fastdds::statistics::rtps::IStatusQueryable& status_queryable);
1329
1330
    /**
1331
     * Creates the monitor service in this RTPSParticipant with a simple default
1332
     * implementation of the IStatusQueryable.
1333
     *
1334
     * @return true if the monitor service could be correctly created.
1335
     *
1336
     */
1337
    bool create_monitor_service();
1338
1339
    /**
1340
     * Returns whether the monitor service in created in this RTPSParticipant.
1341
     *
1342
     * @return true if the monitor service is created.
1343
     * @return false otherwise.
1344
     *
1345
     */
1346
    bool is_monitor_service_created() const;
1347
1348
    /**
1349
     * Enables the monitor service in this RTPSParticipant.
1350
     *
1351
     * @return true if the monitor service could be correctly enabled.
1352
     *
1353
     */
1354
    bool enable_monitor_service() const;
1355
1356
    /**
1357
     * Disables the monitor service in this RTPSParticipant. Does nothing if the service was not enabled before.
1358
     *
1359
     * @return true if the monitor service could be correctly disabled.
1360
     * @return false if the service could not be properly disabled or if the monitor service was not previously enabled.
1361
     *
1362
     */
1363
    bool disable_monitor_service() const;
1364
1365
    /**
1366
     * fills in the ParticipantBuiltinTopicData from a MonitorService Message
1367
     *
1368
     * @param [out] data Proxy to fill
1369
     * @param [in] msg MonitorService Message to get the proxy information from.
1370
     *
1371
     * @return true if the operation succeeds.
1372
     */
1373
    bool fill_discovery_data_from_cdr_message(
1374
            ParticipantBuiltinTopicData& data,
1375
            const fastdds::statistics::MonitorServiceStatusData& msg);
1376
1377
    /**
1378
     * fills in the PublicationBuiltinTopicData from a MonitorService Message
1379
     *
1380
     * @param [out] data Proxy to fill.
1381
     * @param [in] msg MonitorService Message to get the proxy information from.
1382
     *
1383
     * @return true if the operation succeeds.
1384
     */
1385
    bool fill_discovery_data_from_cdr_message(
1386
            PublicationBuiltinTopicData& data,
1387
            const fastdds::statistics::MonitorServiceStatusData& msg);
1388
1389
    /**
1390
     * fills in the SubscriptionBuiltinTopicData from a MonitorService Message
1391
     *
1392
     * @param [out] data Proxy to fill.
1393
     * @param [in] msg MonitorService Message to get the proxy information from.
1394
     *
1395
     * @return true if the operation succeeds.
1396
     */
1397
    bool fill_discovery_data_from_cdr_message(
1398
            SubscriptionBuiltinTopicData& data,
1399
            const fastdds::statistics::MonitorServiceStatusData& msg);
1400
1401
    bool get_entity_connections(
1402
            const GUID_t&,
1403
            fastdds::statistics::rtps::ConnectionList& conn_list) override;
1404
1405
    const fastdds::statistics::rtps::IConnectionsObserver* get_connections_observer()
1406
0
    {
1407
0
        return conns_observer_.load();
1408
0
    }
1409
1410
#else
1411
    bool get_entity_connections(
1412
            const GUID_t&,
1413
            fastdds::statistics::rtps::ConnectionList&) override
1414
    {
1415
        return false;
1416
    }
1417
1418
#endif // FASTDDS_STATISTICS
1419
1420
    bool should_match_local_endpoints()
1421
0
    {
1422
0
        return match_local_endpoints_;
1423
0
    }
1424
1425
    /**
1426
     * Method called on participant removal with the set of locators associated to the participant.
1427
     *
1428
     * @param remote_participant_locators Set of locators associated to the participant removed.
1429
     */
1430
    void update_removed_participant(
1431
            const LocatorList_t& remote_participant_locators);
1432
1433
    /**
1434
     * @brief Get participant's @ref dds::utils::TypePropagation
1435
     *
1436
     * @return This participant's @ref dds::utils::TypePropagation
1437
     */
1438
    dds::utils::TypePropagation type_propagation() const;
1439
1440
};
1441
} // namespace rtps
1442
} /* namespace rtps */
1443
} /* namespace eprosima */
1444
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
1445
#endif //FASTDDS_RTPS_PARTICIPANT__RTPSPARTICIPANTIMPL_H