Coverage Report

Created: 2026-04-01 06:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/Fast-DDS/src/cpp/rtps/writer/LivelinessManager.cpp
Line
Count
Source
1
// Copyright 2019 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
#include <rtps/writer/LivelinessManager.hpp>
16
17
#include <algorithm>
18
19
#include <fastdds/dds/log/Log.hpp>
20
21
using namespace std::chrono;
22
23
namespace eprosima {
24
namespace fastdds {
25
namespace rtps {
26
27
using LivelinessDataIterator = ResourceLimitedVector<LivelinessData>::iterator;
28
29
LivelinessManager::LivelinessManager(
30
        const LivelinessCallback& callback,
31
        ResourceEvent& service,
32
        bool manage_automatic)
33
0
    : callback_(callback)
34
0
    , manage_automatic_(manage_automatic)
35
0
    , writers_()
36
0
    , mutex_()
37
0
    , col_mutex_()
38
0
    , timer_owner_(nullptr)
39
0
    , timer_(
40
0
        service,
41
0
        [this]() -> bool
42
0
        {
43
0
            return timer_expired();
44
0
        },
45
0
        0)
46
0
{
47
0
}
48
49
LivelinessManager::~LivelinessManager()
50
0
{
51
0
    std::lock_guard<std::mutex> _(mutex_);
52
0
    timer_owner_ = nullptr;
53
0
    timer_.cancel_timer();
54
0
}
55
56
bool LivelinessManager::add_writer(
57
        GUID_t guid,
58
        fastdds::dds::LivelinessQosPolicyKind kind,
59
        dds::Duration_t lease_duration)
60
0
{
61
0
    if (!manage_automatic_ && kind == fastdds::dds::LivelinessQosPolicyKind::AUTOMATIC_LIVELINESS_QOS)
62
0
    {
63
0
        EPROSIMA_LOG_WARNING(RTPS_WRITER, "Liveliness manager not managing automatic writers, writer not added");
64
0
        return false;
65
0
    }
66
67
0
    {
68
        // collection guard
69
0
        std::lock_guard<shared_mutex> _(col_mutex_);
70
        // writers_ elements guard
71
0
        std::lock_guard<std::mutex> __(mutex_);
72
73
0
        for (LivelinessData& writer : writers_)
74
0
        {
75
0
            if (writer.guid == guid &&
76
0
                    writer.kind == kind &&
77
0
                    writer.lease_duration == lease_duration)
78
0
            {
79
0
                writer.count++;
80
0
                return true;
81
0
            }
82
0
        }
83
0
        writers_.emplace_back(guid, kind, lease_duration);
84
0
    }
85
86
0
    if (!calculate_next())
87
0
    {
88
        // TimedEvent is thread safe
89
0
        timer_.cancel_timer();
90
0
        return true;
91
0
    }
92
93
0
    std::lock_guard<std::mutex> _(mutex_);
94
95
0
    if (timer_owner_ != nullptr)
96
0
    {
97
        // Some times the interval could be negative if a writer expired during the call to this function
98
        // Once in this situation there is not much we can do but let asio timers expire immediately
99
0
        auto interval = timer_owner_->time - steady_clock::now();
100
0
        timer_.update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
101
0
        timer_.restart_timer();
102
0
    }
103
104
0
    return true;
105
0
}
106
107
bool LivelinessManager::remove_writer(
108
        GUID_t guid,
109
        fastdds::dds::LivelinessQosPolicyKind kind,
110
        dds::Duration_t lease_duration,
111
        LivelinessData::WriterStatus& writer_status)
112
0
{
113
0
    bool removed = false;
114
115
0
    {
116
        // collection guard
117
0
        std::lock_guard<shared_mutex> _(col_mutex_);
118
        // writers_ elements guard
119
0
        std::lock_guard<std::mutex> __(mutex_);
120
121
0
        removed = writers_.remove_if([guid, kind, lease_duration, &writer_status](LivelinessData& writer)
122
0
                        {
123
0
                            writer_status = writer.status;
124
0
                            return writer.guid == guid &&
125
0
                                   writer.kind == kind &&
126
0
                                   writer.lease_duration == lease_duration &&
127
0
                                   --writer.count == 0;
128
0
                        });
129
0
    }
130
131
0
    if (!removed)
132
0
    {
133
0
        return false;
134
0
    }
135
136
0
    std::unique_lock<std::mutex> lock(mutex_);
137
138
0
    if (timer_owner_ != nullptr)
139
0
    {
140
0
        lock.unlock();
141
142
0
        if (!calculate_next())
143
0
        {
144
0
            timer_.cancel_timer();
145
0
            return true;
146
0
        }
147
148
0
        lock.lock();
149
150
0
        if (timer_owner_ != nullptr)
151
0
        {
152
            // Some times the interval could be negative if a writer expired during the call to this function
153
            // Once in this situation there is not much we can do but let asio timers expire inmediately
154
0
            auto interval = timer_owner_->time - steady_clock::now();
155
0
            timer_.update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
156
0
            timer_.restart_timer();
157
0
        }
158
0
    }
159
160
0
    return true;
161
0
}
162
163
bool LivelinessManager::assert_liveliness(
164
        GUID_t guid,
165
        fastdds::dds::LivelinessQosPolicyKind kind,
166
        dds::Duration_t lease_duration)
167
0
{
168
0
    bool found = false;
169
170
0
    {
171
        // collection guard
172
0
        shared_lock<shared_mutex> _(col_mutex_);
173
174
0
        for (LivelinessData& writer : writers_)
175
0
        {
176
            // writers_ elements guard
177
0
            std::unique_lock<std::mutex> lock(mutex_);
178
179
0
            if (writer.guid == guid &&
180
0
                    writer.kind == kind &&
181
0
                    writer.lease_duration == lease_duration)
182
0
            {
183
0
                lock.unlock();
184
185
0
                found = true;
186
187
                // Execute the callbacks
188
0
                if (writer.kind == fastdds::dds::LivelinessQosPolicyKind::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS ||
189
0
                        writer.kind == fastdds::dds::LivelinessQosPolicyKind::AUTOMATIC_LIVELINESS_QOS)
190
0
                {
191
0
                    for (LivelinessData& w: writers_)
192
0
                    {
193
0
                        if (w.kind == writer.kind &&
194
0
                                w.guid.guidPrefix == guid.guidPrefix)
195
0
                        {
196
0
                            assert_writer_liveliness(w);
197
0
                        }
198
0
                    }
199
0
                }
200
0
                else if (writer.kind == fastdds::dds::LivelinessQosPolicyKind::MANUAL_BY_TOPIC_LIVELINESS_QOS)
201
0
                {
202
0
                    assert_writer_liveliness(writer);
203
0
                }
204
205
0
                break;
206
0
            }
207
0
        }
208
0
    }
209
210
0
    if (!found)
211
0
    {
212
0
        return false;
213
0
    }
214
215
0
    timer_.cancel_timer();
216
217
    // Updates the timer owner
218
0
    if (!calculate_next())
219
0
    {
220
0
        EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error when restarting liveliness timer");
221
0
        return false;
222
0
    }
223
224
0
    std::lock_guard<std::mutex> lock(mutex_);
225
226
0
    if (timer_owner_ != nullptr)
227
0
    {
228
        // Some times the interval could be negative if a writer expired during the call to this function
229
        // Once in this situation there is not much we can do but let asio timers expire inmediately
230
0
        auto interval = timer_owner_->time - steady_clock::now();
231
0
        timer_.update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
232
0
        timer_.restart_timer();
233
0
    }
234
235
0
    return true;
236
0
}
237
238
bool LivelinessManager::assert_liveliness(
239
        fastdds::dds::LivelinessQosPolicyKind kind,
240
        GuidPrefix_t guid_prefix)
241
0
{
242
243
0
    if (!manage_automatic_ && kind == fastdds::dds::LivelinessQosPolicyKind::AUTOMATIC_LIVELINESS_QOS)
244
0
    {
245
0
        EPROSIMA_LOG_WARNING(RTPS_WRITER, "Liveliness manager not managing automatic writers, writer not added");
246
0
        return false;
247
0
    }
248
249
0
    {
250
        // collection guard
251
0
        shared_lock<shared_mutex> _(col_mutex_);
252
253
0
        if (writers_.empty())
254
0
        {
255
0
            return true;
256
0
        }
257
258
259
0
        for (LivelinessData& writer: writers_)
260
0
        {
261
0
            if (writer.kind == kind &&
262
0
                    guid_prefix == writer.guid.guidPrefix)
263
0
            {
264
0
                assert_writer_liveliness(writer);
265
0
            }
266
0
        }
267
0
    }
268
269
0
    timer_.cancel_timer();
270
271
    // Updates the timer owner
272
0
    if (!calculate_next())
273
0
    {
274
0
        EPROSIMA_LOG_INFO(RTPS_WRITER,
275
0
                "Error when restarting liveliness timer: " << writers_.size() << " writers, liveliness "
276
0
                                                           << kind);
277
0
        return false;
278
0
    }
279
280
0
    std::lock_guard<std::mutex> lock(mutex_);
281
282
0
    if (timer_owner_ != nullptr)
283
0
    {
284
        // Some times the interval could be negative if a writer expired during the call to this function
285
        // Once in this situation there is not much we can do but let asio timers expire inmediately
286
0
        auto interval = timer_owner_->time - steady_clock::now();
287
0
        timer_.update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
288
0
        timer_.restart_timer();
289
0
    }
290
291
0
    return true;
292
0
}
293
294
bool LivelinessManager::calculate_next()
295
0
{
296
    // Keep this lock order to prevent ABBA deadlocks
297
0
    shared_lock<shared_mutex> _(col_mutex_);
298
0
    std::lock_guard<std::mutex> __(mutex_);
299
300
0
    bool any_alive = false;
301
0
    steady_clock::time_point min_time = steady_clock::now() + nanoseconds(dds::c_TimeInfinite.to_ns());
302
303
0
    timer_owner_ = nullptr;
304
305
    // collection guard
306
0
    for (LivelinessData& writer : writers_)
307
0
    {
308
0
        if (writer.status == LivelinessData::WriterStatus::ALIVE)
309
0
        {
310
0
            if (writer.time < min_time)
311
0
            {
312
0
                min_time = writer.time;
313
0
                timer_owner_ = &writer;
314
0
            }
315
0
            any_alive = true;
316
0
        }
317
0
    }
318
319
0
    return any_alive;
320
0
}
321
322
bool LivelinessManager::timer_expired()
323
0
{
324
0
    std::unique_lock<std::mutex> lock(mutex_);
325
326
0
    if (timer_owner_ == nullptr)
327
0
    {
328
0
        EPROSIMA_LOG_ERROR(RTPS_WRITER, "Liveliness timer expired but there is no writer");
329
0
        return false;
330
0
    }
331
0
    else
332
0
    {
333
0
        timer_owner_->status = LivelinessData::WriterStatus::NOT_ALIVE;
334
0
    }
335
336
0
    auto guid = timer_owner_->guid;
337
0
    auto kind = timer_owner_->kind;
338
0
    auto lease_duration = timer_owner_->lease_duration;
339
340
0
    lock.unlock();
341
342
0
    if (callback_ != nullptr)
343
0
    {
344
0
        callback_(guid, kind, lease_duration, -1, 1);
345
0
    }
346
347
0
    if (calculate_next())
348
0
    {
349
0
        lock.lock();
350
351
0
        if ( timer_owner_ != nullptr)
352
0
        {
353
            // Some times the interval could be negative if a writer expired during the call to this function
354
            // Once in this situation there is not much we can do but let asio timers expire inmediately
355
0
            auto interval = timer_owner_->time - steady_clock::now();
356
0
            timer_.update_interval_millisec((double)duration_cast<milliseconds>(interval).count());
357
358
0
            return true;
359
0
        }
360
0
    }
361
362
0
    return false;
363
0
}
364
365
bool LivelinessManager::is_any_alive(
366
        fastdds::dds::LivelinessQosPolicyKind kind)
367
0
{
368
    // Keep this lock order to prevent ABBA deadlocks
369
0
    shared_lock<shared_mutex> _(col_mutex_);
370
0
    std::lock_guard<std::mutex> __(mutex_);
371
372
0
    for (const auto& writer : writers_)
373
0
    {
374
0
        if (writer.kind == kind && writer.status == LivelinessData::WriterStatus::ALIVE)
375
0
        {
376
0
            return true;
377
0
        }
378
0
    }
379
380
0
    return false;
381
0
}
382
383
void LivelinessManager::assert_writer_liveliness(
384
        LivelinessData& writer)
385
0
{
386
    // The shared_mutex is taken, that is, the writer referenced will not be destroyed during this call
387
0
    std::unique_lock<std::mutex> lock(mutex_);
388
389
0
    auto status = writer.status;
390
0
    auto guid = writer.guid;
391
0
    auto kind = writer.kind;
392
0
    auto lease_duration = writer.lease_duration;
393
394
0
    writer.status = LivelinessData::WriterStatus::ALIVE;
395
0
    writer.time = steady_clock::now() + nanoseconds(writer.lease_duration.to_ns());
396
397
0
    lock.unlock();
398
399
0
    if (callback_ != nullptr)
400
0
    {
401
0
        if (status == LivelinessData::WriterStatus::NOT_ASSERTED)
402
0
        {
403
0
            callback_(guid, kind, lease_duration, 1, 0);
404
0
        }
405
0
        else if (status == LivelinessData::WriterStatus::NOT_ALIVE)
406
0
        {
407
0
            callback_(guid, kind, lease_duration, 1, -1);
408
0
        }
409
0
    }
410
0
}
411
412
const ResourceLimitedVector<LivelinessData>& LivelinessManager::get_liveliness_data() const
413
0
{
414
0
    return writers_;
415
0
}
416
417
} // namespace rtps
418
} // namespace fastdds
419
} // namespace eprosima