Coverage Report

Created: 2022-08-24 06:19

/src/Fast-DDS/src/cpp/rtps/writer/ReaderProxy.cpp
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file ReaderProxy.cpp
17
 *
18
 */
19
20
21
#include <fastdds/dds/log/Log.hpp>
22
#include <fastdds/rtps/history/WriterHistory.h>
23
#include <fastdds/rtps/writer/ReaderProxy.h>
24
#include <fastdds/rtps/writer/StatefulWriter.h>
25
#include <fastdds/rtps/resources/TimedEvent.h>
26
#include <fastrtps/utils/TimeConversion.h>
27
#include <fastdds/rtps/common/LocatorListComparisons.hpp>
28
29
#include <rtps/participant/RTPSParticipantImpl.h>
30
#include <rtps/history/HistoryAttributesExtension.hpp>
31
32
#include "rtps/messages/RTPSGapBuilder.hpp"
33
#include <rtps/DataSharing/DataSharingNotifier.hpp>
34
35
#include <mutex>
36
#include <cassert>
37
#include <algorithm>
38
39
namespace eprosima {
40
namespace fastrtps {
41
namespace rtps {
42
43
ReaderProxy::ReaderProxy(
44
        const WriterTimes& times,
45
        const RemoteLocatorsAllocationAttributes& loc_alloc,
46
        StatefulWriter* writer)
47
    : is_active_(false)
48
    , locator_info_(
49
        writer, loc_alloc.max_unicast_locators,
50
        loc_alloc.max_multicast_locators)
51
    , durability_kind_(VOLATILE)
52
    , expects_inline_qos_(false)
53
    , is_reliable_(false)
54
    , disable_positive_acks_(false)
55
    , writer_(writer)
56
    , changes_for_reader_(resource_limits_from_history(writer->mp_history->m_att, 0))
57
    , nack_supression_event_(nullptr)
58
    , initial_heartbeat_event_(nullptr)
59
    , timers_enabled_(false)
60
    , last_acknack_count_(0)
61
    , last_nackfrag_count_(0)
62
0
{
63
0
    nack_supression_event_ = new TimedEvent(writer_->getRTPSParticipant()->getEventResource(),
64
0
                    [&]() -> bool
65
0
                    {
66
0
                        writer_->perform_nack_supression(guid());
67
0
                        return false;
68
0
                    },
69
0
                    TimeConv::Time_t2MilliSecondsDouble(times.nackSupressionDuration));
70
71
0
    initial_heartbeat_event_ = new TimedEvent(writer_->getRTPSParticipant()->getEventResource(),
72
0
                    [&]() -> bool
73
0
                    {
74
0
                        writer_->intraprocess_heartbeat(this);
75
0
                        return false;
76
0
                    }, 0);
77
78
0
    stop();
79
0
}
80
81
bool ReaderProxy::rtps_is_relevant(
82
        CacheChange_t* change) const
83
0
{
84
0
    auto filter = writer_->reader_data_filter();
85
0
    if (nullptr != filter)
86
0
    {
87
0
        bool ret = filter->is_relevant(*change, guid());
88
0
        logInfo(RTPS_READER_PROXY,
89
0
                "Change " << change->instanceHandle << " is relevant for reader " << guid() << "? " << ret);
90
0
        return ret;
91
0
    }
92
0
    return true;
93
0
}
94
95
ReaderProxy::~ReaderProxy()
96
0
{
97
0
    if (nack_supression_event_)
98
0
    {
99
0
        delete(nack_supression_event_);
100
0
        nack_supression_event_ = nullptr;
101
0
    }
102
103
0
    if (initial_heartbeat_event_)
104
0
    {
105
0
        delete(initial_heartbeat_event_);
106
0
        initial_heartbeat_event_ = nullptr;
107
0
    }
108
0
}
109
110
void ReaderProxy::start(
111
        const ReaderProxyData& reader_attributes,
112
        bool is_datasharing)
113
0
{
114
0
    locator_info_.start(
115
0
        reader_attributes.guid(),
116
0
        reader_attributes.remote_locators().unicast,
117
0
        reader_attributes.remote_locators().multicast,
118
0
        reader_attributes.m_expectsInlineQos,
119
0
        is_datasharing);
120
121
0
    is_active_ = true;
122
0
    durability_kind_ = reader_attributes.m_qos.m_durability.durabilityKind();
123
0
    expects_inline_qos_ = reader_attributes.m_expectsInlineQos;
124
0
    is_reliable_ = reader_attributes.m_qos.m_reliability.kind != BEST_EFFORT_RELIABILITY_QOS;
125
0
    disable_positive_acks_ = reader_attributes.disable_positive_acks();
126
0
    if (durability_kind_ == DurabilityKind_t::VOLATILE)
127
0
    {
128
0
        SequenceNumber_t min_sequence = writer_->get_seq_num_min();
129
0
        changes_low_mark_ = (min_sequence == SequenceNumber_t::unknown()) ?
130
0
                writer_->next_sequence_number() - 1 : min_sequence - 1;
131
0
    }
132
0
    else
133
0
    {
134
0
        acked_changes_set(SequenceNumber_t());  // Simulate initial acknack to set low mark
135
0
    }
136
137
0
    timers_enabled_.store(is_remote_and_reliable());
138
0
    if (is_local_reader())
139
0
    {
140
0
        initial_heartbeat_event_->restart_timer();
141
0
    }
142
143
0
    logInfo(RTPS_READER_PROXY, "Reader Proxy started");
144
0
}
145
146
bool ReaderProxy::update(
147
        const ReaderProxyData& reader_attributes)
148
0
{
149
0
    durability_kind_ = reader_attributes.m_qos.m_durability.durabilityKind();
150
0
    expects_inline_qos_ = reader_attributes.m_expectsInlineQos;
151
0
    is_reliable_ = reader_attributes.m_qos.m_reliability.kind != BEST_EFFORT_RELIABILITY_QOS;
152
0
    disable_positive_acks_ = reader_attributes.disable_positive_acks();
153
154
0
    locator_info_.update(
155
0
        reader_attributes.remote_locators().unicast,
156
0
        reader_attributes.remote_locators().multicast,
157
0
        reader_attributes.m_expectsInlineQos);
158
159
0
    return true;
160
0
}
161
162
void ReaderProxy::stop()
163
0
{
164
0
    locator_info_.stop();
165
0
    is_active_ = false;
166
0
    disable_timers();
167
168
0
    changes_for_reader_.clear();
169
0
    last_acknack_count_ = 0;
170
0
    last_nackfrag_count_ = 0;
171
0
    changes_low_mark_ = SequenceNumber_t();
172
0
}
173
174
void ReaderProxy::disable_timers()
175
0
{
176
0
    if (timers_enabled_.exchange(false))
177
0
    {
178
0
        nack_supression_event_->cancel_timer();
179
0
    }
180
0
    initial_heartbeat_event_->cancel_timer();
181
0
}
182
183
void ReaderProxy::update_nack_supression_interval(
184
        const Duration_t& interval)
185
0
{
186
0
    nack_supression_event_->update_interval(interval);
187
0
}
188
189
void ReaderProxy::add_change(
190
        const ChangeForReader_t& change,
191
        bool is_relevant,
192
        bool restart_nack_supression)
193
0
{
194
0
    if (restart_nack_supression && timers_enabled_.load())
195
0
    {
196
0
        nack_supression_event_->restart_timer();
197
0
    }
198
199
0
    add_change(change, is_relevant);
200
0
}
201
202
void ReaderProxy::add_change(
203
        const ChangeForReader_t& change,
204
        bool is_relevant,
205
        bool restart_nack_supression,
206
        const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
207
0
{
208
0
    if (restart_nack_supression && timers_enabled_)
209
0
    {
210
0
        nack_supression_event_->restart_timer(max_blocking_time);
211
0
    }
212
213
0
    add_change(change, is_relevant);
214
0
}
215
216
void ReaderProxy::add_change(
217
        const ChangeForReader_t& change,
218
        bool is_relevant)
219
0
{
220
0
    assert(change.getSequenceNumber() > changes_low_mark_);
221
0
    assert(changes_for_reader_.empty() ? true :
222
0
            change.getSequenceNumber() > changes_for_reader_.back().getSequenceNumber());
223
224
    // Irrelevant changes are not added to the collection
225
0
    if (!is_relevant)
226
0
    {
227
0
        if ( !is_reliable_ &&
228
0
                changes_low_mark_ + 1 == change.getSequenceNumber())
229
0
        {
230
0
            changes_low_mark_ = change.getSequenceNumber();
231
0
        }
232
0
        return;
233
0
    }
234
235
0
    if (changes_for_reader_.push_back(change) == nullptr)
236
0
    {
237
        // This should never happen
238
0
        logError(RTPS_READER_PROXY, "Error adding change " << change.getSequenceNumber()
239
0
                                                           << " to reader proxy " << guid());
240
0
        eprosima::fastdds::dds::Log::Flush();
241
0
        assert(false);
242
0
    }
243
0
}
244
245
bool ReaderProxy::has_changes() const
246
0
{
247
0
    return !changes_for_reader_.empty();
248
0
}
249
250
bool ReaderProxy::change_is_acked(
251
        const SequenceNumber_t& seq_num) const
252
0
{
253
0
    if (seq_num <= changes_low_mark_ || changes_for_reader_.empty())
254
0
    {
255
0
        return true;
256
0
    }
257
258
0
    ChangeConstIterator chit = find_change(seq_num);
259
0
    if (chit == changes_for_reader_.end())
260
0
    {
261
        // There is a hole in changes_for_reader_
262
        // This means a change was removed, or was not relevant.
263
0
        return true;
264
0
    }
265
266
0
    return chit->getStatus() == ACKNOWLEDGED;
267
0
}
268
269
bool ReaderProxy::change_is_unsent(
270
        const SequenceNumber_t& seq_num,
271
        FragmentNumber_t& next_unsent_frag,
272
        SequenceNumber_t& gap_seq,
273
        const SequenceNumber_t& min_seq,
274
        bool& need_reactivate_periodic_heartbeat) const
275
0
{
276
0
    if (seq_num <= changes_low_mark_ || changes_for_reader_.empty())
277
0
    {
278
0
        return false;
279
0
    }
280
281
0
    ChangeConstIterator chit = find_change(seq_num);
282
0
    if (chit == changes_for_reader_.end())
283
0
    {
284
        // There is a hole in changes_for_reader_
285
        // This means a change was removed.
286
0
        return false;
287
0
    }
288
289
0
    bool returned_value = chit->getStatus() == UNSENT;
290
291
0
    if (returned_value)
292
0
    {
293
0
        next_unsent_frag = chit->get_next_unsent_fragment();
294
0
        gap_seq = SequenceNumber_t::unknown();
295
296
0
        if (is_reliable_ && !chit->has_been_delivered())
297
0
        {
298
0
            need_reactivate_periodic_heartbeat |= true;
299
0
            SequenceNumber_t prev =
300
0
                    (changes_for_reader_.begin() != chit ?
301
0
                    std::prev(chit)->getSequenceNumber() :
302
0
                    changes_low_mark_
303
0
                    ) + 1;
304
305
0
            if (prev != chit->getSequenceNumber())
306
0
            {
307
0
                gap_seq = prev;
308
309
                // Verify the calculated gap_seq in ReaderProxy is a real hole in the history.
310
0
                if (gap_seq < min_seq) // Several samples of the hole are not really already available.
311
0
                {
312
0
                    if (min_seq < seq_num)
313
0
                    {
314
0
                        gap_seq = min_seq;
315
0
                    }
316
0
                    else
317
0
                    {
318
0
                        gap_seq = SequenceNumber_t::unknown();
319
0
                    }
320
0
                }
321
0
            }
322
0
        }
323
0
    }
324
325
0
    return returned_value;
326
0
}
327
328
void ReaderProxy::acked_changes_set(
329
        const SequenceNumber_t& seq_num)
330
0
{
331
0
    SequenceNumber_t future_low_mark = seq_num;
332
333
0
    if (seq_num > changes_low_mark_)
334
0
    {
335
0
        ChangeIterator chit = find_change(seq_num, false);
336
        // continue advancing until next change is not acknowledged
337
0
        while (chit != changes_for_reader_.end()
338
0
                && chit->getSequenceNumber() == future_low_mark
339
0
                && chit->getStatus() == ACKNOWLEDGED)
340
0
        {
341
0
            ++chit;
342
0
            ++future_low_mark;
343
0
        }
344
0
        changes_for_reader_.erase(changes_for_reader_.begin(), chit);
345
0
    }
346
0
    else
347
0
    {
348
0
        future_low_mark = changes_low_mark_ + 1;
349
350
0
        if (seq_num == SequenceNumber_t() && durability_kind_ != DurabilityKind_t::VOLATILE)
351
0
        {
352
            // Special case. Currently only used on Builtin StatefulWriters
353
            // after losing lease duration, and on late joiners to set
354
            // changes_low_mark_ to match that of the writer.
355
0
            SequenceNumber_t min_sequence = writer_->get_seq_num_min();
356
0
            if (min_sequence != SequenceNumber_t::unknown())
357
0
            {
358
0
                SequenceNumber_t current_sequence = seq_num;
359
0
                if (seq_num < min_sequence)
360
0
                {
361
0
                    current_sequence = min_sequence;
362
0
                }
363
0
                future_low_mark = current_sequence;
364
365
0
                bool should_sort = false;
366
0
                for (; current_sequence <= changes_low_mark_; ++current_sequence)
367
0
                {
368
                    // Skip all consecutive changes already in the collection
369
0
                    ChangeConstIterator it = find_change(current_sequence);
370
0
                    while ( it != changes_for_reader_.end() &&
371
0
                            current_sequence <= changes_low_mark_ &&
372
0
                            it->getSequenceNumber() == current_sequence)
373
0
                    {
374
0
                        ++current_sequence;
375
0
                        ++it;
376
0
                    }
377
378
0
                    if (current_sequence <= changes_low_mark_)
379
0
                    {
380
0
                        CacheChange_t* change = nullptr;
381
0
                        if (writer_->mp_history->get_change(current_sequence, writer_->getGuid(), &change))
382
0
                        {
383
0
                            should_sort = true;
384
0
                            ChangeForReader_t cr(change);
385
0
                            cr.setStatus(UNACKNOWLEDGED);
386
0
                            changes_for_reader_.push_back(cr);
387
0
                        }
388
0
                    }
389
0
                }
390
                // Keep changes sorted by sequence number
391
0
                if (should_sort)
392
0
                {
393
0
                    std::sort(changes_for_reader_.begin(), changes_for_reader_.end(), ChangeForReaderCmp());
394
0
                }
395
0
            }
396
0
            else if (!is_local_reader())
397
0
            {
398
0
                future_low_mark = writer_->next_sequence_number();
399
0
            }
400
0
        }
401
0
    }
402
0
    changes_low_mark_ = future_low_mark - 1;
403
0
}
404
405
bool ReaderProxy::requested_changes_set(
406
        const SequenceNumberSet_t& seq_num_set,
407
        RTPSGapBuilder& gap_builder,
408
        const SequenceNumber_t& min_seq_in_history)
409
0
{
410
0
    bool isSomeoneWasSetRequested = false;
411
412
0
    if (SequenceNumber_t::unknown() != min_seq_in_history)
413
0
    {
414
0
        seq_num_set.for_each([&](SequenceNumber_t sit)
415
0
                {
416
0
                    ChangeIterator chit = find_change(sit, true);
417
0
                    if (chit != changes_for_reader_.end())
418
0
                    {
419
0
                        if (UNACKNOWLEDGED == chit->getStatus())
420
0
                        {
421
0
                            chit->setStatus(REQUESTED);
422
0
                            chit->markAllFragmentsAsUnsent();
423
0
                            isSomeoneWasSetRequested = true;
424
0
                        }
425
0
                    }
426
0
                    else if ((sit >= min_seq_in_history) && (sit > changes_low_mark_))
427
0
                    {
428
0
                        gap_builder.add(sit);
429
0
                    }
430
0
                });
431
0
    }
432
433
0
    if (isSomeoneWasSetRequested)
434
0
    {
435
0
        logInfo(RTPS_READER_PROXY, "Requested Changes: " << seq_num_set);
436
0
    }
437
438
0
    return isSomeoneWasSetRequested;
439
0
}
440
441
bool ReaderProxy::process_initial_acknack(
442
        const std::function<void(ChangeForReader_t& change)>& func)
443
0
{
444
0
    if (is_local_reader())
445
0
    {
446
0
        return 0 != convert_status_on_all_changes(UNACKNOWLEDGED, UNSENT, func);
447
0
    }
448
449
0
    return true;
450
0
}
451
452
void ReaderProxy::from_unsent_to_status(
453
        const SequenceNumber_t& seq_num,
454
        ChangeForReaderStatus_t status,
455
        bool restart_nack_supression,
456
        bool delivered)
457
0
{
458
    // This function must not be called by a best-effort reader.
459
    // It will use acked_changes_set().
460
0
    assert(is_reliable_);
461
462
0
    if (restart_nack_supression && is_remote_and_reliable())
463
0
    {
464
0
        assert(timers_enabled_.load());
465
0
        nack_supression_event_->restart_timer();
466
0
    }
467
468
    // Called when delivering an UNSENT sample, the seq_number must exists in the ReaderProxy.
469
0
    assert(seq_num > changes_low_mark_);
470
0
    ChangeIterator it = find_change(seq_num, true);
471
0
    assert(changes_for_reader_.end() != it);
472
0
    assert(UNSENT == it->getStatus());
473
0
    assert(UNSENT != status);
474
475
0
    if (ACKNOWLEDGED == status && seq_num == changes_low_mark_ + 1)
476
0
    {
477
0
        assert(changes_for_reader_.begin() == it);
478
0
        changes_for_reader_.erase(it);
479
0
        acked_changes_set(seq_num + 1);
480
0
        return;
481
0
    }
482
483
0
    it->setStatus(status);
484
485
0
    if (delivered)
486
0
    {
487
0
        it->set_delivered();
488
0
    }
489
0
}
490
491
bool ReaderProxy::mark_fragment_as_sent_for_change(
492
        const SequenceNumber_t& seq_num,
493
        FragmentNumber_t frag_num,
494
        bool& was_last_fragment)
495
0
{
496
0
    was_last_fragment = false;
497
498
0
    if (seq_num <= changes_low_mark_)
499
0
    {
500
0
        return false;
501
0
    }
502
503
0
    bool change_found = false;
504
0
    ChangeIterator it = find_change(seq_num, true);
505
506
0
    if (it != changes_for_reader_.end())
507
0
    {
508
0
        change_found = true;
509
0
        it->markFragmentsAsSent(frag_num);
510
0
        was_last_fragment = it->getUnsentFragments().empty();
511
0
    }
512
513
0
    return change_found;
514
0
}
515
516
bool ReaderProxy::perform_nack_supression()
517
0
{
518
0
    return 0 != convert_status_on_all_changes(UNDERWAY, UNACKNOWLEDGED);
519
0
}
520
521
uint32_t ReaderProxy::perform_acknack_response(
522
        const std::function<void(ChangeForReader_t& change)>& func)
523
0
{
524
0
    return convert_status_on_all_changes(REQUESTED, UNSENT, func);
525
0
}
526
527
uint32_t ReaderProxy::convert_status_on_all_changes(
528
        ChangeForReaderStatus_t previous,
529
        ChangeForReaderStatus_t next,
530
        const std::function<void(ChangeForReader_t& change)>& func)
531
0
{
532
0
    assert(previous > next);
533
534
    // NOTE: This is only called for REQUESTED=>UNSENT (acknack response) or
535
    //       UNDERWAY=>UNACKNOWLEDGED (nack supression)
536
537
0
    uint32_t changed = 0;
538
0
    for (ChangeForReader_t& change : changes_for_reader_)
539
0
    {
540
0
        if (change.getStatus() == previous)
541
0
        {
542
0
            ++changed;
543
0
            change.setStatus(next);
544
545
0
            if (func)
546
0
            {
547
0
                func(change);
548
0
            }
549
0
        }
550
0
    }
551
552
0
    return changed;
553
0
}
554
555
void ReaderProxy::change_has_been_removed(
556
        const SequenceNumber_t& seq_num)
557
0
{
558
    // Check sequence number is in the container, because it was not clean up.
559
0
    if (changes_for_reader_.empty() || seq_num < changes_for_reader_.begin()->getSequenceNumber())
560
0
    {
561
0
        return;
562
0
    }
563
564
0
    auto chit = find_change(seq_num);
565
566
0
    if (chit == this->changes_for_reader_.end())
567
0
    {
568
        // No change for this sequence number
569
0
        return;
570
0
    }
571
572
    // In intraprocess, if there is an UNACKNOWLEDGED, a GAP has to be send because there is no reliable mechanism.
573
0
    if (is_local_reader() && ACKNOWLEDGED > chit->getStatus())
574
0
    {
575
0
        writer_->intraprocess_gap(this, seq_num);
576
0
    }
577
578
    // Element may not be in the container when marked as irrelevant.
579
0
    changes_for_reader_.erase(chit);
580
581
    // When removing the next-to-be-acknowledged, we should auto-acknowledge it.
582
0
    if ((changes_low_mark_ + 1) == seq_num)
583
0
    {
584
0
        acked_changes_set(seq_num + 1);
585
0
    }
586
0
}
587
588
bool ReaderProxy::has_unacknowledged(
589
        const SequenceNumber_t& first_seq_in_history) const
590
0
{
591
0
    if (first_seq_in_history > changes_low_mark_)
592
0
    {
593
0
        return true;
594
0
    }
595
596
0
    for (const ChangeForReader_t& it : changes_for_reader_)
597
0
    {
598
0
        if (it.getStatus() == UNACKNOWLEDGED)
599
0
        {
600
0
            return true;
601
0
        }
602
0
    }
603
604
0
    return false;
605
0
}
606
607
bool ReaderProxy::requested_fragment_set(
608
        const SequenceNumber_t& seq_num,
609
        const FragmentNumberSet_t& frag_set)
610
0
{
611
    // Locate the outbound change referenced by the NACK_FRAG
612
0
    ChangeIterator changeIter = find_change(seq_num, true);
613
0
    if (changeIter == changes_for_reader_.end())
614
0
    {
615
0
        return false;
616
0
    }
617
618
0
    changeIter->markFragmentsAsUnsent(frag_set);
619
620
    // If it was UNSENT, we shouldn't switch back to REQUESTED to prevent stalling.
621
0
    if (changeIter->getStatus() != UNSENT)
622
0
    {
623
0
        changeIter->setStatus(REQUESTED);
624
0
    }
625
626
0
    return true;
627
0
}
628
629
bool ReaderProxy::process_nack_frag(
630
        const GUID_t& reader_guid,
631
        uint32_t nack_count,
632
        const SequenceNumber_t& seq_num,
633
        const FragmentNumberSet_t& fragments_state)
634
0
{
635
0
    if (guid() == reader_guid)
636
0
    {
637
0
        if (last_nackfrag_count_ < nack_count)
638
0
        {
639
0
            last_nackfrag_count_ = nack_count;
640
0
            if (requested_fragment_set(seq_num, fragments_state))
641
0
            {
642
0
                return true;
643
0
            }
644
0
        }
645
0
    }
646
647
0
    return false;
648
0
}
649
650
static bool change_less_than_sequence(
651
        const ChangeForReader_t& change,
652
        const SequenceNumber_t& seq_num)
653
0
{
654
0
    return change.getSequenceNumber() < seq_num;
655
0
}
656
657
ReaderProxy::ChangeIterator ReaderProxy::find_change(
658
        const SequenceNumber_t& seq_num,
659
        bool exact)
660
0
{
661
0
    ReaderProxy::ChangeIterator it;
662
0
    ReaderProxy::ChangeIterator end = changes_for_reader_.end();
663
0
    it = std::lower_bound(changes_for_reader_.begin(), end, seq_num, change_less_than_sequence);
664
665
0
    return (!exact)
666
0
           ? it
667
0
           : it == end
668
0
           ? it
669
0
           : it->getSequenceNumber() == seq_num ? it : end;
670
0
}
671
672
ReaderProxy::ChangeConstIterator ReaderProxy::find_change(
673
        const SequenceNumber_t& seq_num) const
674
0
{
675
0
    ReaderProxy::ChangeConstIterator it;
676
0
    ReaderProxy::ChangeConstIterator end = changes_for_reader_.end();
677
0
    it = std::lower_bound(changes_for_reader_.begin(), end, seq_num, change_less_than_sequence);
678
679
0
    return it == end
680
0
           ? it
681
0
           : it->getSequenceNumber() == seq_num ? it : end;
682
0
}
683
684
}   // namespace rtps
685
}   // namespace fastrtps
686
}   // namespace eprosima