/src/Fast-DDS/src/cpp/rtps/participant/RTPSParticipantImpl.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | /** |
16 | | * @file RTPSParticipant.cpp |
17 | | * |
18 | | */ |
19 | | |
20 | | #include <rtps/participant/RTPSParticipantImpl.h> |
21 | | |
22 | | #include <algorithm> |
23 | | #include <functional> |
24 | | #include <memory> |
25 | | #include <mutex> |
26 | | #include <sstream> |
27 | | |
28 | | #include <fastdds/dds/log/Log.hpp> |
29 | | #include <fastdds/rtps/attributes/ServerAttributes.h> |
30 | | #include <fastdds/rtps/builtin/BuiltinProtocols.h> |
31 | | #include <fastdds/rtps/builtin/discovery/endpoint/EDP.h> |
32 | | #include <fastdds/rtps/builtin/discovery/participant/PDPSimple.h> |
33 | | #include <fastdds/rtps/builtin/data/ParticipantProxyData.h> |
34 | | #include <fastdds/rtps/builtin/liveliness/WLP.h> |
35 | | #include <fastdds/rtps/history/WriterHistory.h> |
36 | | #include <fastdds/rtps/messages/MessageReceiver.h> |
37 | | #include <fastdds/rtps/participant/RTPSParticipant.h> |
38 | | #include <fastdds/rtps/reader/StatelessReader.h> |
39 | | #include <fastdds/rtps/reader/StatefulReader.h> |
40 | | #include <fastdds/rtps/reader/StatelessPersistentReader.h> |
41 | | #include <fastdds/rtps/reader/StatefulPersistentReader.h> |
42 | | #include <fastdds/rtps/RTPSDomain.h> |
43 | | #include <fastdds/rtps/transport/UDPv4TransportDescriptor.h> |
44 | | #include <fastdds/rtps/transport/TCPv4TransportDescriptor.h> |
45 | | #include <fastdds/rtps/transport/TCPv6TransportDescriptor.h> |
46 | | #include <fastdds/rtps/transport/shared_mem/SharedMemTransportDescriptor.h> |
47 | | #include <fastdds/rtps/writer/StatelessWriter.h> |
48 | | #include <fastdds/rtps/writer/StatefulWriter.h> |
49 | | #include <fastdds/rtps/writer/StatelessPersistentWriter.h> |
50 | | #include <fastdds/rtps/writer/StatefulPersistentWriter.h> |
51 | | |
52 | | #include <fastrtps/utils/IPFinder.h> |
53 | | #include <fastrtps/utils/Semaphore.h> |
54 | | #include <fastrtps/xmlparser/XMLProfileManager.h> |
55 | | |
56 | | #include <rtps/builtin/discovery/participant/PDPServer.hpp> |
57 | | #include <rtps/builtin/discovery/participant/PDPClient.h> |
58 | | #include <rtps/history/BasicPayloadPool.hpp> |
59 | | #include <rtps/persistence/PersistenceService.h> |
60 | | #include <statistics/rtps/GuidUtils.hpp> |
61 | | |
62 | | namespace eprosima { |
63 | | namespace fastrtps { |
64 | | namespace rtps { |
65 | | |
66 | | using UDPv4TransportDescriptor = fastdds::rtps::UDPv4TransportDescriptor; |
67 | | using TCPTransportDescriptor = fastdds::rtps::TCPTransportDescriptor; |
68 | | using SharedMemTransportDescriptor = fastdds::rtps::SharedMemTransportDescriptor; |
69 | | |
70 | | thread_local RTPSParticipantImpl* RTPSParticipantImpl::collections_mutex_owner_ = nullptr; |
71 | | |
72 | | static EntityId_t TrustedWriter( |
73 | | const EntityId_t& reader) |
74 | 0 | { |
75 | 0 | return |
76 | 0 | (reader == c_EntityId_SPDPReader) ? c_EntityId_SPDPWriter : |
77 | 0 | (reader == c_EntityId_SEDPPubReader) ? c_EntityId_SEDPPubWriter : |
78 | 0 | (reader == c_EntityId_SEDPSubReader) ? c_EntityId_SEDPSubWriter : |
79 | 0 | (reader == c_EntityId_ReaderLiveliness) ? c_EntityId_WriterLiveliness : |
80 | 0 | c_EntityId_Unknown; |
81 | 0 | } |
82 | | |
83 | | static bool should_be_intraprocess_only( |
84 | | const RTPSParticipantAttributes& att) |
85 | 0 | { |
86 | 0 | return |
87 | 0 | xmlparser::XMLProfileManager::library_settings().intraprocess_delivery == INTRAPROCESS_FULL && |
88 | 0 | att.builtin.discovery_config.ignoreParticipantFlags == |
89 | 0 | (ParticipantFilteringFlags::FILTER_DIFFERENT_HOST | ParticipantFilteringFlags::FILTER_DIFFERENT_PROCESS); |
90 | 0 | } |
91 | | |
92 | | static bool get_unique_flows_parameters( |
93 | | const RTPSParticipantAttributes& part_att, |
94 | | const EndpointAttributes& att, |
95 | | bool& unique_flows, |
96 | | uint16_t& initial_port, |
97 | | uint16_t& final_port) |
98 | 0 | { |
99 | 0 | const std::string* value = PropertyPolicyHelper::find_property(att.properties, "fastdds.unique_network_flows"); |
100 | |
|
101 | 0 | unique_flows = (nullptr != value); |
102 | 0 | if (unique_flows) |
103 | 0 | { |
104 | | // TODO (Miguel C): parse value to get port range |
105 | 0 | final_port = part_att.port.portBase; |
106 | 0 | initial_port = part_att.port.portBase - 400; |
107 | 0 | } |
108 | |
|
109 | 0 | return true; |
110 | 0 | } |
111 | | |
112 | | Locator_t& RTPSParticipantImpl::applyLocatorAdaptRule( |
113 | | Locator_t& loc) |
114 | 0 | { |
115 | | // This is a completely made up rule |
116 | | // It is transport responsibility to interpret this new port. |
117 | 0 | loc.port += m_att.port.participantIDGain; |
118 | 0 | return loc; |
119 | 0 | } |
120 | | |
121 | | RTPSParticipantImpl::RTPSParticipantImpl( |
122 | | uint32_t domain_id, |
123 | | const RTPSParticipantAttributes& PParam, |
124 | | const GuidPrefix_t& guidP, |
125 | | const GuidPrefix_t& persistence_guid, |
126 | | RTPSParticipant* par, |
127 | | RTPSParticipantListener* plisten) |
128 | | : domain_id_(domain_id) |
129 | | , m_att(PParam) |
130 | | , m_guid(guidP, c_EntityId_RTPSParticipant) |
131 | | , mp_builtinProtocols(nullptr) |
132 | | , mp_ResourceSemaphore(new Semaphore(0)) |
133 | | , IdCounter(0) |
134 | | , type_check_fn_(nullptr) |
135 | | , client_override_(false) |
136 | | , internal_metatraffic_locators_(false) |
137 | | , internal_default_locators_(false) |
138 | | #if HAVE_SECURITY |
139 | | , m_security_manager(this) |
140 | | #endif // if HAVE_SECURITY |
141 | | , mp_participantListener(plisten) |
142 | | , mp_userParticipant(par) |
143 | | , mp_mutex(new std::recursive_mutex()) |
144 | | , is_intraprocess_only_(should_be_intraprocess_only(PParam)) |
145 | | , has_shm_transport_(false) |
146 | 0 | { |
147 | 0 | if (c_GuidPrefix_Unknown != persistence_guid) |
148 | 0 | { |
149 | 0 | m_persistence_guid = GUID_t(persistence_guid, c_EntityId_RTPSParticipant); |
150 | 0 | } |
151 | | // Builtin transports by default |
152 | 0 | if (PParam.useBuiltinTransports) |
153 | 0 | { |
154 | 0 | UDPv4TransportDescriptor descriptor; |
155 | 0 | descriptor.sendBufferSize = m_att.sendSocketBufferSize; |
156 | 0 | descriptor.receiveBufferSize = m_att.listenSocketBufferSize; |
157 | 0 | m_network_Factory.RegisterTransport(&descriptor, &m_att.properties); |
158 | |
|
159 | 0 | #ifdef SHM_TRANSPORT_BUILTIN |
160 | 0 | SharedMemTransportDescriptor shm_transport; |
161 | | // We assume (Linux) UDP doubles the user socket buffer size in kernel, so |
162 | | // the equivalent segment size in SHM would be socket buffer size x 2 |
163 | 0 | auto segment_size_udp_equivalent = |
164 | 0 | std::max(m_att.sendSocketBufferSize, m_att.listenSocketBufferSize) * 2; |
165 | 0 | shm_transport.segment_size(segment_size_udp_equivalent); |
166 | | // Use same default max_message_size on both UDP and SHM |
167 | 0 | shm_transport.max_message_size(descriptor.max_message_size()); |
168 | 0 | has_shm_transport_ |= m_network_Factory.RegisterTransport(&shm_transport); |
169 | 0 | #endif // ifdef SHM_TRANSPORT_BUILTIN |
170 | 0 | } |
171 | | |
172 | | // BACKUP servers guid is its persistence one |
173 | 0 | if (PParam.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::BACKUP) |
174 | 0 | { |
175 | 0 | m_persistence_guid = m_guid; |
176 | 0 | } |
177 | | |
178 | | // Store the Guid in string format. |
179 | 0 | std::stringstream guid_sstr; |
180 | 0 | guid_sstr << m_guid; |
181 | 0 | guid_str_ = guid_sstr.str(); |
182 | | |
183 | | // Client-server discovery protocol requires that every TCP transport has a listening port |
184 | 0 | switch (PParam.builtin.discovery_config.discoveryProtocol) |
185 | 0 | { |
186 | 0 | case DiscoveryProtocol::BACKUP: |
187 | 0 | case DiscoveryProtocol::CLIENT: |
188 | 0 | case DiscoveryProtocol::SERVER: |
189 | 0 | case DiscoveryProtocol::SUPER_CLIENT: |
190 | | // Verify if listening ports are provided |
191 | 0 | for (auto& transportDescriptor : PParam.userTransports) |
192 | 0 | { |
193 | 0 | TCPTransportDescriptor* pT = dynamic_cast<TCPTransportDescriptor*>(transportDescriptor.get()); |
194 | 0 | if (pT && pT->listening_ports.empty()) |
195 | 0 | { |
196 | 0 | logInfo(RTPS_PARTICIPANT, |
197 | 0 | "Participant " << m_att.getName() << " with GUID " << m_guid << |
198 | 0 | " tries to use discovery server over TCP without providing a proper listening port."); |
199 | 0 | } |
200 | 0 | } |
201 | 0 | default: |
202 | 0 | break; |
203 | 0 | } |
204 | | |
205 | | |
206 | | // User defined transports |
207 | 0 | for (const auto& transportDescriptor : PParam.userTransports) |
208 | 0 | { |
209 | 0 | if (m_network_Factory.RegisterTransport(transportDescriptor.get(), &m_att.properties)) |
210 | 0 | { |
211 | 0 | has_shm_transport_ |= |
212 | 0 | (dynamic_cast<fastdds::rtps::SharedMemTransportDescriptor*>(transportDescriptor.get()) != nullptr); |
213 | 0 | } |
214 | 0 | else |
215 | 0 | { |
216 | | // SHM transport could be disabled |
217 | 0 | if ((dynamic_cast<fastdds::rtps::SharedMemTransportDescriptor*>(transportDescriptor.get()) != nullptr)) |
218 | 0 | { |
219 | 0 | logError(RTPS_PARTICIPANT, |
220 | 0 | "Unable to Register SHM Transport. SHM Transport is not supported in" |
221 | 0 | " the current platform."); |
222 | 0 | } |
223 | 0 | else |
224 | 0 | { |
225 | 0 | logError(RTPS_PARTICIPANT, |
226 | 0 | "User transport failed to register."); |
227 | 0 | } |
228 | |
|
229 | 0 | } |
230 | 0 | } |
231 | |
|
232 | 0 | mp_userParticipant->mp_impl = this; |
233 | 0 | mp_event_thr.init_thread(); |
234 | |
|
235 | 0 | if (!networkFactoryHasRegisteredTransports()) |
236 | 0 | { |
237 | 0 | return; |
238 | 0 | } |
239 | | |
240 | | /* If metatrafficMulticastLocatorList is empty, add mandatory default Locators |
241 | | Else -> Take them */ |
242 | | |
243 | | // Creation of metatraffic locator and receiver resources |
244 | 0 | uint32_t metatraffic_multicast_port = m_att.port.getMulticastPort(domain_id_); |
245 | 0 | uint32_t metatraffic_unicast_port = m_att.port.getUnicastPort(domain_id_, |
246 | 0 | static_cast<uint32_t>(m_att.participantID)); |
247 | 0 | uint32_t meta_multicast_port_for_check = metatraffic_multicast_port; |
248 | | |
249 | | /* INSERT DEFAULT MANDATORY MULTICAST LOCATORS HERE */ |
250 | 0 | if (m_att.builtin.metatrafficMulticastLocatorList.empty() && m_att.builtin.metatrafficUnicastLocatorList.empty()) |
251 | 0 | { |
252 | 0 | get_default_metatraffic_locators(); |
253 | 0 | internal_metatraffic_locators_ = true; |
254 | 0 | } |
255 | 0 | else |
256 | 0 | { |
257 | 0 | if (0 < m_att.builtin.metatrafficMulticastLocatorList.size() && |
258 | 0 | 0 != m_att.builtin.metatrafficMulticastLocatorList.begin()->port) |
259 | 0 | { |
260 | 0 | meta_multicast_port_for_check = m_att.builtin.metatrafficMulticastLocatorList.begin()->port; |
261 | 0 | } |
262 | 0 | std::for_each(m_att.builtin.metatrafficMulticastLocatorList.begin(), |
263 | 0 | m_att.builtin.metatrafficMulticastLocatorList.end(), [&](Locator_t& locator) |
264 | 0 | { |
265 | 0 | m_network_Factory.fillMetatrafficMulticastLocator(locator, metatraffic_multicast_port); |
266 | 0 | }); |
267 | 0 | m_network_Factory.NormalizeLocators(m_att.builtin.metatrafficMulticastLocatorList); |
268 | |
|
269 | 0 | std::for_each(m_att.builtin.metatrafficUnicastLocatorList.begin(), |
270 | 0 | m_att.builtin.metatrafficUnicastLocatorList.end(), [&](Locator_t& locator) |
271 | 0 | { |
272 | 0 | m_network_Factory.fillMetatrafficUnicastLocator(locator, metatraffic_unicast_port); |
273 | 0 | }); |
274 | 0 | m_network_Factory.NormalizeLocators(m_att.builtin.metatrafficUnicastLocatorList); |
275 | 0 | } |
276 | | |
277 | | // Initial peers |
278 | 0 | if (m_att.builtin.initialPeersList.empty()) |
279 | 0 | { |
280 | 0 | m_att.builtin.initialPeersList = m_att.builtin.metatrafficMulticastLocatorList; |
281 | 0 | } |
282 | 0 | else |
283 | 0 | { |
284 | 0 | LocatorList_t initial_peers; |
285 | 0 | initial_peers.swap(m_att.builtin.initialPeersList); |
286 | |
|
287 | 0 | std::for_each(initial_peers.begin(), initial_peers.end(), |
288 | 0 | [&](Locator_t& locator) |
289 | 0 | { |
290 | 0 | m_network_Factory.configureInitialPeerLocator(domain_id_, locator, m_att); |
291 | 0 | }); |
292 | 0 | } |
293 | | |
294 | | // Creation of user locator and receiver resources |
295 | | //If no default locators are defined we define some. |
296 | | /* The reasoning here is the following. |
297 | | If the parameters of the RTPS Participant don't hold default listening locators for the creation |
298 | | of Endpoints, we make some for Unicast only. |
299 | | If there is at least one listen locator of any kind, we do not create any default ones. |
300 | | If there are no sending locators defined, we create default ones for the transports we implement. |
301 | | */ |
302 | 0 | if (m_att.defaultUnicastLocatorList.empty() && m_att.defaultMulticastLocatorList.empty()) |
303 | 0 | { |
304 | | //Default Unicast Locators in case they have not been provided |
305 | | /* INSERT DEFAULT UNICAST LOCATORS FOR THE PARTICIPANT */ |
306 | 0 | get_default_unicast_locators(); |
307 | 0 | internal_default_locators_ = true; |
308 | 0 | logInfo(RTPS_PARTICIPANT, m_att.getName() << " Created with NO default Unicast Locator List, adding Locators:" |
309 | 0 | << m_att.defaultUnicastLocatorList); |
310 | 0 | } |
311 | 0 | else |
312 | 0 | { |
313 | | // Locator with port 0, calculate port. |
314 | 0 | std::for_each(m_att.defaultUnicastLocatorList.begin(), m_att.defaultUnicastLocatorList.end(), |
315 | 0 | [&](Locator_t& loc) |
316 | 0 | { |
317 | 0 | m_network_Factory.fill_default_locator_port(domain_id_, loc, m_att, false); |
318 | 0 | }); |
319 | 0 | m_network_Factory.NormalizeLocators(m_att.defaultUnicastLocatorList); |
320 | |
|
321 | 0 | std::for_each(m_att.defaultMulticastLocatorList.begin(), m_att.defaultMulticastLocatorList.end(), |
322 | 0 | [&](Locator_t& loc) |
323 | 0 | { |
324 | 0 | m_network_Factory.fill_default_locator_port(domain_id_, loc, m_att, true); |
325 | 0 | }); |
326 | 0 | } |
327 | |
|
328 | | #if HAVE_SECURITY |
329 | | // Start security |
330 | | if (!m_security_manager.init( |
331 | | security_attributes_, |
332 | | PParam.properties)) |
333 | | { |
334 | | // Participant will be deleted, no need to allocate buffers or create builtin endpoints |
335 | | return; |
336 | | } |
337 | | #endif // if HAVE_SECURITY |
338 | |
|
339 | 0 | if (is_intraprocess_only()) |
340 | 0 | { |
341 | 0 | m_att.builtin.metatrafficUnicastLocatorList.clear(); |
342 | 0 | m_att.defaultUnicastLocatorList.clear(); |
343 | 0 | m_att.defaultMulticastLocatorList.clear(); |
344 | 0 | } |
345 | |
|
346 | 0 | createReceiverResources(m_att.builtin.metatrafficMulticastLocatorList, true, false); |
347 | 0 | createReceiverResources(m_att.builtin.metatrafficUnicastLocatorList, true, false); |
348 | 0 | createReceiverResources(m_att.defaultUnicastLocatorList, true, false); |
349 | 0 | createReceiverResources(m_att.defaultMulticastLocatorList, true, false); |
350 | | |
351 | | // Check metatraffic multicast port |
352 | 0 | if (0 < m_att.builtin.metatrafficMulticastLocatorList.size() && |
353 | 0 | m_att.builtin.metatrafficMulticastLocatorList.begin()->port != meta_multicast_port_for_check) |
354 | 0 | { |
355 | 0 | logWarning(RTPS_PARTICIPANT, |
356 | 0 | "Metatraffic multicast port " << meta_multicast_port_for_check << " cannot be opened." |
357 | 0 | " It may is opened by another application. Discovery may fail."); |
358 | 0 | } |
359 | |
|
360 | 0 | bool allow_growing_buffers = m_att.allocation.send_buffers.dynamic; |
361 | 0 | size_t num_send_buffers = m_att.allocation.send_buffers.preallocated_number; |
362 | 0 | if (num_send_buffers == 0) |
363 | 0 | { |
364 | | // Two buffers (user, events) |
365 | 0 | num_send_buffers = 2; |
366 | | // Add one buffer per reception thread |
367 | 0 | num_send_buffers += m_receiverResourcelist.size(); |
368 | 0 | } |
369 | | |
370 | | // Create buffer pool |
371 | 0 | send_buffers_.reset(new SendBuffersManager(num_send_buffers, allow_growing_buffers)); |
372 | 0 | send_buffers_->init(this); |
373 | | |
374 | | // Initialize flow controller factory. |
375 | | // This must be done after initiate network layer. |
376 | 0 | flow_controller_factory_.init(this); |
377 | | |
378 | | // Support old API |
379 | 0 | if (PParam.throughputController.bytesPerPeriod != UINT32_MAX && PParam.throughputController.periodMillisecs != 0) |
380 | 0 | { |
381 | 0 | fastdds::rtps::FlowControllerDescriptor old_descriptor; |
382 | 0 | old_descriptor.name = guid_str_.c_str(); |
383 | 0 | old_descriptor.max_bytes_per_period = PParam.throughputController.bytesPerPeriod; |
384 | 0 | old_descriptor.period_ms = PParam.throughputController.periodMillisecs; |
385 | 0 | flow_controller_factory_.register_flow_controller(old_descriptor); |
386 | 0 | } |
387 | | |
388 | | // Register user's flow controllers. |
389 | 0 | for (auto flow_controller_desc : m_att.flow_controllers) |
390 | 0 | { |
391 | 0 | flow_controller_factory_.register_flow_controller(*flow_controller_desc.get()); |
392 | 0 | } |
393 | | |
394 | |
|
395 | | #if HAVE_SECURITY |
396 | | if (m_security_manager.is_security_active()) |
397 | | { |
398 | | if (!m_security_manager.create_entities()) |
399 | | { |
400 | | return; |
401 | | } |
402 | | } |
403 | | #endif // if HAVE_SECURITY |
404 | |
|
405 | 0 | mp_builtinProtocols = new BuiltinProtocols(); |
406 | | |
407 | | // Initialize builtin protocols |
408 | 0 | if (!mp_builtinProtocols->initBuiltinProtocols(this, m_att.builtin)) |
409 | 0 | { |
410 | 0 | logError(RTPS_PARTICIPANT, "The builtin protocols were not correctly initialized"); |
411 | 0 | return; |
412 | 0 | } |
413 | | |
414 | 0 | if (c_GuidPrefix_Unknown != persistence_guid) |
415 | 0 | { |
416 | 0 | logInfo(RTPS_PARTICIPANT, "RTPSParticipant \"" << m_att.getName() << "\" with guidPrefix: " << m_guid.guidPrefix |
417 | 0 | << " and persistence guid: " << persistence_guid); |
418 | 0 | } |
419 | 0 | else |
420 | 0 | { |
421 | 0 | logInfo(RTPS_PARTICIPANT, |
422 | 0 | "RTPSParticipant \"" << m_att.getName() << "\" with guidPrefix: " << m_guid.guidPrefix); |
423 | 0 | } |
424 | |
|
425 | 0 | initialized_ = true; |
426 | 0 | } |
427 | | |
428 | | RTPSParticipantImpl::RTPSParticipantImpl( |
429 | | uint32_t domain_id, |
430 | | const RTPSParticipantAttributes& PParam, |
431 | | const GuidPrefix_t& guidP, |
432 | | RTPSParticipant* par, |
433 | | RTPSParticipantListener* plisten) |
434 | | : RTPSParticipantImpl(domain_id, PParam, guidP, c_GuidPrefix_Unknown, par, plisten) |
435 | 0 | { |
436 | 0 | } |
437 | | |
438 | | void RTPSParticipantImpl::enable() |
439 | 0 | { |
440 | 0 | mp_builtinProtocols->enable(); |
441 | | |
442 | | //Start reception |
443 | 0 | for (auto& receiver : m_receiverResourcelist) |
444 | 0 | { |
445 | 0 | receiver.Receiver->RegisterReceiver(receiver.mp_receiver); |
446 | 0 | } |
447 | 0 | } |
448 | | |
449 | | void RTPSParticipantImpl::disable() |
450 | 0 | { |
451 | | // Disabling event thread also disables participant announcement, so there is no need to call |
452 | | // stopRTPSParticipantAnnouncement() |
453 | 0 | mp_event_thr.stop_thread(); |
454 | | |
455 | | // Disable Retries on Transports |
456 | 0 | m_network_Factory.Shutdown(); |
457 | | |
458 | | // Safely abort threads. |
459 | 0 | for (auto& block : m_receiverResourcelist) |
460 | 0 | { |
461 | 0 | block.Receiver->UnregisterReceiver(block.mp_receiver); |
462 | 0 | block.disable(); |
463 | 0 | } |
464 | |
|
465 | 0 | deleteAllUserEndpoints(); |
466 | |
|
467 | 0 | if (nullptr != mp_builtinProtocols) |
468 | 0 | { |
469 | 0 | delete(mp_builtinProtocols); |
470 | 0 | mp_builtinProtocols = nullptr; |
471 | 0 | } |
472 | 0 | } |
473 | | |
474 | | const std::vector<RTPSWriter*>& RTPSParticipantImpl::getAllWriters() const |
475 | 0 | { |
476 | 0 | return m_allWriterList; |
477 | 0 | } |
478 | | |
479 | | const std::vector<RTPSReader*>& RTPSParticipantImpl::getAllReaders() const |
480 | 0 | { |
481 | 0 | return m_allReaderList; |
482 | 0 | } |
483 | | |
484 | | RTPSParticipantImpl::~RTPSParticipantImpl() |
485 | 0 | { |
486 | 0 | disable(); |
487 | |
|
488 | | #if HAVE_SECURITY |
489 | | m_security_manager.destroy(); |
490 | | #endif // if HAVE_SECURITY |
491 | | |
492 | | // Destruct message receivers |
493 | 0 | for (auto& block : m_receiverResourcelist) |
494 | 0 | { |
495 | 0 | delete block.mp_receiver; |
496 | 0 | } |
497 | 0 | m_receiverResourcelist.clear(); |
498 | |
|
499 | 0 | delete mp_ResourceSemaphore; |
500 | 0 | delete mp_userParticipant; |
501 | 0 | mp_userParticipant = nullptr; |
502 | 0 | send_resource_list_.clear(); |
503 | |
|
504 | 0 | delete mp_mutex; |
505 | 0 | } |
506 | | |
507 | | template <EndpointKind_t kind, octet no_key, octet with_key> |
508 | | bool RTPSParticipantImpl::preprocess_endpoint_attributes( |
509 | | const EntityId_t& entity_id, |
510 | | uint32_t& id_counter, |
511 | | EndpointAttributes& att, |
512 | | EntityId_t& entId) |
513 | 0 | { |
514 | 0 | const char* debug_label = (att.endpointKind == WRITER ? "writer" : "reader"); |
515 | |
|
516 | 0 | if (!att.unicastLocatorList.isValid()) |
517 | 0 | { |
518 | 0 | logError(RTPS_PARTICIPANT, "Unicast Locator List for " << debug_label << " contains invalid Locator"); |
519 | 0 | return false; |
520 | 0 | } |
521 | 0 | if (!att.multicastLocatorList.isValid()) |
522 | 0 | { |
523 | 0 | logError(RTPS_PARTICIPANT, "Multicast Locator List for " << debug_label << " contains invalid Locator"); |
524 | 0 | return false; |
525 | 0 | } |
526 | 0 | if (!att.remoteLocatorList.isValid()) |
527 | 0 | { |
528 | 0 | logError(RTPS_PARTICIPANT, "Remote Locator List for " << debug_label << " contains invalid Locator"); |
529 | 0 | return false; |
530 | 0 | } |
531 | | |
532 | 0 | if (entity_id == c_EntityId_Unknown) |
533 | 0 | { |
534 | 0 | if (att.topicKind == NO_KEY) |
535 | 0 | { |
536 | 0 | entId.value[3] = (-2 == att.getUserDefinedID() && 0 < att.getEntityID()) ? (0x60) | no_key : no_key; |
537 | 0 | } |
538 | 0 | else if (att.topicKind == WITH_KEY) |
539 | 0 | { |
540 | 0 | entId.value[3] = (-2 == att.getUserDefinedID() && 0 < att.getEntityID()) ? (0x60) | with_key : with_key; |
541 | 0 | } |
542 | 0 | uint32_t idnum; |
543 | 0 | if (att.getEntityID() > 0) |
544 | 0 | { |
545 | 0 | idnum = static_cast<uint32_t>(att.getEntityID()); |
546 | 0 | } |
547 | 0 | else |
548 | 0 | { |
549 | 0 | idnum = ++id_counter; |
550 | 0 | } |
551 | |
|
552 | 0 | entId.value[2] = octet(idnum); |
553 | 0 | entId.value[1] = octet(idnum >> 8); |
554 | 0 | entId.value[0] = octet(idnum >> 16); |
555 | 0 | } |
556 | 0 | else |
557 | 0 | { |
558 | 0 | entId = entity_id; |
559 | 0 | } |
560 | |
|
561 | 0 | if (att.persistence_guid == c_Guid_Unknown) |
562 | 0 | { |
563 | | // Try to load persistence_guid from property |
564 | 0 | const std::string* persistence_guid_property = PropertyPolicyHelper::find_property( |
565 | 0 | att.properties, "dds.persistence.guid"); |
566 | 0 | if (persistence_guid_property != nullptr) |
567 | 0 | { |
568 | | // Load persistence_guid from property |
569 | 0 | std::istringstream(persistence_guid_property->c_str()) >> att.persistence_guid; |
570 | 0 | if (att.persistence_guid == c_Guid_Unknown) |
571 | 0 | { |
572 | | // Wrongly configured property |
573 | 0 | logError(RTPS_PARTICIPANT, "Cannot configure " << debug_label << "'s persistence GUID from '" |
574 | 0 | << persistence_guid_property->c_str() |
575 | 0 | << "'. Wrong input"); |
576 | 0 | return false; |
577 | 0 | } |
578 | 0 | } |
579 | 0 | } |
580 | | |
581 | | // Error log level can be disable. Avoid unused warning |
582 | 0 | static_cast<void>(debug_label); |
583 | |
|
584 | 0 | return true; |
585 | 0 | } Unexecuted instantiation: bool eprosima::fastrtps::rtps::RTPSParticipantImpl::preprocess_endpoint_attributes<(eprosima::fastrtps::rtps::EndpointKind_t)1, (unsigned char)3, (unsigned char)2>(eprosima::fastrtps::rtps::EntityId_t const&, unsigned int&, eprosima::fastrtps::rtps::EndpointAttributes&, eprosima::fastrtps::rtps::EntityId_t&) Unexecuted instantiation: bool eprosima::fastrtps::rtps::RTPSParticipantImpl::preprocess_endpoint_attributes<(eprosima::fastrtps::rtps::EndpointKind_t)0, (unsigned char)4, (unsigned char)7>(eprosima::fastrtps::rtps::EntityId_t const&, unsigned int&, eprosima::fastrtps::rtps::EndpointAttributes&, eprosima::fastrtps::rtps::EntityId_t&) |
586 | | |
587 | | template<typename Functor> |
588 | | bool RTPSParticipantImpl::create_writer( |
589 | | RTPSWriter** writer_out, |
590 | | WriterAttributes& param, |
591 | | const EntityId_t& entity_id, |
592 | | bool is_builtin, |
593 | | const Functor& callback) |
594 | 0 | { |
595 | 0 | std::string type = (param.endpoint.reliabilityKind == RELIABLE) ? "RELIABLE" : "BEST_EFFORT"; |
596 | 0 | logInfo(RTPS_PARTICIPANT, "Creating writer of type " << type); |
597 | 0 | EntityId_t entId; |
598 | 0 | if (!preprocess_endpoint_attributes<WRITER, 0x03, 0x02>(entity_id, IdCounter, param.endpoint, entId)) |
599 | 0 | { |
600 | 0 | return false; |
601 | 0 | } |
602 | | |
603 | 0 | if (existsEntityId(entId, WRITER)) |
604 | 0 | { |
605 | 0 | logError(RTPS_PARTICIPANT, |
606 | 0 | "A writer with the same entityId already exists in this RTPSParticipant"); |
607 | 0 | return false; |
608 | 0 | } |
609 | | |
610 | 0 | GUID_t guid(m_guid.guidPrefix, entId); |
611 | 0 | fastdds::rtps::FlowController* flow_controller = nullptr; |
612 | 0 | const char* flow_controller_name = param.flow_controller_name; |
613 | | |
614 | | // Support of old flow controller style. |
615 | 0 | if (param.throughputController.bytesPerPeriod != UINT32_MAX && param.throughputController.periodMillisecs != 0) |
616 | 0 | { |
617 | 0 | flow_controller_name = guid_str_.c_str(); |
618 | 0 | if (ASYNCHRONOUS_WRITER == param.mode) |
619 | 0 | { |
620 | 0 | fastdds::rtps::FlowControllerDescriptor old_descriptor; |
621 | 0 | old_descriptor.name = guid_str_.c_str(); |
622 | 0 | old_descriptor.max_bytes_per_period = param.throughputController.bytesPerPeriod; |
623 | 0 | old_descriptor.period_ms = param.throughputController.periodMillisecs; |
624 | 0 | flow_controller_factory_.register_flow_controller(old_descriptor); |
625 | 0 | flow_controller = flow_controller_factory_.retrieve_flow_controller(guid_str_.c_str(), param); |
626 | 0 | } |
627 | 0 | else |
628 | 0 | { |
629 | 0 | logWarning(RTPS_PARTICIPANT, |
630 | 0 | "Throughput flow controller was configured while writer's publish mode is configured as synchronous." \ |
631 | 0 | "Throughput flow controller configuration is not taken into account.") |
632 | |
|
633 | 0 | } |
634 | 0 | } |
635 | 0 | if (m_att.throughputController.bytesPerPeriod != UINT32_MAX && m_att.throughputController.periodMillisecs != 0) |
636 | 0 | { |
637 | 0 | if (ASYNCHRONOUS_WRITER == param.mode && nullptr == flow_controller) |
638 | 0 | { |
639 | 0 | flow_controller_name = guid_str_.c_str(); |
640 | 0 | flow_controller = flow_controller_factory_.retrieve_flow_controller(guid_str_, param); |
641 | 0 | } |
642 | 0 | else |
643 | 0 | { |
644 | 0 | logWarning(RTPS_PARTICIPANT, |
645 | 0 | "Throughput flow controller was configured while writer's publish mode is configured as synchronous." \ |
646 | 0 | "Throughput flow controller configuration is not taken into account.") |
647 | 0 | } |
648 | 0 | } |
649 | | |
650 | | // Retrieve flow controller. |
651 | | // If not default flow controller, publish_mode must be asynchronously. |
652 | 0 | if (nullptr == flow_controller && |
653 | 0 | (fastdds::rtps::FASTDDS_FLOW_CONTROLLER_DEFAULT == flow_controller_name || |
654 | 0 | ASYNCHRONOUS_WRITER == param.mode)) |
655 | 0 | { |
656 | 0 | flow_controller = flow_controller_factory_.retrieve_flow_controller(flow_controller_name, param); |
657 | 0 | } |
658 | |
|
659 | 0 | if (nullptr == flow_controller) |
660 | 0 | { |
661 | 0 | if (fastdds::rtps::FASTDDS_FLOW_CONTROLLER_DEFAULT != flow_controller_name && |
662 | 0 | SYNCHRONOUS_WRITER == param.mode) |
663 | 0 | { |
664 | 0 | logError(RTPS_PARTICIPANT, "Cannot use a flow controller in synchronously publication mode."); |
665 | 0 | } |
666 | 0 | else |
667 | 0 | { |
668 | 0 | logError(RTPS_PARTICIPANT, "Cannot create the writer. Couldn't find flow controller " |
669 | 0 | << flow_controller_name << " for writer."); |
670 | 0 | } |
671 | 0 | return false; |
672 | 0 | } |
673 | | |
674 | | // Check for unique_network_flows feature |
675 | 0 | if (nullptr != PropertyPolicyHelper::find_property(param.endpoint.properties, "fastdds.unique_network_flows")) |
676 | 0 | { |
677 | 0 | logError(RTPS_PARTICIPANT, "Unique network flows not supported on writers"); |
678 | 0 | return false; |
679 | 0 | } |
680 | | |
681 | | // Special case for DiscoveryProtocol::BACKUP, which abuses persistence guid |
682 | 0 | GUID_t former_persistence_guid = param.endpoint.persistence_guid; |
683 | 0 | if (param.endpoint.persistence_guid == c_Guid_Unknown) |
684 | 0 | { |
685 | 0 | if (m_persistence_guid != c_Guid_Unknown) |
686 | 0 | { |
687 | | // Generate persistence guid from participant persistence guid |
688 | 0 | param.endpoint.persistence_guid = GUID_t( |
689 | 0 | m_persistence_guid.guidPrefix, |
690 | 0 | entity_id); |
691 | 0 | } |
692 | 0 | } |
693 | | |
694 | | // Get persistence service |
695 | 0 | IPersistenceService* persistence = nullptr; |
696 | 0 | if (!get_persistence_service(is_builtin, param.endpoint, persistence)) |
697 | 0 | { |
698 | 0 | return false; |
699 | 0 | } |
700 | | |
701 | 0 | normalize_endpoint_locators(param.endpoint); |
702 | |
|
703 | 0 | RTPSWriter* SWriter = nullptr; |
704 | 0 | SWriter = callback(guid, param, flow_controller, persistence, param.endpoint.reliabilityKind == RELIABLE); |
705 | | |
706 | | // restore attributes |
707 | 0 | param.endpoint.persistence_guid = former_persistence_guid; |
708 | |
|
709 | 0 | if (SWriter == nullptr) |
710 | 0 | { |
711 | 0 | return false; |
712 | 0 | } |
713 | | |
714 | 0 | if (!SWriter->is_pool_initialized()) |
715 | 0 | { |
716 | 0 | delete(SWriter); |
717 | 0 | return false; |
718 | 0 | } |
719 | | |
720 | | #if HAVE_SECURITY |
721 | | if (!is_builtin) |
722 | | { |
723 | | if (!m_security_manager.register_local_writer(SWriter->getGuid(), |
724 | | param.endpoint.properties, SWriter->getAttributes().security_attributes())) |
725 | | { |
726 | | delete(SWriter); |
727 | | return false; |
728 | | } |
729 | | } |
730 | | else |
731 | | { |
732 | | if (!m_security_manager.register_local_builtin_writer(SWriter->getGuid(), |
733 | | SWriter->getAttributes().security_attributes())) |
734 | | { |
735 | | delete(SWriter); |
736 | | return false; |
737 | | } |
738 | | } |
739 | | #endif // if HAVE_SECURITY |
740 | | |
741 | 0 | createSendResources(SWriter); |
742 | 0 | if (param.endpoint.reliabilityKind == RELIABLE) |
743 | 0 | { |
744 | 0 | if (!createAndAssociateReceiverswithEndpoint(SWriter)) |
745 | 0 | { |
746 | 0 | delete(SWriter); |
747 | 0 | return false; |
748 | 0 | } |
749 | 0 | } |
750 | | |
751 | 0 | { |
752 | 0 | std::lock_guard<shared_mutex> _(endpoints_list_mutex); |
753 | 0 | m_allWriterList.push_back(SWriter); |
754 | |
|
755 | 0 | if (!is_builtin) |
756 | 0 | { |
757 | 0 | m_userWriterList.push_back(SWriter); |
758 | 0 | } |
759 | 0 | } |
760 | 0 | *writer_out = SWriter; |
761 | |
|
762 | | #ifdef FASTDDS_STATISTICS |
763 | | |
764 | | if (!is_builtin) |
765 | | { |
766 | | // Register all compatible statistical listeners |
767 | | for_each_listener([this, &guid](Key listener) |
768 | | { |
769 | | if (are_writers_involved(listener->mask())) |
770 | | { |
771 | | register_in_writer(listener->get_shared_ptr(), guid); |
772 | | } |
773 | | }); |
774 | | } |
775 | | |
776 | | #endif // FASTDDS_STATISTICS |
777 | |
|
778 | 0 | return true; |
779 | 0 | } Unexecuted instantiation: RTPSParticipantImpl.cpp:bool eprosima::fastrtps::rtps::RTPSParticipantImpl::create_writer<eprosima::fastrtps::rtps::RTPSParticipantImpl::createWriter(eprosima::fastrtps::rtps::RTPSWriter**, eprosima::fastrtps::rtps::WriterAttributes&, eprosima::fastrtps::rtps::WriterHistory*, eprosima::fastrtps::rtps::WriterListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool)::$_5>(eprosima::fastrtps::rtps::RTPSWriter**, eprosima::fastrtps::rtps::WriterAttributes&, eprosima::fastrtps::rtps::EntityId_t const&, bool, eprosima::fastrtps::rtps::RTPSParticipantImpl::createWriter(eprosima::fastrtps::rtps::RTPSWriter**, eprosima::fastrtps::rtps::WriterAttributes&, eprosima::fastrtps::rtps::WriterHistory*, eprosima::fastrtps::rtps::WriterListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool)::$_5 const&) Unexecuted instantiation: RTPSParticipantImpl.cpp:bool eprosima::fastrtps::rtps::RTPSParticipantImpl::create_writer<eprosima::fastrtps::rtps::RTPSParticipantImpl::createWriter(eprosima::fastrtps::rtps::RTPSWriter**, eprosima::fastrtps::rtps::WriterAttributes&, std::__1::shared_ptr<eprosima::fastrtps::rtps::IPayloadPool> const&, eprosima::fastrtps::rtps::WriterHistory*, eprosima::fastrtps::rtps::WriterListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool)::$_6>(eprosima::fastrtps::rtps::RTPSWriter**, eprosima::fastrtps::rtps::WriterAttributes&, eprosima::fastrtps::rtps::EntityId_t const&, bool, eprosima::fastrtps::rtps::RTPSParticipantImpl::createWriter(eprosima::fastrtps::rtps::RTPSWriter**, eprosima::fastrtps::rtps::WriterAttributes&, std::__1::shared_ptr<eprosima::fastrtps::rtps::IPayloadPool> const&, eprosima::fastrtps::rtps::WriterHistory*, eprosima::fastrtps::rtps::WriterListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool)::$_6 const&) Unexecuted instantiation: RTPSParticipantImpl.cpp:bool eprosima::fastrtps::rtps::RTPSParticipantImpl::create_writer<eprosima::fastrtps::rtps::RTPSParticipantImpl::create_writer(eprosima::fastrtps::rtps::RTPSWriter**, eprosima::fastrtps::rtps::WriterAttributes&, std::__1::shared_ptr<eprosima::fastrtps::rtps::IPayloadPool> const&, std::__1::shared_ptr<eprosima::fastrtps::rtps::IChangePool> const&, eprosima::fastrtps::rtps::WriterHistory*, eprosima::fastrtps::rtps::WriterListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool)::$_7>(eprosima::fastrtps::rtps::RTPSWriter**, eprosima::fastrtps::rtps::WriterAttributes&, eprosima::fastrtps::rtps::EntityId_t const&, bool, eprosima::fastrtps::rtps::RTPSParticipantImpl::create_writer(eprosima::fastrtps::rtps::RTPSWriter**, eprosima::fastrtps::rtps::WriterAttributes&, std::__1::shared_ptr<eprosima::fastrtps::rtps::IPayloadPool> const&, std::__1::shared_ptr<eprosima::fastrtps::rtps::IChangePool> const&, eprosima::fastrtps::rtps::WriterHistory*, eprosima::fastrtps::rtps::WriterListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool)::$_7 const&) |
780 | | |
781 | | template <typename Functor> |
782 | | bool RTPSParticipantImpl::create_reader( |
783 | | RTPSReader** reader_out, |
784 | | ReaderAttributes& param, |
785 | | const EntityId_t& entity_id, |
786 | | bool is_builtin, |
787 | | bool enable, |
788 | | const Functor& callback) |
789 | 0 | { |
790 | 0 | std::string type = (param.endpoint.reliabilityKind == RELIABLE) ? "RELIABLE" : "BEST_EFFORT"; |
791 | 0 | logInfo(RTPS_PARTICIPANT, "Creating reader of type " << type); |
792 | 0 | EntityId_t entId; |
793 | 0 | if (!preprocess_endpoint_attributes<READER, 0x04, 0x07>(entity_id, IdCounter, param.endpoint, entId)) |
794 | 0 | { |
795 | 0 | return false; |
796 | 0 | } |
797 | | |
798 | 0 | if (existsEntityId(entId, READER)) |
799 | 0 | { |
800 | 0 | logError(RTPS_PARTICIPANT, |
801 | 0 | "A reader with the same entityId already exists in this RTPSParticipant"); |
802 | 0 | return false; |
803 | 0 | } |
804 | | |
805 | | // Special case for DiscoveryProtocol::BACKUP, which abuses persistence guid |
806 | 0 | GUID_t former_persistence_guid = param.endpoint.persistence_guid; |
807 | 0 | if (param.endpoint.persistence_guid == c_Guid_Unknown) |
808 | 0 | { |
809 | 0 | if (m_persistence_guid != c_Guid_Unknown) |
810 | 0 | { |
811 | | // Generate persistence guid from participant persistence guid |
812 | 0 | param.endpoint.persistence_guid = GUID_t( |
813 | 0 | m_persistence_guid.guidPrefix, |
814 | 0 | entity_id); |
815 | 0 | } |
816 | 0 | } |
817 | | |
818 | | // Get persistence service |
819 | 0 | IPersistenceService* persistence = nullptr; |
820 | 0 | if (!get_persistence_service(is_builtin, param.endpoint, persistence)) |
821 | 0 | { |
822 | 0 | return false; |
823 | 0 | } |
824 | | |
825 | | // Check for unique_network_flows feature |
826 | 0 | bool request_unique_flows = false; |
827 | 0 | uint16_t initial_port = 0; |
828 | 0 | uint16_t final_port = 0; |
829 | 0 | if (!get_unique_flows_parameters(m_att, param.endpoint, request_unique_flows, initial_port, final_port)) |
830 | 0 | { |
831 | 0 | return false; |
832 | 0 | } |
833 | | |
834 | 0 | normalize_endpoint_locators(param.endpoint); |
835 | |
|
836 | 0 | RTPSReader* SReader = nullptr; |
837 | 0 | GUID_t guid(m_guid.guidPrefix, entId); |
838 | 0 | SReader = callback(guid, param, persistence, param.endpoint.reliabilityKind == RELIABLE); |
839 | | |
840 | | // restore attributes |
841 | 0 | param.endpoint.persistence_guid = former_persistence_guid; |
842 | |
|
843 | 0 | if (SReader == nullptr) |
844 | 0 | { |
845 | 0 | return false; |
846 | 0 | } |
847 | | |
848 | | #if HAVE_SECURITY |
849 | | |
850 | | if (!is_builtin) |
851 | | { |
852 | | if (!m_security_manager.register_local_reader(SReader->getGuid(), |
853 | | param.endpoint.properties, SReader->getAttributes().security_attributes())) |
854 | | { |
855 | | delete(SReader); |
856 | | return false; |
857 | | } |
858 | | } |
859 | | else |
860 | | { |
861 | | if (!m_security_manager.register_local_builtin_reader(SReader->getGuid(), |
862 | | SReader->getAttributes().security_attributes())) |
863 | | { |
864 | | delete(SReader); |
865 | | return false; |
866 | | } |
867 | | } |
868 | | #endif // if HAVE_SECURITY |
869 | | |
870 | 0 | if (param.endpoint.reliabilityKind == RELIABLE) |
871 | 0 | { |
872 | 0 | createSendResources(SReader); |
873 | 0 | } |
874 | |
|
875 | 0 | if (is_builtin) |
876 | 0 | { |
877 | 0 | SReader->setTrustedWriter(TrustedWriter(SReader->getGuid().entityId)); |
878 | 0 | } |
879 | |
|
880 | 0 | if (enable) |
881 | 0 | { |
882 | 0 | if (!createAndAssociateReceiverswithEndpoint(SReader, request_unique_flows, initial_port, final_port)) |
883 | 0 | { |
884 | 0 | delete(SReader); |
885 | 0 | return false; |
886 | 0 | } |
887 | 0 | } |
888 | | |
889 | 0 | { |
890 | 0 | std::lock_guard<shared_mutex> _(endpoints_list_mutex); |
891 | |
|
892 | 0 | m_allReaderList.push_back(SReader); |
893 | |
|
894 | 0 | if (!is_builtin) |
895 | 0 | { |
896 | 0 | m_userReaderList.push_back(SReader); |
897 | 0 | } |
898 | 0 | } |
899 | 0 | *reader_out = SReader; |
900 | |
|
901 | | #ifdef FASTDDS_STATISTICS |
902 | | |
903 | | if (!is_builtin) |
904 | | { |
905 | | // Register all compatible statistical listeners |
906 | | for_each_listener([this, &guid](Key listener) |
907 | | { |
908 | | if (are_readers_involved(listener->mask())) |
909 | | { |
910 | | register_in_reader(listener->get_shared_ptr(), guid); |
911 | | } |
912 | | }); |
913 | | } |
914 | | |
915 | | #endif // FASTDDS_STATISTICS |
916 | |
|
917 | 0 | return true; |
918 | 0 | } Unexecuted instantiation: RTPSParticipantImpl.cpp:bool eprosima::fastrtps::rtps::RTPSParticipantImpl::create_reader<eprosima::fastrtps::rtps::RTPSParticipantImpl::createReader(eprosima::fastrtps::rtps::RTPSReader**, eprosima::fastrtps::rtps::ReaderAttributes&, eprosima::fastrtps::rtps::ReaderHistory*, eprosima::fastrtps::rtps::ReaderListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool, bool)::$_8>(eprosima::fastrtps::rtps::RTPSReader**, eprosima::fastrtps::rtps::ReaderAttributes&, eprosima::fastrtps::rtps::EntityId_t const&, bool, bool, eprosima::fastrtps::rtps::RTPSParticipantImpl::createReader(eprosima::fastrtps::rtps::RTPSReader**, eprosima::fastrtps::rtps::ReaderAttributes&, eprosima::fastrtps::rtps::ReaderHistory*, eprosima::fastrtps::rtps::ReaderListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool, bool)::$_8 const&) Unexecuted instantiation: RTPSParticipantImpl.cpp:bool eprosima::fastrtps::rtps::RTPSParticipantImpl::create_reader<eprosima::fastrtps::rtps::RTPSParticipantImpl::createReader(eprosima::fastrtps::rtps::RTPSReader**, eprosima::fastrtps::rtps::ReaderAttributes&, std::__1::shared_ptr<eprosima::fastrtps::rtps::IPayloadPool> const&, eprosima::fastrtps::rtps::ReaderHistory*, eprosima::fastrtps::rtps::ReaderListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool, bool)::$_9>(eprosima::fastrtps::rtps::RTPSReader**, eprosima::fastrtps::rtps::ReaderAttributes&, eprosima::fastrtps::rtps::EntityId_t const&, bool, bool, eprosima::fastrtps::rtps::RTPSParticipantImpl::createReader(eprosima::fastrtps::rtps::RTPSReader**, eprosima::fastrtps::rtps::ReaderAttributes&, std::__1::shared_ptr<eprosima::fastrtps::rtps::IPayloadPool> const&, eprosima::fastrtps::rtps::ReaderHistory*, eprosima::fastrtps::rtps::ReaderListener*, eprosima::fastrtps::rtps::EntityId_t const&, bool, bool)::$_9 const&) |
919 | | |
920 | | /* |
921 | | * |
922 | | * MAIN RTPSParticipant IMPL API |
923 | | * |
924 | | */ |
925 | | bool RTPSParticipantImpl::createWriter( |
926 | | RTPSWriter** WriterOut, |
927 | | WriterAttributes& param, |
928 | | WriterHistory* hist, |
929 | | WriterListener* listen, |
930 | | const EntityId_t& entityId, |
931 | | bool isBuiltin) |
932 | 0 | { |
933 | 0 | auto callback = [hist, listen, this] |
934 | 0 | (const GUID_t& guid, WriterAttributes& param, fastdds::rtps::FlowController* flow_controller, |
935 | 0 | IPersistenceService* persistence, bool is_reliable) -> RTPSWriter* |
936 | 0 | { |
937 | 0 | if (is_reliable) |
938 | 0 | { |
939 | 0 | if (persistence != nullptr) |
940 | 0 | { |
941 | 0 | return new StatefulPersistentWriter(this, guid, param, flow_controller, |
942 | 0 | hist, listen, persistence); |
943 | 0 | } |
944 | 0 | else |
945 | 0 | { |
946 | 0 | return new StatefulWriter(this, guid, param, flow_controller, |
947 | 0 | hist, listen); |
948 | 0 | } |
949 | 0 | } |
950 | 0 | else |
951 | 0 | { |
952 | 0 | if (persistence != nullptr) |
953 | 0 | { |
954 | 0 | return new StatelessPersistentWriter(this, guid, param, flow_controller, |
955 | 0 | hist, listen, persistence); |
956 | 0 | } |
957 | 0 | else |
958 | 0 | { |
959 | 0 | return new StatelessWriter(this, guid, param, flow_controller, |
960 | 0 | hist, listen); |
961 | 0 | } |
962 | 0 | } |
963 | 0 | }; |
964 | 0 | return create_writer(WriterOut, param, entityId, isBuiltin, callback); |
965 | 0 | } |
966 | | |
967 | | bool RTPSParticipantImpl::createWriter( |
968 | | RTPSWriter** WriterOut, |
969 | | WriterAttributes& param, |
970 | | const std::shared_ptr<IPayloadPool>& payload_pool, |
971 | | WriterHistory* hist, |
972 | | WriterListener* listen, |
973 | | const EntityId_t& entityId, |
974 | | bool isBuiltin) |
975 | 0 | { |
976 | 0 | if (!payload_pool) |
977 | 0 | { |
978 | 0 | logError(RTPS_PARTICIPANT, "Trying to create writer with null payload pool"); |
979 | 0 | return false; |
980 | 0 | } |
981 | | |
982 | 0 | auto callback = [hist, listen, &payload_pool, this] |
983 | 0 | (const GUID_t& guid, WriterAttributes& param, fastdds::rtps::FlowController* flow_controller, |
984 | 0 | IPersistenceService* persistence, bool is_reliable) -> RTPSWriter* |
985 | 0 | { |
986 | 0 | if (is_reliable) |
987 | 0 | { |
988 | 0 | if (persistence != nullptr) |
989 | 0 | { |
990 | 0 | return new StatefulPersistentWriter(this, guid, param, payload_pool, flow_controller, |
991 | 0 | hist, listen, persistence); |
992 | 0 | } |
993 | 0 | else |
994 | 0 | { |
995 | 0 | return new StatefulWriter(this, guid, param, payload_pool, flow_controller, |
996 | 0 | hist, listen); |
997 | 0 | } |
998 | 0 | } |
999 | 0 | else |
1000 | 0 | { |
1001 | 0 | if (persistence != nullptr) |
1002 | 0 | { |
1003 | 0 | return new StatelessPersistentWriter(this, guid, param, payload_pool, flow_controller, |
1004 | 0 | hist, listen, persistence); |
1005 | 0 | } |
1006 | 0 | else |
1007 | 0 | { |
1008 | 0 | return new StatelessWriter(this, guid, param, payload_pool, flow_controller, |
1009 | 0 | hist, listen); |
1010 | 0 | } |
1011 | 0 | } |
1012 | 0 | }; |
1013 | 0 | return create_writer(WriterOut, param, entityId, isBuiltin, callback); |
1014 | 0 | } |
1015 | | |
1016 | | bool RTPSParticipantImpl::create_writer( |
1017 | | RTPSWriter** WriterOut, |
1018 | | WriterAttributes& watt, |
1019 | | const std::shared_ptr<IPayloadPool>& payload_pool, |
1020 | | const std::shared_ptr<IChangePool>& change_pool, |
1021 | | WriterHistory* hist, |
1022 | | WriterListener* listen, |
1023 | | const EntityId_t& entityId, |
1024 | | bool isBuiltin) |
1025 | 0 | { |
1026 | 0 | if (!payload_pool) |
1027 | 0 | { |
1028 | 0 | logError(RTPS_PARTICIPANT, "Trying to create writer with null payload pool"); |
1029 | 0 | return false; |
1030 | 0 | } |
1031 | | |
1032 | 0 | auto callback = [hist, listen, &payload_pool, &change_pool, this] |
1033 | 0 | (const GUID_t& guid, WriterAttributes& watt, fastdds::rtps::FlowController* flow_controller, |
1034 | 0 | IPersistenceService* persistence, bool is_reliable) -> RTPSWriter* |
1035 | 0 | { |
1036 | 0 | if (is_reliable) |
1037 | 0 | { |
1038 | 0 | if (persistence != nullptr) |
1039 | 0 | { |
1040 | 0 | return new StatefulPersistentWriter(this, guid, watt, payload_pool, change_pool, |
1041 | 0 | flow_controller, hist, listen, persistence); |
1042 | 0 | } |
1043 | 0 | else |
1044 | 0 | { |
1045 | 0 | return new StatefulWriter(this, guid, watt, payload_pool, change_pool, |
1046 | 0 | flow_controller, hist, listen); |
1047 | 0 | } |
1048 | 0 | } |
1049 | 0 | else |
1050 | 0 | { |
1051 | 0 | if (persistence != nullptr) |
1052 | 0 | { |
1053 | 0 | return new StatelessPersistentWriter(this, guid, watt, payload_pool, change_pool, |
1054 | 0 | flow_controller, hist, listen, persistence); |
1055 | 0 | } |
1056 | 0 | else |
1057 | 0 | { |
1058 | 0 | return new StatelessWriter(this, guid, watt, payload_pool, change_pool, |
1059 | 0 | flow_controller, hist, listen); |
1060 | 0 | } |
1061 | 0 | } |
1062 | 0 | }; |
1063 | 0 | return create_writer(WriterOut, watt, entityId, isBuiltin, callback); |
1064 | 0 | } |
1065 | | |
1066 | | bool RTPSParticipantImpl::createReader( |
1067 | | RTPSReader** ReaderOut, |
1068 | | ReaderAttributes& param, |
1069 | | ReaderHistory* hist, |
1070 | | ReaderListener* listen, |
1071 | | const EntityId_t& entityId, |
1072 | | bool isBuiltin, |
1073 | | bool enable) |
1074 | 0 | { |
1075 | 0 | auto callback = [hist, listen, this] |
1076 | 0 | (const GUID_t& guid, ReaderAttributes& param, IPersistenceService* persistence, |
1077 | 0 | bool is_reliable) -> RTPSReader* |
1078 | 0 | { |
1079 | 0 | if (is_reliable) |
1080 | 0 | { |
1081 | 0 | if (persistence != nullptr) |
1082 | 0 | { |
1083 | 0 | return new StatefulPersistentReader(this, guid, param, hist, listen, persistence); |
1084 | 0 | } |
1085 | 0 | else |
1086 | 0 | { |
1087 | 0 | return new StatefulReader(this, guid, param, hist, listen); |
1088 | 0 | } |
1089 | 0 | } |
1090 | 0 | else |
1091 | 0 | { |
1092 | 0 | if (persistence != nullptr) |
1093 | 0 | { |
1094 | 0 | return new StatelessPersistentReader(this, guid, param, hist, listen, persistence); |
1095 | 0 | } |
1096 | 0 | else |
1097 | 0 | { |
1098 | 0 | return new StatelessReader(this, guid, param, hist, listen); |
1099 | 0 | } |
1100 | 0 | } |
1101 | 0 | }; |
1102 | 0 | return create_reader(ReaderOut, param, entityId, isBuiltin, enable, callback); |
1103 | 0 | } |
1104 | | |
1105 | | bool RTPSParticipantImpl::createReader( |
1106 | | RTPSReader** ReaderOut, |
1107 | | ReaderAttributes& param, |
1108 | | const std::shared_ptr<IPayloadPool>& payload_pool, |
1109 | | ReaderHistory* hist, |
1110 | | ReaderListener* listen, |
1111 | | const EntityId_t& entityId, |
1112 | | bool isBuiltin, |
1113 | | bool enable) |
1114 | 0 | { |
1115 | 0 | if (!payload_pool) |
1116 | 0 | { |
1117 | 0 | logError(RTPS_PARTICIPANT, "Trying to create reader with null payload pool"); |
1118 | 0 | return false; |
1119 | 0 | } |
1120 | | |
1121 | 0 | auto callback = [hist, listen, &payload_pool, this] |
1122 | 0 | (const GUID_t& guid, ReaderAttributes& param, IPersistenceService* persistence, |
1123 | 0 | bool is_reliable) -> RTPSReader* |
1124 | 0 | { |
1125 | 0 | if (is_reliable) |
1126 | 0 | { |
1127 | 0 | if (persistence != nullptr) |
1128 | 0 | { |
1129 | 0 | return new StatefulPersistentReader(this, guid, param, payload_pool, hist, listen, persistence); |
1130 | 0 | } |
1131 | 0 | else |
1132 | 0 | { |
1133 | 0 | return new StatefulReader(this, guid, param, payload_pool, hist, listen); |
1134 | 0 | } |
1135 | 0 | } |
1136 | 0 | else |
1137 | 0 | { |
1138 | 0 | if (persistence != nullptr) |
1139 | 0 | { |
1140 | 0 | return new StatelessPersistentReader(this, guid, param, payload_pool, hist, listen, |
1141 | 0 | persistence); |
1142 | 0 | } |
1143 | 0 | else |
1144 | 0 | { |
1145 | 0 | return new StatelessReader(this, guid, param, payload_pool, hist, listen); |
1146 | 0 | } |
1147 | 0 | } |
1148 | 0 | }; |
1149 | 0 | return create_reader(ReaderOut, param, entityId, isBuiltin, enable, callback); |
1150 | 0 | } |
1151 | | |
1152 | | RTPSReader* RTPSParticipantImpl::find_local_reader( |
1153 | | const GUID_t& reader_guid) |
1154 | 0 | { |
1155 | 0 | shared_lock<shared_mutex> _(endpoints_list_mutex); |
1156 | |
|
1157 | 0 | for (auto reader : m_allReaderList) |
1158 | 0 | { |
1159 | 0 | if (reader->getGuid() == reader_guid) |
1160 | 0 | { |
1161 | 0 | return reader; |
1162 | 0 | } |
1163 | 0 | } |
1164 | | |
1165 | 0 | return nullptr; |
1166 | 0 | } |
1167 | | |
1168 | | RTPSWriter* RTPSParticipantImpl::find_local_writer( |
1169 | | const GUID_t& writer_guid) |
1170 | 0 | { |
1171 | 0 | shared_lock<shared_mutex> _(endpoints_list_mutex); |
1172 | |
|
1173 | 0 | for (auto writer : m_allWriterList) |
1174 | 0 | { |
1175 | 0 | if (writer->getGuid() == writer_guid) |
1176 | 0 | { |
1177 | 0 | return writer; |
1178 | 0 | } |
1179 | 0 | } |
1180 | | |
1181 | 0 | return nullptr; |
1182 | 0 | } |
1183 | | |
1184 | | bool RTPSParticipantImpl::enableReader( |
1185 | | RTPSReader* reader) |
1186 | 0 | { |
1187 | 0 | if (!assignEndpointListenResources(reader)) |
1188 | 0 | { |
1189 | 0 | return false; |
1190 | 0 | } |
1191 | 0 | return true; |
1192 | 0 | } |
1193 | | |
1194 | | // Avoid to receive PDPSimple reader a DATA while calling ~PDPSimple and EDP was destroy already. |
1195 | | void RTPSParticipantImpl::disableReader( |
1196 | | RTPSReader* reader) |
1197 | 0 | { |
1198 | 0 | m_receiverResourcelistMutex.lock(); |
1199 | 0 | for (auto it = m_receiverResourcelist.begin(); it != m_receiverResourcelist.end(); ++it) |
1200 | 0 | { |
1201 | 0 | it->mp_receiver->removeEndpoint(reader); |
1202 | 0 | } |
1203 | 0 | m_receiverResourcelistMutex.unlock(); |
1204 | 0 | } |
1205 | | |
1206 | | bool RTPSParticipantImpl::registerWriter( |
1207 | | RTPSWriter* Writer, |
1208 | | const TopicAttributes& topicAtt, |
1209 | | const WriterQos& wqos) |
1210 | 0 | { |
1211 | 0 | return this->mp_builtinProtocols->addLocalWriter(Writer, topicAtt, wqos); |
1212 | 0 | } |
1213 | | |
1214 | | bool RTPSParticipantImpl::registerReader( |
1215 | | RTPSReader* reader, |
1216 | | const TopicAttributes& topicAtt, |
1217 | | const ReaderQos& rqos, |
1218 | | const fastdds::rtps::ContentFilterProperty* content_filter) |
1219 | 0 | { |
1220 | 0 | return this->mp_builtinProtocols->addLocalReader(reader, topicAtt, rqos, content_filter); |
1221 | 0 | } |
1222 | | |
1223 | | void RTPSParticipantImpl::update_attributes( |
1224 | | const RTPSParticipantAttributes& patt) |
1225 | 0 | { |
1226 | 0 | bool update_pdp = false; |
1227 | | // Check if new interfaces have been added |
1228 | 0 | if (internal_metatraffic_locators_) |
1229 | 0 | { |
1230 | 0 | LocatorList_t metatraffic_unicast_locator_list = m_att.builtin.metatrafficUnicastLocatorList; |
1231 | 0 | get_default_metatraffic_locators(); |
1232 | 0 | if (!(metatraffic_unicast_locator_list == m_att.builtin.metatrafficUnicastLocatorList)) |
1233 | 0 | { |
1234 | 0 | update_pdp = true; |
1235 | 0 | m_network_Factory.update_network_interfaces(); |
1236 | 0 | logInfo(RTPS_PARTICIPANT, m_att.getName() << " updated its metatraffic locators"); |
1237 | 0 | } |
1238 | 0 | } |
1239 | 0 | if (internal_default_locators_) |
1240 | 0 | { |
1241 | 0 | LocatorList_t default_unicast_locator_list = m_att.defaultUnicastLocatorList; |
1242 | 0 | get_default_unicast_locators(); |
1243 | 0 | if (!(default_unicast_locator_list == m_att.defaultUnicastLocatorList)) |
1244 | 0 | { |
1245 | 0 | update_pdp = true; |
1246 | 0 | logInfo(RTPS_PARTICIPANT, m_att.getName() << " updated default unicast locator list, current locators: " |
1247 | 0 | << m_att.defaultUnicastLocatorList); |
1248 | 0 | } |
1249 | 0 | } |
1250 | |
|
1251 | 0 | auto pdp = mp_builtinProtocols->mp_PDP; |
1252 | | |
1253 | | // Check if there are changes |
1254 | 0 | if (patt.builtin.discovery_config.m_DiscoveryServers != m_att.builtin.discovery_config.m_DiscoveryServers |
1255 | 0 | || patt.userData != m_att.userData |
1256 | 0 | || update_pdp) |
1257 | 0 | { |
1258 | 0 | update_pdp = true; |
1259 | 0 | std::vector<GUID_t> modified_servers; |
1260 | 0 | LocatorList_t modified_locators; |
1261 | | |
1262 | | // Update RTPSParticipantAttributes member |
1263 | 0 | m_att.userData = patt.userData; |
1264 | | |
1265 | | // If there's no PDP don't process Discovery-related attributes. |
1266 | 0 | if (!pdp) |
1267 | 0 | { |
1268 | 0 | return; |
1269 | 0 | } |
1270 | | |
1271 | | // Check that the remote servers list is consistent: all the already known remote servers must be included in |
1272 | | // the list and either new remote servers are added or remote server listening locator is modified. |
1273 | 0 | for (auto existing_server : m_att.builtin.discovery_config.m_DiscoveryServers) |
1274 | 0 | { |
1275 | 0 | bool contained = false; |
1276 | 0 | for (auto incoming_server : patt.builtin.discovery_config.m_DiscoveryServers) |
1277 | 0 | { |
1278 | 0 | if (existing_server.guidPrefix == incoming_server.guidPrefix) |
1279 | 0 | { |
1280 | 0 | for (auto incoming_locator : incoming_server.metatrafficUnicastLocatorList) |
1281 | 0 | { |
1282 | 0 | bool locator_contained = false; |
1283 | 0 | for (auto existing_locator : existing_server.metatrafficUnicastLocatorList) |
1284 | 0 | { |
1285 | 0 | if (incoming_locator == existing_locator) |
1286 | 0 | { |
1287 | 0 | locator_contained = true; |
1288 | 0 | break; |
1289 | 0 | } |
1290 | 0 | } |
1291 | 0 | if (!locator_contained) |
1292 | 0 | { |
1293 | 0 | modified_servers.emplace_back(incoming_server.GetParticipant()); |
1294 | 0 | modified_locators.push_back(incoming_locator); |
1295 | 0 | logInfo(RTPS_QOS_CHECK, |
1296 | 0 | "DS Server: " << incoming_server.guidPrefix << " has modified its locators: " |
1297 | 0 | << incoming_locator << " being added") |
1298 | 0 | } |
1299 | 0 | } |
1300 | 0 | contained = true; |
1301 | 0 | break; |
1302 | 0 | } |
1303 | 0 | } |
1304 | 0 | if (!contained) |
1305 | 0 | { |
1306 | 0 | logWarning(RTPS_QOS_CHECK, |
1307 | 0 | "Discovery Servers cannot be removed from the list; they can only be added"); |
1308 | 0 | return; |
1309 | 0 | } |
1310 | 0 | } |
1311 | | |
1312 | 0 | { |
1313 | 0 | std::lock_guard<std::recursive_mutex> lock(*pdp->getMutex()); |
1314 | | |
1315 | | // Update user data |
1316 | 0 | auto local_participant_proxy_data = pdp->getLocalParticipantProxyData(); |
1317 | 0 | local_participant_proxy_data->m_userData.data_vec(m_att.userData); |
1318 | | |
1319 | | // Update metatraffic locators |
1320 | 0 | for (auto locator : m_att.builtin.metatrafficMulticastLocatorList) |
1321 | 0 | { |
1322 | 0 | local_participant_proxy_data->metatraffic_locators.add_multicast_locator(locator); |
1323 | 0 | } |
1324 | 0 | for (auto locator : m_att.builtin.metatrafficUnicastLocatorList) |
1325 | 0 | { |
1326 | 0 | local_participant_proxy_data->metatraffic_locators.add_unicast_locator(locator); |
1327 | 0 | } |
1328 | | |
1329 | | // Update default locators |
1330 | 0 | for (auto locator : m_att.defaultUnicastLocatorList) |
1331 | 0 | { |
1332 | 0 | local_participant_proxy_data->default_locators.add_unicast_locator(locator); |
1333 | 0 | } |
1334 | |
|
1335 | 0 | createSenderResources(m_att.builtin.metatrafficMulticastLocatorList); |
1336 | 0 | createSenderResources(m_att.builtin.metatrafficUnicastLocatorList); |
1337 | 0 | createSenderResources(m_att.defaultUnicastLocatorList); |
1338 | 0 | if (!modified_locators.empty()) |
1339 | 0 | { |
1340 | 0 | createSenderResources(modified_locators); |
1341 | 0 | } |
1342 | | |
1343 | | // Update remote servers list |
1344 | 0 | if (m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::CLIENT || |
1345 | 0 | m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SUPER_CLIENT || |
1346 | 0 | m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SERVER || |
1347 | 0 | m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::BACKUP) |
1348 | 0 | { |
1349 | | // Add incoming servers iff we don't know about them already or the listening locator has been modified |
1350 | 0 | for (auto incoming_server : patt.builtin.discovery_config.m_DiscoveryServers) |
1351 | 0 | { |
1352 | 0 | eprosima::fastdds::rtps::RemoteServerList_t::iterator server_it; |
1353 | 0 | for (server_it = m_att.builtin.discovery_config.m_DiscoveryServers.begin(); |
1354 | 0 | server_it != m_att.builtin.discovery_config.m_DiscoveryServers.end(); server_it++) |
1355 | 0 | { |
1356 | 0 | if (server_it->guidPrefix == incoming_server.guidPrefix) |
1357 | 0 | { |
1358 | | // Check if the listening locators have been modified |
1359 | 0 | for (auto guid : modified_servers) |
1360 | 0 | { |
1361 | 0 | if (guid == incoming_server.GetParticipant()) |
1362 | 0 | { |
1363 | 0 | server_it->metatrafficUnicastLocatorList = |
1364 | 0 | incoming_server.metatrafficUnicastLocatorList; |
1365 | 0 | break; |
1366 | 0 | } |
1367 | 0 | } |
1368 | 0 | break; |
1369 | 0 | } |
1370 | 0 | } |
1371 | 0 | if (server_it == m_att.builtin.discovery_config.m_DiscoveryServers.end()) |
1372 | 0 | { |
1373 | 0 | m_att.builtin.discovery_config.m_DiscoveryServers.push_back(incoming_server); |
1374 | 0 | } |
1375 | 0 | } |
1376 | | |
1377 | | // Update the servers list in builtin protocols |
1378 | 0 | { |
1379 | 0 | std::unique_lock<eprosima::shared_mutex> disc_lock(mp_builtinProtocols->getDiscoveryMutex()); |
1380 | 0 | mp_builtinProtocols->m_DiscoveryServers = m_att.builtin.discovery_config.m_DiscoveryServers; |
1381 | 0 | } |
1382 | | |
1383 | | // Notify PDPServer |
1384 | 0 | if (m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SERVER || |
1385 | 0 | m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::BACKUP) |
1386 | 0 | { |
1387 | 0 | fastdds::rtps::PDPServer* pdp_server = static_cast<fastdds::rtps::PDPServer*>(pdp); |
1388 | 0 | pdp_server->update_remote_servers_list(); |
1389 | 0 | for (auto remote_server : modified_servers) |
1390 | 0 | { |
1391 | 0 | pdp_server->remove_remote_participant(remote_server, |
1392 | 0 | ParticipantDiscoveryInfo::DISCOVERY_STATUS::DROPPED_PARTICIPANT); |
1393 | 0 | } |
1394 | 0 | } |
1395 | | // Notify PDPClient |
1396 | 0 | else if (m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::CLIENT || |
1397 | 0 | m_att.builtin.discovery_config.discoveryProtocol == DiscoveryProtocol::SUPER_CLIENT) |
1398 | 0 | { |
1399 | 0 | fastdds::rtps::PDPClient* pdp_client = static_cast<fastdds::rtps::PDPClient*>(pdp); |
1400 | 0 | pdp_client->update_remote_servers_list(); |
1401 | 0 | for (auto remote_server : modified_servers) |
1402 | 0 | { |
1403 | 0 | pdp_client->remove_remote_participant(remote_server, |
1404 | 0 | ParticipantDiscoveryInfo::DISCOVERY_STATUS::DROPPED_PARTICIPANT); |
1405 | 0 | } |
1406 | 0 | } |
1407 | 0 | } |
1408 | 0 | } |
1409 | 0 | } |
1410 | | |
1411 | 0 | if (update_pdp) |
1412 | 0 | { |
1413 | | // Send DATA(P) |
1414 | 0 | pdp->announceParticipantState(true); |
1415 | 0 | } |
1416 | 0 | } |
1417 | | |
1418 | | bool RTPSParticipantImpl::updateLocalWriter( |
1419 | | RTPSWriter* Writer, |
1420 | | const TopicAttributes& topicAtt, |
1421 | | const WriterQos& wqos) |
1422 | 0 | { |
1423 | 0 | return this->mp_builtinProtocols->updateLocalWriter(Writer, topicAtt, wqos); |
1424 | 0 | } |
1425 | | |
1426 | | bool RTPSParticipantImpl::updateLocalReader( |
1427 | | RTPSReader* reader, |
1428 | | const TopicAttributes& topicAtt, |
1429 | | const ReaderQos& rqos, |
1430 | | const fastdds::rtps::ContentFilterProperty* content_filter) |
1431 | 0 | { |
1432 | 0 | return this->mp_builtinProtocols->updateLocalReader(reader, topicAtt, rqos, content_filter); |
1433 | 0 | } |
1434 | | |
1435 | | /* |
1436 | | * |
1437 | | * AUXILIARY METHODS |
1438 | | * |
1439 | | * |
1440 | | */ |
1441 | | |
1442 | | |
1443 | | bool RTPSParticipantImpl::existsEntityId( |
1444 | | const EntityId_t& ent, |
1445 | | EndpointKind_t kind) const |
1446 | 0 | { |
1447 | |
|
1448 | 0 | auto check = [&ent](Endpoint* e) |
1449 | 0 | { |
1450 | 0 | return ent == e->getGuid().entityId; |
1451 | 0 | }; |
1452 | |
|
1453 | 0 | shared_lock<shared_mutex> _(endpoints_list_mutex); |
1454 | |
|
1455 | 0 | if (kind == WRITER) |
1456 | 0 | { |
1457 | 0 | return std::any_of(m_userWriterList.begin(), m_userWriterList.end(), check); |
1458 | 0 | } |
1459 | 0 | else |
1460 | 0 | { |
1461 | 0 | return std::any_of(m_userReaderList.begin(), m_userReaderList.end(), check); |
1462 | 0 | } |
1463 | | |
1464 | 0 | return false; |
1465 | 0 | } |
1466 | | |
1467 | | /* |
1468 | | * |
1469 | | * RECEIVER RESOURCE METHODS |
1470 | | * |
1471 | | */ |
1472 | | bool RTPSParticipantImpl::assignEndpointListenResources( |
1473 | | Endpoint* endp) |
1474 | 0 | { |
1475 | | //Tag the endpoint with the ReceiverResources |
1476 | 0 | bool valid = true; |
1477 | | |
1478 | | /* No need to check for emptiness on the lists, as it was already done on part function |
1479 | | In case are using the default list of Locators they have already been embedded to the parameters |
1480 | | */ |
1481 | | |
1482 | | //UNICAST |
1483 | 0 | assignEndpoint2LocatorList(endp, endp->getAttributes().unicastLocatorList); |
1484 | | //MULTICAST |
1485 | 0 | assignEndpoint2LocatorList(endp, endp->getAttributes().multicastLocatorList); |
1486 | 0 | return valid; |
1487 | 0 | } |
1488 | | |
1489 | | bool RTPSParticipantImpl::createAndAssociateReceiverswithEndpoint( |
1490 | | Endpoint* pend, |
1491 | | bool unique_flows, |
1492 | | uint16_t initial_unique_port, |
1493 | | uint16_t final_unique_port) |
1494 | 0 | { |
1495 | | /* This function... |
1496 | | - Asks the network factory for new resources |
1497 | | - Encapsulates the new resources within the ReceiverControlBlock list |
1498 | | - Associated the endpoint to the new elements in the list |
1499 | | - Launches the listener thread |
1500 | | */ |
1501 | |
|
1502 | 0 | if (unique_flows) |
1503 | 0 | { |
1504 | 0 | pend->getAttributes().multicastLocatorList.clear(); |
1505 | 0 | pend->getAttributes().unicastLocatorList = m_att.defaultUnicastLocatorList; |
1506 | |
|
1507 | 0 | uint16_t port = initial_unique_port; |
1508 | 0 | while (port < final_unique_port) |
1509 | 0 | { |
1510 | | // Set port on unicast locators |
1511 | 0 | for (Locator_t& loc : pend->getAttributes().unicastLocatorList) |
1512 | 0 | { |
1513 | 0 | loc.port = port; |
1514 | 0 | } |
1515 | | |
1516 | | // Try creating receiver resources |
1517 | 0 | if (createReceiverResources(pend->getAttributes().unicastLocatorList, false, true)) |
1518 | 0 | { |
1519 | 0 | break; |
1520 | 0 | } |
1521 | | |
1522 | | // Try with next port |
1523 | 0 | ++port; |
1524 | 0 | } |
1525 | | |
1526 | | // Fail when unique ports are exhausted |
1527 | 0 | if (port >= final_unique_port) |
1528 | 0 | { |
1529 | 0 | logError(RTPS_PARTICIPANT, "Unique flows requested but exhausted. Port range: " |
1530 | 0 | << initial_unique_port << "-" << final_unique_port); |
1531 | 0 | return false; |
1532 | 0 | } |
1533 | 0 | } |
1534 | 0 | else |
1535 | 0 | { |
1536 | | // 1 - Ask the network factory to generate the elements that do still not exist |
1537 | | //Iterate through the list of unicast and multicast locators the endpoint has... unless its empty |
1538 | | //In that case, just use the standard |
1539 | 0 | if (pend->getAttributes().unicastLocatorList.empty() && pend->getAttributes().multicastLocatorList.empty()) |
1540 | 0 | { |
1541 | | // Take default locators from the participant. |
1542 | 0 | pend->getAttributes().unicastLocatorList = m_att.defaultUnicastLocatorList; |
1543 | 0 | pend->getAttributes().multicastLocatorList = m_att.defaultMulticastLocatorList; |
1544 | 0 | } |
1545 | 0 | createReceiverResources(pend->getAttributes().unicastLocatorList, false, true); |
1546 | 0 | createReceiverResources(pend->getAttributes().multicastLocatorList, false, true); |
1547 | 0 | } |
1548 | | |
1549 | | // Associate the Endpoint with ReceiverControlBlock |
1550 | 0 | assignEndpointListenResources(pend); |
1551 | 0 | return true; |
1552 | 0 | } |
1553 | | |
1554 | | bool RTPSParticipantImpl::assignEndpoint2LocatorList( |
1555 | | Endpoint* endp, |
1556 | | LocatorList_t& list) |
1557 | 0 | { |
1558 | | /* Note: |
1559 | | The previous version of this function associated (or created) ListenResources and added the endpoint to them. |
1560 | | It then requested the list of Locators the Listener is listening to and appended to the LocatorList_t from the parameters. |
1561 | | |
1562 | | This has been removed because it is considered redundant. For ReceiveResources that listen on multiple interfaces, only |
1563 | | one of the supported Locators is needed to make the match, and the case of new ListenResources being created has been removed |
1564 | | since its the NetworkFactory the one that takes care of Resource creation. |
1565 | | */ |
1566 | 0 | for (auto lit = list.begin(); lit != list.end(); ++lit) |
1567 | 0 | { |
1568 | | //Iteration of all Locators within the Locator list passed down as argument |
1569 | 0 | std::lock_guard<std::mutex> guard(m_receiverResourcelistMutex); |
1570 | | //Check among ReceiverResources whether the locator is supported or not |
1571 | 0 | for (auto it = m_receiverResourcelist.begin(); it != m_receiverResourcelist.end(); ++it) |
1572 | 0 | { |
1573 | | //Take mutex for the resource since we are going to interact with shared resources |
1574 | | //std::lock_guard<std::mutex> guard((*it).mtx); |
1575 | 0 | if (it->Receiver->SupportsLocator(*lit)) |
1576 | 0 | { |
1577 | | //Supported! Take mutex and update lists - We maintain reader/writer discrimination just in case |
1578 | 0 | it->mp_receiver->associateEndpoint(endp); |
1579 | | // end association between reader/writer and the receive resources |
1580 | 0 | } |
1581 | |
|
1582 | 0 | } |
1583 | | //Finished iteratig through all ListenResources for a single Locator (from the parameter list). |
1584 | | //Since this function is called after checking with NetFactory we do not have to create any more resource. |
1585 | 0 | } |
1586 | 0 | return true; |
1587 | 0 | } |
1588 | | |
1589 | | bool RTPSParticipantImpl::createSendResources( |
1590 | | Endpoint* pend) |
1591 | 0 | { |
1592 | 0 | if (pend->m_att.remoteLocatorList.empty()) |
1593 | 0 | { |
1594 | | // Adds the default locators of every registered transport. |
1595 | 0 | m_network_Factory.GetDefaultOutputLocators(pend->m_att.remoteLocatorList); |
1596 | 0 | } |
1597 | |
|
1598 | 0 | std::lock_guard<std::timed_mutex> guard(m_send_resources_mutex_); |
1599 | | |
1600 | | //Output locators have been specified, create them |
1601 | 0 | for (auto it = pend->m_att.remoteLocatorList.begin(); it != pend->m_att.remoteLocatorList.end(); ++it) |
1602 | 0 | { |
1603 | 0 | if (!m_network_Factory.build_send_resources(send_resource_list_, (*it))) |
1604 | 0 | { |
1605 | 0 | logWarning(RTPS_PARTICIPANT, "Cannot create send resource for endpoint remote locator (" << |
1606 | 0 | pend->getGuid() << ", " << (*it) << ")"); |
1607 | 0 | } |
1608 | 0 | } |
1609 | |
|
1610 | 0 | return true; |
1611 | 0 | } |
1612 | | |
1613 | | bool RTPSParticipantImpl::createReceiverResources( |
1614 | | LocatorList_t& Locator_list, |
1615 | | bool ApplyMutation, |
1616 | | bool RegisterReceiver) |
1617 | 0 | { |
1618 | 0 | std::vector<std::shared_ptr<ReceiverResource>> newItemsBuffer; |
1619 | 0 | bool ret_val = Locator_list.empty(); |
1620 | |
|
1621 | | #if HAVE_SECURITY |
1622 | | // An auxilary buffer is needed in the ReceiverResource to to decrypt the message, |
1623 | | // that imposes a limit in the received messages size even if the transport allows (uint32_t) messages size. |
1624 | | uint32_t max_receiver_buffer_size = |
1625 | | is_secure() ? std::numeric_limits<uint16_t>::max() : std::numeric_limits<uint32_t>::max(); |
1626 | | #else |
1627 | 0 | uint32_t max_receiver_buffer_size = std::numeric_limits<uint32_t>::max(); |
1628 | 0 | #endif // if HAVE_SECURITY |
1629 | |
|
1630 | 0 | for (auto it_loc = Locator_list.begin(); it_loc != Locator_list.end(); ++it_loc) |
1631 | 0 | { |
1632 | 0 | bool ret = m_network_Factory.BuildReceiverResources(*it_loc, newItemsBuffer, max_receiver_buffer_size); |
1633 | 0 | if (!ret && ApplyMutation) |
1634 | 0 | { |
1635 | 0 | uint32_t tries = 0; |
1636 | 0 | while (!ret && (tries < m_att.builtin.mutation_tries)) |
1637 | 0 | { |
1638 | 0 | tries++; |
1639 | 0 | *it_loc = applyLocatorAdaptRule(*it_loc); |
1640 | 0 | ret = m_network_Factory.BuildReceiverResources(*it_loc, newItemsBuffer, max_receiver_buffer_size); |
1641 | 0 | } |
1642 | 0 | } |
1643 | |
|
1644 | 0 | ret_val |= !newItemsBuffer.empty(); |
1645 | |
|
1646 | 0 | for (auto it_buffer = newItemsBuffer.begin(); it_buffer != newItemsBuffer.end(); ++it_buffer) |
1647 | 0 | { |
1648 | 0 | std::lock_guard<std::mutex> lock(m_receiverResourcelistMutex); |
1649 | | //Push the new items into the ReceiverResource buffer |
1650 | 0 | m_receiverResourcelist.emplace_back(*it_buffer); |
1651 | | //Create and init the MessageReceiver |
1652 | 0 | auto mr = new MessageReceiver(this, (*it_buffer)->max_message_size()); |
1653 | 0 | m_receiverResourcelist.back().mp_receiver = mr; |
1654 | | //Start reception |
1655 | 0 | if (RegisterReceiver) |
1656 | 0 | { |
1657 | 0 | m_receiverResourcelist.back().Receiver->RegisterReceiver(mr); |
1658 | 0 | } |
1659 | 0 | } |
1660 | 0 | newItemsBuffer.clear(); |
1661 | 0 | } |
1662 | |
|
1663 | 0 | return ret_val; |
1664 | 0 | } |
1665 | | |
1666 | | void RTPSParticipantImpl::createSenderResources( |
1667 | | const LocatorList_t& locator_list) |
1668 | 0 | { |
1669 | 0 | std::lock_guard<std::timed_mutex> lock(m_send_resources_mutex_); |
1670 | |
|
1671 | 0 | for (auto it_loc = locator_list.begin(); it_loc != locator_list.end(); ++it_loc) |
1672 | 0 | { |
1673 | 0 | m_network_Factory.build_send_resources(send_resource_list_, *it_loc); |
1674 | 0 | } |
1675 | 0 | } |
1676 | | |
1677 | | void RTPSParticipantImpl::createSenderResources( |
1678 | | const Locator_t& locator) |
1679 | 0 | { |
1680 | 0 | std::lock_guard<std::timed_mutex> lock(m_send_resources_mutex_); |
1681 | |
|
1682 | 0 | m_network_Factory.build_send_resources(send_resource_list_, locator); |
1683 | 0 | } |
1684 | | |
1685 | | bool RTPSParticipantImpl::deleteUserEndpoint( |
1686 | | const GUID_t& endpoint) |
1687 | 0 | { |
1688 | 0 | if ( getGuid().guidPrefix != endpoint.guidPrefix) |
1689 | 0 | { |
1690 | 0 | return false; |
1691 | 0 | } |
1692 | | |
1693 | 0 | bool found = false, found_in_users = false; |
1694 | 0 | Endpoint* p_endpoint = nullptr; |
1695 | |
|
1696 | 0 | if (endpoint.entityId.is_writer()) |
1697 | 0 | { |
1698 | 0 | std::lock_guard<shared_mutex> _(endpoints_list_mutex); |
1699 | |
|
1700 | 0 | for (auto wit = m_userWriterList.begin(); wit != m_userWriterList.end(); ++wit) |
1701 | 0 | { |
1702 | 0 | if ((*wit)->getGuid().entityId == endpoint.entityId) //Found it |
1703 | 0 | { |
1704 | 0 | m_userWriterList.erase(wit); |
1705 | 0 | found_in_users = true; |
1706 | 0 | break; |
1707 | 0 | } |
1708 | 0 | } |
1709 | |
|
1710 | 0 | for (auto wit = m_allWriterList.begin(); wit != m_allWriterList.end(); ++wit) |
1711 | 0 | { |
1712 | 0 | if ((*wit)->getGuid().entityId == endpoint.entityId) //Found it |
1713 | 0 | { |
1714 | 0 | p_endpoint = *wit; |
1715 | 0 | m_allWriterList.erase(wit); |
1716 | 0 | found = true; |
1717 | 0 | break; |
1718 | 0 | } |
1719 | 0 | } |
1720 | 0 | } |
1721 | 0 | else |
1722 | 0 | { |
1723 | 0 | std::lock_guard<shared_mutex> _(endpoints_list_mutex); |
1724 | |
|
1725 | 0 | for (auto rit = m_userReaderList.begin(); rit != m_userReaderList.end(); ++rit) |
1726 | 0 | { |
1727 | 0 | if ((*rit)->getGuid().entityId == endpoint.entityId) //Found it |
1728 | 0 | { |
1729 | 0 | m_userReaderList.erase(rit); |
1730 | 0 | found_in_users = true; |
1731 | 0 | break; |
1732 | 0 | } |
1733 | 0 | } |
1734 | |
|
1735 | 0 | for (auto rit = m_allReaderList.begin(); rit != m_allReaderList.end(); ++rit) |
1736 | 0 | { |
1737 | 0 | if ((*rit)->getGuid().entityId == endpoint.entityId) //Found it |
1738 | 0 | { |
1739 | 0 | p_endpoint = *rit; |
1740 | 0 | m_allReaderList.erase(rit); |
1741 | 0 | found = true; |
1742 | 0 | break; |
1743 | 0 | } |
1744 | 0 | } |
1745 | 0 | } |
1746 | |
|
1747 | 0 | if (!found) |
1748 | 0 | { |
1749 | 0 | return false; |
1750 | 0 | } |
1751 | | |
1752 | 0 | { |
1753 | 0 | std::lock_guard<std::mutex> _(m_receiverResourcelistMutex); |
1754 | |
|
1755 | 0 | for (auto& rb : m_receiverResourcelist) |
1756 | 0 | { |
1757 | 0 | auto receiver = rb.mp_receiver; |
1758 | 0 | if (receiver) |
1759 | 0 | { |
1760 | 0 | receiver->removeEndpoint(p_endpoint); |
1761 | 0 | } |
1762 | 0 | } |
1763 | 0 | } |
1764 | | |
1765 | | //REMOVE FROM BUILTIN PROTOCOLS |
1766 | 0 | if (p_endpoint->getAttributes().endpointKind == WRITER) |
1767 | 0 | { |
1768 | 0 | if (found_in_users) |
1769 | 0 | { |
1770 | 0 | mp_builtinProtocols->removeLocalWriter(static_cast<RTPSWriter*>(p_endpoint)); |
1771 | 0 | } |
1772 | |
|
1773 | | #if HAVE_SECURITY |
1774 | | if (p_endpoint->getAttributes().security_attributes().is_submessage_protected || |
1775 | | p_endpoint->getAttributes().security_attributes().is_payload_protected) |
1776 | | { |
1777 | | m_security_manager.unregister_local_writer(p_endpoint->getGuid()); |
1778 | | } |
1779 | | #endif // if HAVE_SECURITY |
1780 | 0 | } |
1781 | 0 | else |
1782 | 0 | { |
1783 | 0 | if (found_in_users) |
1784 | 0 | { |
1785 | 0 | mp_builtinProtocols->removeLocalReader(static_cast<RTPSReader*>(p_endpoint)); |
1786 | 0 | } |
1787 | |
|
1788 | | #if HAVE_SECURITY |
1789 | | if (p_endpoint->getAttributes().security_attributes().is_submessage_protected || |
1790 | | p_endpoint->getAttributes().security_attributes().is_payload_protected) |
1791 | | { |
1792 | | m_security_manager.unregister_local_reader(p_endpoint->getGuid()); |
1793 | | } |
1794 | | #endif // if HAVE_SECURITY |
1795 | 0 | } |
1796 | |
|
1797 | 0 | delete(p_endpoint); |
1798 | 0 | return true; |
1799 | 0 | } |
1800 | | |
1801 | | void RTPSParticipantImpl::deleteAllUserEndpoints() |
1802 | 0 | { |
1803 | 0 | std::vector<Endpoint*> tmp(0); |
1804 | |
|
1805 | 0 | { |
1806 | 0 | using namespace std; |
1807 | |
|
1808 | 0 | lock_guard<shared_mutex> _(endpoints_list_mutex); |
1809 | | |
1810 | | // move the collections to a local |
1811 | 0 | tmp.resize(m_userWriterList.size() + m_userReaderList.size()); |
1812 | 0 | auto it = move(m_userWriterList.begin(), m_userWriterList.end(), tmp.begin()); |
1813 | 0 | it = move(m_userReaderList.begin(), m_userReaderList.end(), it); |
1814 | | |
1815 | | // check we have copied all elements |
1816 | 0 | assert(tmp.end() == it); |
1817 | | |
1818 | | // now update the all collections by removing the user elements |
1819 | 0 | sort(m_userWriterList.begin(), m_userWriterList.end()); |
1820 | 0 | sort(m_userReaderList.begin(), m_userReaderList.end()); |
1821 | 0 | sort(m_allWriterList.begin(), m_allWriterList.end()); |
1822 | 0 | sort(m_allReaderList.begin(), m_allReaderList.end()); |
1823 | |
|
1824 | 0 | vector<RTPSWriter*> writers; |
1825 | 0 | set_difference(m_allWriterList.begin(), m_allWriterList.end(), |
1826 | 0 | m_userWriterList.begin(), m_userWriterList.end(), |
1827 | 0 | back_inserter(writers)); |
1828 | 0 | swap(writers, m_allWriterList); |
1829 | |
|
1830 | 0 | vector<RTPSReader*> readers; |
1831 | 0 | set_difference(m_allReaderList.begin(), m_allReaderList.end(), |
1832 | 0 | m_userReaderList.begin(), m_userReaderList.end(), |
1833 | 0 | back_inserter(readers)); |
1834 | 0 | swap(readers, m_allReaderList); |
1835 | | |
1836 | | // remove dangling references |
1837 | 0 | m_userWriterList.clear(); |
1838 | 0 | m_userReaderList.clear(); |
1839 | 0 | } |
1840 | | |
1841 | | // unlink the transport receiver blocks from the endpoints |
1842 | 0 | for ( auto endpoint : tmp) |
1843 | 0 | { |
1844 | 0 | std::lock_guard<std::mutex> _(m_receiverResourcelistMutex); |
1845 | |
|
1846 | 0 | for (auto& rb : m_receiverResourcelist) |
1847 | 0 | { |
1848 | 0 | auto receiver = rb.mp_receiver; |
1849 | 0 | if (receiver) |
1850 | 0 | { |
1851 | 0 | receiver->removeEndpoint(endpoint); |
1852 | 0 | } |
1853 | 0 | } |
1854 | 0 | } |
1855 | | |
1856 | | // Remove from builtin protocols |
1857 | 0 | auto removeEndpoint = [this](EndpointKind_t kind, Endpoint* p) |
1858 | 0 | { |
1859 | 0 | return kind == WRITER |
1860 | 0 | ? mp_builtinProtocols->removeLocalWriter((RTPSWriter*)p) |
1861 | 0 | : mp_builtinProtocols->removeLocalReader((RTPSReader*)p); |
1862 | 0 | }; |
1863 | |
|
1864 | | #if HAVE_SECURITY |
1865 | | bool (eprosima::fastrtps::rtps::security::SecurityManager::* unregister_endpoint[2])( |
1866 | | const GUID_t& writer_guid); |
1867 | | unregister_endpoint[WRITER] = &security::SecurityManager::unregister_local_writer; |
1868 | | unregister_endpoint[READER] = &security::SecurityManager::unregister_local_reader; |
1869 | | #endif // if HAVE_SECURITY |
1870 | |
|
1871 | 0 | for ( auto endpoint : tmp) |
1872 | 0 | { |
1873 | 0 | auto kind = endpoint->getAttributes().endpointKind; |
1874 | 0 | removeEndpoint(kind, endpoint); |
1875 | |
|
1876 | | #if HAVE_SECURITY |
1877 | | if (endpoint->getAttributes().security_attributes().is_submessage_protected || |
1878 | | endpoint->getAttributes().security_attributes().is_payload_protected) |
1879 | | { |
1880 | | (m_security_manager.*unregister_endpoint[kind])(endpoint->getGuid()); |
1881 | | } |
1882 | | #endif // if HAVE_SECURITY |
1883 | | |
1884 | | // remove the endpoints |
1885 | 0 | delete(endpoint); |
1886 | 0 | } |
1887 | 0 | } |
1888 | | |
1889 | | void RTPSParticipantImpl::normalize_endpoint_locators( |
1890 | | EndpointAttributes& endpoint_att) |
1891 | 0 | { |
1892 | | // Locators with port 0, calculate port. |
1893 | 0 | for (Locator_t& loc : endpoint_att.unicastLocatorList) |
1894 | 0 | { |
1895 | 0 | m_network_Factory.fill_default_locator_port(domain_id_, loc, m_att, false); |
1896 | 0 | } |
1897 | 0 | for (Locator_t& loc : endpoint_att.multicastLocatorList) |
1898 | 0 | { |
1899 | 0 | m_network_Factory.fill_default_locator_port(domain_id_, loc, m_att, true); |
1900 | 0 | } |
1901 | | |
1902 | | // Normalize unicast locators |
1903 | 0 | if (!endpoint_att.unicastLocatorList.empty()) |
1904 | 0 | { |
1905 | 0 | m_network_Factory.NormalizeLocators(endpoint_att.unicastLocatorList); |
1906 | 0 | } |
1907 | 0 | } |
1908 | | |
1909 | | std::vector<std::string> RTPSParticipantImpl::getParticipantNames() const |
1910 | 0 | { |
1911 | 0 | std::vector<std::string> participant_names; |
1912 | 0 | auto pdp = mp_builtinProtocols->mp_PDP; |
1913 | 0 | for (auto it = pdp->ParticipantProxiesBegin(); it != pdp->ParticipantProxiesEnd(); ++it) |
1914 | 0 | { |
1915 | 0 | participant_names.emplace_back((*it)->m_participantName.to_string()); |
1916 | 0 | } |
1917 | 0 | return participant_names; |
1918 | 0 | } |
1919 | | |
1920 | | void RTPSParticipantImpl::setGuid( |
1921 | | GUID_t& guid) |
1922 | 0 | { |
1923 | 0 | m_guid = guid; |
1924 | 0 | } |
1925 | | |
1926 | | void RTPSParticipantImpl::announceRTPSParticipantState() |
1927 | 0 | { |
1928 | 0 | return mp_builtinProtocols->announceRTPSParticipantState(); |
1929 | 0 | } |
1930 | | |
1931 | | void RTPSParticipantImpl::stopRTPSParticipantAnnouncement() |
1932 | 0 | { |
1933 | 0 | return mp_builtinProtocols->stopRTPSParticipantAnnouncement(); |
1934 | 0 | } |
1935 | | |
1936 | | void RTPSParticipantImpl::resetRTPSParticipantAnnouncement() |
1937 | 0 | { |
1938 | 0 | return mp_builtinProtocols->resetRTPSParticipantAnnouncement(); |
1939 | 0 | } |
1940 | | |
1941 | | void RTPSParticipantImpl::loose_next_change() |
1942 | 0 | { |
1943 | | //NOTE: This is replaced by the test transport |
1944 | | //this->mp_send_thr->loose_next_change(); |
1945 | 0 | } |
1946 | | |
1947 | | bool RTPSParticipantImpl::newRemoteEndpointDiscovered( |
1948 | | const GUID_t& pguid, |
1949 | | int16_t userDefinedId, |
1950 | | EndpointKind_t kind) |
1951 | 0 | { |
1952 | 0 | if (m_att.builtin.discovery_config.discoveryProtocol != DiscoveryProtocol::SIMPLE || |
1953 | 0 | m_att.builtin.discovery_config.use_STATIC_EndpointDiscoveryProtocol == false) |
1954 | 0 | { |
1955 | 0 | logWarning(RTPS_PARTICIPANT, |
1956 | 0 | "Remote Endpoints can only be activated with static discovery protocol over PDP simple protocol"); |
1957 | 0 | return false; |
1958 | 0 | } |
1959 | | |
1960 | 0 | if (PDPSimple* pS = dynamic_cast<PDPSimple*>(mp_builtinProtocols->mp_PDP)) |
1961 | 0 | { |
1962 | 0 | return pS->newRemoteEndpointStaticallyDiscovered(pguid, userDefinedId, kind); |
1963 | 0 | } |
1964 | | |
1965 | 0 | return false; |
1966 | 0 | } |
1967 | | |
1968 | | void RTPSParticipantImpl::ResourceSemaphorePost() |
1969 | 0 | { |
1970 | 0 | if (mp_ResourceSemaphore != nullptr) |
1971 | 0 | { |
1972 | 0 | mp_ResourceSemaphore->post(); |
1973 | 0 | } |
1974 | 0 | } |
1975 | | |
1976 | | void RTPSParticipantImpl::ResourceSemaphoreWait() |
1977 | 0 | { |
1978 | 0 | if (mp_ResourceSemaphore != nullptr) |
1979 | 0 | { |
1980 | 0 | mp_ResourceSemaphore->wait(); |
1981 | 0 | } |
1982 | 0 | } |
1983 | | |
1984 | | void RTPSParticipantImpl::assert_remote_participant_liveliness( |
1985 | | const GuidPrefix_t& remote_guid) |
1986 | 0 | { |
1987 | 0 | if (mp_builtinProtocols && mp_builtinProtocols->mp_PDP) |
1988 | 0 | { |
1989 | 0 | mp_builtinProtocols->mp_PDP->assert_remote_participant_liveliness(remote_guid); |
1990 | 0 | } |
1991 | 0 | } |
1992 | | |
1993 | | /** |
1994 | | * Get the list of locators from which this publisher may send data. |
1995 | | * |
1996 | | * @param [out] locators LocatorList_t where the list of locators will be stored. |
1997 | | */ |
1998 | | void RTPSParticipantImpl::get_sending_locators( |
1999 | | rtps::LocatorList_t& locators) const |
2000 | 0 | { |
2001 | 0 | locators.clear(); |
2002 | | |
2003 | | // Traverse the sender list and query |
2004 | 0 | for (const auto& send_resource : send_resource_list_) |
2005 | 0 | { |
2006 | 0 | send_resource->add_locators_to_list(locators); |
2007 | 0 | } |
2008 | 0 | } |
2009 | | |
2010 | | uint32_t RTPSParticipantImpl::getMaxMessageSize() const |
2011 | 0 | { |
2012 | | #if HAVE_SECURITY |
2013 | | // An auxilary buffer is needed in the ReceiverResource to to decrypt the message, |
2014 | | // that imposes a limit in the received messages size even if the transport allows (uint32_t) messages size. |
2015 | | // So the sender limits also its size. |
2016 | | uint32_t max_receiver_buffer_size = |
2017 | | is_secure() ? std::numeric_limits<uint16_t>::max() : std::numeric_limits<uint32_t>::max(); |
2018 | | #else |
2019 | 0 | uint32_t max_receiver_buffer_size = std::numeric_limits<uint32_t>::max(); |
2020 | 0 | #endif // if HAVE_SECURITY |
2021 | |
|
2022 | 0 | return (std::min)( |
2023 | 0 | m_network_Factory.get_max_message_size_between_transports(), |
2024 | 0 | max_receiver_buffer_size); |
2025 | 0 | } |
2026 | | |
2027 | | uint32_t RTPSParticipantImpl::getMaxDataSize() |
2028 | 0 | { |
2029 | 0 | return calculateMaxDataSize(getMaxMessageSize()); |
2030 | 0 | } |
2031 | | |
2032 | | uint32_t RTPSParticipantImpl::calculateMaxDataSize( |
2033 | | uint32_t length) |
2034 | 0 | { |
2035 | 0 | uint32_t maxDataSize = length; |
2036 | |
|
2037 | | #if HAVE_SECURITY |
2038 | | // If there is rtps messsage protection, reduce max size for messages, |
2039 | | // because extra data is added on encryption. |
2040 | | if (security_attributes_.is_rtps_protected) |
2041 | | { |
2042 | | maxDataSize -= m_security_manager.calculate_extra_size_for_rtps_message(); |
2043 | | } |
2044 | | #endif // if HAVE_SECURITY |
2045 | | |
2046 | | // RTPS header |
2047 | 0 | maxDataSize -= RTPSMESSAGE_HEADER_SIZE; |
2048 | 0 | return maxDataSize; |
2049 | 0 | } |
2050 | | |
2051 | | bool RTPSParticipantImpl::networkFactoryHasRegisteredTransports() const |
2052 | 0 | { |
2053 | 0 | return m_network_Factory.numberOfRegisteredTransports() > 0; |
2054 | 0 | } |
2055 | | |
2056 | | #if HAVE_SECURITY |
2057 | | bool RTPSParticipantImpl::pairing_remote_reader_with_local_writer_after_security( |
2058 | | const GUID_t& local_writer, |
2059 | | const ReaderProxyData& remote_reader_data) |
2060 | | { |
2061 | | bool return_value; |
2062 | | |
2063 | | return_value = mp_builtinProtocols->mp_PDP->getEDP()->pairing_remote_reader_with_local_writer_after_security( |
2064 | | local_writer, remote_reader_data); |
2065 | | if (!return_value && mp_builtinProtocols->mp_WLP != nullptr) |
2066 | | { |
2067 | | return_value = mp_builtinProtocols->mp_WLP->pairing_remote_reader_with_local_writer_after_security( |
2068 | | local_writer, remote_reader_data); |
2069 | | } |
2070 | | |
2071 | | return return_value; |
2072 | | } |
2073 | | |
2074 | | bool RTPSParticipantImpl::pairing_remote_writer_with_local_reader_after_security( |
2075 | | const GUID_t& local_reader, |
2076 | | const WriterProxyData& remote_writer_data) |
2077 | | { |
2078 | | bool return_value; |
2079 | | |
2080 | | return_value = mp_builtinProtocols->mp_PDP->getEDP()->pairing_remote_writer_with_local_reader_after_security( |
2081 | | local_reader, remote_writer_data); |
2082 | | if (!return_value && mp_builtinProtocols->mp_WLP != nullptr) |
2083 | | { |
2084 | | return_value = mp_builtinProtocols->mp_WLP->pairing_remote_writer_with_local_reader_after_security( |
2085 | | local_reader, remote_writer_data); |
2086 | | } |
2087 | | |
2088 | | return return_value; |
2089 | | } |
2090 | | |
2091 | | bool RTPSParticipantImpl::is_security_enabled_for_writer( |
2092 | | const WriterAttributes& writer_attributes) |
2093 | | { |
2094 | | if (!is_initialized() || !is_secure()) |
2095 | | { |
2096 | | return false; |
2097 | | } |
2098 | | |
2099 | | if (security_attributes().is_rtps_protected) |
2100 | | { |
2101 | | return true; |
2102 | | } |
2103 | | |
2104 | | security::EndpointSecurityAttributes security_attributes; |
2105 | | if (security_manager().get_datawriter_sec_attributes(writer_attributes.endpoint.properties, security_attributes)) |
2106 | | { |
2107 | | return (security_attributes.is_payload_protected == true || |
2108 | | security_attributes.is_submessage_protected == true); |
2109 | | } |
2110 | | |
2111 | | return false; |
2112 | | } |
2113 | | |
2114 | | bool RTPSParticipantImpl::is_security_enabled_for_reader( |
2115 | | const ReaderAttributes& reader_attributes) |
2116 | | { |
2117 | | if (!is_initialized() || !is_secure()) |
2118 | | { |
2119 | | return false; |
2120 | | } |
2121 | | |
2122 | | if (security_attributes().is_rtps_protected) |
2123 | | { |
2124 | | return true; |
2125 | | } |
2126 | | |
2127 | | security::EndpointSecurityAttributes security_attributes; |
2128 | | if (security_manager().get_datareader_sec_attributes(reader_attributes.endpoint.properties, security_attributes)) |
2129 | | { |
2130 | | return (security_attributes.is_payload_protected == true || |
2131 | | security_attributes.is_submessage_protected == true); |
2132 | | } |
2133 | | |
2134 | | return false; |
2135 | | } |
2136 | | |
2137 | | #endif // if HAVE_SECURITY |
2138 | | |
2139 | | PDPSimple* RTPSParticipantImpl::pdpsimple() |
2140 | 0 | { |
2141 | 0 | return dynamic_cast<PDPSimple*>(mp_builtinProtocols->mp_PDP); |
2142 | 0 | } |
2143 | | |
2144 | | WLP* RTPSParticipantImpl::wlp() |
2145 | 0 | { |
2146 | 0 | return mp_builtinProtocols->mp_WLP; |
2147 | 0 | } |
2148 | | |
2149 | | fastdds::dds::builtin::TypeLookupManager* RTPSParticipantImpl::typelookup_manager() const |
2150 | 0 | { |
2151 | 0 | return mp_builtinProtocols->tlm_; |
2152 | 0 | } |
2153 | | |
2154 | | IPersistenceService* RTPSParticipantImpl::get_persistence_service( |
2155 | | const EndpointAttributes& param) |
2156 | 0 | { |
2157 | 0 | IPersistenceService* ret_val; |
2158 | |
|
2159 | 0 | ret_val = PersistenceFactory::create_persistence_service(param.properties); |
2160 | 0 | return ret_val != nullptr ? |
2161 | 0 | ret_val : |
2162 | 0 | PersistenceFactory::create_persistence_service(m_att.properties); |
2163 | 0 | } |
2164 | | |
2165 | | bool RTPSParticipantImpl::get_persistence_service( |
2166 | | bool is_builtin, |
2167 | | const EndpointAttributes& param, |
2168 | | IPersistenceService*& service) |
2169 | 0 | { |
2170 | 0 | service = nullptr; |
2171 | |
|
2172 | 0 | const char* debug_label = (param.endpointKind == WRITER ? "writer" : "reader"); |
2173 | | |
2174 | | // Check if also support persistence with TRANSIENT_LOCAL. |
2175 | 0 | DurabilityKind_t durability_red_line = get_persistence_durability_red_line(is_builtin); |
2176 | 0 | if (param.durabilityKind >= durability_red_line) |
2177 | 0 | { |
2178 | 0 | if (param.persistence_guid == c_Guid_Unknown) |
2179 | 0 | { |
2180 | 0 | logError(RTPS_PARTICIPANT, "Cannot create persistence service. Persistence GUID not specified"); |
2181 | 0 | return false; |
2182 | 0 | } |
2183 | 0 | service = get_persistence_service(param); |
2184 | 0 | if (service == nullptr) |
2185 | 0 | { |
2186 | 0 | logError(RTPS_PARTICIPANT, |
2187 | 0 | "Couldn't create writer persistence service for transient/persistent " << debug_label); |
2188 | 0 | return false; |
2189 | 0 | } |
2190 | 0 | } |
2191 | | |
2192 | | // Error log level can be disable. Avoid unused warning |
2193 | 0 | static_cast<void>(debug_label); |
2194 | |
|
2195 | 0 | return true; |
2196 | 0 | } |
2197 | | |
2198 | | bool RTPSParticipantImpl::get_new_entity_id( |
2199 | | EntityId_t& entityId) |
2200 | 0 | { |
2201 | 0 | if (entityId == c_EntityId_Unknown) |
2202 | 0 | { |
2203 | 0 | uint32_t idnum = ++IdCounter; |
2204 | 0 | octet* c = reinterpret_cast<octet*>(&idnum); |
2205 | 0 | entityId.value[2] = c[0]; |
2206 | 0 | entityId.value[1] = c[1]; |
2207 | 0 | entityId.value[0] = c[2]; |
2208 | 0 | entityId.value[3] = 0x01; // Vendor specific |
2209 | 0 | } |
2210 | 0 | else |
2211 | 0 | { |
2212 | 0 | return !existsEntityId(entityId, READER) && !existsEntityId(entityId, WRITER); |
2213 | 0 | } |
2214 | | |
2215 | 0 | return true; |
2216 | 0 | } |
2217 | | |
2218 | | void RTPSParticipantImpl::set_check_type_function( |
2219 | | std::function<bool(const std::string&)>&& check_type) |
2220 | 0 | { |
2221 | 0 | type_check_fn_ = std::move(check_type); |
2222 | 0 | } |
2223 | | |
2224 | | std::unique_ptr<RTPSMessageGroup_t> RTPSParticipantImpl::get_send_buffer() |
2225 | 0 | { |
2226 | 0 | return send_buffers_->get_buffer(this); |
2227 | 0 | } |
2228 | | |
2229 | | void RTPSParticipantImpl::return_send_buffer( |
2230 | | std::unique_ptr <RTPSMessageGroup_t>&& buffer) |
2231 | 0 | { |
2232 | 0 | send_buffers_->return_buffer(std::move(buffer)); |
2233 | 0 | } |
2234 | | |
2235 | | uint32_t RTPSParticipantImpl::get_domain_id() const |
2236 | 0 | { |
2237 | 0 | return domain_id_; |
2238 | 0 | } |
2239 | | |
2240 | | //!Compare metatraffic locators list searching for mutations |
2241 | | bool RTPSParticipantImpl::did_mutation_took_place_on_meta( |
2242 | | const LocatorList_t& MulticastLocatorList, |
2243 | | const LocatorList_t& UnicastLocatorList) const |
2244 | 0 | { |
2245 | 0 | using namespace std; |
2246 | 0 | using namespace eprosima::fastdds::rtps; |
2247 | |
|
2248 | 0 | if (m_att.builtin.metatrafficMulticastLocatorList == MulticastLocatorList |
2249 | 0 | && m_att.builtin.metatrafficUnicastLocatorList == UnicastLocatorList) |
2250 | 0 | { |
2251 | | // no mutation |
2252 | 0 | return false; |
2253 | 0 | } |
2254 | | |
2255 | | // If one of the locators is 0.0.0.0 we must replace it by all local interfaces like the framework does |
2256 | 0 | list<Locator_t> unicast_real_locators; |
2257 | 0 | LocatorListConstIterator it = UnicastLocatorList.begin(), old_it; |
2258 | 0 | LocatorList_t locals; |
2259 | |
|
2260 | 0 | do |
2261 | 0 | { |
2262 | | // copy ordinary locators till the first ANY |
2263 | 0 | old_it = it; |
2264 | 0 | it = find_if(it, UnicastLocatorList.end(), IPLocator::isAny); |
2265 | | |
2266 | | // copy ordinary locators |
2267 | 0 | copy(old_it, it, back_inserter(unicast_real_locators)); |
2268 | | |
2269 | | // transform new ones if needed |
2270 | 0 | if (it != UnicastLocatorList.end()) |
2271 | 0 | { |
2272 | 0 | const Locator_t& an_any = *it; |
2273 | | |
2274 | | // load interfaces if needed |
2275 | 0 | if (locals.empty()) |
2276 | 0 | { |
2277 | 0 | IPFinder::getIP4Address(&locals); |
2278 | 0 | } |
2279 | | |
2280 | | // add a locator for each local |
2281 | 0 | transform(locals.begin(), |
2282 | 0 | locals.end(), |
2283 | 0 | back_inserter(unicast_real_locators), |
2284 | 0 | [&an_any](const Locator_t& loc) -> Locator_t |
2285 | 0 | { |
2286 | 0 | Locator_t specific(loc); |
2287 | 0 | specific.port = an_any.port; |
2288 | 0 | specific.kind = an_any.kind; |
2289 | 0 | return specific; |
2290 | 0 | }); |
2291 | | |
2292 | | // search for the next if any |
2293 | 0 | ++it; |
2294 | 0 | } |
2295 | 0 | } while (it != UnicastLocatorList.end()); |
2296 | | |
2297 | | // TCP is a special case because physical ports are taken from the TransportDescriptors |
2298 | | // besides WAN address may be added by the transport |
2299 | 0 | struct ResetLogical |
2300 | 0 | { |
2301 | | // use of unary_function to introduce the following aliases is deprecated |
2302 | | // using argument_type = Locator_t; |
2303 | | // using result_type = Locator_t&; |
2304 | |
|
2305 | 0 | using Transports = vector<shared_ptr<TransportDescriptorInterface>>; |
2306 | |
|
2307 | 0 | ResetLogical( |
2308 | 0 | const Transports& tp) |
2309 | 0 | : Transports_(tp) |
2310 | 0 | { |
2311 | 0 | for (auto desc : Transports_) |
2312 | 0 | { |
2313 | 0 | if (nullptr == tcp4) |
2314 | 0 | { |
2315 | 0 | tcp4 = dynamic_pointer_cast<TCPv4TransportDescriptor>(desc); |
2316 | 0 | } |
2317 | |
|
2318 | 0 | if (nullptr == tcp6) |
2319 | 0 | { |
2320 | 0 | tcp6 = dynamic_pointer_cast<TCPv6TransportDescriptor>(desc); |
2321 | 0 | } |
2322 | 0 | } |
2323 | 0 | } |
2324 | |
|
2325 | 0 | uint16_t Tcp4ListeningPort() const |
2326 | 0 | { |
2327 | 0 | return tcp4 ? ( tcp4->listening_ports.empty() ? 0 : tcp4->listening_ports[0]) : 0; |
2328 | 0 | } |
2329 | |
|
2330 | 0 | uint16_t Tcp6ListeningPort() const |
2331 | 0 | { |
2332 | 0 | return tcp6 ? ( tcp6->listening_ports.empty() ? 0 : tcp6->listening_ports[0]) : 0; |
2333 | 0 | } |
2334 | |
|
2335 | 0 | void set_wan_address( |
2336 | 0 | Locator_t& loc) const |
2337 | 0 | { |
2338 | 0 | if (tcp4) |
2339 | 0 | { |
2340 | 0 | assert(LOCATOR_KIND_TCPv4 == loc.kind); |
2341 | 0 | auto& ip = tcp4->wan_addr; |
2342 | 0 | IPLocator::setWan(loc, ip[0], ip[1], ip[2], ip[3]); |
2343 | 0 | } |
2344 | 0 | } |
2345 | |
|
2346 | 0 | Locator_t operator ()( |
2347 | 0 | const Locator_t& loc) const |
2348 | 0 | { |
2349 | 0 | Locator_t ret(loc); |
2350 | 0 | switch (loc.kind) |
2351 | 0 | { |
2352 | 0 | case LOCATOR_KIND_TCPv4: |
2353 | 0 | set_wan_address(ret); |
2354 | 0 | IPLocator::setPhysicalPort(ret, Tcp4ListeningPort()); |
2355 | 0 | break; |
2356 | 0 | case LOCATOR_KIND_TCPv6: |
2357 | 0 | IPLocator::setPhysicalPort(ret, Tcp6ListeningPort()); |
2358 | 0 | break; |
2359 | 0 | } |
2360 | 0 | return ret; |
2361 | 0 | } |
2362 | | |
2363 | | // reference to the transports |
2364 | 0 | const Transports& Transports_; |
2365 | 0 | shared_ptr<TCPv4TransportDescriptor> tcp4; |
2366 | 0 | shared_ptr<TCPv6TransportDescriptor> tcp6; |
2367 | |
|
2368 | 0 | } |
2369 | 0 | transform_functor(m_att.userTransports); |
2370 | | |
2371 | | // transform-copy |
2372 | 0 | set<Locator_t> update_attributes; |
2373 | |
|
2374 | 0 | transform(m_att.builtin.metatrafficMulticastLocatorList.begin(), |
2375 | 0 | m_att.builtin.metatrafficMulticastLocatorList.end(), |
2376 | 0 | inserter(update_attributes, update_attributes.begin()), |
2377 | 0 | transform_functor); |
2378 | |
|
2379 | 0 | transform(m_att.builtin.metatrafficUnicastLocatorList.begin(), |
2380 | 0 | m_att.builtin.metatrafficUnicastLocatorList.end(), |
2381 | 0 | inserter(update_attributes, update_attributes.begin()), |
2382 | 0 | transform_functor); |
2383 | |
|
2384 | 0 | set<Locator_t> original_ones; |
2385 | |
|
2386 | 0 | transform(MulticastLocatorList.begin(), |
2387 | 0 | MulticastLocatorList.end(), |
2388 | 0 | inserter(original_ones, original_ones.begin()), |
2389 | 0 | transform_functor); |
2390 | |
|
2391 | 0 | transform(unicast_real_locators.begin(), |
2392 | 0 | unicast_real_locators.end(), |
2393 | 0 | inserter(original_ones, original_ones.begin()), |
2394 | 0 | transform_functor); |
2395 | | |
2396 | | // if equal then no mutation took place on physical ports |
2397 | 0 | return !(update_attributes == original_ones); |
2398 | 0 | } |
2399 | | |
2400 | | DurabilityKind_t RTPSParticipantImpl::get_persistence_durability_red_line( |
2401 | | bool is_builtin_endpoint) |
2402 | 0 | { |
2403 | 0 | DurabilityKind_t durability_red_line = TRANSIENT; |
2404 | 0 | if (!is_builtin_endpoint) |
2405 | 0 | { |
2406 | 0 | std::string* persistence_support_transient_local_property = PropertyPolicyHelper::find_property( |
2407 | 0 | m_att.properties, "dds.persistence.also-support-transient-local"); |
2408 | 0 | if (nullptr != persistence_support_transient_local_property && |
2409 | 0 | 0 == persistence_support_transient_local_property->compare("true")) |
2410 | 0 | { |
2411 | 0 | durability_red_line = TRANSIENT_LOCAL; |
2412 | 0 | } |
2413 | 0 | } |
2414 | |
|
2415 | 0 | return durability_red_line; |
2416 | 0 | } |
2417 | | |
2418 | | void RTPSParticipantImpl::environment_file_has_changed() |
2419 | 0 | { |
2420 | 0 | RTPSParticipantAttributes patt = m_att; |
2421 | | // Only if it is a server/backup or a client override |
2422 | 0 | if (DiscoveryProtocol_t::SERVER == m_att.builtin.discovery_config.discoveryProtocol || |
2423 | 0 | DiscoveryProtocol_t::BACKUP == m_att.builtin.discovery_config.discoveryProtocol || |
2424 | 0 | client_override_) |
2425 | 0 | { |
2426 | 0 | if (load_environment_server_info(patt.builtin.discovery_config.m_DiscoveryServers)) |
2427 | 0 | { |
2428 | 0 | update_attributes(patt); |
2429 | 0 | } |
2430 | 0 | } |
2431 | 0 | else |
2432 | 0 | { |
2433 | 0 | logWarning(RTPS_QOS_CHECK, "Trying to add Discovery Servers to a participant which is not a SERVER, BACKUP " << |
2434 | 0 | "or an overriden CLIENT (SIMPLE participant transformed into CLIENT with the environment variable)"); |
2435 | 0 | } |
2436 | 0 | } |
2437 | | |
2438 | | void RTPSParticipantImpl::get_default_metatraffic_locators() |
2439 | 0 | { |
2440 | 0 | uint32_t metatraffic_multicast_port = m_att.port.getMulticastPort(domain_id_); |
2441 | 0 | uint32_t metatraffic_unicast_port = m_att.port.getUnicastPort(domain_id_, |
2442 | 0 | static_cast<uint32_t>(m_att.participantID)); |
2443 | |
|
2444 | 0 | m_network_Factory.getDefaultMetatrafficMulticastLocators(m_att.builtin.metatrafficMulticastLocatorList, |
2445 | 0 | metatraffic_multicast_port); |
2446 | 0 | m_network_Factory.NormalizeLocators(m_att.builtin.metatrafficMulticastLocatorList); |
2447 | |
|
2448 | 0 | m_network_Factory.getDefaultMetatrafficUnicastLocators(m_att.builtin.metatrafficUnicastLocatorList, |
2449 | 0 | metatraffic_unicast_port); |
2450 | 0 | m_network_Factory.NormalizeLocators(m_att.builtin.metatrafficUnicastLocatorList); |
2451 | 0 | } |
2452 | | |
2453 | | void RTPSParticipantImpl::get_default_unicast_locators() |
2454 | 0 | { |
2455 | 0 | m_network_Factory.getDefaultUnicastLocators(domain_id_, m_att.defaultUnicastLocatorList, m_att); |
2456 | 0 | m_network_Factory.NormalizeLocators(m_att.defaultUnicastLocatorList); |
2457 | 0 | } |
2458 | | |
2459 | | #ifdef FASTDDS_STATISTICS |
2460 | | |
2461 | | bool RTPSParticipantImpl::register_in_writer( |
2462 | | std::shared_ptr<fastdds::statistics::IListener> listener, |
2463 | | GUID_t writer_guid) |
2464 | | { |
2465 | | bool res = false; |
2466 | | |
2467 | | if ( GUID_t::unknown() == writer_guid ) |
2468 | | { |
2469 | | shared_lock<shared_mutex> _(endpoints_list_mutex); |
2470 | | res = true; |
2471 | | for ( auto writer : m_userWriterList) |
2472 | | { |
2473 | | if (!fastdds::statistics::is_statistics_builtin(writer->getGuid().entityId)) |
2474 | | { |
2475 | | res &= writer->add_statistics_listener(listener); |
2476 | | } |
2477 | | } |
2478 | | } |
2479 | | else if (!fastdds::statistics::is_statistics_builtin(writer_guid.entityId)) |
2480 | | { |
2481 | | RTPSWriter* writer = find_local_writer(writer_guid); |
2482 | | res = writer->add_statistics_listener(listener); |
2483 | | } |
2484 | | |
2485 | | return res; |
2486 | | } |
2487 | | |
2488 | | bool RTPSParticipantImpl::register_in_reader( |
2489 | | std::shared_ptr<fastdds::statistics::IListener> listener, |
2490 | | GUID_t reader_guid) |
2491 | | { |
2492 | | bool res = false; |
2493 | | |
2494 | | if ( GUID_t::unknown() == reader_guid ) |
2495 | | { |
2496 | | shared_lock<shared_mutex> _(endpoints_list_mutex); |
2497 | | res = true; |
2498 | | for ( auto reader : m_userReaderList) |
2499 | | { |
2500 | | if (!fastdds::statistics::is_statistics_builtin(reader->getGuid().entityId)) |
2501 | | { |
2502 | | res &= reader->add_statistics_listener(listener); |
2503 | | } |
2504 | | } |
2505 | | } |
2506 | | else if (!fastdds::statistics::is_statistics_builtin(reader_guid.entityId)) |
2507 | | { |
2508 | | RTPSReader* reader = find_local_reader(reader_guid); |
2509 | | res = reader->add_statistics_listener(listener); |
2510 | | } |
2511 | | |
2512 | | return res; |
2513 | | } |
2514 | | |
2515 | | bool RTPSParticipantImpl::unregister_in_writer( |
2516 | | std::shared_ptr<fastdds::statistics::IListener> listener) |
2517 | | { |
2518 | | shared_lock<shared_mutex> _(endpoints_list_mutex); |
2519 | | bool res = true; |
2520 | | |
2521 | | for ( auto writer : m_userWriterList) |
2522 | | { |
2523 | | if (!fastdds::statistics::is_statistics_builtin(writer->getGuid().entityId)) |
2524 | | { |
2525 | | res &= writer->remove_statistics_listener(listener); |
2526 | | } |
2527 | | } |
2528 | | |
2529 | | return res; |
2530 | | } |
2531 | | |
2532 | | bool RTPSParticipantImpl::unregister_in_reader( |
2533 | | std::shared_ptr<fastdds::statistics::IListener> listener) |
2534 | | { |
2535 | | shared_lock<shared_mutex> _(endpoints_list_mutex); |
2536 | | bool res = true; |
2537 | | |
2538 | | for ( auto reader : m_userReaderList) |
2539 | | { |
2540 | | if (!fastdds::statistics::is_statistics_builtin(reader->getGuid().entityId)) |
2541 | | { |
2542 | | res &= reader->remove_statistics_listener(listener); |
2543 | | } |
2544 | | } |
2545 | | |
2546 | | return res; |
2547 | | } |
2548 | | |
2549 | | #endif // FASTDDS_STATISTICS |
2550 | | |
2551 | | } /* namespace rtps */ |
2552 | | } /* namespace fastrtps */ |
2553 | | } /* namespace eprosima */ |