Coverage Report

Created: 2026-05-04 06:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/Fast-DDS/src/cpp/rtps/writer/StatefulWriter.cpp
Line
Count
Source
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file StatefulWriter.cpp
17
 *
18
 */
19
20
#include "StatefulWriter.hpp"
21
22
#include <mutex>
23
#include <stdexcept>
24
#include <vector>
25
26
#include <fastdds/dds/log/Log.hpp>
27
#include <fastdds/rtps/builtin/data/SubscriptionBuiltinTopicData.hpp>
28
#include <fastdds/rtps/common/VendorId_t.hpp>
29
#include <fastdds/rtps/history/WriterHistory.hpp>
30
#include <fastdds/rtps/history/WriterHistory.hpp>
31
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
32
#include <rtps/messages/RTPSMessageCreator.hpp>
33
#include <fastdds/rtps/participant/RTPSParticipant.hpp>
34
#include <fastdds/rtps/reader/ReaderDiscoveryStatus.hpp>
35
#include <fastdds/rtps/reader/RTPSReader.hpp>
36
#include <fastdds/rtps/writer/WriterListener.hpp>
37
38
#include <rtps/builtin/BuiltinProtocols.h>
39
#include <rtps/builtin/liveliness/WLP.hpp>
40
#include <rtps/DataSharing/DataSharingNotifier.hpp>
41
#include <rtps/DataSharing/DataSharingPayloadPool.hpp>
42
#include <rtps/DataSharing/WriterPool.hpp>
43
#include <rtps/history/BasicPayloadPool.hpp>
44
#include <rtps/history/CacheChangePool.h>
45
#include <rtps/messages/RTPSGapBuilder.hpp>
46
#include <rtps/messages/RTPSMessageGroup.hpp>
47
#include <rtps/network/utils/external_locators.hpp>
48
#include <rtps/participant/RTPSParticipantImpl.hpp>
49
#include <rtps/reader/BaseReader.hpp>
50
#include <rtps/reader/LocalReaderPointer.hpp>
51
#include <rtps/resources/ResourceEvent.h>
52
#include <rtps/resources/TimedEvent.h>
53
#include <rtps/domain/RTPSDomainImpl.hpp>
54
#include <rtps/writer/BaseWriter.hpp>
55
#include <rtps/writer/ReaderProxy.hpp>
56
#include <rtps/writer/StatefulWriterListener.hpp>
57
#include <utils/TimeConversion.hpp>
58
59
#ifdef FASTDDS_STATISTICS
60
#include <statistics/types/monitorservice_types.hpp>
61
#endif // ifdef FASTDDS_STATISTICS
62
63
#include "../builtin/discovery/database/DiscoveryDataBase.hpp"
64
65
#include "../flowcontrol/FlowController.hpp"
66
67
namespace eprosima {
68
namespace fastdds {
69
namespace rtps {
70
71
/**
72
 * Loops over all the readers in the vector, applying the given routine.
73
 * The loop continues until the result of the routine is true for any reader
74
 * or all readers have been processed.
75
 * The returned value is true if the routine returned true at any point,
76
 * or false otherwise.
77
 */
78
template<typename T>
79
static bool for_matched_readers(
80
        ResourceLimitedVector<ReaderProxy*>& reader_vector_1,
81
        T fun)
82
0
{
83
0
    for (ReaderProxy* remote_reader : reader_vector_1)
84
0
    {
85
0
        if (fun(remote_reader))
86
0
        {
87
0
            return true;
88
0
        }
89
0
    }
90
91
0
    return false;
92
0
}
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::unsent_change_added_to_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::unsent_change_added_to_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::change_removed_by_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::change_removed_by_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_reader_add_edp(eprosima::fastdds::rtps::ReaderProxyData const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::matched_reader_add_edp(eprosima::fastdds::rtps::ReaderProxyData const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_reader_is_matched(eprosima::fastdds::rtps::GUID_t const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::matched_reader_is_matched(eprosima::fastdds::rtps::GUID_t const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_reader_lookup(eprosima::fastdds::rtps::GUID_t&, eprosima::fastdds::rtps::ReaderProxy**)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::matched_reader_lookup(eprosima::fastdds::rtps::GUID_t&, eprosima::fastdds::rtps::ReaderProxy**)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::all_readers_updated()::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::all_readers_updated()::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::wait_for_all_acked(eprosima::fastdds::dds::Time_t const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::wait_for_all_acked(eprosima::fastdds::dds::Time_t const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::check_acked_status()::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::check_acked_status()::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::update_times(eprosima::fastdds::rtps::WriterTimes const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::update_times(eprosima::fastdds::rtps::WriterTimes const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_1>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_1)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::perform_nack_supression(eprosima::fastdds::rtps::GUID_t const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::perform_nack_supression(eprosima::fastdds::rtps::GUID_t const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::process_acknack(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::BitmapRange<eprosima::fastdds::rtps::SequenceNumber_t, eprosima::fastdds::rtps::SequenceNumberDiff, 256u> const&, bool, bool&, std::__1::array<unsigned char, 2ul>)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::process_acknack(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::BitmapRange<eprosima::fastdds::rtps::SequenceNumber_t, eprosima::fastdds::rtps::SequenceNumberDiff, 256u> const&, bool, bool&, std::__1::array<unsigned char, 2ul>)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::process_nack_frag(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::rtps::SequenceNumber_t const&, eprosima::fastdds::BitmapRange<unsigned int, eprosima::fastdds::DiffFunction<unsigned int>, 256u> const&, bool&, std::__1::array<unsigned char, 2ul>)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::process_nack_frag(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::rtps::SequenceNumber_t const&, eprosima::fastdds::BitmapRange<unsigned int, eprosima::fastdds::DiffFunction<unsigned int>, 256u> const&, bool&, std::__1::array<unsigned char, 2ul>)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::ack_timer_expired()::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::ack_timer_expired()::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::get_connections(std::__1::vector<eprosima::fastdds::statistics::Connection, std::__1::allocator<eprosima::fastdds::statistics::Connection> >&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::get_connections(std::__1::vector<eprosima::fastdds::statistics::Connection, std::__1::allocator<eprosima::fastdds::statistics::Connection> >&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::get_connections(std::__1::vector<eprosima::fastdds::statistics::Connection, std::__1::allocator<eprosima::fastdds::statistics::Connection> >&)::$_1>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::get_connections(std::__1::vector<eprosima::fastdds::statistics::Connection, std::__1::allocator<eprosima::fastdds::statistics::Connection> >&)::$_1)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::get_connections(std::__1::vector<eprosima::fastdds::statistics::Connection, std::__1::allocator<eprosima::fastdds::statistics::Connection> >&)::$_2>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::get_connections(std::__1::vector<eprosima::fastdds::statistics::Connection, std::__1::allocator<eprosima::fastdds::statistics::Connection> >&)::$_2)
93
94
template<typename T>
95
static bool for_matched_readers(
96
        ResourceLimitedVector<ReaderProxy*>& reader_vector_1,
97
        ResourceLimitedVector<ReaderProxy*>& reader_vector_2,
98
        T fun)
99
0
{
100
0
    if (for_matched_readers(reader_vector_1, fun))
101
0
    {
102
0
        return true;
103
0
    }
104
0
    return for_matched_readers(reader_vector_2, fun);
105
0
}
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::unsent_change_added_to_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::unsent_change_added_to_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::change_removed_by_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::change_removed_by_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_reader_add_edp(eprosima::fastdds::rtps::ReaderProxyData const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::matched_reader_add_edp(eprosima::fastdds::rtps::ReaderProxyData const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_reader_is_matched(eprosima::fastdds::rtps::GUID_t const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::matched_reader_is_matched(eprosima::fastdds::rtps::GUID_t const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_reader_lookup(eprosima::fastdds::rtps::GUID_t&, eprosima::fastdds::rtps::ReaderProxy**)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::matched_reader_lookup(eprosima::fastdds::rtps::GUID_t&, eprosima::fastdds::rtps::ReaderProxy**)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::all_readers_updated()::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::all_readers_updated()::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::wait_for_all_acked(eprosima::fastdds::dds::Time_t const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::wait_for_all_acked(eprosima::fastdds::dds::Time_t const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::check_acked_status()::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::check_acked_status()::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::update_times(eprosima::fastdds::rtps::WriterTimes const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::update_times(eprosima::fastdds::rtps::WriterTimes const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_1>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_1)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::perform_nack_supression(eprosima::fastdds::rtps::GUID_t const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::perform_nack_supression(eprosima::fastdds::rtps::GUID_t const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::process_acknack(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::BitmapRange<eprosima::fastdds::rtps::SequenceNumber_t, eprosima::fastdds::rtps::SequenceNumberDiff, 256u> const&, bool, bool&, std::__1::array<unsigned char, 2ul>)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::process_acknack(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::BitmapRange<eprosima::fastdds::rtps::SequenceNumber_t, eprosima::fastdds::rtps::SequenceNumberDiff, 256u> const&, bool, bool&, std::__1::array<unsigned char, 2ul>)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::process_nack_frag(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::rtps::SequenceNumber_t const&, eprosima::fastdds::BitmapRange<unsigned int, eprosima::fastdds::DiffFunction<unsigned int>, 256u> const&, bool&, std::__1::array<unsigned char, 2ul>)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::process_nack_frag(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::rtps::SequenceNumber_t const&, eprosima::fastdds::BitmapRange<unsigned int, eprosima::fastdds::DiffFunction<unsigned int>, 256u> const&, bool&, std::__1::array<unsigned char, 2ul>)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::ack_timer_expired()::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::ack_timer_expired()::$_0)
106
107
template<typename T>
108
static bool for_matched_readers(
109
        ResourceLimitedVector<ReaderProxy*>& reader_vector_1,
110
        ResourceLimitedVector<ReaderProxy*>& reader_vector_2,
111
        ResourceLimitedVector<ReaderProxy*>& reader_vector_3,
112
        T fun)
113
0
{
114
0
    if (for_matched_readers(reader_vector_1, reader_vector_2, fun))
115
0
    {
116
0
        return true;
117
0
    }
118
0
    return for_matched_readers(reader_vector_3, fun);
119
0
}
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::unsent_change_added_to_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::unsent_change_added_to_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::change_removed_by_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::change_removed_by_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_reader_add_edp(eprosima::fastdds::rtps::ReaderProxyData const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::matched_reader_add_edp(eprosima::fastdds::rtps::ReaderProxyData const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_reader_is_matched(eprosima::fastdds::rtps::GUID_t const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::matched_reader_is_matched(eprosima::fastdds::rtps::GUID_t const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_reader_lookup(eprosima::fastdds::rtps::GUID_t&, eprosima::fastdds::rtps::ReaderProxy**)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::matched_reader_lookup(eprosima::fastdds::rtps::GUID_t&, eprosima::fastdds::rtps::ReaderProxy**)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::all_readers_updated()::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::all_readers_updated()::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::wait_for_all_acked(eprosima::fastdds::dds::Time_t const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::wait_for_all_acked(eprosima::fastdds::dds::Time_t const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::check_acked_status()::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::check_acked_status()::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::update_times(eprosima::fastdds::rtps::WriterTimes const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::update_times(eprosima::fastdds::rtps::WriterTimes const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_1>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_1)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::perform_nack_supression(eprosima::fastdds::rtps::GUID_t const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::perform_nack_supression(eprosima::fastdds::rtps::GUID_t const&)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::process_acknack(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::BitmapRange<eprosima::fastdds::rtps::SequenceNumber_t, eprosima::fastdds::rtps::SequenceNumberDiff, 256u> const&, bool, bool&, std::__1::array<unsigned char, 2ul>)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::process_acknack(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::BitmapRange<eprosima::fastdds::rtps::SequenceNumber_t, eprosima::fastdds::rtps::SequenceNumberDiff, 256u> const&, bool, bool&, std::__1::array<unsigned char, 2ul>)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::process_nack_frag(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::rtps::SequenceNumber_t const&, eprosima::fastdds::BitmapRange<unsigned int, eprosima::fastdds::DiffFunction<unsigned int>, 256u> const&, bool&, std::__1::array<unsigned char, 2ul>)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::process_nack_frag(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::rtps::SequenceNumber_t const&, eprosima::fastdds::BitmapRange<unsigned int, eprosima::fastdds::DiffFunction<unsigned int>, 256u> const&, bool&, std::__1::array<unsigned char, 2ul>)::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::ack_timer_expired()::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::ack_timer_expired()::$_0)
120
121
/**
122
 * Loops over all the readers in the vector, applying the given routine.
123
 * The loop continues until the result of the routine is true for any reader
124
 * or all readers have been processes.
125
 * The returned value is true if the routine returned true at any point,
126
 * or false otherwise.
127
 *
128
 * const version
129
 */
130
template<typename T>
131
static bool for_matched_readers(
132
        const ResourceLimitedVector<ReaderProxy*>& reader_vector_1,
133
        T fun)
134
0
{
135
0
    for (const ReaderProxy* remote_reader : reader_vector_1)
136
0
    {
137
0
        if (fun(remote_reader))
138
0
        {
139
0
            return true;
140
0
        }
141
0
    }
142
143
0
    return false;
144
0
}
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::is_acked_by_all_nts(eprosima::fastdds::rtps::SequenceNumber_t) const::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::rtps::StatefulWriter::is_acked_by_all_nts(eprosima::fastdds::rtps::SequenceNumber_t) const::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_readers_guids(std::__1::vector<eprosima::fastdds::rtps::GUID_t, std::__1::allocator<eprosima::fastdds::rtps::GUID_t> >&) const::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::rtps::StatefulWriter::matched_readers_guids(std::__1::vector<eprosima::fastdds::rtps::GUID_t, std::__1::allocator<eprosima::fastdds::rtps::GUID_t> >&) const::$_0)
145
146
template<typename T>
147
static bool for_matched_readers(
148
        const ResourceLimitedVector<ReaderProxy*>& reader_vector_1,
149
        const ResourceLimitedVector<ReaderProxy*>& reader_vector_2,
150
        T fun)
151
0
{
152
0
    if (for_matched_readers(reader_vector_1, fun))
153
0
    {
154
0
        return true;
155
0
    }
156
0
    return for_matched_readers(reader_vector_2, fun);
157
0
}
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::is_acked_by_all_nts(eprosima::fastdds::rtps::SequenceNumber_t) const::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::rtps::StatefulWriter::is_acked_by_all_nts(eprosima::fastdds::rtps::SequenceNumber_t) const::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_readers_guids(std::__1::vector<eprosima::fastdds::rtps::GUID_t, std::__1::allocator<eprosima::fastdds::rtps::GUID_t> >&) const::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::rtps::StatefulWriter::matched_readers_guids(std::__1::vector<eprosima::fastdds::rtps::GUID_t, std::__1::allocator<eprosima::fastdds::rtps::GUID_t> >&) const::$_0)
158
159
template<typename T>
160
static bool for_matched_readers(
161
        const ResourceLimitedVector<ReaderProxy*>& reader_vector_1,
162
        const ResourceLimitedVector<ReaderProxy*>& reader_vector_2,
163
        const ResourceLimitedVector<ReaderProxy*>& reader_vector_3,
164
        T fun)
165
0
{
166
0
    if (for_matched_readers(reader_vector_1, reader_vector_2, fun))
167
0
    {
168
0
        return true;
169
0
    }
170
0
    return for_matched_readers(reader_vector_3, fun);
171
0
}
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::is_acked_by_all_nts(eprosima::fastdds::rtps::SequenceNumber_t) const::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::rtps::StatefulWriter::is_acked_by_all_nts(eprosima::fastdds::rtps::SequenceNumber_t) const::$_0)
Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_readers_guids(std::__1::vector<eprosima::fastdds::rtps::GUID_t, std::__1::allocator<eprosima::fastdds::rtps::GUID_t> >&) const::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::rtps::StatefulWriter::matched_readers_guids(std::__1::vector<eprosima::fastdds::rtps::GUID_t, std::__1::allocator<eprosima::fastdds::rtps::GUID_t> >&) const::$_0)
172
173
using namespace std::chrono;
174
175
StatefulWriter::StatefulWriter(
176
        RTPSParticipantImpl* pimpl,
177
        const GUID_t& guid,
178
        const WriterAttributes& att,
179
        FlowController* flow_controller,
180
        WriterHistory* history,
181
        WriterListener* listener,
182
        StatefulWriterListener* stateful_listener)
183
0
    : BaseWriter(pimpl, guid, att, flow_controller, history, listener)
184
0
    , periodic_hb_event_(nullptr)
185
0
    , nack_response_event_(nullptr)
186
0
    , ack_event_(nullptr)
187
0
    , heartbeat_count_(0)
188
0
    , times_(att.times)
189
0
    , matched_remote_readers_(att.matched_readers_allocation)
190
0
    , matched_readers_pool_(att.matched_readers_allocation)
191
0
    , next_all_acked_notify_sequence_(0, 1)
192
0
    , all_acked_(false)
193
0
    , may_remove_change_cond_()
194
0
    , may_remove_change_(0)
195
0
    , disable_heartbeat_piggyback_(att.disable_heartbeat_piggyback)
196
0
    , disable_positive_acks_(att.disable_positive_acks)
197
0
    , keep_duration_(att.keep_duration)
198
0
    , last_sequence_number_()
199
0
    , biggest_removed_sequence_number_()
200
0
    , matched_local_readers_(att.matched_readers_allocation)
201
0
    , matched_datasharing_readers_(att.matched_readers_allocation)
202
0
    , locator_selector_general_(*this, att.matched_readers_allocation)
203
0
    , locator_selector_async_(*this, att.matched_readers_allocation)
204
0
    , stateful_writer_listener_(stateful_listener)
205
0
{
206
0
    init(pimpl, att);
207
0
}
208
209
void StatefulWriter::init(
210
        RTPSParticipantImpl* pimpl,
211
        const WriterAttributes& att)
212
0
{
213
0
    const RTPSParticipantAttributes& part_att = pimpl->get_attributes();
214
215
0
    auto push_mode = PropertyPolicyHelper::find_property(att.endpoint.properties, "fastdds.push_mode");
216
0
    push_mode_ = !((nullptr != push_mode) && ("false" == *push_mode));
217
218
0
    periodic_hb_event_ = new TimedEvent(
219
0
        pimpl->getEventResource(),
220
0
        [&]() -> bool
221
0
        {
222
0
            return send_periodic_heartbeat();
223
0
        },
224
0
        fastdds::rtps::TimeConv::Time_t2MilliSecondsDouble(times_.heartbeat_period));
225
226
0
    nack_response_event_ = new TimedEvent(
227
0
        pimpl->getEventResource(),
228
0
        [&]() -> bool
229
0
        {
230
0
            perform_nack_response();
231
0
            return false;
232
0
        },
233
0
        fastdds::rtps::TimeConv::Time_t2MilliSecondsDouble(times_.nack_response_delay));
234
235
0
    if (disable_positive_acks_)
236
0
    {
237
0
        ack_event_ = new TimedEvent(
238
0
            pimpl->getEventResource(),
239
0
            [&]() -> bool
240
0
            {
241
0
                return ack_timer_expired();
242
0
            },
243
0
            att.keep_duration.to_ns() * 1e-6);             // in milliseconds
244
0
    }
245
246
0
    for (size_t n = 0; n < att.matched_readers_allocation.initial; ++n)
247
0
    {
248
0
        ReaderProxy* new_proxy = new ReaderProxy(
249
0
            times_, part_att.allocation.locators, this, stateful_writer_listener_);
250
0
        matched_readers_pool_.push_back(new_proxy);
251
0
    }
252
0
}
253
254
StatefulWriter::~StatefulWriter()
255
0
{
256
0
    EPROSIMA_LOG_INFO(RTPS_WRITER, "StatefulWriter destructor");
257
0
}
258
259
void StatefulWriter::local_actions_on_writer_removed()
260
0
{
261
0
    EPROSIMA_LOG_INFO(RTPS_WRITER, "StatefulWriter local_actions_on_writer_removed");
262
263
    // Disable timed events, because their callbacks use cache changes
264
0
    if (disable_positive_acks_)
265
0
    {
266
0
        delete(ack_event_);
267
0
        ack_event_ = nullptr;
268
0
    }
269
270
0
    if (nack_response_event_ != nullptr)
271
0
    {
272
0
        delete(nack_response_event_);
273
0
        nack_response_event_ = nullptr;
274
0
    }
275
276
    // This must be the next action, as it frees CacheChange_t from the async thread.
277
0
    BaseWriter::local_actions_on_writer_removed();
278
279
    // Stop all active proxies and pass them to the pool
280
0
    {
281
0
        std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
282
0
        while (!matched_remote_readers_.empty())
283
0
        {
284
0
            ReaderProxy* remote_reader = matched_remote_readers_.back();
285
0
            matched_remote_readers_.pop_back();
286
0
            remote_reader->stop();
287
0
            matched_readers_pool_.push_back(remote_reader);
288
0
        }
289
0
        while (!matched_local_readers_.empty())
290
0
        {
291
0
            ReaderProxy* remote_reader = matched_local_readers_.back();
292
0
            matched_local_readers_.pop_back();
293
0
            remote_reader->stop();
294
0
            matched_readers_pool_.push_back(remote_reader);
295
0
        }
296
0
        while (!matched_datasharing_readers_.empty())
297
0
        {
298
0
            ReaderProxy* remote_reader = matched_datasharing_readers_.back();
299
0
            matched_datasharing_readers_.pop_back();
300
0
            remote_reader->stop();
301
0
            matched_readers_pool_.push_back(remote_reader);
302
0
        }
303
0
    }
304
305
    // PeriodicHeartbeatEvent must be released after releasing all proxies
306
    // because proxy's NackSuppressionEvent could restart this event.
307
0
    if (periodic_hb_event_ != nullptr)
308
0
    {
309
0
        delete(periodic_hb_event_);
310
0
        periodic_hb_event_ = nullptr;
311
0
    }
312
313
    // Delete all proxies in the pool
314
0
    for (ReaderProxy* remote_reader : matched_readers_pool_)
315
0
    {
316
0
        delete(remote_reader);
317
0
    }
318
0
}
319
320
/*
321
 * CHANGE-RELATED METHODS
322
 */
323
void StatefulWriter::prepare_datasharing_delivery(
324
        CacheChange_t* change)
325
0
{
326
0
    auto pool = std::dynamic_pointer_cast<WriterPool>(history_->get_payload_pool());
327
0
    assert (pool != nullptr);
328
329
0
    pool->add_to_shared_history(change);
330
0
    EPROSIMA_LOG_INFO(RTPS_WRITER, "Notifying readers of cache change with SN " << change->sequenceNumber);
331
0
}
332
333
void StatefulWriter::unsent_change_added_to_history(
334
        CacheChange_t* change,
335
        const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
336
0
{
337
0
    std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
338
0
    auto payload_length = change->serializedPayload.length;
339
340
0
    if (liveliness_lease_duration_ < dds::c_TimeInfinite)
341
0
    {
342
0
        mp_RTPSParticipant->wlp()->assert_liveliness(
343
0
            getGuid(),
344
0
            liveliness_kind_,
345
0
            liveliness_lease_duration_);
346
0
    }
347
348
    // Prepare the metadata for datasharing
349
0
    if (is_datasharing_compatible())
350
0
    {
351
0
        prepare_datasharing_delivery(change);
352
0
    }
353
354
    // Now for the rest of readers
355
0
    if (!matched_remote_readers_.empty() || !matched_datasharing_readers_.empty() || !matched_local_readers_.empty())
356
0
    {
357
0
        bool should_be_sent = false;
358
0
        for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
359
0
                [this, &should_be_sent, &change, &max_blocking_time](ReaderProxy* reader)
360
0
                {
361
0
                    ChangeForReader_t changeForReader(change);
362
0
                    bool is_relevant = reader->rtps_is_relevant(change);
363
364
0
                    if (push_mode_ || !reader->is_reliable() || reader->is_local_reader())
365
0
                    {
366
                        //ChangeForReader_t construct sets status to UNSENT.
367
0
                        should_be_sent |= is_relevant;
368
0
                    }
369
0
                    else
370
0
                    {
371
0
                        changeForReader.setStatus(UNACKNOWLEDGED);
372
0
                    }
373
0
                    reader->add_change(changeForReader, is_relevant, false, max_blocking_time);
374
375
0
                    return false;
376
0
                }
377
0
                );
378
379
0
        if (disable_positive_acks_)
380
0
        {
381
0
            Time_t expiration_ts = change->sourceTimestamp + keep_duration_;
382
0
            Time_t current_ts;
383
0
            Time_t::now(current_ts);
384
0
            assert(expiration_ts >= current_ts);
385
0
            auto interval = (expiration_ts - current_ts).to_duration_t();
386
387
0
            ack_event_->update_interval(interval);
388
0
            ack_event_->restart_timer(max_blocking_time);
389
0
        }
390
391
        // After adding the CacheChange_t to flowcontroller, its pointer cannot be used because it may be removed
392
        // internally before exiting the call. For example if the writer matched with a best-effort reader.
393
0
        if (should_be_sent)
394
0
        {
395
0
            flow_controller_->add_new_sample(this, change, max_blocking_time);
396
0
        }
397
0
        else
398
0
        {
399
0
            periodic_hb_event_->restart_timer(max_blocking_time);
400
0
        }
401
0
    }
402
0
    else
403
0
    {
404
0
        EPROSIMA_LOG_INFO(RTPS_WRITER, "No reader proxy to add change.");
405
0
        check_acked_status();
406
0
    }
407
408
    // Throughput should be notified even if no matches are available
409
0
    on_publish_throughput(payload_length);
410
0
}
411
412
bool StatefulWriter::intraprocess_delivery(
413
        CacheChange_t* change,
414
        ReaderProxy* reader_proxy)
415
0
{
416
0
    LocalReaderPointer::Instance local_reader = reader_proxy->local_reader();
417
0
    if (local_reader)
418
0
    {
419
0
        if (change->write_params.related_sample_identity() != SampleIdentity::unknown())
420
0
        {
421
0
            change->write_params.sample_identity(change->write_params.related_sample_identity());
422
0
        }
423
0
        return local_reader->process_data_msg(change);
424
0
    }
425
0
    return false;
426
0
}
427
428
bool StatefulWriter::intraprocess_gap(
429
        ReaderProxy* reader_proxy,
430
        const SequenceNumber_t& first_seq,
431
        const SequenceNumber_t& last_seq)
432
0
{
433
0
    LocalReaderPointer::Instance local_reader = reader_proxy->local_reader();
434
0
    if (local_reader)
435
0
    {
436
0
        return local_reader->process_gap_msg(
437
0
            m_guid, first_seq, SequenceNumberSet_t(last_seq), c_VendorId_eProsima);
438
0
    }
439
440
0
    return false;
441
0
}
442
443
bool StatefulWriter::intraprocess_heartbeat(
444
        ReaderProxy* reader_proxy,
445
        bool liveliness)
446
0
{
447
0
    bool returned_value = false;
448
0
    LocalReaderPointer::Instance local_reader = reader_proxy->local_reader();
449
450
0
    if (local_reader)
451
0
    {
452
0
        std::unique_lock<RecursiveTimedMutex> lockW(mp_mutex);
453
0
        SequenceNumber_t first_seq = get_seq_num_min();
454
0
        SequenceNumber_t last_seq = get_seq_num_max();
455
456
0
        if (first_seq == c_SequenceNumber_Unknown || last_seq == c_SequenceNumber_Unknown)
457
0
        {
458
0
            if (liveliness)
459
0
            {
460
0
                first_seq = next_sequence_number();
461
0
                last_seq = first_seq - 1;
462
0
            }
463
0
        }
464
465
0
        if ((first_seq != c_SequenceNumber_Unknown && last_seq != c_SequenceNumber_Unknown) &&
466
0
                (liveliness || reader_proxy->has_changes()))
467
0
        {
468
0
            increment_hb_count();
469
0
            Count_t hb_count = heartbeat_count_;
470
0
            lockW.unlock();
471
0
            returned_value = local_reader->process_heartbeat_msg(
472
0
                m_guid, hb_count, first_seq, last_seq, true, liveliness, c_VendorId_eProsima);
473
0
        }
474
0
    }
475
476
0
    return returned_value;
477
0
}
478
479
bool StatefulWriter::change_removed_by_history(
480
        CacheChange_t* a_change,
481
        const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
482
0
{
483
0
    bool ret_value = false;
484
0
    SequenceNumber_t sequence_number = a_change->sequenceNumber;
485
486
0
    std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
487
0
    EPROSIMA_LOG_INFO(RTPS_WRITER, "Change " << sequence_number << " to be removed.");
488
489
0
    if (flow_controller_->remove_change(a_change, max_blocking_time))
490
0
    {
491
492
        // Take note of biggest removed sequence number to improve sending of gaps
493
0
        if (sequence_number > biggest_removed_sequence_number_)
494
0
        {
495
0
            biggest_removed_sequence_number_ = sequence_number;
496
0
        }
497
498
        // Invalidate CacheChange pointer in ReaderProxies.
499
0
        for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
500
0
                [sequence_number](ReaderProxy* reader)
501
0
                {
502
0
                    reader->change_has_been_removed(sequence_number);
503
0
                    return false;
504
0
                }
505
0
                );
506
507
        // remove from datasharing pool history
508
0
        if (is_datasharing_compatible())
509
0
        {
510
0
            auto pool = std::dynamic_pointer_cast<WriterPool>(history_->get_payload_pool());
511
0
            assert (pool != nullptr);
512
513
0
            pool->remove_from_shared_history(a_change);
514
0
            EPROSIMA_LOG_INFO(RTPS_WRITER, "Removing shared cache change with SN " << a_change->sequenceNumber);
515
0
        }
516
517
0
        may_remove_change_ = 2;
518
0
        may_remove_change_cond_.notify_one();
519
520
0
        ret_value = true;
521
0
    }
522
523
0
    return ret_value;
524
0
}
525
526
void StatefulWriter::send_heartbeat_to_all_readers(
527
        bool force_separating)
528
0
{
529
    // This method is only called from send_periodic_heartbeat
530
531
0
    if (separate_sending_enabled_ || force_separating)
532
0
    {
533
0
        for (ReaderProxy* reader : matched_remote_readers_)
534
0
        {
535
0
            send_heartbeat_to_nts(*reader);
536
0
        }
537
0
    }
538
0
    else
539
0
    {
540
0
        for (ReaderProxy* reader : matched_local_readers_)
541
0
        {
542
0
            intraprocess_heartbeat(reader);
543
0
        }
544
545
0
        for (ReaderProxy* reader : matched_datasharing_readers_)
546
0
        {
547
0
            reader->datasharing_notify();
548
0
        }
549
550
0
        if (there_are_remote_readers_)
551
0
        {
552
0
            RTPSMessageGroup group(mp_RTPSParticipant, this, &locator_selector_general_);
553
0
            select_all_readers_nts(group, locator_selector_general_);
554
555
0
            assert(
556
0
                (SequenceNumber_t::unknown() == get_seq_num_min() &&
557
0
                SequenceNumber_t::unknown() == get_seq_num_max()) ||
558
0
                (SequenceNumber_t::unknown() != get_seq_num_min() &&
559
0
                SequenceNumber_t::unknown() != get_seq_num_max()));
560
561
0
            add_gaps_for_holes_in_history(group);
562
563
0
            send_heartbeat_nts_(locator_selector_general_.all_remote_readers.size(), group, disable_positive_acks_);
564
0
        }
565
0
    }
566
0
}
567
568
void StatefulWriter::deliver_sample_to_intraprocesses(
569
        CacheChange_t* change)
570
0
{
571
0
    for (ReaderProxy* remoteReader : matched_local_readers_)
572
0
    {
573
0
        SequenceNumber_t gap_seq;
574
0
        FragmentNumber_t dummy = 0;
575
0
        bool dumb = false;
576
0
        if (remoteReader->change_is_unsent(change->sequenceNumber, dummy, gap_seq, get_seq_num_min(), dumb))
577
0
        {
578
            // If there is a hole (removed from history or not relevants) between previous sample and this one,
579
            // send it a personal GAP.
580
0
            if (SequenceNumber_t::unknown() != gap_seq)
581
0
            {
582
0
                intraprocess_gap(remoteReader, gap_seq, change->sequenceNumber);
583
0
                remoteReader->acked_changes_set(change->sequenceNumber);
584
0
            }
585
0
            bool delivered = intraprocess_delivery(change, remoteReader);
586
0
            if (!remoteReader->is_reliable())
587
0
            {
588
0
                remoteReader->acked_changes_set(change->sequenceNumber + 1);
589
0
            }
590
0
            else
591
0
            {
592
0
                intraprocess_heartbeat(remoteReader, false);
593
0
                remoteReader->from_unsent_to_status(
594
0
                    change->sequenceNumber,
595
0
                    delivered ? ACKNOWLEDGED : UNACKNOWLEDGED,
596
0
                    false,
597
0
                    delivered);
598
0
            }
599
0
        }
600
0
    }
601
0
}
602
603
void StatefulWriter::deliver_sample_to_datasharing(
604
        CacheChange_t* change)
605
0
{
606
0
    for (ReaderProxy* remoteReader : matched_datasharing_readers_)
607
0
    {
608
0
        SequenceNumber_t gap_seq;
609
0
        FragmentNumber_t dummy = 0;
610
0
        bool dumb = false;
611
0
        if (remoteReader->change_is_unsent(change->sequenceNumber, dummy, gap_seq, get_seq_num_min(), dumb))
612
0
        {
613
0
            if (!remoteReader->is_reliable())
614
0
            {
615
0
                remoteReader->acked_changes_set(change->sequenceNumber + 1);
616
0
            }
617
0
            else
618
0
            {
619
0
                remoteReader->from_unsent_to_status(
620
0
                    change->sequenceNumber,
621
0
                    UNACKNOWLEDGED,
622
0
                    false);
623
0
            }
624
0
            remoteReader->datasharing_notify();
625
0
        }
626
0
    }
627
0
}
628
629
DeliveryRetCode StatefulWriter::deliver_sample_to_network(
630
        CacheChange_t* change,
631
        RTPSMessageGroup& group,
632
        LocatorSelectorSender& locator_selector, // Object locked by FlowControllerImpl
633
        const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
634
0
{
635
0
    NetworkFactory& network = mp_RTPSParticipant->network_factory();
636
0
    DeliveryRetCode ret_code = DeliveryRetCode::DELIVERED;
637
0
    uint32_t n_fragments = change->getFragmentCount();
638
0
    FragmentNumber_t min_unsent_fragment = 0;
639
0
    bool need_reactivate_periodic_heartbeat = false;
640
641
0
    while (DeliveryRetCode::DELIVERED == ret_code &&
642
0
            min_unsent_fragment != n_fragments + 1)
643
0
    {
644
0
        SequenceNumber_t gap_seq_for_all = SequenceNumber_t::unknown();
645
0
        locator_selector.locator_selector.reset(false);
646
0
        auto first_relevant_reader = matched_remote_readers_.begin();
647
0
        bool inline_qos = false;
648
0
        bool should_be_sent = false;
649
0
        min_unsent_fragment = n_fragments + 1;
650
651
0
        for (auto remote_reader = first_relevant_reader; remote_reader != matched_remote_readers_.end();
652
0
                ++remote_reader)
653
0
        {
654
0
            SequenceNumber_t gap_seq;
655
0
            FragmentNumber_t next_unsent_frag = 0;
656
657
0
            if (SequenceNumber_t::unknown() != (*remote_reader)->first_irrelevant_removed())
658
0
            {
659
                // Send GAP with irrelevant changes that are not in history.
660
0
                group.sender(this, (*remote_reader)->message_sender());
661
0
                add_gaps_for_removed_irrelevants(**remote_reader, group);
662
0
                group.sender(this, &locator_selector);             // This makes the flush_and_reset().
663
0
            }
664
665
0
            if ((*remote_reader)->change_is_unsent(change->sequenceNumber, next_unsent_frag, gap_seq, get_seq_num_min(),
666
0
                    need_reactivate_periodic_heartbeat) &&
667
0
                    (0 == n_fragments || min_unsent_fragment >= next_unsent_frag))
668
0
            {
669
0
                if (min_unsent_fragment > next_unsent_frag)
670
0
                {
671
0
                    locator_selector.locator_selector.reset(false);
672
0
                    first_relevant_reader = remote_reader;
673
0
                    min_unsent_fragment = next_unsent_frag;
674
0
                }
675
676
0
                (*remote_reader)->active(true);
677
0
                locator_selector.locator_selector.enable((*remote_reader)->guid());
678
0
                should_be_sent = true;
679
0
                inline_qos |= (*remote_reader)->expects_inline_qos();
680
681
                // If there is a hole (removed from history or not relevants) between previous sample and this one,
682
                // send it a personal GAP.
683
0
                if (SequenceNumber_t::unknown() != gap_seq)
684
0
                {
685
0
                    if (SequenceNumber_t::unknown() == gap_seq_for_all)     // Calculate if the hole is for all readers
686
0
                    {
687
0
                        History::const_iterator chit = history_->find_change_nts(change);
688
689
0
                        if (chit == history_->changesBegin())
690
0
                        {
691
0
                            gap_seq_for_all = gap_seq;
692
0
                        }
693
0
                        else
694
0
                        {
695
0
                            SequenceNumber_t prev = (*std::prev(chit))->sequenceNumber + 1;
696
697
0
                            if (prev == gap_seq)
698
0
                            {
699
0
                                gap_seq_for_all = gap_seq;
700
0
                            }
701
0
                        }
702
0
                    }
703
704
0
                    if (gap_seq_for_all != gap_seq)     // If it is an individual GAP, sent it to repective reader.
705
0
                    {
706
0
                        group.sender(this, (*remote_reader)->message_sender());
707
0
                        group.add_gap(gap_seq, SequenceNumberSet_t(change->sequenceNumber),
708
0
                                (*remote_reader)->guid());
709
0
                        send_heartbeat_nts_(1u, group, disable_positive_acks_);
710
0
                        group.sender(this, &locator_selector);     // This makes the flush_and_reset().
711
0
                    }
712
0
                }
713
0
            }
714
0
            else
715
0
            {
716
0
                (*remote_reader)->active(false);
717
0
            }
718
0
        }
719
720
0
        bool should_send_global_gap = SequenceNumber_t::unknown() != gap_seq_for_all;
721
722
0
        if (locator_selector.locator_selector.state_has_changed() &&
723
0
                ((should_be_sent && !separate_sending_enabled_) || should_send_global_gap))
724
0
        {
725
0
            group.flush_and_reset();
726
0
            network.select_locators(locator_selector.locator_selector);
727
0
            compute_selected_guids(locator_selector);
728
0
        }
729
730
0
        if (should_send_global_gap) // Send GAP for all readers
731
0
        {
732
0
            group.add_gap(gap_seq_for_all, SequenceNumberSet_t(change->sequenceNumber));
733
0
        }
734
735
0
        try
736
0
        {
737
0
            if (should_be_sent)
738
0
            {
739
0
                if (!separate_sending_enabled_)
740
0
                {
741
0
                    size_t num_locators = locator_selector.locator_selector.selected_size();
742
0
                    if (num_locators > 0)
743
0
                    {
744
0
                        if (0 < n_fragments)
745
0
                        {
746
0
                            if (min_unsent_fragment != n_fragments + 1)
747
0
                            {
748
0
                                if (group.add_data_frag(*change, min_unsent_fragment, inline_qos))
749
0
                                {
750
0
                                    for (auto remote_reader = first_relevant_reader;
751
0
                                            remote_reader != matched_remote_readers_.end();
752
0
                                            ++remote_reader)
753
0
                                    {
754
0
                                        if ((*remote_reader)->active())
755
0
                                        {
756
0
                                            bool allFragmentsSent = false;
757
0
                                            (*remote_reader)->mark_fragment_as_sent_for_change(
758
0
                                                change->sequenceNumber,
759
0
                                                min_unsent_fragment,
760
0
                                                allFragmentsSent);
761
762
0
                                            if (allFragmentsSent)
763
0
                                            {
764
0
                                                if (!(*remote_reader)->is_reliable())
765
0
                                                {
766
0
                                                    (*remote_reader)->acked_changes_set(change->sequenceNumber + 1);
767
0
                                                }
768
0
                                                else
769
0
                                                {
770
0
                                                    (*remote_reader)->from_unsent_to_status(
771
0
                                                        change->sequenceNumber,
772
0
                                                        UNDERWAY,
773
0
                                                        true);
774
0
                                                }
775
0
                                            }
776
0
                                        }
777
0
                                    }
778
0
                                    add_statistics_sent_submessage(change, num_locators);
779
0
                                }
780
0
                                else
781
0
                                {
782
0
                                    ret_code = DeliveryRetCode::NOT_DELIVERED;
783
0
                                }
784
0
                            }
785
0
                        }
786
0
                        else
787
0
                        {
788
0
                            if (group.add_data(*change, inline_qos))
789
0
                            {
790
0
                                for (auto remote_reader = first_relevant_reader;
791
0
                                        remote_reader != matched_remote_readers_.end();
792
0
                                        ++remote_reader)
793
0
                                {
794
0
                                    if ((*remote_reader)->active())
795
0
                                    {
796
0
                                        if (!(*remote_reader)->is_reliable())
797
0
                                        {
798
0
                                            (*remote_reader)->acked_changes_set(change->sequenceNumber + 1);
799
0
                                        }
800
0
                                        else
801
0
                                        {
802
0
                                            (*remote_reader)->from_unsent_to_status(
803
0
                                                change->sequenceNumber,
804
0
                                                UNDERWAY,
805
0
                                                true);
806
0
                                        }
807
0
                                    }
808
0
                                }
809
0
                                add_statistics_sent_submessage(change, num_locators);
810
0
                            }
811
0
                            else
812
0
                            {
813
0
                                ret_code = DeliveryRetCode::NOT_DELIVERED;
814
0
                            }
815
0
                        }
816
817
0
                        send_heartbeat_piggyback_nts_(group, locator_selector);
818
0
                    }
819
0
                }
820
0
                else
821
0
                {
822
0
                    for (auto remote_reader = first_relevant_reader;
823
0
                            remote_reader != matched_remote_readers_.end();
824
0
                            ++remote_reader)
825
0
                    {
826
0
                        if ((*remote_reader)->active())
827
0
                        {
828
0
                            group.sender(this, (*remote_reader)->message_sender());
829
830
0
                            if (0 < n_fragments)
831
0
                            {
832
0
                                if (min_unsent_fragment != n_fragments + 1)
833
0
                                {
834
0
                                    if (group.add_data_frag(*change, min_unsent_fragment, inline_qos))
835
0
                                    {
836
0
                                        bool allFragmentsSent = false;
837
0
                                        (*remote_reader)->mark_fragment_as_sent_for_change(
838
0
                                            change->sequenceNumber,
839
0
                                            min_unsent_fragment,
840
0
                                            allFragmentsSent);
841
842
0
                                        if (allFragmentsSent)
843
0
                                        {
844
0
                                            if (!(*remote_reader)->is_reliable())
845
0
                                            {
846
0
                                                (*remote_reader)->acked_changes_set(change->sequenceNumber + 1);
847
0
                                            }
848
0
                                            else
849
0
                                            {
850
0
                                                (*remote_reader)->from_unsent_to_status(
851
0
                                                    change->sequenceNumber,
852
0
                                                    UNDERWAY,
853
0
                                                    true);
854
0
                                            }
855
0
                                        }
856
0
                                        add_statistics_sent_submessage(change, (*remote_reader)->locators_size());
857
0
                                    }
858
0
                                    else
859
0
                                    {
860
0
                                        ret_code = DeliveryRetCode::NOT_DELIVERED;
861
0
                                    }
862
0
                                }
863
0
                            }
864
0
                            else
865
0
                            {
866
0
                                if (group.add_data(*change, (*remote_reader)->expects_inline_qos()))
867
0
                                {
868
0
                                    if (!(*remote_reader)->is_reliable())
869
0
                                    {
870
0
                                        (*remote_reader)->acked_changes_set(change->sequenceNumber + 1);
871
0
                                    }
872
0
                                    else
873
0
                                    {
874
0
                                        (*remote_reader)->from_unsent_to_status(
875
0
                                            change->sequenceNumber,
876
0
                                            UNDERWAY,
877
0
                                            true);
878
0
                                    }
879
0
                                    add_statistics_sent_submessage(change, (*remote_reader)->locators_size());
880
0
                                }
881
0
                                else
882
0
                                {
883
0
                                    EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error sending change " << change->sequenceNumber);
884
0
                                    ret_code = DeliveryRetCode::NOT_DELIVERED;
885
0
                                }
886
0
                            }
887
888
0
                            send_heartbeat_nts_(1u, group, false);
889
0
                        }
890
0
                    }
891
0
                }
892
893
0
                on_sample_datas(change->write_params.sample_identity(), change->writer_info.num_sent_submessages);
894
0
                on_data_sent();
895
0
            }
896
0
        }
897
0
        catch (const RTPSMessageGroup::timeout&)
898
0
        {
899
0
            EPROSIMA_LOG_ERROR(RTPS_WRITER, "Max blocking time reached");
900
0
            ret_code = DeliveryRetCode::NOT_DELIVERED;
901
0
        }
902
0
        catch (const RTPSMessageGroup::limit_exceeded&)
903
0
        {
904
0
            ret_code = DeliveryRetCode::EXCEEDED_LIMIT;
905
0
        }
906
907
0
        if (disable_positive_acks_ && last_sequence_number_ == SequenceNumber_t())
908
0
        {
909
0
            last_sequence_number_ = change->sequenceNumber;
910
0
            if ( !(ack_event_->getRemainingTimeMilliSec() > 0))
911
0
            {
912
                // Restart ack_timer
913
0
                Time_t expiration_ts = change->sourceTimestamp + keep_duration_;
914
0
                Time_t current_ts;
915
0
                Time_t::now(current_ts);
916
0
                assert(expiration_ts >= current_ts);
917
0
                auto interval = (expiration_ts - current_ts).to_duration_t();
918
919
0
                ack_event_->update_interval(interval);
920
0
                ack_event_->restart_timer(max_blocking_time);
921
0
            }
922
0
        }
923
924
        // Restore in case a exception was launched by RTPSMessageGroup.
925
0
        group.sender(this, &locator_selector);
926
927
0
    }
928
929
0
    if (need_reactivate_periodic_heartbeat)
930
0
    {
931
0
        periodic_hb_event_->restart_timer(max_blocking_time);
932
0
    }
933
934
0
    return ret_code;
935
0
}
936
937
/*
938
 * MATCHED_READER-RELATED METHODS
939
 */
940
void StatefulWriter::update_reader_info(
941
        LocatorSelectorSender& locator_selector,
942
        bool create_sender_resources)
943
0
{
944
0
    update_cached_info_nts(locator_selector);
945
0
    compute_selected_guids(locator_selector);
946
947
0
    if (create_sender_resources)
948
0
    {
949
0
        RTPSParticipantImpl* part = get_participant_impl();
950
0
        locator_selector.locator_selector.for_each([part](const Locator_t& loc)
951
0
                {
952
0
                    part->createSenderResources(loc);
953
0
                });
954
0
    }
955
956
    // Check if we have local or remote readers
957
0
    there_are_remote_readers_ = !matched_remote_readers_.empty();
958
0
    there_are_local_readers_ = !matched_local_readers_.empty();
959
0
    there_are_datasharing_readers_ = !matched_datasharing_readers_.empty();
960
0
}
961
962
void StatefulWriter::select_all_readers_nts(
963
        RTPSMessageGroup& group,
964
        LocatorSelectorSender& locator_selector)
965
0
{
966
0
    locator_selector.locator_selector.reset(true);
967
0
    if (locator_selector.locator_selector.state_has_changed())
968
0
    {
969
0
        group.flush_and_reset();
970
0
        mp_RTPSParticipant->network_factory().select_locators(locator_selector.locator_selector);
971
0
        compute_selected_guids(locator_selector);
972
0
    }
973
0
}
974
975
bool StatefulWriter::matched_reader_add_edp(
976
        const ReaderProxyData& rdata)
977
0
{
978
0
    using network::external_locators::filter_remote_locators;
979
980
0
    if (rdata.guid == c_Guid_Unknown)
981
0
    {
982
0
        EPROSIMA_LOG_ERROR(RTPS_WRITER, "Reliable Writer need GUID_t of matched readers");
983
0
        return false;
984
0
    }
985
986
0
    std::unique_lock<RecursiveTimedMutex> guard(mp_mutex);
987
0
    std::unique_lock<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
988
0
    std::unique_lock<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);
989
990
    // Check if it is already matched.
991
0
    if (for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
992
0
            [this, &rdata](ReaderProxy* reader)
993
0
            {
994
0
                if (reader->guid() == rdata.guid)
995
0
                {
996
0
                    EPROSIMA_LOG_INFO(RTPS_WRITER, "Attempting to add existing reader, updating information.");
997
0
                    if (reader->update(rdata))
998
0
                    {
999
0
                        filter_remote_locators(*reader->general_locator_selector_entry(),
1000
0
                        m_att.external_unicast_locators, m_att.ignore_non_matching_locators);
1001
0
                        filter_remote_locators(*reader->async_locator_selector_entry(),
1002
0
                        m_att.external_unicast_locators, m_att.ignore_non_matching_locators);
1003
0
                        mp_RTPSParticipant->createSenderResources(rdata.remote_locators, m_att);
1004
0
                        update_reader_info(locator_selector_general_, true);
1005
0
                        update_reader_info(locator_selector_async_, true);
1006
0
                    }
1007
0
                    return true;
1008
0
                }
1009
0
                return false;
1010
0
            }))
1011
0
    {
1012
0
        if (nullptr != listener_)
1013
0
        {
1014
            // call the listener without locks taken
1015
0
            guard_locator_selector_async.unlock();
1016
0
            guard_locator_selector_general.unlock();
1017
0
            guard.unlock();
1018
1019
0
            listener_->on_reader_discovery(this, ReaderDiscoveryStatus::CHANGED_QOS_READER, rdata.guid, &rdata);
1020
0
        }
1021
1022
0
#ifdef FASTDDS_STATISTICS
1023
        // notify monitor service so that the connectionlist for this entity
1024
        // could be updated
1025
0
        if (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin())
1026
0
        {
1027
0
            mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);
1028
0
        }
1029
0
#endif //FASTDDS_STATISTICS
1030
1031
0
        return false;
1032
0
    }
1033
1034
    // Get a reader proxy from the inactive pool (or create a new one if necessary and allowed)
1035
0
    ReaderProxy* rp = nullptr;
1036
0
    if (matched_readers_pool_.empty())
1037
0
    {
1038
0
        size_t max_readers = matched_readers_pool_.max_size();
1039
0
        if (get_matched_readers_size() + matched_readers_pool_.size() < max_readers)
1040
0
        {
1041
0
            const RTPSParticipantAttributes& part_att = mp_RTPSParticipant->get_attributes();
1042
0
            rp = new ReaderProxy(times_, part_att.allocation.locators, this, stateful_writer_listener_);
1043
0
        }
1044
0
        else
1045
0
        {
1046
0
            EPROSIMA_LOG_WARNING(RTPS_WRITER, "Maximum number of reader proxies (" << max_readers
1047
0
                                                                                   << ") reached for writer "
1048
0
                                                                                   << m_guid);
1049
0
            return false;
1050
0
        }
1051
0
    }
1052
0
    else
1053
0
    {
1054
0
        rp = matched_readers_pool_.back();
1055
0
        matched_readers_pool_.pop_back();
1056
0
    }
1057
1058
    // Add info of new datareader.
1059
0
    rp->start(rdata, is_datasharing_compatible_with(rdata.data_sharing));
1060
0
    filter_remote_locators(*rp->general_locator_selector_entry(),
1061
0
            m_att.external_unicast_locators, m_att.ignore_non_matching_locators);
1062
0
    filter_remote_locators(*rp->async_locator_selector_entry(),
1063
0
            m_att.external_unicast_locators, m_att.ignore_non_matching_locators);
1064
0
    locator_selector_general_.locator_selector.add_entry(rp->general_locator_selector_entry());
1065
0
    locator_selector_async_.locator_selector.add_entry(rp->async_locator_selector_entry());
1066
1067
0
    if (rp->is_local_reader())
1068
0
    {
1069
0
        matched_local_readers_.push_back(rp);
1070
0
        EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << rdata.guid << " to " << this->m_guid.entityId
1071
0
                                                        << " as local reader");
1072
0
    }
1073
0
    else
1074
0
    {
1075
0
        if (rp->is_datasharing_reader())
1076
0
        {
1077
0
            matched_datasharing_readers_.push_back(rp);
1078
0
            EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << rdata.guid << " to " << this->m_guid.entityId
1079
0
                                                            << " as data sharing");
1080
0
        }
1081
0
        else
1082
0
        {
1083
0
            matched_remote_readers_.push_back(rp);
1084
0
            EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << rdata.guid << " to " << this->m_guid.entityId
1085
0
                                                            << " as remote reader");
1086
0
        }
1087
0
    }
1088
1089
0
    mp_RTPSParticipant->createSenderResources(rdata.remote_locators, m_att);
1090
0
    update_reader_info(locator_selector_general_, true);
1091
0
    update_reader_info(locator_selector_async_, true);
1092
1093
0
    if (rp->is_datasharing_reader())
1094
0
    {
1095
0
        if (nullptr != listener_)
1096
0
        {
1097
            // call the listener without locks taken
1098
0
            guard_locator_selector_async.unlock();
1099
0
            guard_locator_selector_general.unlock();
1100
0
            guard.unlock();
1101
1102
0
            listener_->on_reader_discovery(this, ReaderDiscoveryStatus::DISCOVERED_READER, rdata.guid, &rdata);
1103
0
        }
1104
1105
0
#ifdef FASTDDS_STATISTICS
1106
        // notify monitor service so that the connectionlist for this entity
1107
        // could be updated
1108
0
        if (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin())
1109
0
        {
1110
0
            mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);
1111
0
        }
1112
0
#endif //FASTDDS_STATISTICS
1113
1114
0
        return true;
1115
0
    }
1116
1117
0
    bool is_reliable = rp->is_reliable();
1118
0
    if (is_reliable)
1119
0
    {
1120
0
        SequenceNumber_t min_seq = get_seq_num_min();
1121
0
        SequenceNumber_t last_seq = get_seq_num_max();
1122
0
        RTPSMessageGroup group(mp_RTPSParticipant, this, rp->message_sender());
1123
1124
        // History not empty
1125
0
        if (min_seq != SequenceNumber_t::unknown())
1126
0
        {
1127
0
            (void)last_seq;
1128
0
            assert(last_seq != SequenceNumber_t::unknown());
1129
0
            assert(min_seq <= last_seq);
1130
1131
0
            try
1132
0
            {
1133
                // Late-joiner
1134
0
                if (TRANSIENT_LOCAL <= rp->durability_kind() &&
1135
0
                        TRANSIENT_LOCAL <= m_att.durabilityKind)
1136
0
                {
1137
0
                    for (History::iterator cit = history_->changesBegin(); cit != history_->changesEnd(); ++cit)
1138
0
                    {
1139
                        // Holes are managed when deliver_sample(), sending GAP messages.
1140
0
                        if (rp->rtps_is_relevant(*cit))
1141
0
                        {
1142
0
                            ChangeForReader_t changeForReader(*cit);
1143
1144
                            // If it is local, maintain in UNSENT status and add to flow controller.
1145
0
                            if (rp->is_local_reader())
1146
0
                            {
1147
0
                                flow_controller_->add_old_sample(this, *cit);
1148
0
                            }
1149
                            // In other case, set as UNACKNOWLEDGED and expects the reader request them.
1150
0
                            else
1151
0
                            {
1152
0
                                changeForReader.setStatus(UNACKNOWLEDGED);
1153
0
                            }
1154
1155
0
                            rp->add_change(changeForReader, true, false);
1156
0
                        }
1157
0
                    }
1158
0
                }
1159
0
                else
1160
0
                {
1161
0
                    if (rp->is_local_reader())
1162
0
                    {
1163
0
                        intraprocess_gap(rp, min_seq, history_->next_sequence_number());
1164
0
                    }
1165
0
                    else
1166
0
                    {
1167
                        // Send a GAP of the whole history.
1168
0
                        group.add_gap(min_seq, SequenceNumberSet_t(history_->next_sequence_number()), rp->guid());
1169
0
                    }
1170
0
                }
1171
1172
                // Always activate heartbeat period. We need a confirmation of the reader.
1173
                // The state has to be updated.
1174
0
                periodic_hb_event_->restart_timer(std::chrono::steady_clock::now() + std::chrono::hours(24));
1175
0
            }
1176
0
            catch (const RTPSMessageGroup::timeout&)
1177
0
            {
1178
0
                EPROSIMA_LOG_ERROR(RTPS_WRITER, "Max blocking time reached");
1179
0
            }
1180
0
        }
1181
1182
0
        if (rp->is_local_reader())
1183
0
        {
1184
0
            intraprocess_heartbeat(rp);
1185
0
        }
1186
0
        else
1187
0
        {
1188
0
            send_heartbeat_nts_(1u, group, disable_positive_acks_);
1189
0
            group.flush_and_reset();
1190
0
        }
1191
0
    }
1192
0
    else
1193
0
    {
1194
        // Acknowledged all for best-effort reader.
1195
0
        rp->acked_changes_set(history_->next_sequence_number());
1196
0
    }
1197
1198
0
    EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy " << rp->guid() << " added to " << this->m_guid.entityId << " with "
1199
0
                                                   << rdata.remote_locators.unicast.size() << "(u)-"
1200
0
                                                   << rdata.remote_locators.multicast.size()
1201
0
                                                   << "(m) locators");
1202
1203
0
    if (nullptr != listener_)
1204
0
    {
1205
        // call the listener without locks taken
1206
0
        guard_locator_selector_async.unlock();
1207
0
        guard_locator_selector_general.unlock();
1208
0
        guard.unlock();
1209
1210
0
        listener_->on_reader_discovery(this, ReaderDiscoveryStatus::DISCOVERED_READER, rdata.guid, &rdata);
1211
0
    }
1212
1213
0
#ifdef FASTDDS_STATISTICS
1214
    // notify monitor service so that the connectionlist for this entity
1215
    // could be updated
1216
0
    if (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin())
1217
0
    {
1218
0
        mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);
1219
0
    }
1220
0
#endif //FASTDDS_STATISTICS
1221
1222
0
    return true;
1223
0
}
1224
1225
bool StatefulWriter::matched_reader_remove(
1226
        const GUID_t& reader_guid)
1227
0
{
1228
0
    ReaderProxy* rproxy = nullptr;
1229
0
    std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
1230
1231
0
    {
1232
0
        std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
1233
0
        std::lock_guard<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_);
1234
1235
0
        for (ReaderProxyIterator it = matched_local_readers_.begin();
1236
0
                it != matched_local_readers_.end(); ++it)
1237
0
        {
1238
0
            if ((*it)->guid() == reader_guid)
1239
0
            {
1240
0
                EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
1241
0
                rproxy = std::move(*it);
1242
0
                it = matched_local_readers_.erase(it);
1243
0
                break;
1244
0
            }
1245
0
        }
1246
1247
0
        if (rproxy == nullptr)
1248
0
        {
1249
0
            for (ReaderProxyIterator it = matched_datasharing_readers_.begin();
1250
0
                    it != matched_datasharing_readers_.end(); ++it)
1251
0
            {
1252
0
                if ((*it)->guid() == reader_guid)
1253
0
                {
1254
0
                    EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
1255
0
                    rproxy = std::move(*it);
1256
0
                    it = matched_datasharing_readers_.erase(it);
1257
0
                    break;
1258
0
                }
1259
0
            }
1260
0
        }
1261
1262
0
        if (rproxy == nullptr)
1263
0
        {
1264
0
            for (ReaderProxyIterator it = matched_remote_readers_.begin();
1265
0
                    it != matched_remote_readers_.end(); ++it)
1266
0
            {
1267
0
                if ((*it)->guid() == reader_guid)
1268
0
                {
1269
0
                    EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid);
1270
0
                    rproxy = std::move(*it);
1271
0
                    it = matched_remote_readers_.erase(it);
1272
0
                    break;
1273
0
                }
1274
0
            }
1275
0
        }
1276
1277
0
        locator_selector_general_.locator_selector.remove_entry(reader_guid);
1278
0
        locator_selector_async_.locator_selector.remove_entry(reader_guid);
1279
0
        update_reader_info(locator_selector_general_, false);
1280
0
        update_reader_info(locator_selector_async_, false);
1281
0
    }
1282
1283
0
    if (get_matched_readers_size() == 0)
1284
0
    {
1285
0
        periodic_hb_event_->cancel_timer();
1286
0
    }
1287
1288
0
    if (rproxy != nullptr)
1289
0
    {
1290
0
        rproxy->stop();
1291
0
        matched_readers_pool_.push_back(rproxy);
1292
1293
0
        check_acked_status();
1294
1295
0
        if (nullptr != listener_)
1296
0
        {
1297
            // listener is called without locks taken
1298
0
            lock.unlock();
1299
0
            listener_->on_reader_discovery(this, ReaderDiscoveryStatus::REMOVED_READER, reader_guid, nullptr);
1300
0
        }
1301
1302
0
#ifdef FASTDDS_STATISTICS
1303
        // notify monitor service so that the connectionlist for this entity
1304
        // could be updated
1305
0
        if (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin())
1306
0
        {
1307
0
            mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid);
1308
0
        }
1309
0
#endif //FASTDDS_STATISTICS
1310
1311
0
        return true;
1312
0
    }
1313
1314
0
    EPROSIMA_LOG_INFO(RTPS_HISTORY, "Reader Proxy doesn't exist in this writer");
1315
0
    return false;
1316
0
}
1317
1318
bool StatefulWriter::matched_reader_is_matched(
1319
        const GUID_t& reader_guid)
1320
0
{
1321
0
    std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
1322
0
    return for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
1323
0
                   [&reader_guid](ReaderProxy* reader)
1324
0
                   {
1325
0
                       return (reader->guid() == reader_guid);
1326
0
                   }
1327
0
                   );
1328
0
}
1329
1330
bool StatefulWriter::matched_reader_lookup(
1331
        GUID_t& readerGuid,
1332
        ReaderProxy** RP)
1333
0
{
1334
0
    std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
1335
0
    return for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
1336
0
                   [&readerGuid, RP](ReaderProxy* reader)
1337
0
                   {
1338
0
                       if (reader->guid() == readerGuid)
1339
0
                       {
1340
0
                           *RP = reader;
1341
0
                           return true;
1342
0
                       }
1343
0
                       return false;
1344
0
                   }
1345
0
                   );
1346
0
}
1347
1348
bool StatefulWriter::has_been_fully_delivered(
1349
        const SequenceNumber_t& seq_num) const
1350
0
{
1351
0
    std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
1352
0
    bool found = false;
1353
    // Sequence number has not been generated by this WriterHistory.
1354
0
    if (seq_num >= history_->next_sequence_number())
1355
0
    {
1356
0
        return false;
1357
0
    }
1358
0
    for (auto reader : matched_remote_readers_)
1359
0
    {
1360
0
        bool ret_code = reader->has_been_delivered(seq_num, found);
1361
0
        if (found && !ret_code)
1362
0
        {
1363
            // The change has not been fully delivered if it is pending delivery on at least one ReaderProxy.
1364
0
            return false;
1365
0
        }
1366
0
    }
1367
0
    return true;
1368
0
}
1369
1370
bool StatefulWriter::is_acked_by_all(
1371
        const SequenceNumber_t& seq_num) const
1372
0
{
1373
0
    std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
1374
0
    return is_acked_by_all_nts(seq_num);
1375
0
}
1376
1377
bool StatefulWriter::is_acked_by_all_nts(
1378
        const SequenceNumber_t seq) const
1379
0
{
1380
0
    assert(history_->next_sequence_number() > seq);
1381
0
    return (seq < next_all_acked_notify_sequence_) ||
1382
0
           !for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
1383
0
                   [seq](const ReaderProxy* reader)
1384
0
                   {
1385
0
                       return !(reader->change_is_acked(seq));
1386
0
                   });
1387
0
}
1388
1389
bool StatefulWriter::all_readers_updated()
1390
0
{
1391
0
    std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
1392
1393
0
    return !for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
1394
0
                   [](const ReaderProxy* reader)
1395
0
                   {
1396
0
                       return (reader->has_changes());
1397
0
                   }
1398
0
                   );
1399
0
}
1400
1401
bool StatefulWriter::wait_for_all_acked(
1402
        const dds::Duration_t& max_wait)
1403
0
{
1404
0
    send_periodic_heartbeat();
1405
1406
0
    std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
1407
0
    std::unique_lock<std::mutex> all_acked_lock(all_acked_mutex_);
1408
1409
0
    all_acked_ = !for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
1410
0
                    [](const ReaderProxy* reader)
1411
0
                    {
1412
0
                        return reader->has_changes();
1413
0
                    }
1414
0
                    );
1415
0
    lock.unlock();
1416
1417
0
    if (!all_acked_)
1418
0
    {
1419
0
        std::chrono::microseconds max_w(fastdds::rtps::TimeConv::Duration_t2MicroSecondsInt64(max_wait));
1420
0
        all_acked_cond_.wait_for(all_acked_lock, max_w, [&]()
1421
0
                {
1422
0
                    return all_acked_;
1423
0
                });
1424
0
    }
1425
1426
0
    return all_acked_;
1427
0
}
1428
1429
void StatefulWriter::rebuild_status_after_load()
1430
0
{
1431
0
    SequenceNumber_t min_seq = get_seq_num_min();
1432
0
    if (min_seq != SequenceNumber_t::unknown())
1433
0
    {
1434
0
        biggest_removed_sequence_number_ = min_seq - 1;
1435
0
        may_remove_change_ = 1;
1436
0
    }
1437
1438
0
    SequenceNumber_t next_seq = history_->next_sequence_number();
1439
0
    next_all_acked_notify_sequence_ = next_seq;
1440
0
    min_readers_low_mark_ = next_seq - 1;
1441
0
    all_acked_ = true;
1442
0
}
1443
1444
void StatefulWriter::check_acked_status()
1445
0
{
1446
0
    std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
1447
1448
0
    bool all_acked = true;
1449
0
    bool has_min_low_mark = false;
1450
    // #8945 If no readers matched, notify all old changes.
1451
0
    SequenceNumber_t min_low_mark = history_->next_sequence_number() - 1;
1452
1453
0
    for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
1454
0
            [&all_acked, &has_min_low_mark, &min_low_mark](ReaderProxy* reader)
1455
0
            {
1456
0
                SequenceNumber_t reader_low_mark = reader->changes_low_mark();
1457
0
                if (reader_low_mark < min_low_mark || !has_min_low_mark)
1458
0
                {
1459
0
                    has_min_low_mark = true;
1460
0
                    min_low_mark = reader_low_mark;
1461
0
                }
1462
1463
0
                if (reader->has_changes())
1464
0
                {
1465
0
                    all_acked = false;
1466
0
                }
1467
1468
0
                return false;
1469
0
            }
1470
0
            );
1471
1472
0
    if (all_acked)
1473
0
    {
1474
0
        min_low_mark = history_->next_sequence_number() - 1;
1475
0
    }
1476
1477
0
    bool something_changed = all_acked;
1478
0
    SequenceNumber_t min_seq = get_seq_num_min();
1479
0
    if (min_seq != SequenceNumber_t::unknown())
1480
0
    {
1481
        // In the case where we haven't received an acknack from a recently matched reader,
1482
        // min_low_mark will be zero, and no change will be notified as received by all
1483
0
        if (next_all_acked_notify_sequence_ <= min_low_mark)
1484
0
        {
1485
0
            if ((listener_ != nullptr) && (min_low_mark >= get_seq_num_min()))
1486
0
            {
1487
                // We will inform backwards about the changes received by all readers, starting
1488
                // on min_low_mark down until next_all_acked_notify_sequence_. This way we can
1489
                // safely proceed with the traversal, in case a change is removed from the history
1490
                // inside the callback
1491
0
                History::iterator history_end = history_->changesEnd();
1492
0
                History::iterator cit =
1493
0
                        std::lower_bound(history_->changesBegin(), history_end, min_low_mark,
1494
0
                                [](
1495
0
                                    const CacheChange_t* change,
1496
0
                                    const SequenceNumber_t& seq)
1497
0
                                {
1498
0
                                    return change->sequenceNumber < seq;
1499
0
                                });
1500
0
                if (cit != history_end && (*cit)->sequenceNumber == min_low_mark)
1501
0
                {
1502
0
                    ++cit;
1503
0
                }
1504
1505
0
                SequenceNumber_t seq{};
1506
0
                SequenceNumber_t end_seq = min_seq > next_all_acked_notify_sequence_ ?
1507
0
                        min_seq : next_all_acked_notify_sequence_;
1508
1509
                // The iterator starts pointing to the change inmediately after min_low_mark
1510
0
                --cit;
1511
1512
0
                do
1513
0
                {
1514
                    // Avoid notifying changes before next_all_acked_notify_sequence_
1515
0
                    CacheChange_t* change = *cit;
1516
0
                    seq = change->sequenceNumber;
1517
0
                    if (seq < next_all_acked_notify_sequence_)
1518
0
                    {
1519
0
                        break;
1520
0
                    }
1521
1522
                    // Change iterator before it possibly becomes invalidated
1523
0
                    if (cit != history_->changesBegin())
1524
0
                    {
1525
0
                        --cit;
1526
0
                    }
1527
1528
                    // Notify reception of change (may remove that change on VOLATILE writers)
1529
0
                    listener_->on_writer_change_received_by_all(this, change);
1530
1531
                    // Stop if we got to either next_all_acked_notify_sequence_ or the first change
1532
0
                }
1533
0
                while (seq > end_seq);
1534
0
            }
1535
1536
0
            next_all_acked_notify_sequence_ = min_low_mark + 1;
1537
0
        }
1538
1539
0
        if (min_low_mark >= get_seq_num_min())
1540
0
        {
1541
            // get_seq_num_min() returns SequenceNumber_t::unknown() when the history is empty.
1542
            // Thus, it is set to 2 to indicate that all samples have been removed.
1543
0
            may_remove_change_ = (get_seq_num_min() == SequenceNumber_t::unknown()) ? 2 : 1;
1544
0
        }
1545
1546
0
        min_readers_low_mark_ = min_low_mark;
1547
0
        something_changed = true;
1548
0
    }
1549
1550
0
    if (all_acked)
1551
0
    {
1552
0
        std::unique_lock<std::mutex> all_acked_lock(all_acked_mutex_);
1553
0
        SequenceNumber_t next_seq = history_->next_sequence_number();
1554
0
        next_all_acked_notify_sequence_ = next_seq;
1555
0
        min_readers_low_mark_ = next_seq - 1;
1556
0
        all_acked_ = true;
1557
0
        all_acked_cond_.notify_all();
1558
0
    }
1559
1560
0
    if (something_changed)
1561
0
    {
1562
0
        may_remove_change_cond_.notify_one();
1563
0
    }
1564
0
}
1565
1566
bool StatefulWriter::try_remove_change(
1567
        const std::chrono::steady_clock::time_point& max_blocking_time_point,
1568
        std::unique_lock<RecursiveTimedMutex>& lock)
1569
0
{
1570
0
    EPROSIMA_LOG_INFO(RTPS_WRITER, "Starting process try remove change for writer " << getGuid());
1571
1572
0
    SequenceNumber_t min_low_mark;
1573
1574
0
    {
1575
0
        std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
1576
0
        min_low_mark = next_all_acked_notify_sequence_ - 1;
1577
0
    }
1578
1579
0
    SequenceNumber_t calc = min_low_mark < get_seq_num_min() ? SequenceNumber_t() :
1580
0
            (min_low_mark - get_seq_num_min()) + 1;
1581
0
    unsigned int may_remove_change = 1;
1582
1583
0
    if (calc <= SequenceNumber_t())
1584
0
    {
1585
0
        may_remove_change_ = 0;
1586
0
        may_remove_change_cond_.wait_until(lock, max_blocking_time_point,
1587
0
                [&]()
1588
0
                {
1589
0
                    return may_remove_change_ > 0;
1590
0
                });
1591
0
        may_remove_change = may_remove_change_;
1592
0
    }
1593
1594
    // Some changes acked
1595
0
    if (may_remove_change == 1)
1596
0
    {
1597
0
        return history_->remove_min_change();
1598
0
    }
1599
    // Waiting a change was removed.
1600
0
    else if (may_remove_change == 2)
1601
0
    {
1602
0
        return true;
1603
0
    }
1604
1605
0
    return false;
1606
0
}
1607
1608
bool StatefulWriter::wait_for_acknowledgement(
1609
        const SequenceNumber_t& seq,
1610
        const std::chrono::steady_clock::time_point& max_blocking_time_point,
1611
        std::unique_lock<RecursiveTimedMutex>& lock)
1612
0
{
1613
0
    return may_remove_change_cond_.wait_until(lock, max_blocking_time_point,
1614
0
                   [this, &seq]()
1615
0
                   {
1616
0
                       return is_acked_by_all_nts(seq);
1617
0
                   });
1618
0
}
1619
1620
/*
1621
 * PARAMETER_RELATED METHODS
1622
 */
1623
void StatefulWriter::update_attributes(
1624
        const WriterAttributes& att)
1625
0
{
1626
0
    BaseWriter::update_attributes(att);
1627
1628
0
    this->update_times(att.times);
1629
0
    if (this->get_disable_positive_acks())
1630
0
    {
1631
0
        this->update_positive_acks_times(att);
1632
0
    }
1633
0
}
1634
1635
bool StatefulWriter::matched_readers_guids(
1636
        std::vector<GUID_t>& guids) const
1637
0
{
1638
0
    std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
1639
0
    guids.clear();
1640
0
    guids.reserve(matched_local_readers_.size() + matched_datasharing_readers_.size() +
1641
0
            matched_remote_readers_.size());
1642
0
    for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
1643
0
            [&guids](const ReaderProxy* reader)
1644
0
            {
1645
0
                guids.emplace_back(reader->guid());
1646
0
                return false;
1647
0
            }
1648
0
            );
1649
1650
0
    return true;
1651
0
}
1652
1653
void StatefulWriter::update_positive_acks_times(
1654
        const WriterAttributes& att)
1655
0
{
1656
0
    std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
1657
0
    keep_duration_ = att.keep_duration;
1658
    // Restart ack timer with new duration
1659
0
    ack_event_->update_interval(keep_duration_);
1660
0
    ack_event_->restart_timer();
1661
0
}
1662
1663
void StatefulWriter::update_times(
1664
        const WriterTimes& times)
1665
0
{
1666
0
    std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
1667
0
    if (times_.heartbeat_period != times.heartbeat_period)
1668
0
    {
1669
0
        periodic_hb_event_->update_interval(times.heartbeat_period);
1670
0
    }
1671
0
    if (times_.nack_response_delay != times.nack_response_delay)
1672
0
    {
1673
0
        if (nack_response_event_ != nullptr)
1674
0
        {
1675
0
            nack_response_event_->update_interval(times.nack_response_delay);
1676
0
        }
1677
0
    }
1678
0
    if (times_.nack_supression_duration != times.nack_supression_duration)
1679
0
    {
1680
0
        for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
1681
0
                [&times](ReaderProxy* reader)
1682
0
                {
1683
0
                    reader->update_nack_supression_interval(times.nack_supression_duration);
1684
0
                    return false;
1685
0
                }
1686
0
                );
1687
1688
0
        for (ReaderProxy* it : matched_readers_pool_)
1689
0
        {
1690
0
            it->update_nack_supression_interval(times.nack_supression_duration);
1691
0
        }
1692
0
    }
1693
0
    times_ = times;
1694
0
}
1695
1696
SequenceNumber_t StatefulWriter::next_sequence_number() const
1697
0
{
1698
0
    return history_->next_sequence_number();
1699
0
}
1700
1701
bool StatefulWriter::send_periodic_heartbeat(
1702
        bool final,
1703
        bool liveliness)
1704
0
{
1705
0
    std::lock_guard<RecursiveTimedMutex> guardW(mp_mutex);
1706
0
    std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_);
1707
1708
0
    bool unacked_changes {false};
1709
0
    bool irrelevants_removed {false};
1710
0
    if (!liveliness)
1711
0
    {
1712
0
        SequenceNumber_t first_seq_to_check_acknowledge = get_seq_num_min();
1713
0
        if (SequenceNumber_t::unknown() == first_seq_to_check_acknowledge)
1714
0
        {
1715
0
            first_seq_to_check_acknowledge = history_->next_sequence_number() - 1;
1716
0
        }
1717
1718
0
        for_matched_readers(matched_local_readers_, matched_datasharing_readers_,
1719
0
                matched_remote_readers_,
1720
0
                [first_seq_to_check_acknowledge, &unacked_changes, &irrelevants_removed](ReaderProxy* reader)
1721
0
                {
1722
0
                    if (!unacked_changes)
1723
0
                    {
1724
0
                        unacked_changes = reader->has_unacknowledged(first_seq_to_check_acknowledge);
1725
0
                    }
1726
1727
0
                    if (!irrelevants_removed)
1728
0
                    {
1729
0
                        irrelevants_removed = SequenceNumber_t::unknown() != reader->first_irrelevant_removed();
1730
0
                    }
1731
1732
0
                    return unacked_changes && irrelevants_removed;
1733
0
                }
1734
0
                );
1735
1736
0
        if (unacked_changes)
1737
0
        {
1738
0
            try
1739
0
            {
1740
                //TODO if separating, here sends periodic for all readers, instead of ones needed it.
1741
0
                send_heartbeat_to_all_readers(irrelevants_removed);
1742
0
            }
1743
0
            catch (const RTPSMessageGroup::timeout&)
1744
0
            {
1745
0
                EPROSIMA_LOG_ERROR(RTPS_WRITER, "Max blocking time reached");
1746
0
            }
1747
0
        }
1748
0
    }
1749
0
    else if (separate_sending_enabled_)
1750
0
    {
1751
        // Send individual liveliness heartbeat to each reader
1752
0
        for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
1753
0
                [this, &liveliness, &unacked_changes](ReaderProxy* reader)
1754
0
                {
1755
0
                    send_heartbeat_to_nts(*reader, liveliness);
1756
0
                    unacked_changes = true;
1757
0
                    return false;
1758
0
                }
1759
0
                );
1760
0
    }
1761
0
    else
1762
0
    {
1763
        // This is a liveliness heartbeat, we don't care about checking sequence numbers
1764
0
        try
1765
0
        {
1766
0
            for (ReaderProxy* reader : matched_local_readers_)
1767
0
            {
1768
0
                intraprocess_heartbeat(reader, true);
1769
0
                unacked_changes = true;
1770
0
            }
1771
1772
0
            for (ReaderProxy* reader : matched_datasharing_readers_)
1773
0
            {
1774
0
                auto pool = std::dynamic_pointer_cast<WriterPool>(history_->get_payload_pool());
1775
0
                assert(pool);
1776
0
                pool->assert_liveliness();
1777
0
                reader->datasharing_notify();
1778
0
                unacked_changes = true;
1779
0
            }
1780
1781
0
            if (there_are_remote_readers_)
1782
0
            {
1783
0
                unacked_changes = true;
1784
0
                RTPSMessageGroup group(mp_RTPSParticipant, this, &locator_selector_general_);
1785
0
                send_heartbeat_nts_(locator_selector_general_.all_remote_readers.size(), group, final, liveliness);
1786
0
            }
1787
0
        }
1788
0
        catch (const RTPSMessageGroup::timeout&)
1789
0
        {
1790
0
            EPROSIMA_LOG_ERROR(RTPS_WRITER, "Max blocking time reached");
1791
0
        }
1792
0
    }
1793
1794
0
    return unacked_changes;
1795
0
}
1796
1797
void StatefulWriter::send_heartbeat_to_nts(
1798
        ReaderProxy& remoteReaderProxy,
1799
        bool liveliness,
1800
        bool force /* = false */)
1801
0
{
1802
0
    SequenceNumber_t first_seq_to_check_acknowledge = get_seq_num_min();
1803
0
    if (SequenceNumber_t::unknown() == first_seq_to_check_acknowledge)
1804
0
    {
1805
0
        first_seq_to_check_acknowledge = history_->next_sequence_number() - 1;
1806
0
    }
1807
0
    if (remoteReaderProxy.is_reliable() &&
1808
0
            (force || liveliness || remoteReaderProxy.has_unacknowledged(first_seq_to_check_acknowledge)))
1809
0
    {
1810
0
        if (remoteReaderProxy.is_local_reader())
1811
0
        {
1812
0
            intraprocess_heartbeat(&remoteReaderProxy, liveliness);
1813
0
        }
1814
0
        else if (remoteReaderProxy.is_datasharing_reader())
1815
0
        {
1816
0
            remoteReaderProxy.datasharing_notify();
1817
0
        }
1818
0
        else
1819
0
        {
1820
0
            try
1821
0
            {
1822
0
                RTPSMessageGroup group(mp_RTPSParticipant, this, remoteReaderProxy.message_sender());
1823
0
                SequenceNumber_t firstSeq = get_seq_num_min();
1824
0
                SequenceNumber_t lastSeq = get_seq_num_max();
1825
1826
0
                if (firstSeq != c_SequenceNumber_Unknown && lastSeq != c_SequenceNumber_Unknown)
1827
0
                {
1828
0
                    assert(firstSeq <= lastSeq);
1829
0
                    if (!liveliness)
1830
0
                    {
1831
0
                        add_gaps_for_removed_irrelevants(remoteReaderProxy, group);
1832
0
                        add_gaps_for_holes_in_history(group);
1833
0
                    }
1834
0
                }
1835
1836
0
                send_heartbeat_nts_(1u, group, disable_positive_acks_, liveliness);
1837
0
            }
1838
0
            catch (const RTPSMessageGroup::timeout&)
1839
0
            {
1840
0
                EPROSIMA_LOG_ERROR(RTPS_WRITER, "Max blocking time reached");
1841
0
            }
1842
0
        }
1843
0
    }
1844
0
}
1845
1846
void StatefulWriter::send_heartbeat_nts_(
1847
        size_t number_of_readers,
1848
        RTPSMessageGroup& message_group,
1849
        bool final,
1850
        bool liveliness)
1851
0
{
1852
0
    if (!number_of_readers)
1853
0
    {
1854
0
        return;
1855
0
    }
1856
1857
0
    SequenceNumber_t firstSeq = get_seq_num_min();
1858
0
    SequenceNumber_t lastSeq = get_seq_num_max();
1859
1860
0
    if (firstSeq == c_SequenceNumber_Unknown || lastSeq == c_SequenceNumber_Unknown)
1861
0
    {
1862
0
        assert(firstSeq == c_SequenceNumber_Unknown && lastSeq == c_SequenceNumber_Unknown);
1863
1864
0
        if (number_of_readers == 1 || liveliness)
1865
0
        {
1866
0
            firstSeq = next_sequence_number();
1867
0
            lastSeq = firstSeq - 1;
1868
0
        }
1869
0
        else
1870
0
        {
1871
0
            return;
1872
0
        }
1873
0
    }
1874
0
    else
1875
0
    {
1876
0
        assert(firstSeq <= lastSeq);
1877
0
    }
1878
1879
0
    increment_hb_count();
1880
0
    message_group.add_heartbeat(firstSeq, lastSeq, heartbeat_count_, final, liveliness);
1881
1882
0
    EPROSIMA_LOG_INFO(RTPS_WRITER,
1883
0
            getGuid().entityId << " Sending Heartbeat (" << firstSeq << " - " << lastSeq << ")" );
1884
0
}
1885
1886
void StatefulWriter::send_heartbeat_piggyback_nts_(
1887
        RTPSMessageGroup& message_group,
1888
        LocatorSelectorSender& locator_selector)
1889
0
{
1890
0
    if (!disable_heartbeat_piggyback_)
1891
0
    {
1892
0
        if (history_->isFull() || next_all_acked_notify_sequence_ < get_seq_num_min())
1893
0
        {
1894
0
            select_all_readers_nts(message_group, locator_selector);
1895
0
            size_t number_of_readers = locator_selector.all_remote_readers.size();
1896
0
            send_heartbeat_nts_(number_of_readers, message_group, disable_positive_acks_);
1897
0
        }
1898
0
        else
1899
0
        {
1900
0
            if (last_num_exceeded_send_buffer_size_ != message_group.num_exceeded_send_buffer_size())
1901
0
            {
1902
0
                last_num_exceeded_send_buffer_size_ = message_group.num_exceeded_send_buffer_size();
1903
0
                select_all_readers_nts(message_group, locator_selector);
1904
0
                size_t number_of_readers = locator_selector.all_remote_readers.size();
1905
0
                send_heartbeat_nts_(number_of_readers, message_group, disable_positive_acks_);
1906
0
            }
1907
0
        }
1908
0
    }
1909
0
}
1910
1911
void StatefulWriter::perform_nack_response()
1912
0
{
1913
0
    std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
1914
1915
0
    uint32_t changes_to_resend = 0;
1916
0
    for (ReaderProxy* reader : matched_remote_readers_)
1917
0
    {
1918
0
        changes_to_resend += reader->perform_acknack_response([&](ChangeForReader_t& change)
1919
0
                        {
1920
                            // This labmda is called if the ChangeForReader_t pass from REQUESTED to UNSENT.
1921
0
                            assert(nullptr != change.getChange());
1922
0
                            flow_controller_->add_old_sample(this, change.getChange());
1923
0
                        }
1924
0
                        );
1925
0
    }
1926
1927
0
    lock.unlock();
1928
1929
    // Notify the statistics module
1930
0
    on_resent_data(changes_to_resend);
1931
0
}
1932
1933
void StatefulWriter::perform_nack_supression(
1934
        const GUID_t& reader_guid)
1935
0
{
1936
0
    std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
1937
1938
0
    for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
1939
0
            [this, &reader_guid](ReaderProxy* reader)
1940
0
            {
1941
0
                if (reader->guid() == reader_guid)
1942
0
                {
1943
0
                    reader->perform_nack_supression();
1944
0
                    periodic_hb_event_->restart_timer();
1945
0
                    return true;
1946
0
                }
1947
0
                return false;
1948
0
            }
1949
0
            );
1950
0
}
1951
1952
bool StatefulWriter::process_acknack(
1953
        const GUID_t& writer_guid,
1954
        const GUID_t& reader_guid,
1955
        uint32_t ack_count,
1956
        const SequenceNumberSet_t& sn_set,
1957
        bool final_flag,
1958
        bool& result,
1959
        fastdds::rtps::VendorId_t /*origin_vendor_id*/)
1960
0
{
1961
0
    std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
1962
0
    result = (m_guid == writer_guid);
1963
1964
0
    if (result)
1965
0
    {
1966
0
        SequenceNumber_t received_sequence_number = sn_set.empty() ? sn_set.base() : sn_set.max();
1967
0
        if (received_sequence_number <= next_sequence_number())
1968
0
        {
1969
0
            for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
1970
0
                    [&](ReaderProxy* remote_reader)
1971
0
                    {
1972
0
                        if (remote_reader->guid() == reader_guid)
1973
0
                        {
1974
0
                            if (remote_reader->check_and_set_acknack_count(ack_count))
1975
0
                            {
1976
                                // Sequence numbers before Base are set as Acknowledged.
1977
0
                                remote_reader->acked_changes_set(sn_set.base());
1978
0
                                if (sn_set.base() > SequenceNumber_t(0, 0))
1979
0
                                {
1980
                                    // Prepare GAP for requested  samples that are not in history or are irrelevants.
1981
0
                                    RTPSMessageGroup group(mp_RTPSParticipant, this, remote_reader->message_sender());
1982
0
                                    RTPSGapBuilder gap_builder(group);
1983
1984
0
                                    if (remote_reader->requested_changes_set(sn_set, gap_builder, get_seq_num_min()))
1985
0
                                    {
1986
0
                                        nack_response_event_->restart_timer();
1987
0
                                    }
1988
0
                                    else if (!final_flag)
1989
0
                                    {
1990
0
                                        periodic_hb_event_->restart_timer();
1991
0
                                    }
1992
1993
0
                                    gap_builder.flush();
1994
0
                                }
1995
0
                                else if (sn_set.empty() && !final_flag)
1996
0
                                {
1997
                                    // This is the preemptive acknack.
1998
0
                                    if (remote_reader->process_initial_acknack([&](ChangeForReader_t& change_reader)
1999
0
                                    {
2000
0
                                        assert(nullptr != change_reader.getChange());
2001
0
                                        flow_controller_->add_old_sample(this, change_reader.getChange());
2002
0
                                    }))
2003
0
                                    {
2004
0
                                        if (remote_reader->is_remote_and_reliable())
2005
0
                                        {
2006
                                            // Send heartbeat if requested
2007
0
                                            send_heartbeat_to_nts(*remote_reader, false, true);
2008
0
                                            periodic_hb_event_->restart_timer();
2009
0
                                        }
2010
0
                                    }
2011
2012
0
                                    if (remote_reader->is_local_reader() && !remote_reader->is_datasharing_reader())
2013
0
                                    {
2014
0
                                        intraprocess_heartbeat(remote_reader);
2015
0
                                    }
2016
0
                                }
2017
2018
                                // Check if all CacheChange are acknowledge, because a user could be waiting
2019
                                // for this, or some CacheChanges could be removed if we are VOLATILE
2020
0
                                check_acked_status();
2021
0
                            }
2022
0
                            return true;
2023
0
                        }
2024
2025
0
                        return false;
2026
0
                    }
2027
0
                    );
2028
0
        }
2029
0
        else
2030
0
        {
2031
0
            print_inconsistent_acknack(writer_guid, reader_guid, sn_set.base(), received_sequence_number,
2032
0
                    next_sequence_number());
2033
0
        }
2034
0
    }
2035
2036
0
    return result;
2037
0
}
2038
2039
bool StatefulWriter::process_nack_frag(
2040
        const GUID_t& writer_guid,
2041
        const GUID_t& reader_guid,
2042
        uint32_t ack_count,
2043
        const SequenceNumber_t& seq_num,
2044
        const FragmentNumberSet_t& fragments_state,
2045
        bool& result,
2046
        fastdds::rtps::VendorId_t /*origin_vendor_id*/)
2047
0
{
2048
0
    std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
2049
0
    result = false;
2050
0
    if (m_guid == writer_guid)
2051
0
    {
2052
0
        result = true;
2053
0
        for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
2054
0
                [this, &reader_guid, &ack_count, &seq_num, &fragments_state](ReaderProxy* reader)
2055
0
                {
2056
0
                    if (reader->guid() == reader_guid)
2057
0
                    {
2058
0
                        if (reader->process_nack_frag(reader_guid, ack_count, seq_num, fragments_state))
2059
0
                        {
2060
0
                            nack_response_event_->restart_timer();
2061
0
                        }
2062
0
                        return true;
2063
0
                    }
2064
0
                    return false;
2065
0
                }
2066
0
                );
2067
0
    }
2068
2069
0
    return result;
2070
0
}
2071
2072
bool StatefulWriter::ack_timer_expired()
2073
0
{
2074
0
    std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
2075
2076
    // The timer has expired so the earliest non-acked change must be marked as acknowledged
2077
    // This will be done in the first while iteration, as we start with a negative interval
2078
2079
0
    Time_t expiration_ts;
2080
0
    Time_t current_ts;
2081
0
    Time_t::now(current_ts);
2082
2083
    // On the other hand, we've seen in the tests that if samples are sent very quickly with little
2084
    // time between consecutive samples, the timer interval could end up being negative
2085
    // In this case, we keep marking changes as acknowledged until the timer is able to keep up, hence the while
2086
    // loop
2087
2088
0
    do
2089
0
    {
2090
0
        bool acks_flag = false;
2091
0
        for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_,
2092
0
                [this, &acks_flag](ReaderProxy* reader)
2093
0
                {
2094
0
                    if (reader->disable_positive_acks())
2095
0
                    {
2096
0
                        reader->acked_changes_set(last_sequence_number_ + 1);
2097
0
                        acks_flag = true;
2098
0
                    }
2099
0
                    return false;
2100
0
                }
2101
0
                );
2102
0
        if (acks_flag)
2103
0
        {
2104
0
            check_acked_status();
2105
0
        }
2106
2107
0
        CacheChange_t* change;
2108
2109
        // Skip removed changes until reaching the last change
2110
0
        do
2111
0
        {
2112
0
            last_sequence_number_++;
2113
0
        }
2114
0
        while (!history_->get_change(last_sequence_number_, getGuid(), &change) &&
2115
0
                last_sequence_number_ < next_sequence_number());
2116
2117
0
        if (!history_->get_change(
2118
0
                    last_sequence_number_,
2119
0
                    getGuid(),
2120
0
                    &change))
2121
0
        {
2122
            // Stop ack_timer
2123
0
            return false;
2124
0
        }
2125
2126
0
        Time_t::now(current_ts);
2127
0
        expiration_ts = change->sourceTimestamp + keep_duration_;
2128
0
    }
2129
0
    while (expiration_ts < current_ts);
2130
2131
0
    auto interval = (expiration_ts - current_ts).to_duration_t();
2132
0
    ack_event_->update_interval(interval);
2133
0
    return true;
2134
0
}
2135
2136
void StatefulWriter::print_inconsistent_acknack(
2137
        const GUID_t& writer_guid,
2138
        const GUID_t& reader_guid,
2139
        const SequenceNumber_t& min_requested_sequence_number,
2140
        const SequenceNumber_t& max_requested_sequence_number,
2141
        const SequenceNumber_t& next_sequence_number)
2142
0
{
2143
0
    EPROSIMA_LOG_WARNING(RTPS_WRITER, "Inconsistent acknack received. Local Writer "
2144
0
            << writer_guid << " next SequenceNumber " << next_sequence_number << ". Remote Reader "
2145
0
            << reader_guid << " requested range is  [" << min_requested_sequence_number
2146
0
            << ", " << max_requested_sequence_number << "].");
2147
    // This is necessary to avoid Warning of unused variable in case warning log level is disable
2148
0
    static_cast<void>(writer_guid);
2149
0
    static_cast<void>(reader_guid);
2150
0
    static_cast<void>(min_requested_sequence_number);
2151
0
    static_cast<void>(max_requested_sequence_number);
2152
0
    static_cast<void>(next_sequence_number);
2153
0
}
2154
2155
void StatefulWriter::reader_data_filter(
2156
        fastdds::rtps::IReaderDataFilter* reader_data_filter)
2157
0
{
2158
0
    reader_data_filter_ = reader_data_filter;
2159
0
}
2160
2161
const fastdds::rtps::IReaderDataFilter* StatefulWriter::reader_data_filter() const
2162
0
{
2163
0
    return reader_data_filter_;
2164
0
}
2165
2166
DeliveryRetCode StatefulWriter::deliver_sample_nts(
2167
        CacheChange_t* cache_change,
2168
        RTPSMessageGroup& group,
2169
        LocatorSelectorSender& locator_selector, // Object locked by FlowControllerImpl
2170
        const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
2171
0
{
2172
0
    DeliveryRetCode ret_code = DeliveryRetCode::DELIVERED;
2173
2174
0
    if (there_are_local_readers_)
2175
0
    {
2176
0
        deliver_sample_to_intraprocesses(cache_change);
2177
0
    }
2178
2179
    // Process datasharing then
2180
0
    if (there_are_datasharing_readers_)
2181
0
    {
2182
0
        deliver_sample_to_datasharing(cache_change);
2183
0
    }
2184
2185
0
    if (there_are_remote_readers_)
2186
0
    {
2187
0
        ret_code = deliver_sample_to_network(cache_change, group, locator_selector, max_blocking_time);
2188
0
    }
2189
2190
0
    check_acked_status();
2191
2192
0
    return ret_code;
2193
0
}
2194
2195
#ifdef FASTDDS_STATISTICS
2196
2197
bool StatefulWriter::get_connections(
2198
        fastdds::statistics::rtps::ConnectionList& connection_list)
2199
0
{
2200
0
    connection_list.reserve(matched_local_readers_.size() +
2201
0
            matched_datasharing_readers_.size() +
2202
0
            matched_remote_readers_.size());
2203
2204
0
    fastdds::statistics::Connection connection;
2205
2206
0
    {
2207
0
        std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
2208
2209
        //! intraprocess
2210
0
        for_matched_readers(matched_local_readers_, [&connection, &connection_list](ReaderProxy*& reader)
2211
0
                {
2212
0
                    connection.guid(fastdds::statistics::to_statistics_type(reader->guid()));
2213
0
                    connection.mode(fastdds::statistics::ConnectionMode::INTRAPROCESS);
2214
0
                    connection_list.push_back(connection);
2215
2216
0
                    return false;
2217
0
                });
2218
0
    }
2219
2220
0
    {
2221
0
        std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
2222
2223
        //! datasharing
2224
0
        for_matched_readers(matched_datasharing_readers_, [&connection, &connection_list](ReaderProxy*& reader)
2225
0
                {
2226
0
                    connection.guid(fastdds::statistics::to_statistics_type(reader->guid()));
2227
0
                    connection.mode(fastdds::statistics::ConnectionMode::DATA_SHARING);
2228
0
                    connection_list.push_back(connection);
2229
2230
0
                    return false;
2231
0
                });
2232
0
    }
2233
2234
0
    {
2235
0
        std::unique_lock<RecursiveTimedMutex> lock(mp_mutex);
2236
2237
        //! remote
2238
0
        for_matched_readers(matched_remote_readers_, [&connection, &connection_list](ReaderProxy*& reader)
2239
0
                {
2240
                    //! Announced locators is, for the moment,
2241
                    //! equal to the used_locators
2242
0
                    LocatorSelectorEntry* loc_selector_entry = reader->general_locator_selector_entry();
2243
2244
0
                    connection.announced_locators().reserve(reader->locators_size());
2245
0
                    connection.used_locators().reserve(reader->locators_size());
2246
2247
0
                    std::vector<fastdds::statistics::detail::Locator_s> statistics_locators;
2248
0
                    std::for_each(loc_selector_entry->multicast.begin(), loc_selector_entry->multicast.end(),
2249
0
                    [&statistics_locators](const Locator_t& locator)
2250
0
                    {
2251
0
                        statistics_locators.push_back(fastdds::statistics::to_statistics_type(locator));
2252
0
                    });
2253
2254
0
                    std::for_each(loc_selector_entry->unicast.begin(), loc_selector_entry->unicast.end(),
2255
0
                    [&statistics_locators](const Locator_t& locator)
2256
0
                    {
2257
0
                        statistics_locators.push_back(fastdds::statistics::to_statistics_type(locator));
2258
0
                    });
2259
2260
0
                    connection.guid(fastdds::statistics::to_statistics_type(reader->guid()));
2261
0
                    connection.mode(fastdds::statistics::ConnectionMode::TRANSPORT);
2262
0
                    connection.announced_locators(statistics_locators);
2263
0
                    connection.used_locators(statistics_locators);
2264
0
                    connection_list.push_back(connection);
2265
2266
0
                    return false;
2267
0
                });
2268
0
    }
2269
2270
0
    return true;
2271
2272
0
}
2273
2274
#endif // ifdef FASTDDS_STATISTICS
2275
2276
void StatefulWriter::add_gaps_for_removed_irrelevants(
2277
        ReaderProxy& remoteReaderProxy,
2278
        RTPSMessageGroup& group)
2279
0
{
2280
0
    if (SequenceNumber_t::unknown() != remoteReaderProxy.first_irrelevant_removed())
2281
0
    {
2282
0
        group.add_gap(remoteReaderProxy.first_irrelevant_removed(),
2283
0
                SequenceNumberSet_t(remoteReaderProxy.last_irrelevant_removed() + 1),
2284
0
                remoteReaderProxy.guid());
2285
0
        remoteReaderProxy.reset_irrelevant_removed();
2286
0
    }
2287
0
}
2288
2289
void StatefulWriter::add_gaps_for_holes_in_history(
2290
        RTPSMessageGroup& group)
2291
0
{
2292
0
    SequenceNumber_t firstSeq = get_seq_num_min();
2293
0
    SequenceNumber_t lastSeq = get_seq_num_max();
2294
2295
0
    if (SequenceNumber_t::unknown() != firstSeq &&
2296
0
            lastSeq.to64long() - firstSeq.to64long() + 1 != history_->getHistorySize())
2297
0
    {
2298
0
        RTPSGapBuilder gaps(group);
2299
        // There are holes in the history.
2300
0
        History::const_iterator cit = history_->changesBegin();
2301
0
        SequenceNumber_t prev = (*cit)->sequenceNumber + 1;
2302
0
        ++cit;
2303
0
        while (cit != history_->changesEnd())
2304
0
        {
2305
0
            while (prev != (*cit)->sequenceNumber)
2306
0
            {
2307
0
                gaps.add(prev);
2308
0
                ++prev;
2309
0
            }
2310
0
            ++prev;
2311
0
            ++cit;
2312
0
        }
2313
0
        gaps.flush();
2314
0
    }
2315
0
}
2316
2317
}  // namespace rtps
2318
}  // namespace fastdds
2319
}  // namespace eprosima