Coverage Report

Created: 2026-05-04 06:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/Fast-DDS/src/cpp/rtps/writer/ReaderProxy.cpp
Line
Count
Source
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file ReaderProxy.cpp
17
 *
18
 */
19
20
#include <rtps/writer/ReaderProxy.hpp>
21
22
#include <algorithm>
23
#include <cassert>
24
#include <cstdint>
25
#include <mutex>
26
27
#include <fastdds/dds/log/Log.hpp>
28
#include <fastdds/rtps/history/WriterHistory.hpp>
29
#include <fastdds/rtps/common/LocatorListComparisons.hpp>
30
31
#include <rtps/DataSharing/DataSharingNotifier.hpp>
32
#include <rtps/history/HistoryAttributesExtension.hpp>
33
#include <rtps/messages/RTPSGapBuilder.hpp>
34
#include <rtps/participant/RTPSParticipantImpl.hpp>
35
#include <rtps/resources/TimedEvent.h>
36
#include <rtps/writer/StatefulWriter.hpp>
37
#include <rtps/writer/StatefulWriterListener.hpp>
38
#include <utils/TimeConversion.hpp>
39
40
41
namespace eprosima {
42
namespace fastdds {
43
namespace rtps {
44
45
ReaderProxy::ReaderProxy(
46
        const WriterTimes& times,
47
        const RemoteLocatorsAllocationAttributes& loc_alloc,
48
        StatefulWriter* writer,
49
        StatefulWriterListener* stateful_listener)
50
0
    : is_active_(false)
51
0
    , locator_info_(
52
0
        writer, loc_alloc.max_unicast_locators,
53
0
        loc_alloc.max_multicast_locators)
54
0
    , durability_kind_(VOLATILE)
55
0
    , expects_inline_qos_(false)
56
0
    , is_reliable_(false)
57
0
    , disable_positive_acks_(false)
58
0
    , writer_(writer)
59
0
    , changes_for_reader_(resource_limits_from_history(writer->get_history()->m_att, 0))
60
0
    , nack_supression_event_(nullptr)
61
0
    , initial_heartbeat_event_(nullptr)
62
0
    , timers_enabled_(false)
63
0
    , next_expected_acknack_count_(0)
64
0
    , last_nackfrag_count_(0)
65
0
    , stateful_writer_listener_(stateful_listener)
66
0
{
67
0
    auto participant = writer_->get_participant_impl();
68
0
    if (nullptr != participant)
69
0
    {
70
0
        nack_supression_event_ = new TimedEvent(participant->getEventResource(),
71
0
                        [&]() -> bool
72
0
                        {
73
0
                            writer_->perform_nack_supression(guid());
74
0
                            return false;
75
0
                        },
76
0
                        fastdds::rtps::TimeConv::Time_t2MilliSecondsDouble(times.nack_supression_duration));
77
78
0
        initial_heartbeat_event_ = new TimedEvent(participant->getEventResource(),
79
0
                        [&]() -> bool
80
0
                        {
81
0
                            writer_->intraprocess_heartbeat(this);
82
0
                            return false;
83
0
                        }, 0);
84
0
    }
85
86
0
    stop();
87
0
}
88
89
bool ReaderProxy::rtps_is_relevant(
90
        CacheChange_t* change) const
91
0
{
92
0
    auto filter = writer_->reader_data_filter();
93
0
    if (nullptr != filter)
94
0
    {
95
0
        bool ret = filter->is_relevant(*change, guid());
96
0
        EPROSIMA_LOG_INFO(RTPS_READER_PROXY,
97
0
                "Change " << change->instanceHandle << " is relevant for reader " << guid() << "? " << ret);
98
0
        return ret;
99
0
    }
100
0
    return true;
101
0
}
102
103
ReaderProxy::~ReaderProxy()
104
0
{
105
0
    if (nack_supression_event_)
106
0
    {
107
0
        delete(nack_supression_event_);
108
0
        nack_supression_event_ = nullptr;
109
0
    }
110
111
0
    if (initial_heartbeat_event_)
112
0
    {
113
0
        delete(initial_heartbeat_event_);
114
0
        initial_heartbeat_event_ = nullptr;
115
0
    }
116
0
}
117
118
void ReaderProxy::start(
119
        const ReaderProxyData& reader_attributes,
120
        bool is_datasharing)
121
0
{
122
0
    locator_info_.start(
123
0
        reader_attributes.guid,
124
0
        reader_attributes.remote_locators.unicast,
125
0
        reader_attributes.remote_locators.multicast,
126
0
        reader_attributes.expects_inline_qos,
127
0
        is_datasharing);
128
129
0
    is_active_ = true;
130
0
    durability_kind_ = reader_attributes.durability.durabilityKind();
131
0
    expects_inline_qos_ = reader_attributes.expects_inline_qos;
132
0
    is_reliable_ = reader_attributes.reliability.kind != dds::BEST_EFFORT_RELIABILITY_QOS;
133
0
    disable_positive_acks_ = reader_attributes.disable_positive_acks_enabled();
134
0
    if (durability_kind_ == DurabilityKind_t::VOLATILE)
135
0
    {
136
0
        SequenceNumber_t min_sequence = writer_->get_seq_num_min();
137
0
        changes_low_mark_ = (min_sequence == SequenceNumber_t::unknown()) ?
138
0
                writer_->next_sequence_number() - 1 : min_sequence - 1;
139
0
    }
140
0
    else
141
0
    {
142
0
        acked_changes_set(SequenceNumber_t());  // Simulate initial acknack to set low mark
143
0
    }
144
145
0
    timers_enabled_.store(is_remote_and_reliable());
146
0
    if (is_local_reader() && initial_heartbeat_event_)
147
0
    {
148
0
        initial_heartbeat_event_->restart_timer();
149
0
    }
150
151
0
    EPROSIMA_LOG_INFO(RTPS_READER_PROXY, "Reader Proxy started");
152
0
}
153
154
bool ReaderProxy::update(
155
        const ReaderProxyData& reader_attributes)
156
0
{
157
0
    durability_kind_ = reader_attributes.durability.durabilityKind();
158
0
    expects_inline_qos_ = reader_attributes.expects_inline_qos;
159
0
    is_reliable_ = reader_attributes.reliability.kind != dds::BEST_EFFORT_RELIABILITY_QOS;
160
0
    disable_positive_acks_ = reader_attributes.disable_positive_acks_enabled();
161
162
0
    locator_info_.update(
163
0
        reader_attributes.remote_locators.unicast,
164
0
        reader_attributes.remote_locators.multicast,
165
0
        reader_attributes.expects_inline_qos);
166
167
0
    return true;
168
0
}
169
170
void ReaderProxy::stop()
171
0
{
172
0
    locator_info_.stop();
173
0
    is_active_ = false;
174
0
    disable_timers();
175
176
0
    changes_for_reader_.clear();
177
0
    next_expected_acknack_count_ = 0;
178
0
    last_nackfrag_count_ = 0;
179
0
    changes_low_mark_ = SequenceNumber_t();
180
181
0
    first_irrelevant_removed_ = SequenceNumber_t::unknown();
182
0
    last_irrelevant_removed_ = SequenceNumber_t::unknown();
183
0
}
184
185
void ReaderProxy::disable_timers()
186
0
{
187
0
    if (timers_enabled_.exchange(false) && nack_supression_event_)
188
0
    {
189
0
        nack_supression_event_->cancel_timer();
190
0
    }
191
0
    if (initial_heartbeat_event_)
192
0
    {
193
0
        initial_heartbeat_event_->cancel_timer();
194
0
    }
195
0
}
196
197
void ReaderProxy::update_nack_supression_interval(
198
        const dds::Duration_t& interval)
199
0
{
200
0
    if (nack_supression_event_)
201
0
    {
202
0
        nack_supression_event_->update_interval(interval);
203
0
    }
204
0
}
205
206
void ReaderProxy::add_change(
207
        const ChangeForReader_t& change,
208
        bool is_relevant,
209
        bool restart_nack_supression)
210
0
{
211
0
    if (restart_nack_supression && timers_enabled_.load() && nack_supression_event_)
212
0
    {
213
0
        nack_supression_event_->restart_timer();
214
0
    }
215
216
0
    add_change(change, is_relevant);
217
0
}
218
219
void ReaderProxy::add_change(
220
        const ChangeForReader_t& change,
221
        bool is_relevant,
222
        bool restart_nack_supression,
223
        const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
224
0
{
225
0
    if (restart_nack_supression && timers_enabled_ && nack_supression_event_)
226
0
    {
227
0
        nack_supression_event_->restart_timer(max_blocking_time);
228
0
    }
229
230
0
    add_change(change, is_relevant);
231
0
}
232
233
void ReaderProxy::add_change(
234
        const ChangeForReader_t& change,
235
        bool is_relevant)
236
0
{
237
0
    SequenceNumber_t seq_num {change.getSequenceNumber()};
238
0
    assert(seq_num > changes_low_mark_);
239
0
    assert(changes_for_reader_.empty() ? true :
240
0
            seq_num > changes_for_reader_.back().getSequenceNumber());
241
242
    // Irrelevant changes are not added to the collection
243
0
    if (!is_relevant)
244
0
    {
245
0
        if (is_reliable_)
246
0
        {
247
0
            if (!is_local_reader())
248
0
            {
249
0
                if (SequenceNumber_t::unknown() == first_irrelevant_removed_)
250
0
                {
251
0
                    first_irrelevant_removed_ = seq_num;
252
0
                    last_irrelevant_removed_ = seq_num;
253
0
                }
254
0
                else if  (seq_num == last_irrelevant_removed_ + 1)
255
0
                {
256
0
                    last_irrelevant_removed_ = seq_num;
257
0
                }
258
0
            }
259
0
        }
260
0
        else if (changes_low_mark_ + 1 == seq_num)
261
0
        {
262
0
            changes_low_mark_ = seq_num;
263
0
        }
264
0
        return;
265
0
    }
266
267
0
    if (changes_for_reader_.push_back(change) == nullptr)
268
0
    {
269
        // This should never happen
270
0
        EPROSIMA_LOG_ERROR(RTPS_READER_PROXY, "Error adding change " << seq_num
271
0
                                                                     << " to reader proxy " << guid());
272
0
        eprosima::fastdds::dds::Log::Flush();
273
0
        assert(false);
274
0
    }
275
0
}
276
277
bool ReaderProxy::has_changes() const
278
0
{
279
0
    return !changes_for_reader_.empty();
280
0
}
281
282
bool ReaderProxy::change_is_acked(
283
        const SequenceNumber_t& seq_num) const
284
0
{
285
0
    if (seq_num <= changes_low_mark_ || changes_for_reader_.empty())
286
0
    {
287
0
        return true;
288
0
    }
289
290
0
    ChangeConstIterator chit = find_change(seq_num);
291
0
    if (chit == changes_for_reader_.end())
292
0
    {
293
        // There is a hole in changes_for_reader_
294
        // This means a change was removed, or was not relevant.
295
0
        return true;
296
0
    }
297
298
0
    return chit->getStatus() == ACKNOWLEDGED;
299
0
}
300
301
bool ReaderProxy::change_is_unsent(
302
        const SequenceNumber_t& seq_num,
303
        FragmentNumber_t& next_unsent_frag,
304
        SequenceNumber_t& gap_seq,
305
        const SequenceNumber_t& min_seq,
306
        bool& need_reactivate_periodic_heartbeat)
307
0
{
308
0
    if (seq_num <= changes_low_mark_ || changes_for_reader_.empty())
309
0
    {
310
0
        return false;
311
0
    }
312
313
0
    ChangeConstIterator chit = find_change(seq_num);
314
0
    if (chit == changes_for_reader_.end())
315
0
    {
316
        // There is a hole in changes_for_reader_
317
        // This means a change was removed.
318
0
        return false;
319
0
    }
320
321
0
    bool returned_value = chit->getStatus() == UNSENT;
322
323
0
    if (returned_value)
324
0
    {
325
0
        next_unsent_frag = chit->get_next_unsent_fragment();
326
0
        gap_seq = SequenceNumber_t::unknown();
327
328
0
        if (is_reliable_ && !chit->has_been_delivered())
329
0
        {
330
0
            need_reactivate_periodic_heartbeat |= true;
331
0
            SequenceNumber_t prev =
332
0
                    (changes_for_reader_.begin() != chit ?
333
0
                    std::prev(chit)->getSequenceNumber() :
334
0
                    changes_low_mark_
335
0
                    ) + 1;
336
337
0
            if (prev != chit->getSequenceNumber())
338
0
            {
339
0
                gap_seq = prev;
340
341
                // Verify the calculated gap_seq in ReaderProxy is a real hole in the history.
342
0
                if (gap_seq < min_seq) // Several samples of the hole are not really already available.
343
0
                {
344
0
                    if (min_seq < seq_num)
345
0
                    {
346
0
                        gap_seq = min_seq;
347
0
                    }
348
0
                    else
349
0
                    {
350
0
                        gap_seq = SequenceNumber_t::unknown();
351
0
                    }
352
0
                }
353
354
0
                if (SequenceNumber_t::unknown() != first_irrelevant_removed_ &&
355
0
                        SequenceNumber_t::unknown() != gap_seq)
356
0
                {
357
                    // Check if the hole is due to irrelevant changes removed without informing the reader
358
0
                    if (first_irrelevant_removed_ <= gap_seq )
359
0
                    {
360
0
                        if (gap_seq == first_irrelevant_removed_)
361
0
                        {
362
0
                            first_irrelevant_removed_ = SequenceNumber_t::unknown();
363
0
                            last_irrelevant_removed_ = SequenceNumber_t::unknown();
364
0
                        }
365
0
                        else if (gap_seq < last_irrelevant_removed_)
366
0
                        {
367
0
                            last_irrelevant_removed_ = gap_seq - 1;
368
0
                        }
369
0
                    }
370
0
                }
371
0
            }
372
0
        }
373
0
    }
374
375
0
    return returned_value;
376
0
}
377
378
void ReaderProxy::acked_changes_set(
379
        const SequenceNumber_t& seq_num)
380
0
{
381
0
    SequenceNumber_t future_low_mark = seq_num;
382
383
0
    if (seq_num > changes_low_mark_)
384
0
    {
385
0
        ChangeIterator chit = find_change(seq_num, false);
386
        // continue advancing until next change is not acknowledged
387
0
        while (chit != changes_for_reader_.end()
388
0
                && chit->getSequenceNumber() == future_low_mark
389
0
                && chit->getStatus() == ACKNOWLEDGED)
390
0
        {
391
0
            ++chit;
392
0
            ++future_low_mark;
393
0
        }
394
0
        if (stateful_writer_listener_ != nullptr)
395
0
        {
396
0
            for (auto it = changes_for_reader_.begin(); it != chit; ++it)
397
0
            {
398
0
                notify_acknowledged(*it);
399
0
            }
400
0
        }
401
0
        changes_for_reader_.erase(changes_for_reader_.begin(), chit);
402
0
    }
403
0
    else
404
0
    {
405
0
        future_low_mark = changes_low_mark_ + 1;
406
407
0
        if (seq_num == SequenceNumber_t() && durability_kind_ != DurabilityKind_t::VOLATILE)
408
0
        {
409
            // Special case. Currently only used on Builtin StatefulWriters
410
            // after losing lease duration, and on late joiners to set
411
            // changes_low_mark_ to match that of the writer.
412
0
            SequenceNumber_t min_sequence = writer_->get_seq_num_min();
413
0
            if (min_sequence != SequenceNumber_t::unknown())
414
0
            {
415
0
                SequenceNumber_t current_sequence = seq_num;
416
0
                if (seq_num < min_sequence)
417
0
                {
418
0
                    current_sequence = min_sequence;
419
0
                }
420
0
                future_low_mark = current_sequence;
421
422
0
                bool should_sort = false;
423
0
                for (; current_sequence <= changes_low_mark_; ++current_sequence)
424
0
                {
425
                    // Skip all consecutive changes already in the collection
426
0
                    ChangeConstIterator it = find_change(current_sequence);
427
0
                    while ( it != changes_for_reader_.end() &&
428
0
                            current_sequence <= changes_low_mark_ &&
429
0
                            it->getSequenceNumber() == current_sequence)
430
0
                    {
431
0
                        ++current_sequence;
432
0
                        ++it;
433
0
                    }
434
435
0
                    if (current_sequence <= changes_low_mark_)
436
0
                    {
437
0
                        CacheChange_t* change = nullptr;
438
0
                        if (writer_->get_history()->get_change(current_sequence, writer_->getGuid(), &change))
439
0
                        {
440
0
                            should_sort = true;
441
0
                            ChangeForReader_t cr(change);
442
0
                            cr.setStatus(UNACKNOWLEDGED);
443
0
                            changes_for_reader_.push_back(cr);
444
0
                        }
445
0
                    }
446
0
                }
447
                // Keep changes sorted by sequence number
448
0
                if (should_sort)
449
0
                {
450
0
                    std::sort(changes_for_reader_.begin(), changes_for_reader_.end(), ChangeForReaderCmp());
451
0
                }
452
0
            }
453
0
            else if (!is_local_reader())
454
0
            {
455
0
                future_low_mark = writer_->next_sequence_number();
456
0
            }
457
0
        }
458
0
    }
459
0
    changes_low_mark_ = future_low_mark - 1;
460
0
}
461
462
bool ReaderProxy::requested_changes_set(
463
        const SequenceNumberSet_t& seq_num_set,
464
        RTPSGapBuilder& gap_builder,
465
        const SequenceNumber_t& min_seq_in_history)
466
0
{
467
0
    bool isSomeoneWasSetRequested = false;
468
469
0
    if (SequenceNumber_t::unknown() != min_seq_in_history)
470
0
    {
471
0
        seq_num_set.for_each([&](SequenceNumber_t sit)
472
0
                {
473
0
                    ChangeIterator chit = find_change(sit, true);
474
0
                    if (chit != changes_for_reader_.end())
475
0
                    {
476
0
                        if (UNACKNOWLEDGED == chit->getStatus())
477
0
                        {
478
0
                            chit->setStatus(REQUESTED);
479
0
                            chit->markAllFragmentsAsUnsent();
480
0
                            isSomeoneWasSetRequested = true;
481
0
                        }
482
0
                    }
483
0
                    else if ((sit >= min_seq_in_history) && (sit > changes_low_mark_))
484
0
                    {
485
0
                        gap_builder.add(sit);
486
487
0
                        if (SequenceNumber_t::unknown() != first_irrelevant_removed_)
488
0
                        {
489
                            // Check if the hole is due to irrelevant changes removed without informing the reader
490
0
                            if (first_irrelevant_removed_ <= sit )
491
0
                            {
492
0
                                if (sit == first_irrelevant_removed_)
493
0
                                {
494
0
                                    first_irrelevant_removed_ = SequenceNumber_t::unknown();
495
0
                                    last_irrelevant_removed_ = SequenceNumber_t::unknown();
496
0
                                }
497
0
                                else if (sit < last_irrelevant_removed_)
498
0
                                {
499
0
                                    last_irrelevant_removed_ = sit - 1;
500
0
                                }
501
0
                            }
502
0
                        }
503
0
                    }
504
0
                });
505
0
    }
506
507
0
    if (isSomeoneWasSetRequested)
508
0
    {
509
0
        EPROSIMA_LOG_INFO(RTPS_READER_PROXY, "Requested Changes: " << seq_num_set);
510
0
    }
511
512
0
    return isSomeoneWasSetRequested;
513
0
}
514
515
bool ReaderProxy::process_initial_acknack(
516
        const std::function<void(ChangeForReader_t& change)>& func)
517
0
{
518
0
    if (is_local_reader())
519
0
    {
520
0
        return 0 != convert_status_on_all_changes(UNACKNOWLEDGED, UNSENT, false, func);
521
0
    }
522
523
0
    return true;
524
0
}
525
526
void ReaderProxy::from_unsent_to_status(
527
        const SequenceNumber_t& seq_num,
528
        ChangeForReaderStatus_t status,
529
        bool restart_nack_supression,
530
        bool delivered)
531
0
{
532
    // This function must not be called by a best-effort reader.
533
    // It will use acked_changes_set().
534
0
    assert(is_reliable_);
535
536
0
    if (restart_nack_supression && is_remote_and_reliable() && nack_supression_event_)
537
0
    {
538
0
        assert(timers_enabled_.load());
539
0
        nack_supression_event_->restart_timer();
540
0
    }
541
542
    // Called when delivering an UNSENT sample, the seq_number must exists in the ReaderProxy.
543
0
    assert(seq_num > changes_low_mark_);
544
0
    ChangeIterator it = find_change(seq_num, true);
545
0
    assert(changes_for_reader_.end() != it);
546
0
    assert(UNSENT == it->getStatus());
547
0
    assert(UNSENT != status);
548
549
0
    if (ACKNOWLEDGED == status && seq_num == changes_low_mark_ + 1)
550
0
    {
551
0
        assert(changes_for_reader_.begin() == it);
552
0
        notify_acknowledged(*it);
553
0
        changes_for_reader_.erase(it);
554
0
        acked_changes_set(seq_num + 1);
555
0
        return;
556
0
    }
557
558
0
    it->setStatus(status);
559
560
0
    if (delivered)
561
0
    {
562
0
        it->set_delivered();
563
0
    }
564
0
}
565
566
bool ReaderProxy::mark_fragment_as_sent_for_change(
567
        const SequenceNumber_t& seq_num,
568
        FragmentNumber_t frag_num,
569
        bool& was_last_fragment)
570
0
{
571
0
    was_last_fragment = false;
572
573
0
    if (seq_num <= changes_low_mark_)
574
0
    {
575
0
        return false;
576
0
    }
577
578
0
    bool change_found = false;
579
0
    ChangeIterator it = find_change(seq_num, true);
580
581
0
    if (it != changes_for_reader_.end())
582
0
    {
583
0
        change_found = true;
584
0
        it->markFragmentsAsSent(frag_num);
585
0
        was_last_fragment = it->getUnsentFragments().empty();
586
0
    }
587
588
0
    return change_found;
589
0
}
590
591
bool ReaderProxy::perform_nack_supression()
592
0
{
593
0
    return 0 != convert_status_on_all_changes(UNDERWAY, UNACKNOWLEDGED, false);
594
0
}
595
596
uint32_t ReaderProxy::perform_acknack_response(
597
        const std::function<void(ChangeForReader_t& change)>& func)
598
0
{
599
0
    return convert_status_on_all_changes(REQUESTED, UNSENT, nullptr != stateful_writer_listener_, func);
600
0
}
601
602
uint32_t ReaderProxy::convert_status_on_all_changes(
603
        ChangeForReaderStatus_t previous,
604
        ChangeForReaderStatus_t next,
605
        bool notify_resend,
606
        const std::function<void(ChangeForReader_t& change)>& func)
607
0
{
608
0
    assert(previous > next);
609
610
    // NOTE: This is only called for REQUESTED=>UNSENT (acknack response) or
611
    //       UNDERWAY=>UNACKNOWLEDGED (nack supression)
612
613
0
    uint32_t changed = 0;
614
0
    for (ChangeForReader_t& change : changes_for_reader_)
615
0
    {
616
0
        if (change.getStatus() == previous)
617
0
        {
618
0
            ++changed;
619
0
            change.setStatus(next);
620
621
0
            if (notify_resend)
622
0
            {
623
0
                notify_resent(change);
624
0
            }
625
626
0
            if (func)
627
0
            {
628
0
                func(change);
629
0
            }
630
0
        }
631
0
    }
632
633
0
    return changed;
634
0
}
635
636
void ReaderProxy::change_has_been_removed(
637
        const SequenceNumber_t& seq_num)
638
0
{
639
    // Check sequence number is in the container, because it was not clean up.
640
0
    if (changes_for_reader_.empty() || seq_num < changes_for_reader_.begin()->getSequenceNumber())
641
0
    {
642
0
        return;
643
0
    }
644
645
0
    auto chit = find_change(seq_num);
646
647
0
    if (chit == this->changes_for_reader_.end())
648
0
    {
649
        // No change for this sequence number
650
0
        return;
651
0
    }
652
653
    // In intraprocess, if there is an UNACKNOWLEDGED, a GAP has to be send because there is no reliable mechanism.
654
0
    if (is_local_reader() && ACKNOWLEDGED > chit->getStatus())
655
0
    {
656
0
        writer_->intraprocess_gap(this, seq_num);
657
0
    }
658
659
    // Element may not be in the container when marked as irrelevant.
660
0
    changes_for_reader_.erase(chit);
661
662
    // When removing the next-to-be-acknowledged, we should auto-acknowledge it.
663
0
    if ((changes_low_mark_ + 1) == seq_num)
664
0
    {
665
0
        acked_changes_set(seq_num + 1);
666
0
    }
667
0
}
668
669
bool ReaderProxy::has_unacknowledged(
670
        const SequenceNumber_t& first_seq_in_history) const
671
0
{
672
0
    if (first_seq_in_history > changes_low_mark_)
673
0
    {
674
0
        return true;
675
0
    }
676
677
0
    for (const ChangeForReader_t& it : changes_for_reader_)
678
0
    {
679
0
        if (it.getStatus() == UNACKNOWLEDGED)
680
0
        {
681
0
            return true;
682
0
        }
683
0
    }
684
685
0
    return false;
686
0
}
687
688
bool ReaderProxy::requested_fragment_set(
689
        const SequenceNumber_t& seq_num,
690
        const FragmentNumberSet_t& frag_set)
691
0
{
692
    // Locate the outbound change referenced by the NACK_FRAG
693
0
    ChangeIterator changeIter = find_change(seq_num, true);
694
0
    if (changeIter == changes_for_reader_.end())
695
0
    {
696
0
        return false;
697
0
    }
698
699
0
    changeIter->markFragmentsAsUnsent(frag_set);
700
701
    // If it was UNSENT, we shouldn't switch back to REQUESTED to prevent stalling.
702
0
    if (changeIter->getStatus() != UNSENT)
703
0
    {
704
0
        changeIter->setStatus(REQUESTED);
705
0
    }
706
707
0
    return true;
708
0
}
709
710
bool ReaderProxy::process_nack_frag(
711
        const GUID_t& reader_guid,
712
        uint32_t nack_count,
713
        const SequenceNumber_t& seq_num,
714
        const FragmentNumberSet_t& fragments_state)
715
0
{
716
0
    if (guid() == reader_guid)
717
0
    {
718
0
        if (last_nackfrag_count_ < nack_count)
719
0
        {
720
0
            last_nackfrag_count_ = nack_count;
721
0
            if (requested_fragment_set(seq_num, fragments_state))
722
0
            {
723
0
                return true;
724
0
            }
725
0
        }
726
0
    }
727
728
0
    return false;
729
0
}
730
731
static bool change_less_than_sequence(
732
        const ChangeForReader_t& change,
733
        const SequenceNumber_t& seq_num)
734
0
{
735
0
    return change.getSequenceNumber() < seq_num;
736
0
}
737
738
ReaderProxy::ChangeIterator ReaderProxy::find_change(
739
        const SequenceNumber_t& seq_num,
740
        bool exact)
741
0
{
742
0
    ReaderProxy::ChangeIterator it;
743
0
    ReaderProxy::ChangeIterator end = changes_for_reader_.end();
744
0
    it = std::lower_bound(changes_for_reader_.begin(), end, seq_num, change_less_than_sequence);
745
746
0
    return (!exact)
747
0
           ? it
748
0
           : it == end
749
0
           ? it
750
0
           : it->getSequenceNumber() == seq_num ? it : end;
751
0
}
752
753
ReaderProxy::ChangeConstIterator ReaderProxy::find_change(
754
        const SequenceNumber_t& seq_num) const
755
0
{
756
0
    ReaderProxy::ChangeConstIterator it;
757
0
    ReaderProxy::ChangeConstIterator end = changes_for_reader_.end();
758
0
    it = std::lower_bound(changes_for_reader_.begin(), end, seq_num, change_less_than_sequence);
759
760
0
    return it == end
761
0
           ? it
762
0
           : it->getSequenceNumber() == seq_num ? it : end;
763
0
}
764
765
bool ReaderProxy::has_been_delivered(
766
        const SequenceNumber_t& seq_number,
767
        bool& found) const
768
0
{
769
0
    if (seq_number <= changes_low_mark_)
770
0
    {
771
        // Change has already been acknowledged, so it has been delivered
772
0
        return true;
773
0
    }
774
775
0
    ChangeConstIterator it = find_change(seq_number);
776
0
    if (it != changes_for_reader_.end())
777
0
    {
778
0
        found = true;
779
0
        return it->has_been_delivered();
780
0
    }
781
782
0
    return false;
783
0
}
784
785
void ReaderProxy::notify_acknowledged(
786
        const ChangeForReader_t& change) const
787
0
{
788
0
    if (stateful_writer_listener_ != nullptr)
789
0
    {
790
0
        CacheChange_t* sample = change.getChange();
791
0
        uint32_t payload_length = sample ? sample->serializedPayload.length : 0;
792
0
        stateful_writer_listener_->on_writer_data_acknowledged(
793
0
            writer_->getGuid(),
794
0
            guid(),
795
0
            change.getSequenceNumber(),
796
0
            payload_length,
797
0
            change.time_since_first_send(),
798
0
            locator_info_.general_locator_selector_entry());
799
0
    }
800
0
}
801
802
void ReaderProxy::notify_resent(
803
        const ChangeForReader_t& change) const
804
0
{
805
0
    assert (stateful_writer_listener_ != nullptr);
806
807
0
    const CacheChange_t* sample = change.getChange();
808
0
    assert(sample != nullptr);
809
810
    // Calc number of bytes to resend
811
0
    uint64_t resent_bytes = 0;
812
0
    const FragmentNumberSet_t& fragments = change.getUnsentFragments();
813
0
    if (fragments.empty())
814
0
    {
815
0
        resent_bytes = sample->serializedPayload.length;
816
0
    }
817
0
    else
818
0
    {
819
        // Calculate number of bytes to resend
820
0
        uint64_t num_fragments = fragments.count();
821
0
        uint32_t fragment_size = sample->getFragmentSize();
822
0
        if (fragments.max() < sample->getFragmentCount())
823
0
        {
824
0
            resent_bytes = num_fragments * fragment_size;
825
0
        }
826
0
        else
827
0
        {
828
            // Last fragment may be smaller
829
0
            resent_bytes = (num_fragments - 1) * fragment_size;
830
0
            uint32_t last_fragment_size = sample->serializedPayload.length % fragment_size;
831
0
            if (last_fragment_size > 0)
832
0
            {
833
0
                resent_bytes += last_fragment_size;
834
0
            }
835
0
            else
836
0
            {
837
0
                resent_bytes += fragment_size;
838
0
            }
839
0
        }
840
0
    }
841
842
    // Notify to participant
843
    assert(resent_bytes <= sample->serializedPayload.length);
844
0
    stateful_writer_listener_->on_writer_resend_data(
845
0
        writer_->getGuid(),
846
0
        guid(),
847
0
        sample->sequenceNumber,
848
0
        static_cast<uint32_t>(resent_bytes),
849
0
        locator_info_.general_locator_selector_entry());
850
0
}
851
852
}   // namespace rtps
853
}   // namespace fastdds
854
}   // namespace eprosima