/src/Fast-DDS/src/cpp/rtps/transport/TCPChannelResource.cpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2018 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | #include <rtps/transport/TCPChannelResource.h> |
16 | | |
17 | | #include <chrono> |
18 | | #include <thread> |
19 | | |
20 | | #include <fastdds/utils/IPLocator.hpp> |
21 | | |
22 | | #include <rtps/transport/asio_helpers.hpp> |
23 | | #include <rtps/transport/TCPTransportInterface.h> |
24 | | |
25 | | namespace eprosima { |
26 | | namespace fastdds { |
27 | | namespace rtps { |
28 | | |
29 | | using Log = fastdds::dds::Log; |
30 | | |
31 | | /** |
32 | | * Search for the base port in the current domain without taking account the participant |
33 | | */ |
34 | | static uint16_t GetBaseAutoPort( |
35 | | uint16_t currentPort) |
36 | 0 | { |
37 | 0 | if (currentPort < 7411) |
38 | 0 | { |
39 | 0 | return currentPort; |
40 | 0 | } |
41 | 0 | uint16_t aux = currentPort - 7411; // base + offset3 |
42 | 0 | uint16_t domain = static_cast<uint16_t>(aux / 250.); |
43 | 0 | uint16_t part = static_cast<uint16_t>(aux % 250); |
44 | 0 | part = part / 2; |
45 | |
|
46 | 0 | return 7411 + (domain * 250); // And participant 0 |
47 | 0 | } |
48 | | |
49 | | TCPChannelResource::TCPChannelResource( |
50 | | TCPTransportInterface* parent, |
51 | | const Locator& locator, |
52 | | uint32_t maxMsgSize) |
53 | 0 | : ChannelResource(maxMsgSize) |
54 | 0 | , parent_ (parent) |
55 | 0 | , locator_(locator) |
56 | 0 | , waiting_for_keep_alive_(false) |
57 | 0 | , connection_status_(eConnectionStatus::eDisconnected) |
58 | 0 | , tcp_connection_type_(TCPConnectionType::TCP_CONNECT_TYPE) |
59 | 0 | { |
60 | 0 | } |
61 | | |
62 | | TCPChannelResource::TCPChannelResource( |
63 | | TCPTransportInterface* parent, |
64 | | uint32_t maxMsgSize) |
65 | 0 | : ChannelResource(maxMsgSize) |
66 | 0 | , parent_(parent) |
67 | 0 | , locator_() |
68 | 0 | , waiting_for_keep_alive_(false) |
69 | 0 | , connection_status_(eConnectionStatus::eConnected) |
70 | 0 | , tcp_connection_type_(TCPConnectionType::TCP_ACCEPT_TYPE) |
71 | 0 | { |
72 | 0 | } |
73 | | |
74 | | void TCPChannelResource::disable() |
75 | 0 | { |
76 | 0 | ChannelResource::disable(); // prevent asio callback workings on this channel. |
77 | |
|
78 | 0 | disconnect(); |
79 | 0 | } |
80 | | |
81 | | ResponseCode TCPChannelResource::process_bind_request( |
82 | | const Locator& locator) |
83 | 0 | { |
84 | 0 | eConnectionStatus expected = TCPChannelResource::eConnectionStatus::eWaitingForBind; |
85 | 0 | if (connection_status_.compare_exchange_strong(expected, eConnectionStatus::eEstablished)) |
86 | 0 | { |
87 | 0 | locator_ = IPLocator::toPhysicalLocator(locator); |
88 | 0 | EPROSIMA_LOG_INFO(RTCP_MSG, "Connection Established"); |
89 | 0 | return RETCODE_OK; |
90 | 0 | } |
91 | 0 | else if (expected == eConnectionStatus::eEstablished) |
92 | 0 | { |
93 | 0 | return RETCODE_EXISTING_CONNECTION; |
94 | 0 | } |
95 | | |
96 | 0 | return RETCODE_SERVER_ERROR; |
97 | 0 | } |
98 | | |
99 | | void TCPChannelResource::set_all_ports_pending() |
100 | 0 | { |
101 | 0 | std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_); |
102 | 0 | pending_logical_output_ports_.insert(pending_logical_output_ports_.end(), |
103 | 0 | logical_output_ports_.begin(), |
104 | 0 | logical_output_ports_.end()); |
105 | 0 | logical_output_ports_.clear(); |
106 | 0 | } |
107 | | |
108 | | bool TCPChannelResource::is_logical_port_opened( |
109 | | uint16_t port) |
110 | 0 | { |
111 | 0 | std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_); |
112 | 0 | return is_logical_port_opened_nts(port); |
113 | 0 | } |
114 | | |
115 | | bool TCPChannelResource::is_logical_port_opened_nts( |
116 | | uint16_t port) |
117 | 0 | { |
118 | 0 | return std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) != logical_output_ports_.end(); |
119 | 0 | } |
120 | | |
121 | | bool TCPChannelResource::is_logical_port_added( |
122 | | uint16_t port) |
123 | 0 | { |
124 | 0 | std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_); |
125 | 0 | return std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) != logical_output_ports_.end() |
126 | 0 | || std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), port) |
127 | 0 | != pending_logical_output_ports_.end(); |
128 | 0 | } |
129 | | |
130 | | bool TCPChannelResource::wait_logical_port_under_negotiation( |
131 | | uint16_t port, |
132 | | const std::chrono::milliseconds& timeout) |
133 | 0 | { |
134 | 0 | std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_); |
135 | | |
136 | | // Early return if the port is already opened. |
137 | 0 | if (is_logical_port_opened_nts(port)) |
138 | 0 | { |
139 | 0 | return true; |
140 | 0 | } |
141 | | |
142 | | // Early return if the timeout is 0. |
143 | 0 | if (timeout == std::chrono::milliseconds(0)) |
144 | 0 | { |
145 | 0 | return false; |
146 | 0 | } |
147 | | |
148 | | // The port is under negotiation if it's in the pending list and in the negotiation list. |
149 | 0 | bool found_in_negotiating_list = negotiating_logical_ports_.end() != std::find_if( |
150 | 0 | negotiating_logical_ports_.begin(), |
151 | 0 | negotiating_logical_ports_.end(), |
152 | 0 | [port](const decltype(negotiating_logical_ports_)::value_type& item) |
153 | 0 | { |
154 | 0 | return item.second == port; |
155 | 0 | }); |
156 | |
|
157 | 0 | if (found_in_negotiating_list && |
158 | 0 | pending_logical_output_ports_.end() != std::find( |
159 | 0 | pending_logical_output_ports_.begin(), |
160 | 0 | pending_logical_output_ports_.end(), |
161 | 0 | port)) |
162 | 0 | { |
163 | | // Wait for the negotiation to finish. The condition variable might get notified if other logical port is opened. In such case, |
164 | | // it should wait again with the respective remaining time. |
165 | 0 | auto wait_predicate = [this, port]() -> bool |
166 | 0 | { |
167 | 0 | return is_logical_port_opened_nts(port); |
168 | 0 | }; |
169 | 0 | logical_output_ports_updated_cv.wait_for(scopedLock, timeout, wait_predicate); |
170 | 0 | } |
171 | |
|
172 | 0 | return is_logical_port_opened_nts(port); |
173 | 0 | } |
174 | | |
175 | | void TCPChannelResource::add_logical_port( |
176 | | uint16_t port, |
177 | | RTCPMessageManager* rtcp_manager) |
178 | 0 | { |
179 | 0 | std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_); |
180 | | // Already opened? |
181 | 0 | if (std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) == logical_output_ports_.end()) |
182 | 0 | { |
183 | 0 | if (port == 0) |
184 | 0 | { |
185 | 0 | EPROSIMA_LOG_ERROR(RTPS, "Trying to open logical port 0."); |
186 | 0 | } // But let's continue... |
187 | |
|
188 | 0 | if (std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), port) |
189 | 0 | == pending_logical_output_ports_.end()) // Check isn't enqueued already |
190 | 0 | { |
191 | 0 | pending_logical_output_ports_.emplace_back(port); |
192 | 0 | if (connection_established()) |
193 | 0 | { |
194 | 0 | TCPTransactionId id = rtcp_manager->sendOpenLogicalPortRequest(this, port); |
195 | 0 | negotiating_logical_ports_[id] = port; |
196 | 0 | } |
197 | 0 | } |
198 | 0 | } |
199 | |
|
200 | 0 | } |
201 | | |
202 | | void TCPChannelResource::send_pending_open_logical_ports( |
203 | | RTCPMessageManager* rtcp_manager) |
204 | 0 | { |
205 | 0 | std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_); |
206 | 0 | if (!pending_logical_output_ports_.empty()) |
207 | 0 | { |
208 | 0 | for (uint16_t port : pending_logical_output_ports_) |
209 | 0 | { |
210 | 0 | TCPTransactionId id = rtcp_manager->sendOpenLogicalPortRequest(this, port); |
211 | 0 | negotiating_logical_ports_[id] = port; |
212 | 0 | std::this_thread::sleep_for(std::chrono::milliseconds(100)); |
213 | 0 | } |
214 | 0 | } |
215 | 0 | } |
216 | | |
217 | | void TCPChannelResource::add_logical_port_response( |
218 | | const TCPTransactionId& id, |
219 | | bool success, |
220 | | RTCPMessageManager* rtcp_manager) |
221 | 0 | { |
222 | 0 | std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_); |
223 | 0 | auto it = negotiating_logical_ports_.find(id); |
224 | 0 | if (it != negotiating_logical_ports_.end()) |
225 | 0 | { |
226 | 0 | uint16_t port = it->second; |
227 | 0 | auto portIt = std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), port); |
228 | 0 | negotiating_logical_ports_.erase(it); |
229 | 0 | if (portIt != pending_logical_output_ports_.end()) |
230 | 0 | { |
231 | 0 | if (success) |
232 | 0 | { |
233 | 0 | pending_logical_output_ports_.erase(portIt); |
234 | 0 | logical_output_ports_.push_back(port); |
235 | 0 | logical_output_ports_updated_cv.notify_all(); |
236 | 0 | EPROSIMA_LOG_INFO(RTCP, "OpenedLogicalPort: " << port); |
237 | 0 | } |
238 | 0 | else |
239 | 0 | { |
240 | 0 | scopedLock.unlock(); |
241 | 0 | prepare_send_check_logical_ports_req(port, rtcp_manager); |
242 | 0 | } |
243 | 0 | } |
244 | 0 | else |
245 | 0 | { |
246 | 0 | EPROSIMA_LOG_WARNING(RTCP, "Received add_logical_port_response for port " |
247 | 0 | << port << ", but it wasn't found in pending list."); |
248 | 0 | } |
249 | 0 | } |
250 | 0 | else |
251 | 0 | { |
252 | 0 | EPROSIMA_LOG_WARNING(RTCP, "Received add_logical_port_response, but the transaction id wasn't registered " << |
253 | 0 | "(maybe removed" << " while negotiating?)."); |
254 | 0 | } |
255 | 0 | } |
256 | | |
257 | | void TCPChannelResource::prepare_send_check_logical_ports_req( |
258 | | uint16_t closedPort, |
259 | | RTCPMessageManager* rtcp_manager) |
260 | 0 | { |
261 | 0 | std::vector<uint16_t> candidatePorts; |
262 | 0 | uint16_t base_port = GetBaseAutoPort(closedPort); // The first failed port |
263 | 0 | uint16_t max_port = closedPort + parent_->GetMaxLogicalPort(); |
264 | |
|
265 | 0 | for (uint16_t p = base_port; |
266 | 0 | p <= closedPort + (parent_->GetLogicalPortRange() |
267 | 0 | * parent_->GetLogicalPortIncrement()); |
268 | 0 | p += parent_->GetLogicalPortIncrement()) |
269 | 0 | { |
270 | | // Don't add ports just tested and already pendings |
271 | 0 | if (p <= max_port && p != closedPort) |
272 | 0 | { |
273 | 0 | std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_); |
274 | 0 | auto pendingIt = std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), p); |
275 | 0 | if (pendingIt == pending_logical_output_ports_.end()) |
276 | 0 | { |
277 | 0 | candidatePorts.emplace_back(p); |
278 | 0 | } |
279 | 0 | } |
280 | 0 | } |
281 | |
|
282 | 0 | if (candidatePorts.empty()) // No more available ports! |
283 | 0 | { |
284 | 0 | EPROSIMA_LOG_ERROR(RTCP, "Cannot find an available logical port."); |
285 | 0 | } |
286 | 0 | else |
287 | 0 | { |
288 | 0 | TCPTransactionId id = rtcp_manager->sendCheckLogicalPortsRequest(this, candidatePorts); |
289 | 0 | std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_); |
290 | 0 | last_checked_logical_port_[id] = candidatePorts.back(); |
291 | 0 | } |
292 | 0 | } |
293 | | |
294 | | void TCPChannelResource::process_check_logical_ports_response( |
295 | | const TCPTransactionId& transactionId, |
296 | | const std::vector<uint16_t>& availablePorts, |
297 | | RTCPMessageManager* rtcp_manager) |
298 | 0 | { |
299 | 0 | std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_); |
300 | 0 | auto it = last_checked_logical_port_.find(transactionId); |
301 | 0 | if (it != last_checked_logical_port_.end()) |
302 | 0 | { |
303 | 0 | uint16_t lastPort = it->second; |
304 | 0 | last_checked_logical_port_.erase(it); |
305 | 0 | scopedLock.unlock(); |
306 | 0 | if (availablePorts.empty()) |
307 | 0 | { |
308 | 0 | prepare_send_check_logical_ports_req(lastPort, rtcp_manager); |
309 | 0 | } |
310 | 0 | else |
311 | 0 | { |
312 | 0 | add_logical_port(availablePorts.front(), rtcp_manager); |
313 | 0 | } |
314 | 0 | } |
315 | 0 | else |
316 | 0 | { |
317 | 0 | EPROSIMA_LOG_WARNING(RTCP, "Received process_check_logical_ports_response without sending a Request."); |
318 | 0 | } |
319 | 0 | } |
320 | | |
321 | | void TCPChannelResource::set_logical_port_pending( |
322 | | uint16_t port) |
323 | 0 | { |
324 | 0 | std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_); |
325 | 0 | auto it = std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port); |
326 | 0 | if (it != logical_output_ports_.end()) |
327 | 0 | { |
328 | 0 | pending_logical_output_ports_.push_back(port); |
329 | 0 | logical_output_ports_.erase(it); |
330 | 0 | } |
331 | 0 | } |
332 | | |
333 | | bool TCPChannelResource::remove_logical_port( |
334 | | uint16_t port) |
335 | 0 | { |
336 | 0 | std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_); |
337 | 0 | if (!is_logical_port_added(port)) |
338 | 0 | { |
339 | 0 | return false; |
340 | 0 | } |
341 | | |
342 | 0 | auto it = std::remove(logical_output_ports_.begin(), logical_output_ports_.end(), port); |
343 | 0 | logical_output_ports_.erase(it, logical_output_ports_.end()); |
344 | 0 | it = std::remove(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), port); |
345 | 0 | pending_logical_output_ports_.erase(it, pending_logical_output_ports_.end()); |
346 | 0 | return true; |
347 | 0 | } |
348 | | |
349 | | bool TCPChannelResource::check_socket_send_buffer( |
350 | | const size_t& msg_size, |
351 | | const asio::ip::tcp::socket::native_handle_type& socket_native_handle) |
352 | 0 | { |
353 | 0 | int bytesInSendQueue = 0; |
354 | |
|
355 | 0 | #ifndef _WIN32 |
356 | 0 | if (ioctl(socket_native_handle, TIOCOUTQ, &bytesInSendQueue) == -1) |
357 | 0 | { |
358 | 0 | bytesInSendQueue = 0; |
359 | 0 | } |
360 | | #else // ifdef _WIN32 |
361 | | static_cast<void>(socket_native_handle); |
362 | | #endif // ifndef _WIN32 |
363 | | |
364 | |
|
365 | 0 | size_t future_queue_size = size_t(bytesInSendQueue) + msg_size; |
366 | | // TCP actually allocates twice the size of the buffer requested. |
367 | 0 | if (future_queue_size > size_t(2 * parent_->configuration()->sendBufferSize)) |
368 | 0 | { |
369 | 0 | return false; |
370 | 0 | } |
371 | 0 | return true; |
372 | 0 | } |
373 | | |
374 | | void TCPChannelResource::set_socket_options( |
375 | | asio::basic_socket<asio::ip::tcp>& socket, |
376 | | const TCPTransportDescriptor* options) |
377 | 0 | { |
378 | 0 | uint32_t minimum_value = options->maxMessageSize; |
379 | | |
380 | | // Set the send buffer size |
381 | 0 | { |
382 | 0 | uint32_t desired_value = options->sendBufferSize; |
383 | 0 | uint32_t configured_value = 0; |
384 | 0 | if (!asio_helpers::try_setting_buffer_size<asio::socket_base::send_buffer_size>( |
385 | 0 | socket, desired_value, minimum_value, configured_value)) |
386 | 0 | { |
387 | 0 | EPROSIMA_LOG_ERROR(TCP_TRANSPORT, |
388 | 0 | "Couldn't set send buffer size to minimum value: " << minimum_value); |
389 | 0 | } |
390 | 0 | else if (desired_value != configured_value) |
391 | 0 | { |
392 | 0 | EPROSIMA_LOG_WARNING(TCP_TRANSPORT, |
393 | 0 | "Couldn't set send buffer size to desired value. " |
394 | 0 | << "Using " << configured_value << " instead of " << desired_value); |
395 | 0 | } |
396 | 0 | } |
397 | | |
398 | | // Set the receive buffer size |
399 | 0 | { |
400 | 0 | uint32_t desired_value = options->receiveBufferSize; |
401 | 0 | uint32_t configured_value = 0; |
402 | 0 | if (!asio_helpers::try_setting_buffer_size<asio::socket_base::receive_buffer_size>( |
403 | 0 | socket, desired_value, minimum_value, configured_value)) |
404 | 0 | { |
405 | 0 | EPROSIMA_LOG_ERROR(TCP_TRANSPORT, |
406 | 0 | "Couldn't set receive buffer size to minimum value: " << minimum_value); |
407 | 0 | } |
408 | 0 | else if (desired_value != configured_value) |
409 | 0 | { |
410 | 0 | EPROSIMA_LOG_WARNING(TCP_TRANSPORT, |
411 | 0 | "Couldn't set receive buffer size to desired value. " |
412 | 0 | << "Using " << configured_value << " instead of " << desired_value); |
413 | 0 | } |
414 | 0 | } |
415 | | |
416 | | // Set the TCP_NODELAY option |
417 | 0 | socket.set_option(asio::ip::tcp::no_delay(options->enable_tcp_nodelay)); |
418 | 0 | } |
419 | | |
420 | | } // namespace rtps |
421 | | } // namespace fastdds |
422 | | } // namespace eprosima |