Coverage Report

Created: 2025-06-13 06:46

/src/Fast-DDS/src/cpp/rtps/transport/TCPChannelResource.cpp
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2018 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
#include <rtps/transport/TCPChannelResource.h>
16
17
#include <chrono>
18
#include <thread>
19
20
#include <fastdds/utils/IPLocator.hpp>
21
22
#include <rtps/transport/asio_helpers.hpp>
23
#include <rtps/transport/TCPTransportInterface.h>
24
25
namespace eprosima {
26
namespace fastdds {
27
namespace rtps {
28
29
using Log = fastdds::dds::Log;
30
31
/**
32
 * Search for the base port in the current domain without taking account the participant
33
 */
34
static uint16_t GetBaseAutoPort(
35
        uint16_t currentPort)
36
0
{
37
0
    if (currentPort < 7411)
38
0
    {
39
0
        return currentPort;
40
0
    }
41
0
    uint16_t aux = currentPort - 7411; // base + offset3
42
0
    uint16_t domain = static_cast<uint16_t>(aux / 250.);
43
0
    uint16_t part = static_cast<uint16_t>(aux % 250);
44
0
    part = part / 2;
45
46
0
    return 7411 + (domain * 250); // And participant 0
47
0
}
48
49
TCPChannelResource::TCPChannelResource(
50
        TCPTransportInterface* parent,
51
        const Locator& locator,
52
        uint32_t maxMsgSize)
53
0
    : ChannelResource(maxMsgSize)
54
0
    , parent_ (parent)
55
0
    , locator_(locator)
56
0
    , waiting_for_keep_alive_(false)
57
0
    , connection_status_(eConnectionStatus::eDisconnected)
58
0
    , tcp_connection_type_(TCPConnectionType::TCP_CONNECT_TYPE)
59
0
{
60
0
}
61
62
TCPChannelResource::TCPChannelResource(
63
        TCPTransportInterface* parent,
64
        uint32_t maxMsgSize)
65
0
    : ChannelResource(maxMsgSize)
66
0
    , parent_(parent)
67
0
    , locator_()
68
0
    , waiting_for_keep_alive_(false)
69
0
    , connection_status_(eConnectionStatus::eConnected)
70
0
    , tcp_connection_type_(TCPConnectionType::TCP_ACCEPT_TYPE)
71
0
{
72
0
}
73
74
void TCPChannelResource::disable()
75
0
{
76
0
    ChannelResource::disable(); // prevent asio callback workings on this channel.
77
78
0
    disconnect();
79
0
}
80
81
ResponseCode TCPChannelResource::process_bind_request(
82
        const Locator& locator)
83
0
{
84
0
    eConnectionStatus expected = TCPChannelResource::eConnectionStatus::eWaitingForBind;
85
0
    if (connection_status_.compare_exchange_strong(expected, eConnectionStatus::eEstablished))
86
0
    {
87
0
        locator_ = IPLocator::toPhysicalLocator(locator);
88
0
        EPROSIMA_LOG_INFO(RTCP_MSG, "Connection Established");
89
0
        return RETCODE_OK;
90
0
    }
91
0
    else if (expected == eConnectionStatus::eEstablished)
92
0
    {
93
0
        return RETCODE_EXISTING_CONNECTION;
94
0
    }
95
96
0
    return RETCODE_SERVER_ERROR;
97
0
}
98
99
void TCPChannelResource::set_all_ports_pending()
100
0
{
101
0
    std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
102
0
    pending_logical_output_ports_.insert(pending_logical_output_ports_.end(),
103
0
            logical_output_ports_.begin(),
104
0
            logical_output_ports_.end());
105
0
    logical_output_ports_.clear();
106
0
}
107
108
bool TCPChannelResource::is_logical_port_opened(
109
        uint16_t port)
110
0
{
111
0
    std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
112
0
    return is_logical_port_opened_nts(port);
113
0
}
114
115
bool TCPChannelResource::is_logical_port_opened_nts(
116
        uint16_t port)
117
0
{
118
0
    return std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) != logical_output_ports_.end();
119
0
}
120
121
bool TCPChannelResource::is_logical_port_added(
122
        uint16_t port)
123
0
{
124
0
    std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
125
0
    return std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) != logical_output_ports_.end()
126
0
           || std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), port)
127
0
           != pending_logical_output_ports_.end();
128
0
}
129
130
bool TCPChannelResource::wait_logical_port_under_negotiation(
131
        uint16_t port,
132
        const std::chrono::milliseconds& timeout)
133
0
{
134
0
    std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
135
136
    // Early return if the port is already opened.
137
0
    if (is_logical_port_opened_nts(port))
138
0
    {
139
0
        return true;
140
0
    }
141
142
    // Early return if the timeout is 0.
143
0
    if (timeout == std::chrono::milliseconds(0))
144
0
    {
145
0
        return false;
146
0
    }
147
148
    // The port is under negotiation if it's in the pending list and in the negotiation list.
149
0
    bool found_in_negotiating_list = negotiating_logical_ports_.end() != std::find_if(
150
0
        negotiating_logical_ports_.begin(),
151
0
        negotiating_logical_ports_.end(),
152
0
        [port](const decltype(negotiating_logical_ports_)::value_type& item)
153
0
        {
154
0
            return item.second == port;
155
0
        });
156
157
0
    if (found_in_negotiating_list &&
158
0
            pending_logical_output_ports_.end() != std::find(
159
0
                pending_logical_output_ports_.begin(),
160
0
                pending_logical_output_ports_.end(),
161
0
                port))
162
0
    {
163
        // Wait for the negotiation to finish. The condition variable might get notified if other logical port is opened. In such case,
164
        // it should wait again with the respective remaining time.
165
0
        auto wait_predicate = [this, port]() -> bool
166
0
                {
167
0
                    return is_logical_port_opened_nts(port);
168
0
                };
169
0
        logical_output_ports_updated_cv.wait_for(scopedLock, timeout, wait_predicate);
170
0
    }
171
172
0
    return is_logical_port_opened_nts(port);
173
0
}
174
175
void TCPChannelResource::add_logical_port(
176
        uint16_t port,
177
        RTCPMessageManager* rtcp_manager)
178
0
{
179
0
    std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
180
    // Already opened?
181
0
    if (std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port) == logical_output_ports_.end())
182
0
    {
183
0
        if (port == 0)
184
0
        {
185
0
            EPROSIMA_LOG_ERROR(RTPS, "Trying to open logical port 0.");
186
0
        } // But let's continue...
187
188
0
        if (std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), port)
189
0
                == pending_logical_output_ports_.end()) // Check isn't enqueued already
190
0
        {
191
0
            pending_logical_output_ports_.emplace_back(port);
192
0
            if (connection_established())
193
0
            {
194
0
                TCPTransactionId id = rtcp_manager->sendOpenLogicalPortRequest(this, port);
195
0
                negotiating_logical_ports_[id] = port;
196
0
            }
197
0
        }
198
0
    }
199
200
0
}
201
202
void TCPChannelResource::send_pending_open_logical_ports(
203
        RTCPMessageManager* rtcp_manager)
204
0
{
205
0
    std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
206
0
    if (!pending_logical_output_ports_.empty())
207
0
    {
208
0
        for (uint16_t port : pending_logical_output_ports_)
209
0
        {
210
0
            TCPTransactionId id = rtcp_manager->sendOpenLogicalPortRequest(this, port);
211
0
            negotiating_logical_ports_[id] = port;
212
0
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
213
0
        }
214
0
    }
215
0
}
216
217
void TCPChannelResource::add_logical_port_response(
218
        const TCPTransactionId& id,
219
        bool success,
220
        RTCPMessageManager* rtcp_manager)
221
0
{
222
0
    std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
223
0
    auto it = negotiating_logical_ports_.find(id);
224
0
    if (it != negotiating_logical_ports_.end())
225
0
    {
226
0
        uint16_t port = it->second;
227
0
        auto portIt = std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), port);
228
0
        negotiating_logical_ports_.erase(it);
229
0
        if (portIt != pending_logical_output_ports_.end())
230
0
        {
231
0
            if (success)
232
0
            {
233
0
                pending_logical_output_ports_.erase(portIt);
234
0
                logical_output_ports_.push_back(port);
235
0
                logical_output_ports_updated_cv.notify_all();
236
0
                EPROSIMA_LOG_INFO(RTCP, "OpenedLogicalPort: " << port);
237
0
            }
238
0
            else
239
0
            {
240
0
                scopedLock.unlock();
241
0
                prepare_send_check_logical_ports_req(port, rtcp_manager);
242
0
            }
243
0
        }
244
0
        else
245
0
        {
246
0
            EPROSIMA_LOG_WARNING(RTCP, "Received add_logical_port_response for port "
247
0
                    << port << ", but it wasn't found in pending list.");
248
0
        }
249
0
    }
250
0
    else
251
0
    {
252
0
        EPROSIMA_LOG_WARNING(RTCP, "Received add_logical_port_response, but the transaction id wasn't registered " <<
253
0
                "(maybe removed" << " while negotiating?).");
254
0
    }
255
0
}
256
257
void TCPChannelResource::prepare_send_check_logical_ports_req(
258
        uint16_t closedPort,
259
        RTCPMessageManager* rtcp_manager)
260
0
{
261
0
    std::vector<uint16_t> candidatePorts;
262
0
    uint16_t base_port = GetBaseAutoPort(closedPort); // The first failed port
263
0
    uint16_t max_port = closedPort + parent_->GetMaxLogicalPort();
264
265
0
    for (uint16_t p = base_port;
266
0
            p <= closedPort + (parent_->GetLogicalPortRange()
267
0
            * parent_->GetLogicalPortIncrement());
268
0
            p += parent_->GetLogicalPortIncrement())
269
0
    {
270
        // Don't add ports just tested and already pendings
271
0
        if (p <= max_port && p != closedPort)
272
0
        {
273
0
            std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
274
0
            auto pendingIt = std::find(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), p);
275
0
            if (pendingIt == pending_logical_output_ports_.end())
276
0
            {
277
0
                candidatePorts.emplace_back(p);
278
0
            }
279
0
        }
280
0
    }
281
282
0
    if (candidatePorts.empty()) // No more available ports!
283
0
    {
284
0
        EPROSIMA_LOG_ERROR(RTCP, "Cannot find an available logical port.");
285
0
    }
286
0
    else
287
0
    {
288
0
        TCPTransactionId id = rtcp_manager->sendCheckLogicalPortsRequest(this, candidatePorts);
289
0
        std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
290
0
        last_checked_logical_port_[id] = candidatePorts.back();
291
0
    }
292
0
}
293
294
void TCPChannelResource::process_check_logical_ports_response(
295
        const TCPTransactionId& transactionId,
296
        const std::vector<uint16_t>& availablePorts,
297
        RTCPMessageManager* rtcp_manager)
298
0
{
299
0
    std::unique_lock<std::recursive_mutex> scopedLock(pending_logical_mutex_);
300
0
    auto it = last_checked_logical_port_.find(transactionId);
301
0
    if (it != last_checked_logical_port_.end())
302
0
    {
303
0
        uint16_t lastPort = it->second;
304
0
        last_checked_logical_port_.erase(it);
305
0
        scopedLock.unlock();
306
0
        if (availablePorts.empty())
307
0
        {
308
0
            prepare_send_check_logical_ports_req(lastPort, rtcp_manager);
309
0
        }
310
0
        else
311
0
        {
312
0
            add_logical_port(availablePorts.front(), rtcp_manager);
313
0
        }
314
0
    }
315
0
    else
316
0
    {
317
0
        EPROSIMA_LOG_WARNING(RTCP, "Received process_check_logical_ports_response without sending a Request.");
318
0
    }
319
0
}
320
321
void TCPChannelResource::set_logical_port_pending(
322
        uint16_t port)
323
0
{
324
0
    std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
325
0
    auto it = std::find(logical_output_ports_.begin(), logical_output_ports_.end(), port);
326
0
    if (it != logical_output_ports_.end())
327
0
    {
328
0
        pending_logical_output_ports_.push_back(port);
329
0
        logical_output_ports_.erase(it);
330
0
    }
331
0
}
332
333
bool TCPChannelResource::remove_logical_port(
334
        uint16_t port)
335
0
{
336
0
    std::lock_guard<std::recursive_mutex> scopedLock(pending_logical_mutex_);
337
0
    if (!is_logical_port_added(port))
338
0
    {
339
0
        return false;
340
0
    }
341
342
0
    auto it = std::remove(logical_output_ports_.begin(), logical_output_ports_.end(), port);
343
0
    logical_output_ports_.erase(it, logical_output_ports_.end());
344
0
    it = std::remove(pending_logical_output_ports_.begin(), pending_logical_output_ports_.end(), port);
345
0
    pending_logical_output_ports_.erase(it, pending_logical_output_ports_.end());
346
0
    return true;
347
0
}
348
349
bool TCPChannelResource::check_socket_send_buffer(
350
        const size_t& msg_size,
351
        const asio::ip::tcp::socket::native_handle_type& socket_native_handle)
352
0
{
353
0
    int bytesInSendQueue = 0;
354
355
0
#ifndef _WIN32
356
0
    if (ioctl(socket_native_handle, TIOCOUTQ, &bytesInSendQueue) == -1)
357
0
    {
358
0
        bytesInSendQueue = 0;
359
0
    }
360
#else // ifdef _WIN32
361
    static_cast<void>(socket_native_handle);
362
#endif // ifndef _WIN32
363
364
365
0
    size_t future_queue_size = size_t(bytesInSendQueue) + msg_size;
366
    // TCP actually allocates twice the size of the buffer requested.
367
0
    if (future_queue_size > size_t(2 * parent_->configuration()->sendBufferSize))
368
0
    {
369
0
        return false;
370
0
    }
371
0
    return true;
372
0
}
373
374
void TCPChannelResource::set_socket_options(
375
        asio::basic_socket<asio::ip::tcp>& socket,
376
        const TCPTransportDescriptor* options)
377
0
{
378
0
    uint32_t minimum_value = options->maxMessageSize;
379
380
    // Set the send buffer size
381
0
    {
382
0
        uint32_t desired_value = options->sendBufferSize;
383
0
        uint32_t configured_value = 0;
384
0
        if (!asio_helpers::try_setting_buffer_size<asio::socket_base::send_buffer_size>(
385
0
                    socket, desired_value, minimum_value, configured_value))
386
0
        {
387
0
            EPROSIMA_LOG_ERROR(TCP_TRANSPORT,
388
0
                    "Couldn't set send buffer size to minimum value: " << minimum_value);
389
0
        }
390
0
        else if (desired_value != configured_value)
391
0
        {
392
0
            EPROSIMA_LOG_WARNING(TCP_TRANSPORT,
393
0
                    "Couldn't set send buffer size to desired value. "
394
0
                    << "Using " << configured_value << " instead of " << desired_value);
395
0
        }
396
0
    }
397
398
    // Set the receive buffer size
399
0
    {
400
0
        uint32_t desired_value = options->receiveBufferSize;
401
0
        uint32_t configured_value = 0;
402
0
        if (!asio_helpers::try_setting_buffer_size<asio::socket_base::receive_buffer_size>(
403
0
                    socket, desired_value, minimum_value, configured_value))
404
0
        {
405
0
            EPROSIMA_LOG_ERROR(TCP_TRANSPORT,
406
0
                    "Couldn't set receive buffer size to minimum value: " << minimum_value);
407
0
        }
408
0
        else if (desired_value != configured_value)
409
0
        {
410
0
            EPROSIMA_LOG_WARNING(TCP_TRANSPORT,
411
0
                    "Couldn't set receive buffer size to desired value. "
412
0
                    << "Using " << configured_value << " instead of " << desired_value);
413
0
        }
414
0
    }
415
416
    // Set the TCP_NODELAY option
417
0
    socket.set_option(asio::ip::tcp::no_delay(options->enable_tcp_nodelay));
418
0
}
419
420
} // namespace rtps
421
} // namespace fastdds
422
} // namespace eprosima