Coverage Report

Created: 2025-07-03 06:58

/src/Fast-DDS/src/cpp/fastdds/publisher/DataWriterHistory.cpp
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file DataWriterHistory.cpp
17
 */
18
#include <fastdds/publisher/DataWriterHistory.hpp>
19
20
#include <chrono>
21
#include <limits>
22
#include <mutex>
23
24
#include <fastdds/dds/common/InstanceHandle.hpp>
25
#include <fastdds/dds/log/Log.hpp>
26
#include <fastdds/dds/topic/qos/TopicQos.hpp>
27
#include <fastdds/rtps/common/Time_t.hpp>
28
#include <fastdds/rtps/writer/RTPSWriter.hpp>
29
30
#include <rtps/writer/BaseWriter.hpp>
31
32
namespace eprosima {
33
namespace fastdds {
34
namespace dds {
35
36
using namespace eprosima::fastdds::rtps;
37
38
HistoryAttributes DataWriterHistory::to_history_attributes(
39
        const HistoryQosPolicy& history_qos,
40
        const ResourceLimitsQosPolicy& resource_limits_qos,
41
        const rtps::TopicKind_t& topic_kind,
42
        uint32_t payloadMaxSize,
43
        MemoryManagementPolicy_t mempolicy)
44
0
{
45
0
    auto initial_samples = resource_limits_qos.allocated_samples;
46
0
    auto max_samples = resource_limits_qos.max_samples;
47
0
    auto extra_samples = resource_limits_qos.extra_samples;
48
49
0
    if (history_qos.kind != KEEP_ALL_HISTORY_QOS)
50
0
    {
51
0
        max_samples = history_qos.depth;
52
0
        if (topic_kind != NO_KEY)
53
0
        {
54
0
            max_samples *= resource_limits_qos.max_instances;
55
0
        }
56
57
0
        initial_samples = std::min(initial_samples, max_samples);
58
0
    }
59
60
0
    return HistoryAttributes(mempolicy, payloadMaxSize, initial_samples, max_samples, extra_samples);
61
0
}
62
63
DataWriterHistory::DataWriterHistory(
64
        const std::shared_ptr<IPayloadPool>& payload_pool,
65
        const std::shared_ptr<IChangePool>& change_pool,
66
        const HistoryQosPolicy& history_qos,
67
        const ResourceLimitsQosPolicy& resource_limits_qos,
68
        const rtps::TopicKind_t& topic_kind,
69
        uint32_t payloadMaxSize,
70
        MemoryManagementPolicy_t mempolicy,
71
        std::function<void (const fastdds::rtps::InstanceHandle_t&)> unack_sample_remove_functor)
72
0
    : WriterHistory(to_history_attributes(history_qos, resource_limits_qos, topic_kind, payloadMaxSize,
73
0
            mempolicy), payload_pool, change_pool)
74
0
    , history_qos_(history_qos)
75
0
    , resource_limited_qos_(resource_limits_qos)
76
0
    , topic_kind_(topic_kind)
77
0
    , unacknowledged_sample_removed_functor_(unack_sample_remove_functor)
78
0
{
79
0
    if (resource_limited_qos_.max_samples <= 0)
80
0
    {
81
0
        resource_limited_qos_.max_samples = std::numeric_limits<int32_t>::max();
82
0
    }
83
84
0
    if (resource_limited_qos_.max_instances <= 0)
85
0
    {
86
0
        resource_limited_qos_.max_instances = std::numeric_limits<int32_t>::max();
87
0
    }
88
89
0
    if (resource_limited_qos_.max_samples_per_instance <= 0)
90
0
    {
91
0
        resource_limited_qos_.max_samples_per_instance = std::numeric_limits<int32_t>::max();
92
0
    }
93
0
}
94
95
DataWriterHistory::~DataWriterHistory()
96
0
{
97
0
}
98
99
void DataWriterHistory::rebuild_instances()
100
0
{
101
0
    if (topic_kind_ == WITH_KEY)
102
0
    {
103
0
        for (CacheChange_t* change : m_changes)
104
0
        {
105
0
            t_m_Inst_Caches::iterator vit;
106
0
            if (find_or_add_key(change->instanceHandle, change->serializedPayload, &vit))
107
0
            {
108
0
                vit->second.cache_changes.push_back(change);
109
0
            }
110
0
        }
111
0
    }
112
0
}
113
114
bool DataWriterHistory::register_instance(
115
        const InstanceHandle_t& instance_handle,
116
        std::unique_lock<RecursiveTimedMutex>&,
117
        const std::chrono::time_point<std::chrono::steady_clock>&,
118
        SerializedPayload_t*& payload)
119
0
{
120
0
    payload = nullptr;
121
122
    /// Preconditions
123
0
    if (topic_kind_ == NO_KEY)
124
0
    {
125
0
        return false;
126
0
    }
127
128
0
    t_m_Inst_Caches::iterator vit;
129
0
    bool result = find_or_add_key(instance_handle, {}, &vit);
130
0
    if (result)
131
0
    {
132
0
        payload = &vit->second.key_payload;
133
0
    }
134
0
    return result;
135
0
}
136
137
fastdds::rtps::SerializedPayload_t* DataWriterHistory::get_key_value(
138
        const fastdds::rtps::InstanceHandle_t& handle)
139
0
{
140
0
    t_m_Inst_Caches::iterator vit = keyed_changes_.find(handle);
141
0
    if (vit != keyed_changes_.end() && vit->second.is_registered())
142
0
    {
143
0
        return &vit->second.key_payload;
144
0
    }
145
0
    return nullptr;
146
0
}
147
148
bool DataWriterHistory::prepare_change(
149
        CacheChange_t* change,
150
        std::unique_lock<RecursiveTimedMutex>& lock,
151
        const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
152
0
{
153
0
    if (m_isHistoryFull)
154
0
    {
155
0
        bool ret = false;
156
0
        bool is_acked = change_is_acked_or_fully_delivered(m_changes.front());
157
0
        InstanceHandle_t instance = topic_kind_ == NO_KEY ?
158
0
                HANDLE_NIL : m_changes.front()->instanceHandle;
159
160
0
        if (history_qos_.kind == KEEP_ALL_HISTORY_QOS)
161
0
        {
162
0
            ret = this->mp_writer->try_remove_change(max_blocking_time, lock);
163
            // If change was removed (ret == 1) in KeepAllHistory, it must have been acked
164
0
            is_acked = ret;
165
0
        }
166
0
        else if (history_qos_.kind == KEEP_LAST_HISTORY_QOS)
167
0
        {
168
0
            ret = this->remove_min_change(max_blocking_time);
169
0
        }
170
171
        // Notify if change has been removed unacknowledged
172
0
        if (ret && !is_acked)
173
0
        {
174
0
            unacknowledged_sample_removed_functor_(instance);
175
0
        }
176
0
        else if (!ret)
177
0
        {
178
0
            EPROSIMA_LOG_WARNING(RTPS_HISTORY,
179
0
                    "Attempting to add Data to Full WriterCache.");
180
0
            return false;
181
0
        }
182
0
    }
183
184
0
    assert(!m_isHistoryFull);
185
186
    // For NO_KEY we can directly add the change
187
0
    bool add = (topic_kind_ == NO_KEY);
188
0
    if (topic_kind_ == WITH_KEY)
189
0
    {
190
0
        t_m_Inst_Caches::iterator vit;
191
192
        // For WITH_KEY, we take into account the limits on the instance
193
        // In case we wait for a sequence to be acknowledged, we try several times
194
        // until we reach the max blocking timepoint
195
0
        while (!add)
196
0
        {
197
            // We should have the instance
198
0
            if (!find_or_add_key(change->instanceHandle, change->serializedPayload, &vit))
199
0
            {
200
0
                break;
201
0
            }
202
203
0
            if (history_qos_.kind == KEEP_LAST_HISTORY_QOS)
204
0
            {
205
0
                if (vit->second.cache_changes.size() < static_cast<size_t>(history_qos_.depth))
206
0
                {
207
0
                    add = true;
208
0
                }
209
0
                else
210
0
                {
211
0
                    bool is_acked = change_is_acked_or_fully_delivered(vit->second.cache_changes.front());
212
0
                    InstanceHandle_t instance = change->instanceHandle;
213
0
                    add = remove_change_pub(vit->second.cache_changes.front());
214
                    // Notify if removed unacknowledged
215
0
                    if (add && !is_acked)
216
0
                    {
217
0
                        unacknowledged_sample_removed_functor_(instance);
218
0
                    }
219
0
                }
220
0
            }
221
0
            else if (history_qos_.kind == KEEP_ALL_HISTORY_QOS)
222
0
            {
223
0
                if (vit->second.cache_changes.size() <
224
0
                        static_cast<size_t>(resource_limited_qos_.max_samples_per_instance))
225
0
                {
226
0
                    add = true;
227
0
                }
228
0
                else
229
0
                {
230
0
                    SequenceNumber_t seq_to_remove = vit->second.cache_changes.front()->sequenceNumber;
231
0
                    if (!mp_writer->wait_for_acknowledgement(seq_to_remove, max_blocking_time, lock))
232
0
                    {
233
                        // Timeout waiting. Will not add change to history.
234
0
                        break;
235
0
                    }
236
237
                    // vit may have been invalidated
238
0
                    if (!find_or_add_key(change->instanceHandle, change->serializedPayload, &vit))
239
0
                    {
240
0
                        break;
241
0
                    }
242
243
                    // If the change we were trying to remove was already removed, try again
244
0
                    if (vit->second.cache_changes.empty() ||
245
0
                            vit->second.cache_changes.front()->sequenceNumber != seq_to_remove)
246
0
                    {
247
0
                        continue;
248
0
                    }
249
250
                    // Remove change if still present
251
0
                    add = remove_change_pub(vit->second.cache_changes.front());
252
0
                }
253
0
            }
254
0
        }
255
256
0
        if (add)
257
0
        {
258
0
            vit->second.cache_changes.push_back(change);
259
0
        }
260
0
    }
261
262
0
    return add;
263
0
}
264
265
bool DataWriterHistory::add_pub_change(
266
        CacheChange_t* change,
267
        WriteParams& wparams,
268
        std::unique_lock<RecursiveTimedMutex>& lock,
269
        const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
270
0
{
271
0
    bool returnedValue = false;
272
0
    bool add = prepare_change(change, lock, max_blocking_time);
273
274
0
    if (add)
275
0
    {
276
#if HAVE_STRICT_REALTIME
277
        if (this->add_change_(change, wparams, max_blocking_time))
278
#else
279
0
        if (this->add_change_(change, wparams))
280
0
#endif // if HAVE_STRICT_REALTIME
281
0
        {
282
0
            EPROSIMA_LOG_INFO(RTPS_HISTORY,
283
0
                    " Change " << change->sequenceNumber << " added with key: " << change->instanceHandle
284
0
                               << " and " << change->serializedPayload.length << " bytes");
285
0
            returnedValue = true;
286
0
        }
287
0
    }
288
289
0
    return returnedValue;
290
0
}
291
292
bool DataWriterHistory::find_or_add_key(
293
        const InstanceHandle_t& instance_handle,
294
        const SerializedPayload_t& payload,
295
        t_m_Inst_Caches::iterator* vit_out)
296
0
{
297
0
    static_cast<void>(payload);
298
299
0
    t_m_Inst_Caches::iterator vit;
300
0
    vit = keyed_changes_.find(instance_handle);
301
0
    if (vit != keyed_changes_.end())
302
0
    {
303
0
        *vit_out = vit;
304
0
        return true;
305
0
    }
306
307
0
    if (static_cast<int>(keyed_changes_.size()) < resource_limited_qos_.max_instances)
308
0
    {
309
0
        vit = keyed_changes_.insert(std::make_pair(instance_handle, detail::DataWriterInstance())).first;
310
0
        vit->second.key_payload.copy(&payload, false);
311
0
        *vit_out = vit;
312
0
        return true;
313
0
    }
314
315
0
    return false;
316
0
}
317
318
bool DataWriterHistory::removeAllChange(
319
        size_t* removed)
320
0
{
321
322
0
    size_t rem = 0;
323
0
    std::lock_guard<RecursiveTimedMutex> guard(*this->mp_mutex);
324
325
0
    while (m_changes.size() > 0)
326
0
    {
327
0
        if (remove_change_pub(m_changes.front()))
328
0
        {
329
0
            ++rem;
330
0
        }
331
0
        else
332
0
        {
333
0
            break;
334
0
        }
335
0
    }
336
0
    if (removed != nullptr)
337
0
    {
338
0
        *removed = rem;
339
0
    }
340
0
    if (rem > 0)
341
0
    {
342
0
        return true;
343
0
    }
344
0
    return false;
345
0
}
346
347
bool DataWriterHistory::removeMinChange()
348
0
{
349
0
    if (mp_writer == nullptr || mp_mutex == nullptr)
350
0
    {
351
0
        EPROSIMA_LOG_ERROR(RTPS_HISTORY, "You need to create a Writer with this History before using it");
352
0
        return false;
353
0
    }
354
355
0
    std::lock_guard<RecursiveTimedMutex> guard(*this->mp_mutex);
356
0
    if (m_changes.size() > 0)
357
0
    {
358
0
        return remove_change_pub(m_changes.front());
359
0
    }
360
0
    return false;
361
0
}
362
363
bool DataWriterHistory::remove_change_pub(
364
        CacheChange_t* change)
365
0
{
366
0
    return DataWriterHistory::remove_change_pub(change, std::chrono::steady_clock::now() + std::chrono::hours(24));
367
0
}
368
369
bool DataWriterHistory::remove_change_pub(
370
        CacheChange_t* change,
371
        const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
372
0
{
373
0
    if (mp_writer == nullptr || mp_mutex == nullptr)
374
0
    {
375
0
        EPROSIMA_LOG_ERROR(RTPS_HISTORY, "You need to create a Writer with this History before using it");
376
0
        return false;
377
0
    }
378
379
#if HAVE_STRICT_REALTIME
380
    std::unique_lock<RecursiveTimedMutex> lock(*this->mp_mutex, std::defer_lock);
381
    if (!lock.try_lock_until(max_blocking_time))
382
    {
383
        EPROSIMA_LOG_ERROR(PUBLISHER, "Cannot lock the DataWriterHistory mutex");
384
        return false;
385
    }
386
#else
387
0
    std::lock_guard<RecursiveTimedMutex> guard(*this->mp_mutex);
388
0
#endif // if HAVE_STRICT_REALTIME
389
390
0
    if (topic_kind_ == NO_KEY)
391
0
    {
392
0
        if (remove_change(change, max_blocking_time))
393
0
        {
394
0
            m_isHistoryFull = false;
395
0
            return true;
396
0
        }
397
398
0
        return false;
399
0
    }
400
0
    else
401
0
    {
402
0
        t_m_Inst_Caches::iterator vit;
403
0
        vit = keyed_changes_.find(change->instanceHandle);
404
0
        if (vit == keyed_changes_.end())
405
0
        {
406
0
            return false;
407
0
        }
408
409
0
        for (auto chit = vit->second.cache_changes.begin(); chit != vit->second.cache_changes.end(); ++chit)
410
0
        {
411
0
            if (((*chit)->sequenceNumber == change->sequenceNumber) && ((*chit)->writerGUID == change->writerGUID))
412
0
            {
413
0
                if (remove_change(change, max_blocking_time))
414
0
                {
415
0
                    vit->second.cache_changes.erase(chit);
416
0
                    m_isHistoryFull = false;
417
0
                    return true;
418
0
                }
419
0
            }
420
0
        }
421
0
        EPROSIMA_LOG_ERROR(PUBLISHER, "Change not found, something is wrong");
422
0
    }
423
0
    return false;
424
0
}
425
426
bool DataWriterHistory::remove_change_g(
427
        CacheChange_t* a_change)
428
0
{
429
0
    return remove_change_pub(a_change, std::chrono::steady_clock::now() + std::chrono::hours(24));
430
0
}
431
432
bool DataWriterHistory::remove_change_g(
433
        CacheChange_t* a_change,
434
        const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
435
0
{
436
0
    return remove_change_pub(a_change, max_blocking_time);
437
0
}
438
439
bool DataWriterHistory::remove_instance_changes(
440
        const InstanceHandle_t& handle,
441
        const SequenceNumber_t& seq_up_to)
442
0
{
443
0
    if (mp_writer == nullptr || mp_mutex == nullptr)
444
0
    {
445
0
        EPROSIMA_LOG_ERROR(RTPS_HISTORY, "You need to create a Writer with this History before using it");
446
0
        return false;
447
0
    }
448
449
0
    if (topic_kind_ == NO_KEY)
450
0
    {
451
0
        EPROSIMA_LOG_ERROR(RTPS_HISTORY, "Cannot be removed instance changes of a NO_KEY DataType");
452
0
        return false;
453
0
    }
454
455
0
    std::lock_guard<RecursiveTimedMutex> guard(*this->mp_mutex);
456
0
    t_m_Inst_Caches::iterator vit;
457
0
    vit = keyed_changes_.find(handle);
458
0
    if (vit == keyed_changes_.end())
459
0
    {
460
0
        return false;
461
0
    }
462
463
0
    auto chit = vit->second.cache_changes.begin();
464
465
0
    for (; chit != vit->second.cache_changes.end() && (*chit)->sequenceNumber <= seq_up_to; ++chit)
466
0
    {
467
0
        if (remove_change(*chit))
468
0
        {
469
0
            m_isHistoryFull = false;
470
0
        }
471
0
    }
472
473
0
    vit->second.cache_changes.erase(vit->second.cache_changes.begin(), chit);
474
475
0
    if (vit->second.cache_changes.empty())
476
0
    {
477
0
        keyed_changes_.erase(vit);
478
0
    }
479
480
0
    return true;
481
0
}
482
483
bool DataWriterHistory::set_next_deadline(
484
        const InstanceHandle_t& handle,
485
        const std::chrono::steady_clock::time_point& next_deadline_us)
486
0
{
487
0
    if (mp_writer == nullptr || mp_mutex == nullptr)
488
0
    {
489
0
        EPROSIMA_LOG_ERROR(RTPS_HISTORY, "You need to create a Writer with this History before using it");
490
0
        return false;
491
0
    }
492
0
    std::lock_guard<RecursiveTimedMutex> guard(*this->mp_mutex);
493
494
0
    if (topic_kind_ == NO_KEY)
495
0
    {
496
0
        next_deadline_us_ = next_deadline_us;
497
0
        return true;
498
0
    }
499
0
    else if (topic_kind_ == WITH_KEY)
500
0
    {
501
0
        if (keyed_changes_.find(handle) == keyed_changes_.end())
502
0
        {
503
0
            return false;
504
0
        }
505
506
0
        keyed_changes_[handle].next_deadline_us = next_deadline_us;
507
0
        return true;
508
0
    }
509
510
0
    return false;
511
0
}
512
513
bool DataWriterHistory::get_next_deadline(
514
        InstanceHandle_t& handle,
515
        std::chrono::steady_clock::time_point& next_deadline_us)
516
0
{
517
0
    if (mp_writer == nullptr || mp_mutex == nullptr)
518
0
    {
519
0
        EPROSIMA_LOG_ERROR(RTPS_HISTORY, "You need to create a Writer with this History before using it");
520
0
        return false;
521
0
    }
522
0
    std::lock_guard<RecursiveTimedMutex> guard(*this->mp_mutex);
523
524
0
    if (topic_kind_ == WITH_KEY)
525
0
    {
526
0
        auto min = std::min_element(
527
0
            keyed_changes_.begin(),
528
0
            keyed_changes_.end(),
529
0
            [](
530
0
                const t_m_Inst_Caches::value_type& lhs,
531
0
                const t_m_Inst_Caches::value_type& rhs)
532
0
            {
533
0
                return lhs.second.next_deadline_us < rhs.second.next_deadline_us;
534
0
            });
535
536
0
        handle = min->first;
537
0
        next_deadline_us = min->second.next_deadline_us;
538
0
        return true;
539
0
    }
540
0
    else if (topic_kind_ == NO_KEY)
541
0
    {
542
0
        next_deadline_us = next_deadline_us_;
543
0
        return true;
544
0
    }
545
546
0
    return false;
547
0
}
548
549
bool DataWriterHistory::is_key_registered(
550
        const InstanceHandle_t& handle)
551
0
{
552
0
    if (mp_writer == nullptr || mp_mutex == nullptr)
553
0
    {
554
0
        EPROSIMA_LOG_ERROR(RTPS_HISTORY, "You need to create a Writer with this History before using it");
555
0
        return false;
556
0
    }
557
0
    std::lock_guard<RecursiveTimedMutex> guard(*this->mp_mutex);
558
0
    t_m_Inst_Caches::iterator vit;
559
0
    vit = keyed_changes_.find(handle);
560
0
    return vit != keyed_changes_.end() && vit->second.is_registered();
561
0
}
562
563
bool DataWriterHistory::wait_for_acknowledgement_last_change(
564
        const InstanceHandle_t& handle,
565
        std::unique_lock<RecursiveTimedMutex>& lock,
566
        const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
567
0
{
568
0
    if (WITH_KEY == topic_kind_)
569
0
    {
570
        // Find the instance
571
0
        t_m_Inst_Caches::iterator vit = keyed_changes_.find(handle);
572
0
        if (vit != keyed_changes_.end())
573
0
        {
574
0
            SequenceNumber_t seq = vit->second.cache_changes.back()->sequenceNumber;
575
0
            return mp_writer->wait_for_acknowledgement(seq, max_blocking_time, lock);
576
0
        }
577
0
    }
578
0
    return false;
579
0
}
580
581
bool DataWriterHistory::change_is_acked_or_fully_delivered(
582
        const CacheChange_t* change)
583
0
{
584
0
    bool is_acked = false;
585
0
    if (mp_writer->get_disable_positive_acks())
586
0
    {
587
0
        is_acked = mp_writer->has_been_fully_delivered(change->sequenceNumber);
588
0
    }
589
0
    else
590
0
    {
591
0
        is_acked = mp_writer->is_acked_by_all(change->sequenceNumber);
592
0
    }
593
0
    return is_acked;
594
0
}
595
596
}  // namespace dds
597
}  // namespace fastdds
598
}  // namespace eprosima