/src/Fast-DDS/src/cpp/rtps/writer/StatefulWriter.cpp
Line | Count | Source |
1 | | // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | /** |
16 | | * @file StatefulWriter.cpp |
17 | | * |
18 | | */ |
19 | | |
20 | | #include "StatefulWriter.hpp" |
21 | | |
22 | | #include <mutex> |
23 | | #include <stdexcept> |
24 | | #include <vector> |
25 | | |
26 | | #include <fastdds/dds/log/Log.hpp> |
27 | | #include <fastdds/rtps/builtin/data/SubscriptionBuiltinTopicData.hpp> |
28 | | #include <fastdds/rtps/common/VendorId_t.hpp> |
29 | | #include <fastdds/rtps/history/WriterHistory.hpp> |
30 | | #include <fastdds/rtps/history/WriterHistory.hpp> |
31 | | #include <fastdds/rtps/interfaces/IReaderDataFilter.hpp> |
32 | | #include <rtps/messages/RTPSMessageCreator.hpp> |
33 | | #include <fastdds/rtps/participant/RTPSParticipant.hpp> |
34 | | #include <fastdds/rtps/reader/ReaderDiscoveryStatus.hpp> |
35 | | #include <fastdds/rtps/reader/RTPSReader.hpp> |
36 | | #include <fastdds/rtps/writer/WriterListener.hpp> |
37 | | |
38 | | #include <rtps/builtin/BuiltinProtocols.h> |
39 | | #include <rtps/builtin/liveliness/WLP.hpp> |
40 | | #include <rtps/DataSharing/DataSharingNotifier.hpp> |
41 | | #include <rtps/DataSharing/DataSharingPayloadPool.hpp> |
42 | | #include <rtps/DataSharing/WriterPool.hpp> |
43 | | #include <rtps/history/BasicPayloadPool.hpp> |
44 | | #include <rtps/history/CacheChangePool.h> |
45 | | #include <rtps/messages/RTPSGapBuilder.hpp> |
46 | | #include <rtps/messages/RTPSMessageGroup.hpp> |
47 | | #include <rtps/network/utils/external_locators.hpp> |
48 | | #include <rtps/participant/RTPSParticipantImpl.hpp> |
49 | | #include <rtps/reader/BaseReader.hpp> |
50 | | #include <rtps/reader/LocalReaderPointer.hpp> |
51 | | #include <rtps/resources/ResourceEvent.h> |
52 | | #include <rtps/resources/TimedEvent.h> |
53 | | #include <rtps/domain/RTPSDomainImpl.hpp> |
54 | | #include <rtps/writer/BaseWriter.hpp> |
55 | | #include <rtps/writer/ReaderProxy.hpp> |
56 | | #include <rtps/writer/StatefulWriterListener.hpp> |
57 | | #include <utils/TimeConversion.hpp> |
58 | | |
59 | | #ifdef FASTDDS_STATISTICS |
60 | | #include <statistics/types/monitorservice_types.hpp> |
61 | | #endif // ifdef FASTDDS_STATISTICS |
62 | | |
63 | | #include "../builtin/discovery/database/DiscoveryDataBase.hpp" |
64 | | |
65 | | #include "../flowcontrol/FlowController.hpp" |
66 | | |
67 | | namespace eprosima { |
68 | | namespace fastdds { |
69 | | namespace rtps { |
70 | | |
71 | | /** |
72 | | * Loops over all the readers in the vector, applying the given routine. |
73 | | * The loop continues until the result of the routine is true for any reader |
74 | | * or all readers have been processed. |
75 | | * The returned value is true if the routine returned true at any point, |
76 | | * or false otherwise. |
77 | | */ |
78 | | template<typename T> |
79 | | static bool for_matched_readers( |
80 | | ResourceLimitedVector<ReaderProxy*>& reader_vector_1, |
81 | | T fun) |
82 | 0 | { |
83 | 0 | for (ReaderProxy* remote_reader : reader_vector_1) |
84 | 0 | { |
85 | 0 | if (fun(remote_reader)) |
86 | 0 | { |
87 | 0 | return true; |
88 | 0 | } |
89 | 0 | } |
90 | | |
91 | 0 | return false; |
92 | 0 | } Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::unsent_change_added_to_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::unsent_change_added_to_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::change_removed_by_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::change_removed_by_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_reader_add_edp(eprosima::fastdds::rtps::ReaderProxyData const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::matched_reader_add_edp(eprosima::fastdds::rtps::ReaderProxyData const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_reader_is_matched(eprosima::fastdds::rtps::GUID_t const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::matched_reader_is_matched(eprosima::fastdds::rtps::GUID_t const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_reader_lookup(eprosima::fastdds::rtps::GUID_t&, eprosima::fastdds::rtps::ReaderProxy**)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::matched_reader_lookup(eprosima::fastdds::rtps::GUID_t&, eprosima::fastdds::rtps::ReaderProxy**)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::all_readers_updated()::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::all_readers_updated()::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::wait_for_all_acked(eprosima::fastdds::dds::Time_t const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::wait_for_all_acked(eprosima::fastdds::dds::Time_t const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::check_acked_status()::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::check_acked_status()::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::update_times(eprosima::fastdds::rtps::WriterTimes const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::update_times(eprosima::fastdds::rtps::WriterTimes const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_1>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_1) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::perform_nack_supression(eprosima::fastdds::rtps::GUID_t const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::perform_nack_supression(eprosima::fastdds::rtps::GUID_t const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::process_acknack(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::BitmapRange<eprosima::fastdds::rtps::SequenceNumber_t, eprosima::fastdds::rtps::SequenceNumberDiff, 256u> const&, bool, bool&, std::__1::array<unsigned char, 2ul>)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::process_acknack(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::BitmapRange<eprosima::fastdds::rtps::SequenceNumber_t, eprosima::fastdds::rtps::SequenceNumberDiff, 256u> const&, bool, bool&, std::__1::array<unsigned char, 2ul>)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::process_nack_frag(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::rtps::SequenceNumber_t const&, eprosima::fastdds::BitmapRange<unsigned int, eprosima::fastdds::DiffFunction<unsigned int>, 256u> const&, bool&, std::__1::array<unsigned char, 2ul>)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::process_nack_frag(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::rtps::SequenceNumber_t const&, eprosima::fastdds::BitmapRange<unsigned int, eprosima::fastdds::DiffFunction<unsigned int>, 256u> const&, bool&, std::__1::array<unsigned char, 2ul>)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::ack_timer_expired()::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::ack_timer_expired()::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::get_connections(std::__1::vector<eprosima::fastdds::statistics::Connection, std::__1::allocator<eprosima::fastdds::statistics::Connection> >&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::get_connections(std::__1::vector<eprosima::fastdds::statistics::Connection, std::__1::allocator<eprosima::fastdds::statistics::Connection> >&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::get_connections(std::__1::vector<eprosima::fastdds::statistics::Connection, std::__1::allocator<eprosima::fastdds::statistics::Connection> >&)::$_1>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::get_connections(std::__1::vector<eprosima::fastdds::statistics::Connection, std::__1::allocator<eprosima::fastdds::statistics::Connection> >&)::$_1) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::get_connections(std::__1::vector<eprosima::fastdds::statistics::Connection, std::__1::allocator<eprosima::fastdds::statistics::Connection> >&)::$_2>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::get_connections(std::__1::vector<eprosima::fastdds::statistics::Connection, std::__1::allocator<eprosima::fastdds::statistics::Connection> >&)::$_2) |
93 | | |
94 | | template<typename T> |
95 | | static bool for_matched_readers( |
96 | | ResourceLimitedVector<ReaderProxy*>& reader_vector_1, |
97 | | ResourceLimitedVector<ReaderProxy*>& reader_vector_2, |
98 | | T fun) |
99 | 0 | { |
100 | 0 | if (for_matched_readers(reader_vector_1, fun)) |
101 | 0 | { |
102 | 0 | return true; |
103 | 0 | } |
104 | 0 | return for_matched_readers(reader_vector_2, fun); |
105 | 0 | } Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::unsent_change_added_to_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::unsent_change_added_to_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::change_removed_by_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::change_removed_by_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_reader_add_edp(eprosima::fastdds::rtps::ReaderProxyData const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::matched_reader_add_edp(eprosima::fastdds::rtps::ReaderProxyData const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_reader_is_matched(eprosima::fastdds::rtps::GUID_t const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::matched_reader_is_matched(eprosima::fastdds::rtps::GUID_t const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_reader_lookup(eprosima::fastdds::rtps::GUID_t&, eprosima::fastdds::rtps::ReaderProxy**)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::matched_reader_lookup(eprosima::fastdds::rtps::GUID_t&, eprosima::fastdds::rtps::ReaderProxy**)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::all_readers_updated()::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::all_readers_updated()::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::wait_for_all_acked(eprosima::fastdds::dds::Time_t const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::wait_for_all_acked(eprosima::fastdds::dds::Time_t const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::check_acked_status()::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::check_acked_status()::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::update_times(eprosima::fastdds::rtps::WriterTimes const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::update_times(eprosima::fastdds::rtps::WriterTimes const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_1>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_1) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::perform_nack_supression(eprosima::fastdds::rtps::GUID_t const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::perform_nack_supression(eprosima::fastdds::rtps::GUID_t const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::process_acknack(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::BitmapRange<eprosima::fastdds::rtps::SequenceNumber_t, eprosima::fastdds::rtps::SequenceNumberDiff, 256u> const&, bool, bool&, std::__1::array<unsigned char, 2ul>)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::process_acknack(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::BitmapRange<eprosima::fastdds::rtps::SequenceNumber_t, eprosima::fastdds::rtps::SequenceNumberDiff, 256u> const&, bool, bool&, std::__1::array<unsigned char, 2ul>)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::process_nack_frag(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::rtps::SequenceNumber_t const&, eprosima::fastdds::BitmapRange<unsigned int, eprosima::fastdds::DiffFunction<unsigned int>, 256u> const&, bool&, std::__1::array<unsigned char, 2ul>)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::process_nack_frag(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::rtps::SequenceNumber_t const&, eprosima::fastdds::BitmapRange<unsigned int, eprosima::fastdds::DiffFunction<unsigned int>, 256u> const&, bool&, std::__1::array<unsigned char, 2ul>)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::ack_timer_expired()::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::ack_timer_expired()::$_0) |
106 | | |
107 | | template<typename T> |
108 | | static bool for_matched_readers( |
109 | | ResourceLimitedVector<ReaderProxy*>& reader_vector_1, |
110 | | ResourceLimitedVector<ReaderProxy*>& reader_vector_2, |
111 | | ResourceLimitedVector<ReaderProxy*>& reader_vector_3, |
112 | | T fun) |
113 | 0 | { |
114 | 0 | if (for_matched_readers(reader_vector_1, reader_vector_2, fun)) |
115 | 0 | { |
116 | 0 | return true; |
117 | 0 | } |
118 | 0 | return for_matched_readers(reader_vector_3, fun); |
119 | 0 | } Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::unsent_change_added_to_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::unsent_change_added_to_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::change_removed_by_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::change_removed_by_history(eprosima::fastdds::rtps::CacheChange_t*, std::__1::chrono::time_point<std::__1::chrono::steady_clock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > > const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_reader_add_edp(eprosima::fastdds::rtps::ReaderProxyData const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::matched_reader_add_edp(eprosima::fastdds::rtps::ReaderProxyData const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_reader_is_matched(eprosima::fastdds::rtps::GUID_t const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::matched_reader_is_matched(eprosima::fastdds::rtps::GUID_t const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_reader_lookup(eprosima::fastdds::rtps::GUID_t&, eprosima::fastdds::rtps::ReaderProxy**)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::matched_reader_lookup(eprosima::fastdds::rtps::GUID_t&, eprosima::fastdds::rtps::ReaderProxy**)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::all_readers_updated()::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::all_readers_updated()::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::wait_for_all_acked(eprosima::fastdds::dds::Time_t const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::wait_for_all_acked(eprosima::fastdds::dds::Time_t const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::check_acked_status()::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::check_acked_status()::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::update_times(eprosima::fastdds::rtps::WriterTimes const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::update_times(eprosima::fastdds::rtps::WriterTimes const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_1>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::send_periodic_heartbeat(bool, bool)::$_1) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::perform_nack_supression(eprosima::fastdds::rtps::GUID_t const&)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::perform_nack_supression(eprosima::fastdds::rtps::GUID_t const&)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::process_acknack(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::BitmapRange<eprosima::fastdds::rtps::SequenceNumber_t, eprosima::fastdds::rtps::SequenceNumberDiff, 256u> const&, bool, bool&, std::__1::array<unsigned char, 2ul>)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::process_acknack(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::BitmapRange<eprosima::fastdds::rtps::SequenceNumber_t, eprosima::fastdds::rtps::SequenceNumberDiff, 256u> const&, bool, bool&, std::__1::array<unsigned char, 2ul>)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::process_nack_frag(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::rtps::SequenceNumber_t const&, eprosima::fastdds::BitmapRange<unsigned int, eprosima::fastdds::DiffFunction<unsigned int>, 256u> const&, bool&, std::__1::array<unsigned char, 2ul>)::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::process_nack_frag(eprosima::fastdds::rtps::GUID_t const&, eprosima::fastdds::rtps::GUID_t const&, unsigned int, eprosima::fastdds::rtps::SequenceNumber_t const&, eprosima::fastdds::BitmapRange<unsigned int, eprosima::fastdds::DiffFunction<unsigned int>, 256u> const&, bool&, std::__1::array<unsigned char, 2ul>)::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::ack_timer_expired()::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > >&, eprosima::fastdds::rtps::StatefulWriter::ack_timer_expired()::$_0) |
120 | | |
121 | | /** |
122 | | * Loops over all the readers in the vector, applying the given routine. |
123 | | * The loop continues until the result of the routine is true for any reader |
124 | | * or all readers have been processes. |
125 | | * The returned value is true if the routine returned true at any point, |
126 | | * or false otherwise. |
127 | | * |
128 | | * const version |
129 | | */ |
130 | | template<typename T> |
131 | | static bool for_matched_readers( |
132 | | const ResourceLimitedVector<ReaderProxy*>& reader_vector_1, |
133 | | T fun) |
134 | 0 | { |
135 | 0 | for (const ReaderProxy* remote_reader : reader_vector_1) |
136 | 0 | { |
137 | 0 | if (fun(remote_reader)) |
138 | 0 | { |
139 | 0 | return true; |
140 | 0 | } |
141 | 0 | } |
142 | | |
143 | 0 | return false; |
144 | 0 | } Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::is_acked_by_all_nts(eprosima::fastdds::rtps::SequenceNumber_t) const::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::rtps::StatefulWriter::is_acked_by_all_nts(eprosima::fastdds::rtps::SequenceNumber_t) const::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_readers_guids(std::__1::vector<eprosima::fastdds::rtps::GUID_t, std::__1::allocator<eprosima::fastdds::rtps::GUID_t> >&) const::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::rtps::StatefulWriter::matched_readers_guids(std::__1::vector<eprosima::fastdds::rtps::GUID_t, std::__1::allocator<eprosima::fastdds::rtps::GUID_t> >&) const::$_0) |
145 | | |
146 | | template<typename T> |
147 | | static bool for_matched_readers( |
148 | | const ResourceLimitedVector<ReaderProxy*>& reader_vector_1, |
149 | | const ResourceLimitedVector<ReaderProxy*>& reader_vector_2, |
150 | | T fun) |
151 | 0 | { |
152 | 0 | if (for_matched_readers(reader_vector_1, fun)) |
153 | 0 | { |
154 | 0 | return true; |
155 | 0 | } |
156 | 0 | return for_matched_readers(reader_vector_2, fun); |
157 | 0 | } Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::is_acked_by_all_nts(eprosima::fastdds::rtps::SequenceNumber_t) const::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::rtps::StatefulWriter::is_acked_by_all_nts(eprosima::fastdds::rtps::SequenceNumber_t) const::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_readers_guids(std::__1::vector<eprosima::fastdds::rtps::GUID_t, std::__1::allocator<eprosima::fastdds::rtps::GUID_t> >&) const::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::rtps::StatefulWriter::matched_readers_guids(std::__1::vector<eprosima::fastdds::rtps::GUID_t, std::__1::allocator<eprosima::fastdds::rtps::GUID_t> >&) const::$_0) |
158 | | |
159 | | template<typename T> |
160 | | static bool for_matched_readers( |
161 | | const ResourceLimitedVector<ReaderProxy*>& reader_vector_1, |
162 | | const ResourceLimitedVector<ReaderProxy*>& reader_vector_2, |
163 | | const ResourceLimitedVector<ReaderProxy*>& reader_vector_3, |
164 | | T fun) |
165 | 0 | { |
166 | 0 | if (for_matched_readers(reader_vector_1, reader_vector_2, fun)) |
167 | 0 | { |
168 | 0 | return true; |
169 | 0 | } |
170 | 0 | return for_matched_readers(reader_vector_3, fun); |
171 | 0 | } Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::is_acked_by_all_nts(eprosima::fastdds::rtps::SequenceNumber_t) const::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::rtps::StatefulWriter::is_acked_by_all_nts(eprosima::fastdds::rtps::SequenceNumber_t) const::$_0) Unexecuted instantiation: StatefulWriter.cpp:bool eprosima::fastdds::rtps::for_matched_readers<eprosima::fastdds::rtps::StatefulWriter::matched_readers_guids(std::__1::vector<eprosima::fastdds::rtps::GUID_t, std::__1::allocator<eprosima::fastdds::rtps::GUID_t> >&) const::$_0>(eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::ResourceLimitedVector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::integral_constant<bool, false>, eprosima::fastdds::ResourceLimitedContainerConfig, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*>, std::__1::vector<eprosima::fastdds::rtps::ReaderProxy*, std::__1::allocator<eprosima::fastdds::rtps::ReaderProxy*> > > const&, eprosima::fastdds::rtps::StatefulWriter::matched_readers_guids(std::__1::vector<eprosima::fastdds::rtps::GUID_t, std::__1::allocator<eprosima::fastdds::rtps::GUID_t> >&) const::$_0) |
172 | | |
173 | | using namespace std::chrono; |
174 | | |
175 | | StatefulWriter::StatefulWriter( |
176 | | RTPSParticipantImpl* pimpl, |
177 | | const GUID_t& guid, |
178 | | const WriterAttributes& att, |
179 | | FlowController* flow_controller, |
180 | | WriterHistory* history, |
181 | | WriterListener* listener, |
182 | | StatefulWriterListener* stateful_listener) |
183 | 0 | : BaseWriter(pimpl, guid, att, flow_controller, history, listener) |
184 | 0 | , periodic_hb_event_(nullptr) |
185 | 0 | , nack_response_event_(nullptr) |
186 | 0 | , ack_event_(nullptr) |
187 | 0 | , heartbeat_count_(0) |
188 | 0 | , times_(att.times) |
189 | 0 | , matched_remote_readers_(att.matched_readers_allocation) |
190 | 0 | , matched_readers_pool_(att.matched_readers_allocation) |
191 | 0 | , next_all_acked_notify_sequence_(0, 1) |
192 | 0 | , all_acked_(false) |
193 | 0 | , may_remove_change_cond_() |
194 | 0 | , may_remove_change_(0) |
195 | 0 | , disable_heartbeat_piggyback_(att.disable_heartbeat_piggyback) |
196 | 0 | , disable_positive_acks_(att.disable_positive_acks) |
197 | 0 | , keep_duration_(att.keep_duration) |
198 | 0 | , last_sequence_number_() |
199 | 0 | , biggest_removed_sequence_number_() |
200 | 0 | , matched_local_readers_(att.matched_readers_allocation) |
201 | 0 | , matched_datasharing_readers_(att.matched_readers_allocation) |
202 | 0 | , locator_selector_general_(*this, att.matched_readers_allocation) |
203 | 0 | , locator_selector_async_(*this, att.matched_readers_allocation) |
204 | 0 | , stateful_writer_listener_(stateful_listener) |
205 | 0 | { |
206 | 0 | init(pimpl, att); |
207 | 0 | } |
208 | | |
209 | | void StatefulWriter::init( |
210 | | RTPSParticipantImpl* pimpl, |
211 | | const WriterAttributes& att) |
212 | 0 | { |
213 | 0 | const RTPSParticipantAttributes& part_att = pimpl->get_attributes(); |
214 | |
|
215 | 0 | auto push_mode = PropertyPolicyHelper::find_property(att.endpoint.properties, "fastdds.push_mode"); |
216 | 0 | push_mode_ = !((nullptr != push_mode) && ("false" == *push_mode)); |
217 | |
|
218 | 0 | periodic_hb_event_ = new TimedEvent( |
219 | 0 | pimpl->getEventResource(), |
220 | 0 | [&]() -> bool |
221 | 0 | { |
222 | 0 | return send_periodic_heartbeat(); |
223 | 0 | }, |
224 | 0 | fastdds::rtps::TimeConv::Time_t2MilliSecondsDouble(times_.heartbeat_period)); |
225 | |
|
226 | 0 | nack_response_event_ = new TimedEvent( |
227 | 0 | pimpl->getEventResource(), |
228 | 0 | [&]() -> bool |
229 | 0 | { |
230 | 0 | perform_nack_response(); |
231 | 0 | return false; |
232 | 0 | }, |
233 | 0 | fastdds::rtps::TimeConv::Time_t2MilliSecondsDouble(times_.nack_response_delay)); |
234 | |
|
235 | 0 | if (disable_positive_acks_) |
236 | 0 | { |
237 | 0 | ack_event_ = new TimedEvent( |
238 | 0 | pimpl->getEventResource(), |
239 | 0 | [&]() -> bool |
240 | 0 | { |
241 | 0 | return ack_timer_expired(); |
242 | 0 | }, |
243 | 0 | att.keep_duration.to_ns() * 1e-6); // in milliseconds |
244 | 0 | } |
245 | |
|
246 | 0 | for (size_t n = 0; n < att.matched_readers_allocation.initial; ++n) |
247 | 0 | { |
248 | 0 | ReaderProxy* new_proxy = new ReaderProxy( |
249 | 0 | times_, part_att.allocation.locators, this, stateful_writer_listener_); |
250 | 0 | matched_readers_pool_.push_back(new_proxy); |
251 | 0 | } |
252 | 0 | } |
253 | | |
254 | | StatefulWriter::~StatefulWriter() |
255 | 0 | { |
256 | 0 | EPROSIMA_LOG_INFO(RTPS_WRITER, "StatefulWriter destructor"); |
257 | 0 | } |
258 | | |
259 | | void StatefulWriter::local_actions_on_writer_removed() |
260 | 0 | { |
261 | 0 | EPROSIMA_LOG_INFO(RTPS_WRITER, "StatefulWriter local_actions_on_writer_removed"); |
262 | | |
263 | | // Disable timed events, because their callbacks use cache changes |
264 | 0 | if (disable_positive_acks_) |
265 | 0 | { |
266 | 0 | delete(ack_event_); |
267 | 0 | ack_event_ = nullptr; |
268 | 0 | } |
269 | |
|
270 | 0 | if (nack_response_event_ != nullptr) |
271 | 0 | { |
272 | 0 | delete(nack_response_event_); |
273 | 0 | nack_response_event_ = nullptr; |
274 | 0 | } |
275 | | |
276 | | // This must be the next action, as it frees CacheChange_t from the async thread. |
277 | 0 | BaseWriter::local_actions_on_writer_removed(); |
278 | | |
279 | | // Stop all active proxies and pass them to the pool |
280 | 0 | { |
281 | 0 | std::lock_guard<RecursiveTimedMutex> guard(mp_mutex); |
282 | 0 | while (!matched_remote_readers_.empty()) |
283 | 0 | { |
284 | 0 | ReaderProxy* remote_reader = matched_remote_readers_.back(); |
285 | 0 | matched_remote_readers_.pop_back(); |
286 | 0 | remote_reader->stop(); |
287 | 0 | matched_readers_pool_.push_back(remote_reader); |
288 | 0 | } |
289 | 0 | while (!matched_local_readers_.empty()) |
290 | 0 | { |
291 | 0 | ReaderProxy* remote_reader = matched_local_readers_.back(); |
292 | 0 | matched_local_readers_.pop_back(); |
293 | 0 | remote_reader->stop(); |
294 | 0 | matched_readers_pool_.push_back(remote_reader); |
295 | 0 | } |
296 | 0 | while (!matched_datasharing_readers_.empty()) |
297 | 0 | { |
298 | 0 | ReaderProxy* remote_reader = matched_datasharing_readers_.back(); |
299 | 0 | matched_datasharing_readers_.pop_back(); |
300 | 0 | remote_reader->stop(); |
301 | 0 | matched_readers_pool_.push_back(remote_reader); |
302 | 0 | } |
303 | 0 | } |
304 | | |
305 | | // PeriodicHeartbeatEvent must be released after releasing all proxies |
306 | | // because proxy's NackSuppressionEvent could restart this event. |
307 | 0 | if (periodic_hb_event_ != nullptr) |
308 | 0 | { |
309 | 0 | delete(periodic_hb_event_); |
310 | 0 | periodic_hb_event_ = nullptr; |
311 | 0 | } |
312 | | |
313 | | // Delete all proxies in the pool |
314 | 0 | for (ReaderProxy* remote_reader : matched_readers_pool_) |
315 | 0 | { |
316 | 0 | delete(remote_reader); |
317 | 0 | } |
318 | 0 | } |
319 | | |
320 | | /* |
321 | | * CHANGE-RELATED METHODS |
322 | | */ |
323 | | void StatefulWriter::prepare_datasharing_delivery( |
324 | | CacheChange_t* change) |
325 | 0 | { |
326 | 0 | auto pool = std::dynamic_pointer_cast<WriterPool>(history_->get_payload_pool()); |
327 | 0 | assert (pool != nullptr); |
328 | |
|
329 | 0 | pool->add_to_shared_history(change); |
330 | 0 | EPROSIMA_LOG_INFO(RTPS_WRITER, "Notifying readers of cache change with SN " << change->sequenceNumber); |
331 | 0 | } |
332 | | |
333 | | void StatefulWriter::unsent_change_added_to_history( |
334 | | CacheChange_t* change, |
335 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) |
336 | 0 | { |
337 | 0 | std::lock_guard<RecursiveTimedMutex> guard(mp_mutex); |
338 | 0 | auto payload_length = change->serializedPayload.length; |
339 | |
|
340 | 0 | if (liveliness_lease_duration_ < dds::c_TimeInfinite) |
341 | 0 | { |
342 | 0 | mp_RTPSParticipant->wlp()->assert_liveliness( |
343 | 0 | getGuid(), |
344 | 0 | liveliness_kind_, |
345 | 0 | liveliness_lease_duration_); |
346 | 0 | } |
347 | | |
348 | | // Prepare the metadata for datasharing |
349 | 0 | if (is_datasharing_compatible()) |
350 | 0 | { |
351 | 0 | prepare_datasharing_delivery(change); |
352 | 0 | } |
353 | | |
354 | | // Now for the rest of readers |
355 | 0 | if (!matched_remote_readers_.empty() || !matched_datasharing_readers_.empty() || !matched_local_readers_.empty()) |
356 | 0 | { |
357 | 0 | bool should_be_sent = false; |
358 | 0 | for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, |
359 | 0 | [this, &should_be_sent, &change, &max_blocking_time](ReaderProxy* reader) |
360 | 0 | { |
361 | 0 | ChangeForReader_t changeForReader(change); |
362 | 0 | bool is_relevant = reader->rtps_is_relevant(change); |
363 | |
|
364 | 0 | if (push_mode_ || !reader->is_reliable() || reader->is_local_reader()) |
365 | 0 | { |
366 | | //ChangeForReader_t construct sets status to UNSENT. |
367 | 0 | should_be_sent |= is_relevant; |
368 | 0 | } |
369 | 0 | else |
370 | 0 | { |
371 | 0 | changeForReader.setStatus(UNACKNOWLEDGED); |
372 | 0 | } |
373 | 0 | reader->add_change(changeForReader, is_relevant, false, max_blocking_time); |
374 | |
|
375 | 0 | return false; |
376 | 0 | } |
377 | 0 | ); |
378 | |
|
379 | 0 | if (disable_positive_acks_) |
380 | 0 | { |
381 | 0 | Time_t expiration_ts = change->sourceTimestamp + keep_duration_; |
382 | 0 | Time_t current_ts; |
383 | 0 | Time_t::now(current_ts); |
384 | 0 | assert(expiration_ts >= current_ts); |
385 | 0 | auto interval = (expiration_ts - current_ts).to_duration_t(); |
386 | |
|
387 | 0 | ack_event_->update_interval(interval); |
388 | 0 | ack_event_->restart_timer(max_blocking_time); |
389 | 0 | } |
390 | | |
391 | | // After adding the CacheChange_t to flowcontroller, its pointer cannot be used because it may be removed |
392 | | // internally before exiting the call. For example if the writer matched with a best-effort reader. |
393 | 0 | if (should_be_sent) |
394 | 0 | { |
395 | 0 | flow_controller_->add_new_sample(this, change, max_blocking_time); |
396 | 0 | } |
397 | 0 | else |
398 | 0 | { |
399 | 0 | periodic_hb_event_->restart_timer(max_blocking_time); |
400 | 0 | } |
401 | 0 | } |
402 | 0 | else |
403 | 0 | { |
404 | 0 | EPROSIMA_LOG_INFO(RTPS_WRITER, "No reader proxy to add change."); |
405 | 0 | check_acked_status(); |
406 | 0 | } |
407 | | |
408 | | // Throughput should be notified even if no matches are available |
409 | 0 | on_publish_throughput(payload_length); |
410 | 0 | } |
411 | | |
412 | | bool StatefulWriter::intraprocess_delivery( |
413 | | CacheChange_t* change, |
414 | | ReaderProxy* reader_proxy) |
415 | 0 | { |
416 | 0 | LocalReaderPointer::Instance local_reader = reader_proxy->local_reader(); |
417 | 0 | if (local_reader) |
418 | 0 | { |
419 | 0 | if (change->write_params.related_sample_identity() != SampleIdentity::unknown()) |
420 | 0 | { |
421 | 0 | change->write_params.sample_identity(change->write_params.related_sample_identity()); |
422 | 0 | } |
423 | 0 | return local_reader->process_data_msg(change); |
424 | 0 | } |
425 | 0 | return false; |
426 | 0 | } |
427 | | |
428 | | bool StatefulWriter::intraprocess_gap( |
429 | | ReaderProxy* reader_proxy, |
430 | | const SequenceNumber_t& first_seq, |
431 | | const SequenceNumber_t& last_seq) |
432 | 0 | { |
433 | 0 | LocalReaderPointer::Instance local_reader = reader_proxy->local_reader(); |
434 | 0 | if (local_reader) |
435 | 0 | { |
436 | 0 | return local_reader->process_gap_msg( |
437 | 0 | m_guid, first_seq, SequenceNumberSet_t(last_seq), c_VendorId_eProsima); |
438 | 0 | } |
439 | | |
440 | 0 | return false; |
441 | 0 | } |
442 | | |
443 | | bool StatefulWriter::intraprocess_heartbeat( |
444 | | ReaderProxy* reader_proxy, |
445 | | bool liveliness) |
446 | 0 | { |
447 | 0 | bool returned_value = false; |
448 | 0 | LocalReaderPointer::Instance local_reader = reader_proxy->local_reader(); |
449 | |
|
450 | 0 | if (local_reader) |
451 | 0 | { |
452 | 0 | std::unique_lock<RecursiveTimedMutex> lockW(mp_mutex); |
453 | 0 | SequenceNumber_t first_seq = get_seq_num_min(); |
454 | 0 | SequenceNumber_t last_seq = get_seq_num_max(); |
455 | |
|
456 | 0 | if (first_seq == c_SequenceNumber_Unknown || last_seq == c_SequenceNumber_Unknown) |
457 | 0 | { |
458 | 0 | if (liveliness) |
459 | 0 | { |
460 | 0 | first_seq = next_sequence_number(); |
461 | 0 | last_seq = first_seq - 1; |
462 | 0 | } |
463 | 0 | } |
464 | |
|
465 | 0 | if ((first_seq != c_SequenceNumber_Unknown && last_seq != c_SequenceNumber_Unknown) && |
466 | 0 | (liveliness || reader_proxy->has_changes())) |
467 | 0 | { |
468 | 0 | increment_hb_count(); |
469 | 0 | Count_t hb_count = heartbeat_count_; |
470 | 0 | lockW.unlock(); |
471 | 0 | returned_value = local_reader->process_heartbeat_msg( |
472 | 0 | m_guid, hb_count, first_seq, last_seq, true, liveliness, c_VendorId_eProsima); |
473 | 0 | } |
474 | 0 | } |
475 | |
|
476 | 0 | return returned_value; |
477 | 0 | } |
478 | | |
479 | | bool StatefulWriter::change_removed_by_history( |
480 | | CacheChange_t* a_change, |
481 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) |
482 | 0 | { |
483 | 0 | bool ret_value = false; |
484 | 0 | SequenceNumber_t sequence_number = a_change->sequenceNumber; |
485 | |
|
486 | 0 | std::lock_guard<RecursiveTimedMutex> guard(mp_mutex); |
487 | 0 | EPROSIMA_LOG_INFO(RTPS_WRITER, "Change " << sequence_number << " to be removed."); |
488 | |
|
489 | 0 | if (flow_controller_->remove_change(a_change, max_blocking_time)) |
490 | 0 | { |
491 | | |
492 | | // Take note of biggest removed sequence number to improve sending of gaps |
493 | 0 | if (sequence_number > biggest_removed_sequence_number_) |
494 | 0 | { |
495 | 0 | biggest_removed_sequence_number_ = sequence_number; |
496 | 0 | } |
497 | | |
498 | | // Invalidate CacheChange pointer in ReaderProxies. |
499 | 0 | for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, |
500 | 0 | [sequence_number](ReaderProxy* reader) |
501 | 0 | { |
502 | 0 | reader->change_has_been_removed(sequence_number); |
503 | 0 | return false; |
504 | 0 | } |
505 | 0 | ); |
506 | | |
507 | | // remove from datasharing pool history |
508 | 0 | if (is_datasharing_compatible()) |
509 | 0 | { |
510 | 0 | auto pool = std::dynamic_pointer_cast<WriterPool>(history_->get_payload_pool()); |
511 | 0 | assert (pool != nullptr); |
512 | |
|
513 | 0 | pool->remove_from_shared_history(a_change); |
514 | 0 | EPROSIMA_LOG_INFO(RTPS_WRITER, "Removing shared cache change with SN " << a_change->sequenceNumber); |
515 | 0 | } |
516 | |
|
517 | 0 | may_remove_change_ = 2; |
518 | 0 | may_remove_change_cond_.notify_one(); |
519 | |
|
520 | 0 | ret_value = true; |
521 | 0 | } |
522 | |
|
523 | 0 | return ret_value; |
524 | 0 | } |
525 | | |
526 | | void StatefulWriter::send_heartbeat_to_all_readers( |
527 | | bool force_separating) |
528 | 0 | { |
529 | | // This method is only called from send_periodic_heartbeat |
530 | |
|
531 | 0 | if (separate_sending_enabled_ || force_separating) |
532 | 0 | { |
533 | 0 | for (ReaderProxy* reader : matched_remote_readers_) |
534 | 0 | { |
535 | 0 | send_heartbeat_to_nts(*reader); |
536 | 0 | } |
537 | 0 | } |
538 | 0 | else |
539 | 0 | { |
540 | 0 | for (ReaderProxy* reader : matched_local_readers_) |
541 | 0 | { |
542 | 0 | intraprocess_heartbeat(reader); |
543 | 0 | } |
544 | |
|
545 | 0 | for (ReaderProxy* reader : matched_datasharing_readers_) |
546 | 0 | { |
547 | 0 | reader->datasharing_notify(); |
548 | 0 | } |
549 | |
|
550 | 0 | if (there_are_remote_readers_) |
551 | 0 | { |
552 | 0 | RTPSMessageGroup group(mp_RTPSParticipant, this, &locator_selector_general_); |
553 | 0 | select_all_readers_nts(group, locator_selector_general_); |
554 | |
|
555 | 0 | assert( |
556 | 0 | (SequenceNumber_t::unknown() == get_seq_num_min() && |
557 | 0 | SequenceNumber_t::unknown() == get_seq_num_max()) || |
558 | 0 | (SequenceNumber_t::unknown() != get_seq_num_min() && |
559 | 0 | SequenceNumber_t::unknown() != get_seq_num_max())); |
560 | |
|
561 | 0 | add_gaps_for_holes_in_history(group); |
562 | |
|
563 | 0 | send_heartbeat_nts_(locator_selector_general_.all_remote_readers.size(), group, disable_positive_acks_); |
564 | 0 | } |
565 | 0 | } |
566 | 0 | } |
567 | | |
568 | | void StatefulWriter::deliver_sample_to_intraprocesses( |
569 | | CacheChange_t* change) |
570 | 0 | { |
571 | 0 | for (ReaderProxy* remoteReader : matched_local_readers_) |
572 | 0 | { |
573 | 0 | SequenceNumber_t gap_seq; |
574 | 0 | FragmentNumber_t dummy = 0; |
575 | 0 | bool dumb = false; |
576 | 0 | if (remoteReader->change_is_unsent(change->sequenceNumber, dummy, gap_seq, get_seq_num_min(), dumb)) |
577 | 0 | { |
578 | | // If there is a hole (removed from history or not relevants) between previous sample and this one, |
579 | | // send it a personal GAP. |
580 | 0 | if (SequenceNumber_t::unknown() != gap_seq) |
581 | 0 | { |
582 | 0 | intraprocess_gap(remoteReader, gap_seq, change->sequenceNumber); |
583 | 0 | remoteReader->acked_changes_set(change->sequenceNumber); |
584 | 0 | } |
585 | 0 | bool delivered = intraprocess_delivery(change, remoteReader); |
586 | 0 | if (!remoteReader->is_reliable()) |
587 | 0 | { |
588 | 0 | remoteReader->acked_changes_set(change->sequenceNumber + 1); |
589 | 0 | } |
590 | 0 | else |
591 | 0 | { |
592 | 0 | intraprocess_heartbeat(remoteReader, false); |
593 | 0 | remoteReader->from_unsent_to_status( |
594 | 0 | change->sequenceNumber, |
595 | 0 | delivered ? ACKNOWLEDGED : UNACKNOWLEDGED, |
596 | 0 | false, |
597 | 0 | delivered); |
598 | 0 | } |
599 | 0 | } |
600 | 0 | } |
601 | 0 | } |
602 | | |
603 | | void StatefulWriter::deliver_sample_to_datasharing( |
604 | | CacheChange_t* change) |
605 | 0 | { |
606 | 0 | for (ReaderProxy* remoteReader : matched_datasharing_readers_) |
607 | 0 | { |
608 | 0 | SequenceNumber_t gap_seq; |
609 | 0 | FragmentNumber_t dummy = 0; |
610 | 0 | bool dumb = false; |
611 | 0 | if (remoteReader->change_is_unsent(change->sequenceNumber, dummy, gap_seq, get_seq_num_min(), dumb)) |
612 | 0 | { |
613 | 0 | if (!remoteReader->is_reliable()) |
614 | 0 | { |
615 | 0 | remoteReader->acked_changes_set(change->sequenceNumber + 1); |
616 | 0 | } |
617 | 0 | else |
618 | 0 | { |
619 | 0 | remoteReader->from_unsent_to_status( |
620 | 0 | change->sequenceNumber, |
621 | 0 | UNACKNOWLEDGED, |
622 | 0 | false); |
623 | 0 | } |
624 | 0 | remoteReader->datasharing_notify(); |
625 | 0 | } |
626 | 0 | } |
627 | 0 | } |
628 | | |
629 | | DeliveryRetCode StatefulWriter::deliver_sample_to_network( |
630 | | CacheChange_t* change, |
631 | | RTPSMessageGroup& group, |
632 | | LocatorSelectorSender& locator_selector, // Object locked by FlowControllerImpl |
633 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) |
634 | 0 | { |
635 | 0 | NetworkFactory& network = mp_RTPSParticipant->network_factory(); |
636 | 0 | DeliveryRetCode ret_code = DeliveryRetCode::DELIVERED; |
637 | 0 | uint32_t n_fragments = change->getFragmentCount(); |
638 | 0 | FragmentNumber_t min_unsent_fragment = 0; |
639 | 0 | bool need_reactivate_periodic_heartbeat = false; |
640 | |
|
641 | 0 | while (DeliveryRetCode::DELIVERED == ret_code && |
642 | 0 | min_unsent_fragment != n_fragments + 1) |
643 | 0 | { |
644 | 0 | SequenceNumber_t gap_seq_for_all = SequenceNumber_t::unknown(); |
645 | 0 | locator_selector.locator_selector.reset(false); |
646 | 0 | auto first_relevant_reader = matched_remote_readers_.begin(); |
647 | 0 | bool inline_qos = false; |
648 | 0 | bool should_be_sent = false; |
649 | 0 | min_unsent_fragment = n_fragments + 1; |
650 | |
|
651 | 0 | for (auto remote_reader = first_relevant_reader; remote_reader != matched_remote_readers_.end(); |
652 | 0 | ++remote_reader) |
653 | 0 | { |
654 | 0 | SequenceNumber_t gap_seq; |
655 | 0 | FragmentNumber_t next_unsent_frag = 0; |
656 | |
|
657 | 0 | if (SequenceNumber_t::unknown() != (*remote_reader)->first_irrelevant_removed()) |
658 | 0 | { |
659 | | // Send GAP with irrelevant changes that are not in history. |
660 | 0 | group.sender(this, (*remote_reader)->message_sender()); |
661 | 0 | add_gaps_for_removed_irrelevants(**remote_reader, group); |
662 | 0 | group.sender(this, &locator_selector); // This makes the flush_and_reset(). |
663 | 0 | } |
664 | |
|
665 | 0 | if ((*remote_reader)->change_is_unsent(change->sequenceNumber, next_unsent_frag, gap_seq, get_seq_num_min(), |
666 | 0 | need_reactivate_periodic_heartbeat) && |
667 | 0 | (0 == n_fragments || min_unsent_fragment >= next_unsent_frag)) |
668 | 0 | { |
669 | 0 | if (min_unsent_fragment > next_unsent_frag) |
670 | 0 | { |
671 | 0 | locator_selector.locator_selector.reset(false); |
672 | 0 | first_relevant_reader = remote_reader; |
673 | 0 | min_unsent_fragment = next_unsent_frag; |
674 | 0 | } |
675 | |
|
676 | 0 | (*remote_reader)->active(true); |
677 | 0 | locator_selector.locator_selector.enable((*remote_reader)->guid()); |
678 | 0 | should_be_sent = true; |
679 | 0 | inline_qos |= (*remote_reader)->expects_inline_qos(); |
680 | | |
681 | | // If there is a hole (removed from history or not relevants) between previous sample and this one, |
682 | | // send it a personal GAP. |
683 | 0 | if (SequenceNumber_t::unknown() != gap_seq) |
684 | 0 | { |
685 | 0 | if (SequenceNumber_t::unknown() == gap_seq_for_all) // Calculate if the hole is for all readers |
686 | 0 | { |
687 | 0 | History::const_iterator chit = history_->find_change_nts(change); |
688 | |
|
689 | 0 | if (chit == history_->changesBegin()) |
690 | 0 | { |
691 | 0 | gap_seq_for_all = gap_seq; |
692 | 0 | } |
693 | 0 | else |
694 | 0 | { |
695 | 0 | SequenceNumber_t prev = (*std::prev(chit))->sequenceNumber + 1; |
696 | |
|
697 | 0 | if (prev == gap_seq) |
698 | 0 | { |
699 | 0 | gap_seq_for_all = gap_seq; |
700 | 0 | } |
701 | 0 | } |
702 | 0 | } |
703 | |
|
704 | 0 | if (gap_seq_for_all != gap_seq) // If it is an individual GAP, sent it to repective reader. |
705 | 0 | { |
706 | 0 | group.sender(this, (*remote_reader)->message_sender()); |
707 | 0 | group.add_gap(gap_seq, SequenceNumberSet_t(change->sequenceNumber), |
708 | 0 | (*remote_reader)->guid()); |
709 | 0 | send_heartbeat_nts_(1u, group, disable_positive_acks_); |
710 | 0 | group.sender(this, &locator_selector); // This makes the flush_and_reset(). |
711 | 0 | } |
712 | 0 | } |
713 | 0 | } |
714 | 0 | else |
715 | 0 | { |
716 | 0 | (*remote_reader)->active(false); |
717 | 0 | } |
718 | 0 | } |
719 | |
|
720 | 0 | bool should_send_global_gap = SequenceNumber_t::unknown() != gap_seq_for_all; |
721 | |
|
722 | 0 | if (locator_selector.locator_selector.state_has_changed() && |
723 | 0 | ((should_be_sent && !separate_sending_enabled_) || should_send_global_gap)) |
724 | 0 | { |
725 | 0 | group.flush_and_reset(); |
726 | 0 | network.select_locators(locator_selector.locator_selector); |
727 | 0 | compute_selected_guids(locator_selector); |
728 | 0 | } |
729 | |
|
730 | 0 | if (should_send_global_gap) // Send GAP for all readers |
731 | 0 | { |
732 | 0 | group.add_gap(gap_seq_for_all, SequenceNumberSet_t(change->sequenceNumber)); |
733 | 0 | } |
734 | |
|
735 | 0 | try |
736 | 0 | { |
737 | 0 | if (should_be_sent) |
738 | 0 | { |
739 | 0 | if (!separate_sending_enabled_) |
740 | 0 | { |
741 | 0 | size_t num_locators = locator_selector.locator_selector.selected_size(); |
742 | 0 | if (num_locators > 0) |
743 | 0 | { |
744 | 0 | if (0 < n_fragments) |
745 | 0 | { |
746 | 0 | if (min_unsent_fragment != n_fragments + 1) |
747 | 0 | { |
748 | 0 | if (group.add_data_frag(*change, min_unsent_fragment, inline_qos)) |
749 | 0 | { |
750 | 0 | for (auto remote_reader = first_relevant_reader; |
751 | 0 | remote_reader != matched_remote_readers_.end(); |
752 | 0 | ++remote_reader) |
753 | 0 | { |
754 | 0 | if ((*remote_reader)->active()) |
755 | 0 | { |
756 | 0 | bool allFragmentsSent = false; |
757 | 0 | (*remote_reader)->mark_fragment_as_sent_for_change( |
758 | 0 | change->sequenceNumber, |
759 | 0 | min_unsent_fragment, |
760 | 0 | allFragmentsSent); |
761 | |
|
762 | 0 | if (allFragmentsSent) |
763 | 0 | { |
764 | 0 | if (!(*remote_reader)->is_reliable()) |
765 | 0 | { |
766 | 0 | (*remote_reader)->acked_changes_set(change->sequenceNumber + 1); |
767 | 0 | } |
768 | 0 | else |
769 | 0 | { |
770 | 0 | (*remote_reader)->from_unsent_to_status( |
771 | 0 | change->sequenceNumber, |
772 | 0 | UNDERWAY, |
773 | 0 | true); |
774 | 0 | } |
775 | 0 | } |
776 | 0 | } |
777 | 0 | } |
778 | 0 | add_statistics_sent_submessage(change, num_locators); |
779 | 0 | } |
780 | 0 | else |
781 | 0 | { |
782 | 0 | ret_code = DeliveryRetCode::NOT_DELIVERED; |
783 | 0 | } |
784 | 0 | } |
785 | 0 | } |
786 | 0 | else |
787 | 0 | { |
788 | 0 | if (group.add_data(*change, inline_qos)) |
789 | 0 | { |
790 | 0 | for (auto remote_reader = first_relevant_reader; |
791 | 0 | remote_reader != matched_remote_readers_.end(); |
792 | 0 | ++remote_reader) |
793 | 0 | { |
794 | 0 | if ((*remote_reader)->active()) |
795 | 0 | { |
796 | 0 | if (!(*remote_reader)->is_reliable()) |
797 | 0 | { |
798 | 0 | (*remote_reader)->acked_changes_set(change->sequenceNumber + 1); |
799 | 0 | } |
800 | 0 | else |
801 | 0 | { |
802 | 0 | (*remote_reader)->from_unsent_to_status( |
803 | 0 | change->sequenceNumber, |
804 | 0 | UNDERWAY, |
805 | 0 | true); |
806 | 0 | } |
807 | 0 | } |
808 | 0 | } |
809 | 0 | add_statistics_sent_submessage(change, num_locators); |
810 | 0 | } |
811 | 0 | else |
812 | 0 | { |
813 | 0 | ret_code = DeliveryRetCode::NOT_DELIVERED; |
814 | 0 | } |
815 | 0 | } |
816 | |
|
817 | 0 | send_heartbeat_piggyback_nts_(group, locator_selector); |
818 | 0 | } |
819 | 0 | } |
820 | 0 | else |
821 | 0 | { |
822 | 0 | for (auto remote_reader = first_relevant_reader; |
823 | 0 | remote_reader != matched_remote_readers_.end(); |
824 | 0 | ++remote_reader) |
825 | 0 | { |
826 | 0 | if ((*remote_reader)->active()) |
827 | 0 | { |
828 | 0 | group.sender(this, (*remote_reader)->message_sender()); |
829 | |
|
830 | 0 | if (0 < n_fragments) |
831 | 0 | { |
832 | 0 | if (min_unsent_fragment != n_fragments + 1) |
833 | 0 | { |
834 | 0 | if (group.add_data_frag(*change, min_unsent_fragment, inline_qos)) |
835 | 0 | { |
836 | 0 | bool allFragmentsSent = false; |
837 | 0 | (*remote_reader)->mark_fragment_as_sent_for_change( |
838 | 0 | change->sequenceNumber, |
839 | 0 | min_unsent_fragment, |
840 | 0 | allFragmentsSent); |
841 | |
|
842 | 0 | if (allFragmentsSent) |
843 | 0 | { |
844 | 0 | if (!(*remote_reader)->is_reliable()) |
845 | 0 | { |
846 | 0 | (*remote_reader)->acked_changes_set(change->sequenceNumber + 1); |
847 | 0 | } |
848 | 0 | else |
849 | 0 | { |
850 | 0 | (*remote_reader)->from_unsent_to_status( |
851 | 0 | change->sequenceNumber, |
852 | 0 | UNDERWAY, |
853 | 0 | true); |
854 | 0 | } |
855 | 0 | } |
856 | 0 | add_statistics_sent_submessage(change, (*remote_reader)->locators_size()); |
857 | 0 | } |
858 | 0 | else |
859 | 0 | { |
860 | 0 | ret_code = DeliveryRetCode::NOT_DELIVERED; |
861 | 0 | } |
862 | 0 | } |
863 | 0 | } |
864 | 0 | else |
865 | 0 | { |
866 | 0 | if (group.add_data(*change, (*remote_reader)->expects_inline_qos())) |
867 | 0 | { |
868 | 0 | if (!(*remote_reader)->is_reliable()) |
869 | 0 | { |
870 | 0 | (*remote_reader)->acked_changes_set(change->sequenceNumber + 1); |
871 | 0 | } |
872 | 0 | else |
873 | 0 | { |
874 | 0 | (*remote_reader)->from_unsent_to_status( |
875 | 0 | change->sequenceNumber, |
876 | 0 | UNDERWAY, |
877 | 0 | true); |
878 | 0 | } |
879 | 0 | add_statistics_sent_submessage(change, (*remote_reader)->locators_size()); |
880 | 0 | } |
881 | 0 | else |
882 | 0 | { |
883 | 0 | EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error sending change " << change->sequenceNumber); |
884 | 0 | ret_code = DeliveryRetCode::NOT_DELIVERED; |
885 | 0 | } |
886 | 0 | } |
887 | |
|
888 | 0 | send_heartbeat_nts_(1u, group, false); |
889 | 0 | } |
890 | 0 | } |
891 | 0 | } |
892 | |
|
893 | 0 | on_sample_datas(change->write_params.sample_identity(), change->writer_info.num_sent_submessages); |
894 | 0 | on_data_sent(); |
895 | 0 | } |
896 | 0 | } |
897 | 0 | catch (const RTPSMessageGroup::timeout&) |
898 | 0 | { |
899 | 0 | EPROSIMA_LOG_ERROR(RTPS_WRITER, "Max blocking time reached"); |
900 | 0 | ret_code = DeliveryRetCode::NOT_DELIVERED; |
901 | 0 | } |
902 | 0 | catch (const RTPSMessageGroup::limit_exceeded&) |
903 | 0 | { |
904 | 0 | ret_code = DeliveryRetCode::EXCEEDED_LIMIT; |
905 | 0 | } |
906 | |
|
907 | 0 | if (disable_positive_acks_ && last_sequence_number_ == SequenceNumber_t()) |
908 | 0 | { |
909 | 0 | last_sequence_number_ = change->sequenceNumber; |
910 | 0 | if ( !(ack_event_->getRemainingTimeMilliSec() > 0)) |
911 | 0 | { |
912 | | // Restart ack_timer |
913 | 0 | Time_t expiration_ts = change->sourceTimestamp + keep_duration_; |
914 | 0 | Time_t current_ts; |
915 | 0 | Time_t::now(current_ts); |
916 | 0 | assert(expiration_ts >= current_ts); |
917 | 0 | auto interval = (expiration_ts - current_ts).to_duration_t(); |
918 | |
|
919 | 0 | ack_event_->update_interval(interval); |
920 | 0 | ack_event_->restart_timer(max_blocking_time); |
921 | 0 | } |
922 | 0 | } |
923 | | |
924 | | // Restore in case a exception was launched by RTPSMessageGroup. |
925 | 0 | group.sender(this, &locator_selector); |
926 | |
|
927 | 0 | } |
928 | |
|
929 | 0 | if (need_reactivate_periodic_heartbeat) |
930 | 0 | { |
931 | 0 | periodic_hb_event_->restart_timer(max_blocking_time); |
932 | 0 | } |
933 | |
|
934 | 0 | return ret_code; |
935 | 0 | } |
936 | | |
937 | | /* |
938 | | * MATCHED_READER-RELATED METHODS |
939 | | */ |
940 | | void StatefulWriter::update_reader_info( |
941 | | LocatorSelectorSender& locator_selector, |
942 | | bool create_sender_resources) |
943 | 0 | { |
944 | 0 | update_cached_info_nts(locator_selector); |
945 | 0 | compute_selected_guids(locator_selector); |
946 | |
|
947 | 0 | if (create_sender_resources) |
948 | 0 | { |
949 | 0 | RTPSParticipantImpl* part = get_participant_impl(); |
950 | 0 | locator_selector.locator_selector.for_each([part](const Locator_t& loc) |
951 | 0 | { |
952 | 0 | part->createSenderResources(loc); |
953 | 0 | }); |
954 | 0 | } |
955 | | |
956 | | // Check if we have local or remote readers |
957 | 0 | there_are_remote_readers_ = !matched_remote_readers_.empty(); |
958 | 0 | there_are_local_readers_ = !matched_local_readers_.empty(); |
959 | 0 | there_are_datasharing_readers_ = !matched_datasharing_readers_.empty(); |
960 | 0 | } |
961 | | |
962 | | void StatefulWriter::select_all_readers_nts( |
963 | | RTPSMessageGroup& group, |
964 | | LocatorSelectorSender& locator_selector) |
965 | 0 | { |
966 | 0 | locator_selector.locator_selector.reset(true); |
967 | 0 | if (locator_selector.locator_selector.state_has_changed()) |
968 | 0 | { |
969 | 0 | group.flush_and_reset(); |
970 | 0 | mp_RTPSParticipant->network_factory().select_locators(locator_selector.locator_selector); |
971 | 0 | compute_selected_guids(locator_selector); |
972 | 0 | } |
973 | 0 | } |
974 | | |
975 | | bool StatefulWriter::matched_reader_add_edp( |
976 | | const ReaderProxyData& rdata) |
977 | 0 | { |
978 | 0 | using network::external_locators::filter_remote_locators; |
979 | |
|
980 | 0 | if (rdata.guid == c_Guid_Unknown) |
981 | 0 | { |
982 | 0 | EPROSIMA_LOG_ERROR(RTPS_WRITER, "Reliable Writer need GUID_t of matched readers"); |
983 | 0 | return false; |
984 | 0 | } |
985 | | |
986 | 0 | std::unique_lock<RecursiveTimedMutex> guard(mp_mutex); |
987 | 0 | std::unique_lock<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_); |
988 | 0 | std::unique_lock<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_); |
989 | | |
990 | | // Check if it is already matched. |
991 | 0 | if (for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, |
992 | 0 | [this, &rdata](ReaderProxy* reader) |
993 | 0 | { |
994 | 0 | if (reader->guid() == rdata.guid) |
995 | 0 | { |
996 | 0 | EPROSIMA_LOG_INFO(RTPS_WRITER, "Attempting to add existing reader, updating information."); |
997 | 0 | if (reader->update(rdata)) |
998 | 0 | { |
999 | 0 | filter_remote_locators(*reader->general_locator_selector_entry(), |
1000 | 0 | m_att.external_unicast_locators, m_att.ignore_non_matching_locators); |
1001 | 0 | filter_remote_locators(*reader->async_locator_selector_entry(), |
1002 | 0 | m_att.external_unicast_locators, m_att.ignore_non_matching_locators); |
1003 | 0 | mp_RTPSParticipant->createSenderResources(rdata.remote_locators, m_att); |
1004 | 0 | update_reader_info(locator_selector_general_, true); |
1005 | 0 | update_reader_info(locator_selector_async_, true); |
1006 | 0 | } |
1007 | 0 | return true; |
1008 | 0 | } |
1009 | 0 | return false; |
1010 | 0 | })) |
1011 | 0 | { |
1012 | 0 | if (nullptr != listener_) |
1013 | 0 | { |
1014 | | // call the listener without locks taken |
1015 | 0 | guard_locator_selector_async.unlock(); |
1016 | 0 | guard_locator_selector_general.unlock(); |
1017 | 0 | guard.unlock(); |
1018 | |
|
1019 | 0 | listener_->on_reader_discovery(this, ReaderDiscoveryStatus::CHANGED_QOS_READER, rdata.guid, &rdata); |
1020 | 0 | } |
1021 | |
|
1022 | 0 | #ifdef FASTDDS_STATISTICS |
1023 | | // notify monitor service so that the connectionlist for this entity |
1024 | | // could be updated |
1025 | 0 | if (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()) |
1026 | 0 | { |
1027 | 0 | mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid); |
1028 | 0 | } |
1029 | 0 | #endif //FASTDDS_STATISTICS |
1030 | |
|
1031 | 0 | return false; |
1032 | 0 | } |
1033 | | |
1034 | | // Get a reader proxy from the inactive pool (or create a new one if necessary and allowed) |
1035 | 0 | ReaderProxy* rp = nullptr; |
1036 | 0 | if (matched_readers_pool_.empty()) |
1037 | 0 | { |
1038 | 0 | size_t max_readers = matched_readers_pool_.max_size(); |
1039 | 0 | if (get_matched_readers_size() + matched_readers_pool_.size() < max_readers) |
1040 | 0 | { |
1041 | 0 | const RTPSParticipantAttributes& part_att = mp_RTPSParticipant->get_attributes(); |
1042 | 0 | rp = new ReaderProxy(times_, part_att.allocation.locators, this, stateful_writer_listener_); |
1043 | 0 | } |
1044 | 0 | else |
1045 | 0 | { |
1046 | 0 | EPROSIMA_LOG_WARNING(RTPS_WRITER, "Maximum number of reader proxies (" << max_readers |
1047 | 0 | << ") reached for writer " |
1048 | 0 | << m_guid); |
1049 | 0 | return false; |
1050 | 0 | } |
1051 | 0 | } |
1052 | 0 | else |
1053 | 0 | { |
1054 | 0 | rp = matched_readers_pool_.back(); |
1055 | 0 | matched_readers_pool_.pop_back(); |
1056 | 0 | } |
1057 | | |
1058 | | // Add info of new datareader. |
1059 | 0 | rp->start(rdata, is_datasharing_compatible_with(rdata.data_sharing)); |
1060 | 0 | filter_remote_locators(*rp->general_locator_selector_entry(), |
1061 | 0 | m_att.external_unicast_locators, m_att.ignore_non_matching_locators); |
1062 | 0 | filter_remote_locators(*rp->async_locator_selector_entry(), |
1063 | 0 | m_att.external_unicast_locators, m_att.ignore_non_matching_locators); |
1064 | 0 | locator_selector_general_.locator_selector.add_entry(rp->general_locator_selector_entry()); |
1065 | 0 | locator_selector_async_.locator_selector.add_entry(rp->async_locator_selector_entry()); |
1066 | |
|
1067 | 0 | if (rp->is_local_reader()) |
1068 | 0 | { |
1069 | 0 | matched_local_readers_.push_back(rp); |
1070 | 0 | EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << rdata.guid << " to " << this->m_guid.entityId |
1071 | 0 | << " as local reader"); |
1072 | 0 | } |
1073 | 0 | else |
1074 | 0 | { |
1075 | 0 | if (rp->is_datasharing_reader()) |
1076 | 0 | { |
1077 | 0 | matched_datasharing_readers_.push_back(rp); |
1078 | 0 | EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << rdata.guid << " to " << this->m_guid.entityId |
1079 | 0 | << " as data sharing"); |
1080 | 0 | } |
1081 | 0 | else |
1082 | 0 | { |
1083 | 0 | matched_remote_readers_.push_back(rp); |
1084 | 0 | EPROSIMA_LOG_INFO(RTPS_WRITER, "Adding reader " << rdata.guid << " to " << this->m_guid.entityId |
1085 | 0 | << " as remote reader"); |
1086 | 0 | } |
1087 | 0 | } |
1088 | |
|
1089 | 0 | mp_RTPSParticipant->createSenderResources(rdata.remote_locators, m_att); |
1090 | 0 | update_reader_info(locator_selector_general_, true); |
1091 | 0 | update_reader_info(locator_selector_async_, true); |
1092 | |
|
1093 | 0 | if (rp->is_datasharing_reader()) |
1094 | 0 | { |
1095 | 0 | if (nullptr != listener_) |
1096 | 0 | { |
1097 | | // call the listener without locks taken |
1098 | 0 | guard_locator_selector_async.unlock(); |
1099 | 0 | guard_locator_selector_general.unlock(); |
1100 | 0 | guard.unlock(); |
1101 | |
|
1102 | 0 | listener_->on_reader_discovery(this, ReaderDiscoveryStatus::DISCOVERED_READER, rdata.guid, &rdata); |
1103 | 0 | } |
1104 | |
|
1105 | 0 | #ifdef FASTDDS_STATISTICS |
1106 | | // notify monitor service so that the connectionlist for this entity |
1107 | | // could be updated |
1108 | 0 | if (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()) |
1109 | 0 | { |
1110 | 0 | mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid); |
1111 | 0 | } |
1112 | 0 | #endif //FASTDDS_STATISTICS |
1113 | |
|
1114 | 0 | return true; |
1115 | 0 | } |
1116 | | |
1117 | 0 | bool is_reliable = rp->is_reliable(); |
1118 | 0 | if (is_reliable) |
1119 | 0 | { |
1120 | 0 | SequenceNumber_t min_seq = get_seq_num_min(); |
1121 | 0 | SequenceNumber_t last_seq = get_seq_num_max(); |
1122 | 0 | RTPSMessageGroup group(mp_RTPSParticipant, this, rp->message_sender()); |
1123 | | |
1124 | | // History not empty |
1125 | 0 | if (min_seq != SequenceNumber_t::unknown()) |
1126 | 0 | { |
1127 | 0 | (void)last_seq; |
1128 | 0 | assert(last_seq != SequenceNumber_t::unknown()); |
1129 | 0 | assert(min_seq <= last_seq); |
1130 | |
|
1131 | 0 | try |
1132 | 0 | { |
1133 | | // Late-joiner |
1134 | 0 | if (TRANSIENT_LOCAL <= rp->durability_kind() && |
1135 | 0 | TRANSIENT_LOCAL <= m_att.durabilityKind) |
1136 | 0 | { |
1137 | 0 | for (History::iterator cit = history_->changesBegin(); cit != history_->changesEnd(); ++cit) |
1138 | 0 | { |
1139 | | // Holes are managed when deliver_sample(), sending GAP messages. |
1140 | 0 | if (rp->rtps_is_relevant(*cit)) |
1141 | 0 | { |
1142 | 0 | ChangeForReader_t changeForReader(*cit); |
1143 | | |
1144 | | // If it is local, maintain in UNSENT status and add to flow controller. |
1145 | 0 | if (rp->is_local_reader()) |
1146 | 0 | { |
1147 | 0 | flow_controller_->add_old_sample(this, *cit); |
1148 | 0 | } |
1149 | | // In other case, set as UNACKNOWLEDGED and expects the reader request them. |
1150 | 0 | else |
1151 | 0 | { |
1152 | 0 | changeForReader.setStatus(UNACKNOWLEDGED); |
1153 | 0 | } |
1154 | |
|
1155 | 0 | rp->add_change(changeForReader, true, false); |
1156 | 0 | } |
1157 | 0 | } |
1158 | 0 | } |
1159 | 0 | else |
1160 | 0 | { |
1161 | 0 | if (rp->is_local_reader()) |
1162 | 0 | { |
1163 | 0 | intraprocess_gap(rp, min_seq, history_->next_sequence_number()); |
1164 | 0 | } |
1165 | 0 | else |
1166 | 0 | { |
1167 | | // Send a GAP of the whole history. |
1168 | 0 | group.add_gap(min_seq, SequenceNumberSet_t(history_->next_sequence_number()), rp->guid()); |
1169 | 0 | } |
1170 | 0 | } |
1171 | | |
1172 | | // Always activate heartbeat period. We need a confirmation of the reader. |
1173 | | // The state has to be updated. |
1174 | 0 | periodic_hb_event_->restart_timer(std::chrono::steady_clock::now() + std::chrono::hours(24)); |
1175 | 0 | } |
1176 | 0 | catch (const RTPSMessageGroup::timeout&) |
1177 | 0 | { |
1178 | 0 | EPROSIMA_LOG_ERROR(RTPS_WRITER, "Max blocking time reached"); |
1179 | 0 | } |
1180 | 0 | } |
1181 | |
|
1182 | 0 | if (rp->is_local_reader()) |
1183 | 0 | { |
1184 | 0 | intraprocess_heartbeat(rp); |
1185 | 0 | } |
1186 | 0 | else |
1187 | 0 | { |
1188 | 0 | send_heartbeat_nts_(1u, group, disable_positive_acks_); |
1189 | 0 | group.flush_and_reset(); |
1190 | 0 | } |
1191 | 0 | } |
1192 | 0 | else |
1193 | 0 | { |
1194 | | // Acknowledged all for best-effort reader. |
1195 | 0 | rp->acked_changes_set(history_->next_sequence_number()); |
1196 | 0 | } |
1197 | |
|
1198 | 0 | EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy " << rp->guid() << " added to " << this->m_guid.entityId << " with " |
1199 | 0 | << rdata.remote_locators.unicast.size() << "(u)-" |
1200 | 0 | << rdata.remote_locators.multicast.size() |
1201 | 0 | << "(m) locators"); |
1202 | |
|
1203 | 0 | if (nullptr != listener_) |
1204 | 0 | { |
1205 | | // call the listener without locks taken |
1206 | 0 | guard_locator_selector_async.unlock(); |
1207 | 0 | guard_locator_selector_general.unlock(); |
1208 | 0 | guard.unlock(); |
1209 | |
|
1210 | 0 | listener_->on_reader_discovery(this, ReaderDiscoveryStatus::DISCOVERED_READER, rdata.guid, &rdata); |
1211 | 0 | } |
1212 | |
|
1213 | 0 | #ifdef FASTDDS_STATISTICS |
1214 | | // notify monitor service so that the connectionlist for this entity |
1215 | | // could be updated |
1216 | 0 | if (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()) |
1217 | 0 | { |
1218 | 0 | mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid); |
1219 | 0 | } |
1220 | 0 | #endif //FASTDDS_STATISTICS |
1221 | |
|
1222 | 0 | return true; |
1223 | 0 | } |
1224 | | |
1225 | | bool StatefulWriter::matched_reader_remove( |
1226 | | const GUID_t& reader_guid) |
1227 | 0 | { |
1228 | 0 | ReaderProxy* rproxy = nullptr; |
1229 | 0 | std::unique_lock<RecursiveTimedMutex> lock(mp_mutex); |
1230 | |
|
1231 | 0 | { |
1232 | 0 | std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_); |
1233 | 0 | std::lock_guard<LocatorSelectorSender> guard_locator_selector_async(locator_selector_async_); |
1234 | |
|
1235 | 0 | for (ReaderProxyIterator it = matched_local_readers_.begin(); |
1236 | 0 | it != matched_local_readers_.end(); ++it) |
1237 | 0 | { |
1238 | 0 | if ((*it)->guid() == reader_guid) |
1239 | 0 | { |
1240 | 0 | EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid); |
1241 | 0 | rproxy = std::move(*it); |
1242 | 0 | it = matched_local_readers_.erase(it); |
1243 | 0 | break; |
1244 | 0 | } |
1245 | 0 | } |
1246 | |
|
1247 | 0 | if (rproxy == nullptr) |
1248 | 0 | { |
1249 | 0 | for (ReaderProxyIterator it = matched_datasharing_readers_.begin(); |
1250 | 0 | it != matched_datasharing_readers_.end(); ++it) |
1251 | 0 | { |
1252 | 0 | if ((*it)->guid() == reader_guid) |
1253 | 0 | { |
1254 | 0 | EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid); |
1255 | 0 | rproxy = std::move(*it); |
1256 | 0 | it = matched_datasharing_readers_.erase(it); |
1257 | 0 | break; |
1258 | 0 | } |
1259 | 0 | } |
1260 | 0 | } |
1261 | |
|
1262 | 0 | if (rproxy == nullptr) |
1263 | 0 | { |
1264 | 0 | for (ReaderProxyIterator it = matched_remote_readers_.begin(); |
1265 | 0 | it != matched_remote_readers_.end(); ++it) |
1266 | 0 | { |
1267 | 0 | if ((*it)->guid() == reader_guid) |
1268 | 0 | { |
1269 | 0 | EPROSIMA_LOG_INFO(RTPS_WRITER, "Reader Proxy removed: " << reader_guid); |
1270 | 0 | rproxy = std::move(*it); |
1271 | 0 | it = matched_remote_readers_.erase(it); |
1272 | 0 | break; |
1273 | 0 | } |
1274 | 0 | } |
1275 | 0 | } |
1276 | |
|
1277 | 0 | locator_selector_general_.locator_selector.remove_entry(reader_guid); |
1278 | 0 | locator_selector_async_.locator_selector.remove_entry(reader_guid); |
1279 | 0 | update_reader_info(locator_selector_general_, false); |
1280 | 0 | update_reader_info(locator_selector_async_, false); |
1281 | 0 | } |
1282 | |
|
1283 | 0 | if (get_matched_readers_size() == 0) |
1284 | 0 | { |
1285 | 0 | periodic_hb_event_->cancel_timer(); |
1286 | 0 | } |
1287 | |
|
1288 | 0 | if (rproxy != nullptr) |
1289 | 0 | { |
1290 | 0 | rproxy->stop(); |
1291 | 0 | matched_readers_pool_.push_back(rproxy); |
1292 | |
|
1293 | 0 | check_acked_status(); |
1294 | |
|
1295 | 0 | if (nullptr != listener_) |
1296 | 0 | { |
1297 | | // listener is called without locks taken |
1298 | 0 | lock.unlock(); |
1299 | 0 | listener_->on_reader_discovery(this, ReaderDiscoveryStatus::REMOVED_READER, reader_guid, nullptr); |
1300 | 0 | } |
1301 | |
|
1302 | 0 | #ifdef FASTDDS_STATISTICS |
1303 | | // notify monitor service so that the connectionlist for this entity |
1304 | | // could be updated |
1305 | 0 | if (nullptr != mp_RTPSParticipant->get_connections_observer() && !m_guid.is_builtin()) |
1306 | 0 | { |
1307 | 0 | mp_RTPSParticipant->get_connections_observer()->on_local_entity_connections_change(m_guid); |
1308 | 0 | } |
1309 | 0 | #endif //FASTDDS_STATISTICS |
1310 | |
|
1311 | 0 | return true; |
1312 | 0 | } |
1313 | | |
1314 | 0 | EPROSIMA_LOG_INFO(RTPS_HISTORY, "Reader Proxy doesn't exist in this writer"); |
1315 | 0 | return false; |
1316 | 0 | } |
1317 | | |
1318 | | bool StatefulWriter::matched_reader_is_matched( |
1319 | | const GUID_t& reader_guid) |
1320 | 0 | { |
1321 | 0 | std::lock_guard<RecursiveTimedMutex> guard(mp_mutex); |
1322 | 0 | return for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, |
1323 | 0 | [&reader_guid](ReaderProxy* reader) |
1324 | 0 | { |
1325 | 0 | return (reader->guid() == reader_guid); |
1326 | 0 | } |
1327 | 0 | ); |
1328 | 0 | } |
1329 | | |
1330 | | bool StatefulWriter::matched_reader_lookup( |
1331 | | GUID_t& readerGuid, |
1332 | | ReaderProxy** RP) |
1333 | 0 | { |
1334 | 0 | std::lock_guard<RecursiveTimedMutex> guard(mp_mutex); |
1335 | 0 | return for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, |
1336 | 0 | [&readerGuid, RP](ReaderProxy* reader) |
1337 | 0 | { |
1338 | 0 | if (reader->guid() == readerGuid) |
1339 | 0 | { |
1340 | 0 | *RP = reader; |
1341 | 0 | return true; |
1342 | 0 | } |
1343 | 0 | return false; |
1344 | 0 | } |
1345 | 0 | ); |
1346 | 0 | } |
1347 | | |
1348 | | bool StatefulWriter::has_been_fully_delivered( |
1349 | | const SequenceNumber_t& seq_num) const |
1350 | 0 | { |
1351 | 0 | std::lock_guard<RecursiveTimedMutex> guard(mp_mutex); |
1352 | 0 | bool found = false; |
1353 | | // Sequence number has not been generated by this WriterHistory. |
1354 | 0 | if (seq_num >= history_->next_sequence_number()) |
1355 | 0 | { |
1356 | 0 | return false; |
1357 | 0 | } |
1358 | 0 | for (auto reader : matched_remote_readers_) |
1359 | 0 | { |
1360 | 0 | bool ret_code = reader->has_been_delivered(seq_num, found); |
1361 | 0 | if (found && !ret_code) |
1362 | 0 | { |
1363 | | // The change has not been fully delivered if it is pending delivery on at least one ReaderProxy. |
1364 | 0 | return false; |
1365 | 0 | } |
1366 | 0 | } |
1367 | 0 | return true; |
1368 | 0 | } |
1369 | | |
1370 | | bool StatefulWriter::is_acked_by_all( |
1371 | | const SequenceNumber_t& seq_num) const |
1372 | 0 | { |
1373 | 0 | std::lock_guard<RecursiveTimedMutex> guard(mp_mutex); |
1374 | 0 | return is_acked_by_all_nts(seq_num); |
1375 | 0 | } |
1376 | | |
1377 | | bool StatefulWriter::is_acked_by_all_nts( |
1378 | | const SequenceNumber_t seq) const |
1379 | 0 | { |
1380 | 0 | assert(history_->next_sequence_number() > seq); |
1381 | 0 | return (seq < next_all_acked_notify_sequence_) || |
1382 | 0 | !for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, |
1383 | 0 | [seq](const ReaderProxy* reader) |
1384 | 0 | { |
1385 | 0 | return !(reader->change_is_acked(seq)); |
1386 | 0 | }); |
1387 | 0 | } |
1388 | | |
1389 | | bool StatefulWriter::all_readers_updated() |
1390 | 0 | { |
1391 | 0 | std::lock_guard<RecursiveTimedMutex> guard(mp_mutex); |
1392 | |
|
1393 | 0 | return !for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, |
1394 | 0 | [](const ReaderProxy* reader) |
1395 | 0 | { |
1396 | 0 | return (reader->has_changes()); |
1397 | 0 | } |
1398 | 0 | ); |
1399 | 0 | } |
1400 | | |
1401 | | bool StatefulWriter::wait_for_all_acked( |
1402 | | const dds::Duration_t& max_wait) |
1403 | 0 | { |
1404 | 0 | send_periodic_heartbeat(); |
1405 | |
|
1406 | 0 | std::unique_lock<RecursiveTimedMutex> lock(mp_mutex); |
1407 | 0 | std::unique_lock<std::mutex> all_acked_lock(all_acked_mutex_); |
1408 | |
|
1409 | 0 | all_acked_ = !for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, |
1410 | 0 | [](const ReaderProxy* reader) |
1411 | 0 | { |
1412 | 0 | return reader->has_changes(); |
1413 | 0 | } |
1414 | 0 | ); |
1415 | 0 | lock.unlock(); |
1416 | |
|
1417 | 0 | if (!all_acked_) |
1418 | 0 | { |
1419 | 0 | std::chrono::microseconds max_w(fastdds::rtps::TimeConv::Duration_t2MicroSecondsInt64(max_wait)); |
1420 | 0 | all_acked_cond_.wait_for(all_acked_lock, max_w, [&]() |
1421 | 0 | { |
1422 | 0 | return all_acked_; |
1423 | 0 | }); |
1424 | 0 | } |
1425 | |
|
1426 | 0 | return all_acked_; |
1427 | 0 | } |
1428 | | |
1429 | | void StatefulWriter::rebuild_status_after_load() |
1430 | 0 | { |
1431 | 0 | SequenceNumber_t min_seq = get_seq_num_min(); |
1432 | 0 | if (min_seq != SequenceNumber_t::unknown()) |
1433 | 0 | { |
1434 | 0 | biggest_removed_sequence_number_ = min_seq - 1; |
1435 | 0 | may_remove_change_ = 1; |
1436 | 0 | } |
1437 | |
|
1438 | 0 | SequenceNumber_t next_seq = history_->next_sequence_number(); |
1439 | 0 | next_all_acked_notify_sequence_ = next_seq; |
1440 | 0 | min_readers_low_mark_ = next_seq - 1; |
1441 | 0 | all_acked_ = true; |
1442 | 0 | } |
1443 | | |
1444 | | void StatefulWriter::check_acked_status() |
1445 | 0 | { |
1446 | 0 | std::unique_lock<RecursiveTimedMutex> lock(mp_mutex); |
1447 | |
|
1448 | 0 | bool all_acked = true; |
1449 | 0 | bool has_min_low_mark = false; |
1450 | | // #8945 If no readers matched, notify all old changes. |
1451 | 0 | SequenceNumber_t min_low_mark = history_->next_sequence_number() - 1; |
1452 | |
|
1453 | 0 | for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, |
1454 | 0 | [&all_acked, &has_min_low_mark, &min_low_mark](ReaderProxy* reader) |
1455 | 0 | { |
1456 | 0 | SequenceNumber_t reader_low_mark = reader->changes_low_mark(); |
1457 | 0 | if (reader_low_mark < min_low_mark || !has_min_low_mark) |
1458 | 0 | { |
1459 | 0 | has_min_low_mark = true; |
1460 | 0 | min_low_mark = reader_low_mark; |
1461 | 0 | } |
1462 | |
|
1463 | 0 | if (reader->has_changes()) |
1464 | 0 | { |
1465 | 0 | all_acked = false; |
1466 | 0 | } |
1467 | |
|
1468 | 0 | return false; |
1469 | 0 | } |
1470 | 0 | ); |
1471 | |
|
1472 | 0 | if (all_acked) |
1473 | 0 | { |
1474 | 0 | min_low_mark = history_->next_sequence_number() - 1; |
1475 | 0 | } |
1476 | |
|
1477 | 0 | bool something_changed = all_acked; |
1478 | 0 | SequenceNumber_t min_seq = get_seq_num_min(); |
1479 | 0 | if (min_seq != SequenceNumber_t::unknown()) |
1480 | 0 | { |
1481 | | // In the case where we haven't received an acknack from a recently matched reader, |
1482 | | // min_low_mark will be zero, and no change will be notified as received by all |
1483 | 0 | if (next_all_acked_notify_sequence_ <= min_low_mark) |
1484 | 0 | { |
1485 | 0 | if ((listener_ != nullptr) && (min_low_mark >= get_seq_num_min())) |
1486 | 0 | { |
1487 | | // We will inform backwards about the changes received by all readers, starting |
1488 | | // on min_low_mark down until next_all_acked_notify_sequence_. This way we can |
1489 | | // safely proceed with the traversal, in case a change is removed from the history |
1490 | | // inside the callback |
1491 | 0 | History::iterator history_end = history_->changesEnd(); |
1492 | 0 | History::iterator cit = |
1493 | 0 | std::lower_bound(history_->changesBegin(), history_end, min_low_mark, |
1494 | 0 | []( |
1495 | 0 | const CacheChange_t* change, |
1496 | 0 | const SequenceNumber_t& seq) |
1497 | 0 | { |
1498 | 0 | return change->sequenceNumber < seq; |
1499 | 0 | }); |
1500 | 0 | if (cit != history_end && (*cit)->sequenceNumber == min_low_mark) |
1501 | 0 | { |
1502 | 0 | ++cit; |
1503 | 0 | } |
1504 | |
|
1505 | 0 | SequenceNumber_t seq{}; |
1506 | 0 | SequenceNumber_t end_seq = min_seq > next_all_acked_notify_sequence_ ? |
1507 | 0 | min_seq : next_all_acked_notify_sequence_; |
1508 | | |
1509 | | // The iterator starts pointing to the change inmediately after min_low_mark |
1510 | 0 | --cit; |
1511 | |
|
1512 | 0 | do |
1513 | 0 | { |
1514 | | // Avoid notifying changes before next_all_acked_notify_sequence_ |
1515 | 0 | CacheChange_t* change = *cit; |
1516 | 0 | seq = change->sequenceNumber; |
1517 | 0 | if (seq < next_all_acked_notify_sequence_) |
1518 | 0 | { |
1519 | 0 | break; |
1520 | 0 | } |
1521 | | |
1522 | | // Change iterator before it possibly becomes invalidated |
1523 | 0 | if (cit != history_->changesBegin()) |
1524 | 0 | { |
1525 | 0 | --cit; |
1526 | 0 | } |
1527 | | |
1528 | | // Notify reception of change (may remove that change on VOLATILE writers) |
1529 | 0 | listener_->on_writer_change_received_by_all(this, change); |
1530 | | |
1531 | | // Stop if we got to either next_all_acked_notify_sequence_ or the first change |
1532 | 0 | } |
1533 | 0 | while (seq > end_seq); |
1534 | 0 | } |
1535 | |
|
1536 | 0 | next_all_acked_notify_sequence_ = min_low_mark + 1; |
1537 | 0 | } |
1538 | |
|
1539 | 0 | if (min_low_mark >= get_seq_num_min()) |
1540 | 0 | { |
1541 | | // get_seq_num_min() returns SequenceNumber_t::unknown() when the history is empty. |
1542 | | // Thus, it is set to 2 to indicate that all samples have been removed. |
1543 | 0 | may_remove_change_ = (get_seq_num_min() == SequenceNumber_t::unknown()) ? 2 : 1; |
1544 | 0 | } |
1545 | |
|
1546 | 0 | min_readers_low_mark_ = min_low_mark; |
1547 | 0 | something_changed = true; |
1548 | 0 | } |
1549 | |
|
1550 | 0 | if (all_acked) |
1551 | 0 | { |
1552 | 0 | std::unique_lock<std::mutex> all_acked_lock(all_acked_mutex_); |
1553 | 0 | SequenceNumber_t next_seq = history_->next_sequence_number(); |
1554 | 0 | next_all_acked_notify_sequence_ = next_seq; |
1555 | 0 | min_readers_low_mark_ = next_seq - 1; |
1556 | 0 | all_acked_ = true; |
1557 | 0 | all_acked_cond_.notify_all(); |
1558 | 0 | } |
1559 | |
|
1560 | 0 | if (something_changed) |
1561 | 0 | { |
1562 | 0 | may_remove_change_cond_.notify_one(); |
1563 | 0 | } |
1564 | 0 | } |
1565 | | |
1566 | | bool StatefulWriter::try_remove_change( |
1567 | | const std::chrono::steady_clock::time_point& max_blocking_time_point, |
1568 | | std::unique_lock<RecursiveTimedMutex>& lock) |
1569 | 0 | { |
1570 | 0 | EPROSIMA_LOG_INFO(RTPS_WRITER, "Starting process try remove change for writer " << getGuid()); |
1571 | |
|
1572 | 0 | SequenceNumber_t min_low_mark; |
1573 | |
|
1574 | 0 | { |
1575 | 0 | std::lock_guard<RecursiveTimedMutex> guard(mp_mutex); |
1576 | 0 | min_low_mark = next_all_acked_notify_sequence_ - 1; |
1577 | 0 | } |
1578 | |
|
1579 | 0 | SequenceNumber_t calc = min_low_mark < get_seq_num_min() ? SequenceNumber_t() : |
1580 | 0 | (min_low_mark - get_seq_num_min()) + 1; |
1581 | 0 | unsigned int may_remove_change = 1; |
1582 | |
|
1583 | 0 | if (calc <= SequenceNumber_t()) |
1584 | 0 | { |
1585 | 0 | may_remove_change_ = 0; |
1586 | 0 | may_remove_change_cond_.wait_until(lock, max_blocking_time_point, |
1587 | 0 | [&]() |
1588 | 0 | { |
1589 | 0 | return may_remove_change_ > 0; |
1590 | 0 | }); |
1591 | 0 | may_remove_change = may_remove_change_; |
1592 | 0 | } |
1593 | | |
1594 | | // Some changes acked |
1595 | 0 | if (may_remove_change == 1) |
1596 | 0 | { |
1597 | 0 | return history_->remove_min_change(); |
1598 | 0 | } |
1599 | | // Waiting a change was removed. |
1600 | 0 | else if (may_remove_change == 2) |
1601 | 0 | { |
1602 | 0 | return true; |
1603 | 0 | } |
1604 | | |
1605 | 0 | return false; |
1606 | 0 | } |
1607 | | |
1608 | | bool StatefulWriter::wait_for_acknowledgement( |
1609 | | const SequenceNumber_t& seq, |
1610 | | const std::chrono::steady_clock::time_point& max_blocking_time_point, |
1611 | | std::unique_lock<RecursiveTimedMutex>& lock) |
1612 | 0 | { |
1613 | 0 | return may_remove_change_cond_.wait_until(lock, max_blocking_time_point, |
1614 | 0 | [this, &seq]() |
1615 | 0 | { |
1616 | 0 | return is_acked_by_all_nts(seq); |
1617 | 0 | }); |
1618 | 0 | } |
1619 | | |
1620 | | /* |
1621 | | * PARAMETER_RELATED METHODS |
1622 | | */ |
1623 | | void StatefulWriter::update_attributes( |
1624 | | const WriterAttributes& att) |
1625 | 0 | { |
1626 | 0 | BaseWriter::update_attributes(att); |
1627 | |
|
1628 | 0 | this->update_times(att.times); |
1629 | 0 | if (this->get_disable_positive_acks()) |
1630 | 0 | { |
1631 | 0 | this->update_positive_acks_times(att); |
1632 | 0 | } |
1633 | 0 | } |
1634 | | |
1635 | | bool StatefulWriter::matched_readers_guids( |
1636 | | std::vector<GUID_t>& guids) const |
1637 | 0 | { |
1638 | 0 | std::lock_guard<RecursiveTimedMutex> guard(mp_mutex); |
1639 | 0 | guids.clear(); |
1640 | 0 | guids.reserve(matched_local_readers_.size() + matched_datasharing_readers_.size() + |
1641 | 0 | matched_remote_readers_.size()); |
1642 | 0 | for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, |
1643 | 0 | [&guids](const ReaderProxy* reader) |
1644 | 0 | { |
1645 | 0 | guids.emplace_back(reader->guid()); |
1646 | 0 | return false; |
1647 | 0 | } |
1648 | 0 | ); |
1649 | |
|
1650 | 0 | return true; |
1651 | 0 | } |
1652 | | |
1653 | | void StatefulWriter::update_positive_acks_times( |
1654 | | const WriterAttributes& att) |
1655 | 0 | { |
1656 | 0 | std::lock_guard<RecursiveTimedMutex> guard(mp_mutex); |
1657 | 0 | keep_duration_ = att.keep_duration; |
1658 | | // Restart ack timer with new duration |
1659 | 0 | ack_event_->update_interval(keep_duration_); |
1660 | 0 | ack_event_->restart_timer(); |
1661 | 0 | } |
1662 | | |
1663 | | void StatefulWriter::update_times( |
1664 | | const WriterTimes& times) |
1665 | 0 | { |
1666 | 0 | std::lock_guard<RecursiveTimedMutex> guard(mp_mutex); |
1667 | 0 | if (times_.heartbeat_period != times.heartbeat_period) |
1668 | 0 | { |
1669 | 0 | periodic_hb_event_->update_interval(times.heartbeat_period); |
1670 | 0 | } |
1671 | 0 | if (times_.nack_response_delay != times.nack_response_delay) |
1672 | 0 | { |
1673 | 0 | if (nack_response_event_ != nullptr) |
1674 | 0 | { |
1675 | 0 | nack_response_event_->update_interval(times.nack_response_delay); |
1676 | 0 | } |
1677 | 0 | } |
1678 | 0 | if (times_.nack_supression_duration != times.nack_supression_duration) |
1679 | 0 | { |
1680 | 0 | for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, |
1681 | 0 | [×](ReaderProxy* reader) |
1682 | 0 | { |
1683 | 0 | reader->update_nack_supression_interval(times.nack_supression_duration); |
1684 | 0 | return false; |
1685 | 0 | } |
1686 | 0 | ); |
1687 | |
|
1688 | 0 | for (ReaderProxy* it : matched_readers_pool_) |
1689 | 0 | { |
1690 | 0 | it->update_nack_supression_interval(times.nack_supression_duration); |
1691 | 0 | } |
1692 | 0 | } |
1693 | 0 | times_ = times; |
1694 | 0 | } |
1695 | | |
1696 | | SequenceNumber_t StatefulWriter::next_sequence_number() const |
1697 | 0 | { |
1698 | 0 | return history_->next_sequence_number(); |
1699 | 0 | } |
1700 | | |
1701 | | bool StatefulWriter::send_periodic_heartbeat( |
1702 | | bool final, |
1703 | | bool liveliness) |
1704 | 0 | { |
1705 | 0 | std::lock_guard<RecursiveTimedMutex> guardW(mp_mutex); |
1706 | 0 | std::lock_guard<LocatorSelectorSender> guard_locator_selector_general(locator_selector_general_); |
1707 | |
|
1708 | 0 | bool unacked_changes {false}; |
1709 | 0 | bool irrelevants_removed {false}; |
1710 | 0 | if (!liveliness) |
1711 | 0 | { |
1712 | 0 | SequenceNumber_t first_seq_to_check_acknowledge = get_seq_num_min(); |
1713 | 0 | if (SequenceNumber_t::unknown() == first_seq_to_check_acknowledge) |
1714 | 0 | { |
1715 | 0 | first_seq_to_check_acknowledge = history_->next_sequence_number() - 1; |
1716 | 0 | } |
1717 | |
|
1718 | 0 | for_matched_readers(matched_local_readers_, matched_datasharing_readers_, |
1719 | 0 | matched_remote_readers_, |
1720 | 0 | [first_seq_to_check_acknowledge, &unacked_changes, &irrelevants_removed](ReaderProxy* reader) |
1721 | 0 | { |
1722 | 0 | if (!unacked_changes) |
1723 | 0 | { |
1724 | 0 | unacked_changes = reader->has_unacknowledged(first_seq_to_check_acknowledge); |
1725 | 0 | } |
1726 | |
|
1727 | 0 | if (!irrelevants_removed) |
1728 | 0 | { |
1729 | 0 | irrelevants_removed = SequenceNumber_t::unknown() != reader->first_irrelevant_removed(); |
1730 | 0 | } |
1731 | |
|
1732 | 0 | return unacked_changes && irrelevants_removed; |
1733 | 0 | } |
1734 | 0 | ); |
1735 | |
|
1736 | 0 | if (unacked_changes) |
1737 | 0 | { |
1738 | 0 | try |
1739 | 0 | { |
1740 | | //TODO if separating, here sends periodic for all readers, instead of ones needed it. |
1741 | 0 | send_heartbeat_to_all_readers(irrelevants_removed); |
1742 | 0 | } |
1743 | 0 | catch (const RTPSMessageGroup::timeout&) |
1744 | 0 | { |
1745 | 0 | EPROSIMA_LOG_ERROR(RTPS_WRITER, "Max blocking time reached"); |
1746 | 0 | } |
1747 | 0 | } |
1748 | 0 | } |
1749 | 0 | else if (separate_sending_enabled_) |
1750 | 0 | { |
1751 | | // Send individual liveliness heartbeat to each reader |
1752 | 0 | for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, |
1753 | 0 | [this, &liveliness, &unacked_changes](ReaderProxy* reader) |
1754 | 0 | { |
1755 | 0 | send_heartbeat_to_nts(*reader, liveliness); |
1756 | 0 | unacked_changes = true; |
1757 | 0 | return false; |
1758 | 0 | } |
1759 | 0 | ); |
1760 | 0 | } |
1761 | 0 | else |
1762 | 0 | { |
1763 | | // This is a liveliness heartbeat, we don't care about checking sequence numbers |
1764 | 0 | try |
1765 | 0 | { |
1766 | 0 | for (ReaderProxy* reader : matched_local_readers_) |
1767 | 0 | { |
1768 | 0 | intraprocess_heartbeat(reader, true); |
1769 | 0 | unacked_changes = true; |
1770 | 0 | } |
1771 | |
|
1772 | 0 | for (ReaderProxy* reader : matched_datasharing_readers_) |
1773 | 0 | { |
1774 | 0 | auto pool = std::dynamic_pointer_cast<WriterPool>(history_->get_payload_pool()); |
1775 | 0 | assert(pool); |
1776 | 0 | pool->assert_liveliness(); |
1777 | 0 | reader->datasharing_notify(); |
1778 | 0 | unacked_changes = true; |
1779 | 0 | } |
1780 | |
|
1781 | 0 | if (there_are_remote_readers_) |
1782 | 0 | { |
1783 | 0 | unacked_changes = true; |
1784 | 0 | RTPSMessageGroup group(mp_RTPSParticipant, this, &locator_selector_general_); |
1785 | 0 | send_heartbeat_nts_(locator_selector_general_.all_remote_readers.size(), group, final, liveliness); |
1786 | 0 | } |
1787 | 0 | } |
1788 | 0 | catch (const RTPSMessageGroup::timeout&) |
1789 | 0 | { |
1790 | 0 | EPROSIMA_LOG_ERROR(RTPS_WRITER, "Max blocking time reached"); |
1791 | 0 | } |
1792 | 0 | } |
1793 | |
|
1794 | 0 | return unacked_changes; |
1795 | 0 | } |
1796 | | |
1797 | | void StatefulWriter::send_heartbeat_to_nts( |
1798 | | ReaderProxy& remoteReaderProxy, |
1799 | | bool liveliness, |
1800 | | bool force /* = false */) |
1801 | 0 | { |
1802 | 0 | SequenceNumber_t first_seq_to_check_acknowledge = get_seq_num_min(); |
1803 | 0 | if (SequenceNumber_t::unknown() == first_seq_to_check_acknowledge) |
1804 | 0 | { |
1805 | 0 | first_seq_to_check_acknowledge = history_->next_sequence_number() - 1; |
1806 | 0 | } |
1807 | 0 | if (remoteReaderProxy.is_reliable() && |
1808 | 0 | (force || liveliness || remoteReaderProxy.has_unacknowledged(first_seq_to_check_acknowledge))) |
1809 | 0 | { |
1810 | 0 | if (remoteReaderProxy.is_local_reader()) |
1811 | 0 | { |
1812 | 0 | intraprocess_heartbeat(&remoteReaderProxy, liveliness); |
1813 | 0 | } |
1814 | 0 | else if (remoteReaderProxy.is_datasharing_reader()) |
1815 | 0 | { |
1816 | 0 | remoteReaderProxy.datasharing_notify(); |
1817 | 0 | } |
1818 | 0 | else |
1819 | 0 | { |
1820 | 0 | try |
1821 | 0 | { |
1822 | 0 | RTPSMessageGroup group(mp_RTPSParticipant, this, remoteReaderProxy.message_sender()); |
1823 | 0 | SequenceNumber_t firstSeq = get_seq_num_min(); |
1824 | 0 | SequenceNumber_t lastSeq = get_seq_num_max(); |
1825 | |
|
1826 | 0 | if (firstSeq != c_SequenceNumber_Unknown && lastSeq != c_SequenceNumber_Unknown) |
1827 | 0 | { |
1828 | 0 | assert(firstSeq <= lastSeq); |
1829 | 0 | if (!liveliness) |
1830 | 0 | { |
1831 | 0 | add_gaps_for_removed_irrelevants(remoteReaderProxy, group); |
1832 | 0 | add_gaps_for_holes_in_history(group); |
1833 | 0 | } |
1834 | 0 | } |
1835 | |
|
1836 | 0 | send_heartbeat_nts_(1u, group, disable_positive_acks_, liveliness); |
1837 | 0 | } |
1838 | 0 | catch (const RTPSMessageGroup::timeout&) |
1839 | 0 | { |
1840 | 0 | EPROSIMA_LOG_ERROR(RTPS_WRITER, "Max blocking time reached"); |
1841 | 0 | } |
1842 | 0 | } |
1843 | 0 | } |
1844 | 0 | } |
1845 | | |
1846 | | void StatefulWriter::send_heartbeat_nts_( |
1847 | | size_t number_of_readers, |
1848 | | RTPSMessageGroup& message_group, |
1849 | | bool final, |
1850 | | bool liveliness) |
1851 | 0 | { |
1852 | 0 | if (!number_of_readers) |
1853 | 0 | { |
1854 | 0 | return; |
1855 | 0 | } |
1856 | | |
1857 | 0 | SequenceNumber_t firstSeq = get_seq_num_min(); |
1858 | 0 | SequenceNumber_t lastSeq = get_seq_num_max(); |
1859 | |
|
1860 | 0 | if (firstSeq == c_SequenceNumber_Unknown || lastSeq == c_SequenceNumber_Unknown) |
1861 | 0 | { |
1862 | 0 | assert(firstSeq == c_SequenceNumber_Unknown && lastSeq == c_SequenceNumber_Unknown); |
1863 | |
|
1864 | 0 | if (number_of_readers == 1 || liveliness) |
1865 | 0 | { |
1866 | 0 | firstSeq = next_sequence_number(); |
1867 | 0 | lastSeq = firstSeq - 1; |
1868 | 0 | } |
1869 | 0 | else |
1870 | 0 | { |
1871 | 0 | return; |
1872 | 0 | } |
1873 | 0 | } |
1874 | 0 | else |
1875 | 0 | { |
1876 | 0 | assert(firstSeq <= lastSeq); |
1877 | 0 | } |
1878 | | |
1879 | 0 | increment_hb_count(); |
1880 | 0 | message_group.add_heartbeat(firstSeq, lastSeq, heartbeat_count_, final, liveliness); |
1881 | |
|
1882 | 0 | EPROSIMA_LOG_INFO(RTPS_WRITER, |
1883 | 0 | getGuid().entityId << " Sending Heartbeat (" << firstSeq << " - " << lastSeq << ")" ); |
1884 | 0 | } |
1885 | | |
1886 | | void StatefulWriter::send_heartbeat_piggyback_nts_( |
1887 | | RTPSMessageGroup& message_group, |
1888 | | LocatorSelectorSender& locator_selector) |
1889 | 0 | { |
1890 | 0 | if (!disable_heartbeat_piggyback_) |
1891 | 0 | { |
1892 | 0 | if (history_->isFull() || next_all_acked_notify_sequence_ < get_seq_num_min()) |
1893 | 0 | { |
1894 | 0 | select_all_readers_nts(message_group, locator_selector); |
1895 | 0 | size_t number_of_readers = locator_selector.all_remote_readers.size(); |
1896 | 0 | send_heartbeat_nts_(number_of_readers, message_group, disable_positive_acks_); |
1897 | 0 | } |
1898 | 0 | else |
1899 | 0 | { |
1900 | 0 | if (last_num_exceeded_send_buffer_size_ != message_group.num_exceeded_send_buffer_size()) |
1901 | 0 | { |
1902 | 0 | last_num_exceeded_send_buffer_size_ = message_group.num_exceeded_send_buffer_size(); |
1903 | 0 | select_all_readers_nts(message_group, locator_selector); |
1904 | 0 | size_t number_of_readers = locator_selector.all_remote_readers.size(); |
1905 | 0 | send_heartbeat_nts_(number_of_readers, message_group, disable_positive_acks_); |
1906 | 0 | } |
1907 | 0 | } |
1908 | 0 | } |
1909 | 0 | } |
1910 | | |
1911 | | void StatefulWriter::perform_nack_response() |
1912 | 0 | { |
1913 | 0 | std::unique_lock<RecursiveTimedMutex> lock(mp_mutex); |
1914 | |
|
1915 | 0 | uint32_t changes_to_resend = 0; |
1916 | 0 | for (ReaderProxy* reader : matched_remote_readers_) |
1917 | 0 | { |
1918 | 0 | changes_to_resend += reader->perform_acknack_response([&](ChangeForReader_t& change) |
1919 | 0 | { |
1920 | | // This labmda is called if the ChangeForReader_t pass from REQUESTED to UNSENT. |
1921 | 0 | assert(nullptr != change.getChange()); |
1922 | 0 | flow_controller_->add_old_sample(this, change.getChange()); |
1923 | 0 | } |
1924 | 0 | ); |
1925 | 0 | } |
1926 | |
|
1927 | 0 | lock.unlock(); |
1928 | | |
1929 | | // Notify the statistics module |
1930 | 0 | on_resent_data(changes_to_resend); |
1931 | 0 | } |
1932 | | |
1933 | | void StatefulWriter::perform_nack_supression( |
1934 | | const GUID_t& reader_guid) |
1935 | 0 | { |
1936 | 0 | std::unique_lock<RecursiveTimedMutex> lock(mp_mutex); |
1937 | |
|
1938 | 0 | for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, |
1939 | 0 | [this, &reader_guid](ReaderProxy* reader) |
1940 | 0 | { |
1941 | 0 | if (reader->guid() == reader_guid) |
1942 | 0 | { |
1943 | 0 | reader->perform_nack_supression(); |
1944 | 0 | periodic_hb_event_->restart_timer(); |
1945 | 0 | return true; |
1946 | 0 | } |
1947 | 0 | return false; |
1948 | 0 | } |
1949 | 0 | ); |
1950 | 0 | } |
1951 | | |
1952 | | bool StatefulWriter::process_acknack( |
1953 | | const GUID_t& writer_guid, |
1954 | | const GUID_t& reader_guid, |
1955 | | uint32_t ack_count, |
1956 | | const SequenceNumberSet_t& sn_set, |
1957 | | bool final_flag, |
1958 | | bool& result, |
1959 | | fastdds::rtps::VendorId_t /*origin_vendor_id*/) |
1960 | 0 | { |
1961 | 0 | std::unique_lock<RecursiveTimedMutex> lock(mp_mutex); |
1962 | 0 | result = (m_guid == writer_guid); |
1963 | |
|
1964 | 0 | if (result) |
1965 | 0 | { |
1966 | 0 | SequenceNumber_t received_sequence_number = sn_set.empty() ? sn_set.base() : sn_set.max(); |
1967 | 0 | if (received_sequence_number <= next_sequence_number()) |
1968 | 0 | { |
1969 | 0 | for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, |
1970 | 0 | [&](ReaderProxy* remote_reader) |
1971 | 0 | { |
1972 | 0 | if (remote_reader->guid() == reader_guid) |
1973 | 0 | { |
1974 | 0 | if (remote_reader->check_and_set_acknack_count(ack_count)) |
1975 | 0 | { |
1976 | | // Sequence numbers before Base are set as Acknowledged. |
1977 | 0 | remote_reader->acked_changes_set(sn_set.base()); |
1978 | 0 | if (sn_set.base() > SequenceNumber_t(0, 0)) |
1979 | 0 | { |
1980 | | // Prepare GAP for requested samples that are not in history or are irrelevants. |
1981 | 0 | RTPSMessageGroup group(mp_RTPSParticipant, this, remote_reader->message_sender()); |
1982 | 0 | RTPSGapBuilder gap_builder(group); |
1983 | |
|
1984 | 0 | if (remote_reader->requested_changes_set(sn_set, gap_builder, get_seq_num_min())) |
1985 | 0 | { |
1986 | 0 | nack_response_event_->restart_timer(); |
1987 | 0 | } |
1988 | 0 | else if (!final_flag) |
1989 | 0 | { |
1990 | 0 | periodic_hb_event_->restart_timer(); |
1991 | 0 | } |
1992 | |
|
1993 | 0 | gap_builder.flush(); |
1994 | 0 | } |
1995 | 0 | else if (sn_set.empty() && !final_flag) |
1996 | 0 | { |
1997 | | // This is the preemptive acknack. |
1998 | 0 | if (remote_reader->process_initial_acknack([&](ChangeForReader_t& change_reader) |
1999 | 0 | { |
2000 | 0 | assert(nullptr != change_reader.getChange()); |
2001 | 0 | flow_controller_->add_old_sample(this, change_reader.getChange()); |
2002 | 0 | })) |
2003 | 0 | { |
2004 | 0 | if (remote_reader->is_remote_and_reliable()) |
2005 | 0 | { |
2006 | | // Send heartbeat if requested |
2007 | 0 | send_heartbeat_to_nts(*remote_reader, false, true); |
2008 | 0 | periodic_hb_event_->restart_timer(); |
2009 | 0 | } |
2010 | 0 | } |
2011 | |
|
2012 | 0 | if (remote_reader->is_local_reader() && !remote_reader->is_datasharing_reader()) |
2013 | 0 | { |
2014 | 0 | intraprocess_heartbeat(remote_reader); |
2015 | 0 | } |
2016 | 0 | } |
2017 | | |
2018 | | // Check if all CacheChange are acknowledge, because a user could be waiting |
2019 | | // for this, or some CacheChanges could be removed if we are VOLATILE |
2020 | 0 | check_acked_status(); |
2021 | 0 | } |
2022 | 0 | return true; |
2023 | 0 | } |
2024 | | |
2025 | 0 | return false; |
2026 | 0 | } |
2027 | 0 | ); |
2028 | 0 | } |
2029 | 0 | else |
2030 | 0 | { |
2031 | 0 | print_inconsistent_acknack(writer_guid, reader_guid, sn_set.base(), received_sequence_number, |
2032 | 0 | next_sequence_number()); |
2033 | 0 | } |
2034 | 0 | } |
2035 | |
|
2036 | 0 | return result; |
2037 | 0 | } |
2038 | | |
2039 | | bool StatefulWriter::process_nack_frag( |
2040 | | const GUID_t& writer_guid, |
2041 | | const GUID_t& reader_guid, |
2042 | | uint32_t ack_count, |
2043 | | const SequenceNumber_t& seq_num, |
2044 | | const FragmentNumberSet_t& fragments_state, |
2045 | | bool& result, |
2046 | | fastdds::rtps::VendorId_t /*origin_vendor_id*/) |
2047 | 0 | { |
2048 | 0 | std::unique_lock<RecursiveTimedMutex> lock(mp_mutex); |
2049 | 0 | result = false; |
2050 | 0 | if (m_guid == writer_guid) |
2051 | 0 | { |
2052 | 0 | result = true; |
2053 | 0 | for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, |
2054 | 0 | [this, &reader_guid, &ack_count, &seq_num, &fragments_state](ReaderProxy* reader) |
2055 | 0 | { |
2056 | 0 | if (reader->guid() == reader_guid) |
2057 | 0 | { |
2058 | 0 | if (reader->process_nack_frag(reader_guid, ack_count, seq_num, fragments_state)) |
2059 | 0 | { |
2060 | 0 | nack_response_event_->restart_timer(); |
2061 | 0 | } |
2062 | 0 | return true; |
2063 | 0 | } |
2064 | 0 | return false; |
2065 | 0 | } |
2066 | 0 | ); |
2067 | 0 | } |
2068 | |
|
2069 | 0 | return result; |
2070 | 0 | } |
2071 | | |
2072 | | bool StatefulWriter::ack_timer_expired() |
2073 | 0 | { |
2074 | 0 | std::unique_lock<RecursiveTimedMutex> lock(mp_mutex); |
2075 | | |
2076 | | // The timer has expired so the earliest non-acked change must be marked as acknowledged |
2077 | | // This will be done in the first while iteration, as we start with a negative interval |
2078 | |
|
2079 | 0 | Time_t expiration_ts; |
2080 | 0 | Time_t current_ts; |
2081 | 0 | Time_t::now(current_ts); |
2082 | | |
2083 | | // On the other hand, we've seen in the tests that if samples are sent very quickly with little |
2084 | | // time between consecutive samples, the timer interval could end up being negative |
2085 | | // In this case, we keep marking changes as acknowledged until the timer is able to keep up, hence the while |
2086 | | // loop |
2087 | |
|
2088 | 0 | do |
2089 | 0 | { |
2090 | 0 | bool acks_flag = false; |
2091 | 0 | for_matched_readers(matched_local_readers_, matched_datasharing_readers_, matched_remote_readers_, |
2092 | 0 | [this, &acks_flag](ReaderProxy* reader) |
2093 | 0 | { |
2094 | 0 | if (reader->disable_positive_acks()) |
2095 | 0 | { |
2096 | 0 | reader->acked_changes_set(last_sequence_number_ + 1); |
2097 | 0 | acks_flag = true; |
2098 | 0 | } |
2099 | 0 | return false; |
2100 | 0 | } |
2101 | 0 | ); |
2102 | 0 | if (acks_flag) |
2103 | 0 | { |
2104 | 0 | check_acked_status(); |
2105 | 0 | } |
2106 | |
|
2107 | 0 | CacheChange_t* change; |
2108 | | |
2109 | | // Skip removed changes until reaching the last change |
2110 | 0 | do |
2111 | 0 | { |
2112 | 0 | last_sequence_number_++; |
2113 | 0 | } |
2114 | 0 | while (!history_->get_change(last_sequence_number_, getGuid(), &change) && |
2115 | 0 | last_sequence_number_ < next_sequence_number()); |
2116 | |
|
2117 | 0 | if (!history_->get_change( |
2118 | 0 | last_sequence_number_, |
2119 | 0 | getGuid(), |
2120 | 0 | &change)) |
2121 | 0 | { |
2122 | | // Stop ack_timer |
2123 | 0 | return false; |
2124 | 0 | } |
2125 | | |
2126 | 0 | Time_t::now(current_ts); |
2127 | 0 | expiration_ts = change->sourceTimestamp + keep_duration_; |
2128 | 0 | } |
2129 | 0 | while (expiration_ts < current_ts); |
2130 | | |
2131 | 0 | auto interval = (expiration_ts - current_ts).to_duration_t(); |
2132 | 0 | ack_event_->update_interval(interval); |
2133 | 0 | return true; |
2134 | 0 | } |
2135 | | |
2136 | | void StatefulWriter::print_inconsistent_acknack( |
2137 | | const GUID_t& writer_guid, |
2138 | | const GUID_t& reader_guid, |
2139 | | const SequenceNumber_t& min_requested_sequence_number, |
2140 | | const SequenceNumber_t& max_requested_sequence_number, |
2141 | | const SequenceNumber_t& next_sequence_number) |
2142 | 0 | { |
2143 | 0 | EPROSIMA_LOG_WARNING(RTPS_WRITER, "Inconsistent acknack received. Local Writer " |
2144 | 0 | << writer_guid << " next SequenceNumber " << next_sequence_number << ". Remote Reader " |
2145 | 0 | << reader_guid << " requested range is [" << min_requested_sequence_number |
2146 | 0 | << ", " << max_requested_sequence_number << "]."); |
2147 | | // This is necessary to avoid Warning of unused variable in case warning log level is disable |
2148 | 0 | static_cast<void>(writer_guid); |
2149 | 0 | static_cast<void>(reader_guid); |
2150 | 0 | static_cast<void>(min_requested_sequence_number); |
2151 | 0 | static_cast<void>(max_requested_sequence_number); |
2152 | 0 | static_cast<void>(next_sequence_number); |
2153 | 0 | } |
2154 | | |
2155 | | void StatefulWriter::reader_data_filter( |
2156 | | fastdds::rtps::IReaderDataFilter* reader_data_filter) |
2157 | 0 | { |
2158 | 0 | reader_data_filter_ = reader_data_filter; |
2159 | 0 | } |
2160 | | |
2161 | | const fastdds::rtps::IReaderDataFilter* StatefulWriter::reader_data_filter() const |
2162 | 0 | { |
2163 | 0 | return reader_data_filter_; |
2164 | 0 | } |
2165 | | |
2166 | | DeliveryRetCode StatefulWriter::deliver_sample_nts( |
2167 | | CacheChange_t* cache_change, |
2168 | | RTPSMessageGroup& group, |
2169 | | LocatorSelectorSender& locator_selector, // Object locked by FlowControllerImpl |
2170 | | const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) |
2171 | 0 | { |
2172 | 0 | DeliveryRetCode ret_code = DeliveryRetCode::DELIVERED; |
2173 | |
|
2174 | 0 | if (there_are_local_readers_) |
2175 | 0 | { |
2176 | 0 | deliver_sample_to_intraprocesses(cache_change); |
2177 | 0 | } |
2178 | | |
2179 | | // Process datasharing then |
2180 | 0 | if (there_are_datasharing_readers_) |
2181 | 0 | { |
2182 | 0 | deliver_sample_to_datasharing(cache_change); |
2183 | 0 | } |
2184 | |
|
2185 | 0 | if (there_are_remote_readers_) |
2186 | 0 | { |
2187 | 0 | ret_code = deliver_sample_to_network(cache_change, group, locator_selector, max_blocking_time); |
2188 | 0 | } |
2189 | |
|
2190 | 0 | check_acked_status(); |
2191 | |
|
2192 | 0 | return ret_code; |
2193 | 0 | } |
2194 | | |
2195 | | #ifdef FASTDDS_STATISTICS |
2196 | | |
2197 | | bool StatefulWriter::get_connections( |
2198 | | fastdds::statistics::rtps::ConnectionList& connection_list) |
2199 | 0 | { |
2200 | 0 | connection_list.reserve(matched_local_readers_.size() + |
2201 | 0 | matched_datasharing_readers_.size() + |
2202 | 0 | matched_remote_readers_.size()); |
2203 | |
|
2204 | 0 | fastdds::statistics::Connection connection; |
2205 | |
|
2206 | 0 | { |
2207 | 0 | std::unique_lock<RecursiveTimedMutex> lock(mp_mutex); |
2208 | | |
2209 | | //! intraprocess |
2210 | 0 | for_matched_readers(matched_local_readers_, [&connection, &connection_list](ReaderProxy*& reader) |
2211 | 0 | { |
2212 | 0 | connection.guid(fastdds::statistics::to_statistics_type(reader->guid())); |
2213 | 0 | connection.mode(fastdds::statistics::ConnectionMode::INTRAPROCESS); |
2214 | 0 | connection_list.push_back(connection); |
2215 | |
|
2216 | 0 | return false; |
2217 | 0 | }); |
2218 | 0 | } |
2219 | |
|
2220 | 0 | { |
2221 | 0 | std::unique_lock<RecursiveTimedMutex> lock(mp_mutex); |
2222 | | |
2223 | | //! datasharing |
2224 | 0 | for_matched_readers(matched_datasharing_readers_, [&connection, &connection_list](ReaderProxy*& reader) |
2225 | 0 | { |
2226 | 0 | connection.guid(fastdds::statistics::to_statistics_type(reader->guid())); |
2227 | 0 | connection.mode(fastdds::statistics::ConnectionMode::DATA_SHARING); |
2228 | 0 | connection_list.push_back(connection); |
2229 | |
|
2230 | 0 | return false; |
2231 | 0 | }); |
2232 | 0 | } |
2233 | |
|
2234 | 0 | { |
2235 | 0 | std::unique_lock<RecursiveTimedMutex> lock(mp_mutex); |
2236 | | |
2237 | | //! remote |
2238 | 0 | for_matched_readers(matched_remote_readers_, [&connection, &connection_list](ReaderProxy*& reader) |
2239 | 0 | { |
2240 | | //! Announced locators is, for the moment, |
2241 | | //! equal to the used_locators |
2242 | 0 | LocatorSelectorEntry* loc_selector_entry = reader->general_locator_selector_entry(); |
2243 | |
|
2244 | 0 | connection.announced_locators().reserve(reader->locators_size()); |
2245 | 0 | connection.used_locators().reserve(reader->locators_size()); |
2246 | |
|
2247 | 0 | std::vector<fastdds::statistics::detail::Locator_s> statistics_locators; |
2248 | 0 | std::for_each(loc_selector_entry->multicast.begin(), loc_selector_entry->multicast.end(), |
2249 | 0 | [&statistics_locators](const Locator_t& locator) |
2250 | 0 | { |
2251 | 0 | statistics_locators.push_back(fastdds::statistics::to_statistics_type(locator)); |
2252 | 0 | }); |
2253 | |
|
2254 | 0 | std::for_each(loc_selector_entry->unicast.begin(), loc_selector_entry->unicast.end(), |
2255 | 0 | [&statistics_locators](const Locator_t& locator) |
2256 | 0 | { |
2257 | 0 | statistics_locators.push_back(fastdds::statistics::to_statistics_type(locator)); |
2258 | 0 | }); |
2259 | |
|
2260 | 0 | connection.guid(fastdds::statistics::to_statistics_type(reader->guid())); |
2261 | 0 | connection.mode(fastdds::statistics::ConnectionMode::TRANSPORT); |
2262 | 0 | connection.announced_locators(statistics_locators); |
2263 | 0 | connection.used_locators(statistics_locators); |
2264 | 0 | connection_list.push_back(connection); |
2265 | |
|
2266 | 0 | return false; |
2267 | 0 | }); |
2268 | 0 | } |
2269 | |
|
2270 | 0 | return true; |
2271 | |
|
2272 | 0 | } |
2273 | | |
2274 | | #endif // ifdef FASTDDS_STATISTICS |
2275 | | |
2276 | | void StatefulWriter::add_gaps_for_removed_irrelevants( |
2277 | | ReaderProxy& remoteReaderProxy, |
2278 | | RTPSMessageGroup& group) |
2279 | 0 | { |
2280 | 0 | if (SequenceNumber_t::unknown() != remoteReaderProxy.first_irrelevant_removed()) |
2281 | 0 | { |
2282 | 0 | group.add_gap(remoteReaderProxy.first_irrelevant_removed(), |
2283 | 0 | SequenceNumberSet_t(remoteReaderProxy.last_irrelevant_removed() + 1), |
2284 | 0 | remoteReaderProxy.guid()); |
2285 | 0 | remoteReaderProxy.reset_irrelevant_removed(); |
2286 | 0 | } |
2287 | 0 | } |
2288 | | |
2289 | | void StatefulWriter::add_gaps_for_holes_in_history( |
2290 | | RTPSMessageGroup& group) |
2291 | 0 | { |
2292 | 0 | SequenceNumber_t firstSeq = get_seq_num_min(); |
2293 | 0 | SequenceNumber_t lastSeq = get_seq_num_max(); |
2294 | |
|
2295 | 0 | if (SequenceNumber_t::unknown() != firstSeq && |
2296 | 0 | lastSeq.to64long() - firstSeq.to64long() + 1 != history_->getHistorySize()) |
2297 | 0 | { |
2298 | 0 | RTPSGapBuilder gaps(group); |
2299 | | // There are holes in the history. |
2300 | 0 | History::const_iterator cit = history_->changesBegin(); |
2301 | 0 | SequenceNumber_t prev = (*cit)->sequenceNumber + 1; |
2302 | 0 | ++cit; |
2303 | 0 | while (cit != history_->changesEnd()) |
2304 | 0 | { |
2305 | 0 | while (prev != (*cit)->sequenceNumber) |
2306 | 0 | { |
2307 | 0 | gaps.add(prev); |
2308 | 0 | ++prev; |
2309 | 0 | } |
2310 | 0 | ++prev; |
2311 | 0 | ++cit; |
2312 | 0 | } |
2313 | 0 | gaps.flush(); |
2314 | 0 | } |
2315 | 0 | } |
2316 | | |
2317 | | } // namespace rtps |
2318 | | } // namespace fastdds |
2319 | | } // namespace eprosima |