Coverage Report

Created: 2026-05-04 06:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/Fast-DDS/src/cpp/rtps/writer/ReaderProxy.hpp
Line
Count
Source
1
// Copyright 2016-2019 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file ReaderProxy.hpp
17
 */
18
#ifndef RTPS_WRITER__READERPROXY_HPP
19
#define RTPS_WRITER__READERPROXY_HPP
20
21
#include <algorithm>
22
#include <atomic>
23
#include <mutex>
24
#include <set>
25
26
#include <fastdds/rtps/attributes/RTPSParticipantAllocationAttributes.hpp>
27
#include <fastdds/rtps/attributes/WriterAttributes.hpp>
28
#include <fastdds/rtps/common/CacheChange.hpp>
29
#include <fastdds/rtps/common/FragmentNumber.hpp>
30
#include <fastdds/rtps/common/Locator.hpp>
31
#include <fastdds/rtps/common/SequenceNumber.hpp>
32
#include <fastdds/rtps/common/Types.hpp>
33
#include <fastdds/utils/collections/ResourceLimitedVector.hpp>
34
35
#include <rtps/builtin/data/ReaderProxyData.hpp>
36
#include <rtps/writer/ChangeForReader.hpp>
37
#include <rtps/writer/ReaderLocator.hpp>
38
39
namespace eprosima {
40
namespace fastdds {
41
namespace rtps {
42
43
class BaseReader;
44
class StatefulWriter;
45
class StatefulWriterListener;
46
class TimedEvent;
47
class RTPSReader;
48
class IDataSharingNotifier;
49
class RTPSGapBuilder;
50
51
/**
52
 * ReaderProxy class that helps to keep the state of a specific Reader with respect to the RTPSWriter.
53
 * @ingroup WRITER_MODULE
54
 */
55
class ReaderProxy
56
{
57
public:
58
59
    ~ReaderProxy();
60
61
    /**
62
     * Constructor.
63
     * @param times WriterTimes to use in the ReaderProxy.
64
     * @param loc_alloc Maximum number of remote locators to keep in the ReaderProxy.
65
     * @param writer Pointer to the StatefulWriter creating the reader proxy.
66
     * @param stateful_listener Pointer to the StatefulWriterListener associated to the writer.
67
     */
68
    ReaderProxy(
69
            const WriterTimes& times,
70
            const RemoteLocatorsAllocationAttributes& loc_alloc,
71
            StatefulWriter* writer,
72
            StatefulWriterListener* stateful_listener);
73
74
    /**
75
     * Activate this proxy associating it to a remote reader.
76
     * @param reader_attributes ReaderProxyData of the reader for which to keep state.
77
     * @param is_datasharing whether the reader is datasharing compatible with the writer or not.
78
     */
79
    void start(
80
            const ReaderProxyData& reader_attributes,
81
            bool is_datasharing = false);
82
83
    /**
84
     * Update information about the remote reader.
85
     * @param reader_attributes ReaderProxyData with updated information of the reader.
86
     * @return true if data was modified, false otherwise.
87
     */
88
    bool update(
89
            const ReaderProxyData& reader_attributes);
90
91
    /**
92
     * Disable this proxy.
93
     */
94
    void stop();
95
96
    /**
97
     * Called when a change is added to the writer's history.
98
     * @param change Information regarding the change added.
99
     * @param is_relevant Specify if change is relevant for this remote reader.
100
     * @param restart_nack_supression Whether nack-supression event should be restarted.
101
     */
102
    void add_change(
103
            const ChangeForReader_t& change,
104
            bool is_relevant,
105
            bool restart_nack_supression);
106
107
    void add_change(
108
            const ChangeForReader_t& change,
109
            bool is_relevant,
110
            bool restart_nack_supression,
111
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);
112
113
    /**
114
     * Check if there are changes pending for this reader.
115
     * @return true when there are pending changes, false otherwise.
116
     */
117
    bool has_changes() const;
118
119
    /**
120
     * Check if a specific change has been already acknowledged for this reader.
121
     * @param seq_num Sequence number of the change to be checked.
122
     * @return true when the change is irrelevant or has been already acknowledged, false otherwise.
123
     */
124
    bool change_is_acked(
125
            const SequenceNumber_t& seq_num) const;
126
127
    /**
128
     * Check if a specific change is marked to be sent to this reader.
129
     *
130
     * @param [in]  seq_num Sequence number of the change to be checked.
131
     * @param [out] next_unsent_frag Return next fragment to be sent.
132
     * @param [out] gap_seq Return, when it is its first delivery (should be relevant seq_num), the sequence number of
133
     * the first sequence of the gap [first, seq_num). Otherwise return SequenceNumber_t::unknown().
134
     * @param [in]  min_seq Minimum sequence number managed by the History. It could be SequenceNumber_t::unknown() if
135
     * history is empty.
136
     * @param [out] need_reactivate_periodic_heartbeat Indicates if the heartbeat period event has to be restarted.
137
     *
138
     * @return true if the change is marked to be sent. False otherwise.
139
     */
140
    bool change_is_unsent(
141
            const SequenceNumber_t& seq_num,
142
            FragmentNumber_t& next_unsent_frag,
143
            SequenceNumber_t& gap_seq,
144
            const SequenceNumber_t& min_seq,
145
            bool& need_reactivate_periodic_heartbeat);
146
147
    /**
148
     * Mark all changes up to the one indicated by seq_num as Acknowledged.
149
     * For instance, when seq_num is 30, changes 1-29 are marked as acknowledged.
150
     * @param seq_num Sequence number of the first change not to be marked as acknowledged.
151
     */
152
    void acked_changes_set(
153
            const SequenceNumber_t& seq_num);
154
155
    /**
156
     * Mark all changes in the vector as requested.
157
     * @param seq_num_set Bitmap of sequence numbers.
158
     * @param gap_builder RTPSGapBuilder reference uses for adding  each requested change that is irrelevant for the
159
     * requester.
160
     * @param [in] min_seq_in_history Minimum SequenceNumber_t in the writer's history. If writer's history is empty,
161
     * SequenceNumber_t::unknown() is expected.
162
     * @return true if at least one change has been marked as REQUESTED, false otherwise.
163
     */
164
    bool requested_changes_set(
165
            const SequenceNumberSet_t& seq_num_set,
166
            RTPSGapBuilder& gap_builder,
167
            const SequenceNumber_t& min_seq_in_history);
168
169
    /**
170
     * Performs processing of preemptive acknack
171
     * @param func functor called, if the requester is a local reader, for each changes moved to UNSENT status.
172
     * @return true if a heartbeat should be sent, false otherwise.
173
     */
174
    bool process_initial_acknack(
175
            const std::function<void(ChangeForReader_t& change)>& func);
176
177
    /*!
178
     * @brief Sets a change to a particular status (if present in the ReaderProxy)
179
     * @param seq_num Sequence number of the change to update.
180
     * @param status Status to apply.
181
     * @param restart_nack_supression Whether nack supression event should be restarted or not.
182
     * @param delivered true if change was able to be delivered to its addressees. false otherwise.
183
     */
184
    void from_unsent_to_status(
185
            const SequenceNumber_t& seq_num,
186
            ChangeForReaderStatus_t status,
187
            bool restart_nack_supression,
188
            bool delivered = true);
189
190
    /**
191
     * @brief Mark a particular fragment as sent.
192
     * @param [in]  seq_num Sequence number of the change to update.
193
     * @param [in]  frag_num Fragment number to mark as sent.
194
     * @param [out] was_last_fragment Indicates if the fragment was the last one pending.
195
     * @return true when the change was found, false otherwise.
196
     */
197
    bool mark_fragment_as_sent_for_change(
198
            const SequenceNumber_t& seq_num,
199
            FragmentNumber_t frag_num,
200
            bool& was_last_fragment);
201
202
    /**
203
     * Turns all UNDERWAY changes into UNACKNOWLEDGED.
204
     *
205
     * @return true if at least one change changed its status, false otherwise.
206
     */
207
    bool perform_nack_supression();
208
209
    /**
210
     * Turns all REQUESTED changes into UNSENT.
211
     *
212
     * @param func Function executed for each change which changes its status.
213
     * @return the number of changes that changed its status.
214
     */
215
    uint32_t perform_acknack_response(
216
            const std::function<void(ChangeForReader_t& change)>& func);
217
218
    /**
219
     * Call this to inform a change was removed from history.
220
     * @param seq_num Sequence number of the removed change.
221
     */
222
    void change_has_been_removed(
223
            const SequenceNumber_t& seq_num);
224
225
    /*!
226
     * @brief Returns there is some UNACKNOWLEDGED change.
227
     * @param first_seq_in_history Minimum sequence number in the writer history.
228
     * @return There is some UNACKNOWLEDGED change.
229
     */
230
    bool has_unacknowledged(
231
            const SequenceNumber_t& first_seq_in_history) const;
232
233
    /**
234
     * Get the GUID of the reader represented by this proxy.
235
     * @return the GUID of the reader represented by this proxy.
236
     */
237
    inline const GUID_t& guid() const
238
0
    {
239
0
        return locator_info_.remote_guid();
240
0
    }
241
242
    /**
243
     * Get the durability of the reader represented by this proxy.
244
     * @return the durability of the reader represented by this proxy.
245
     */
246
    inline DurabilityKind_t durability_kind() const
247
0
    {
248
0
        return durability_kind_;
249
0
    }
250
251
    /**
252
     * Check if the reader represented by this proxy expexts inline QOS to be received.
253
     * @return true if the reader represented by this proxy expexts inline QOS to be received.
254
     */
255
    inline bool expects_inline_qos() const
256
0
    {
257
0
        return expects_inline_qos_;
258
0
    }
259
260
    /**
261
     * Check if the reader represented by this proxy is reliable.
262
     * @return true if the reader represented by this proxy is reliable.
263
     */
264
    inline bool is_reliable() const
265
0
    {
266
0
        return is_reliable_;
267
0
    }
268
269
    inline bool disable_positive_acks() const
270
0
    {
271
0
        return disable_positive_acks_;
272
0
    }
273
274
    /**
275
     * Check if the reader represented by this proxy is remote and reliable.
276
     * @return true if the reader represented by this proxy is remote and reliable.
277
     */
278
    inline bool is_remote_and_reliable() const
279
0
    {
280
0
        return !locator_info_.is_local_reader() && !locator_info_.is_datasharing_reader() && is_reliable_;
281
0
    }
282
283
    /**
284
     * Check if the reader is on the same process.
285
     * @return true if the reader is no the same process.
286
     */
287
    inline bool is_local_reader()
288
0
    {
289
0
        return locator_info_.is_local_reader();
290
0
    }
291
292
    /**
293
     * Get the local reader on the same process (if any).
294
     * @return The local reader on the same process.
295
     */
296
    inline LocalReaderPointer::Instance local_reader()
297
0
    {
298
0
        return locator_info_.local_reader();
299
0
    }
300
301
    /**
302
     * Called when an ACKNACK is received to set a new value for the minimum count accepted for following received
303
     * ACKNACKs.
304
     *
305
     * @param acknack_count The count of the received ACKNACK.
306
     * @return true if internal count changed (i.e. received ACKNACK is accepted)
307
     */
308
    bool check_and_set_acknack_count(
309
            uint32_t acknack_count)
310
0
    {
311
0
        if (acknack_count >= next_expected_acknack_count_)
312
0
        {
313
0
            next_expected_acknack_count_ = acknack_count;
314
0
            ++next_expected_acknack_count_;
315
0
            return true;
316
0
        }
317
318
0
        return false;
319
0
    }
320
321
    /**
322
     * Process an incoming NACKFRAG submessage.
323
     * @param reader_guid Destination guid of the submessage.
324
     * @param nack_count Counter field of the submessage.
325
     * @param seq_num Sequence number field of the submessage.
326
     * @param fragments_state Bitmap indicating the requested fragments.
327
     * @return true if a change was modified, false otherwise.
328
     */
329
    bool process_nack_frag(
330
            const GUID_t& reader_guid,
331
            uint32_t nack_count,
332
            const SequenceNumber_t& seq_num,
333
            const FragmentNumberSet_t& fragments_state);
334
335
    /**
336
     * Filter a CacheChange_t using the StatefulWriter's IReaderDataFilter.
337
     * @param change
338
     * @return true if the change is relevant, false otherwise.
339
     */
340
    bool rtps_is_relevant(
341
            CacheChange_t* change) const;
342
343
    /**
344
     * Get the highest fully acknowledged sequence number.
345
     * @return the highest fully acknowledged sequence number.
346
     */
347
    inline SequenceNumber_t changes_low_mark() const
348
0
    {
349
0
        return changes_low_mark_;
350
0
    }
351
352
    /*!
353
     * Get the first sequence number not relevant that was removed without reader being informed.
354
     * @return First sequence number.
355
     */
356
    inline SequenceNumber_t first_irrelevant_removed() const
357
0
    {
358
0
        return first_irrelevant_removed_;
359
0
    }
360
361
    /*!
362
     * Get the last sequence number not relevant that was removed without reader being informed.
363
     * @return last sequence number.
364
     */
365
    inline SequenceNumber_t last_irrelevant_removed() const
366
0
    {
367
0
        return last_irrelevant_removed_;
368
0
    }
369
370
    /*!
371
     * Reset the interval of sequence numbers not relevant that were removed without reader being informed.
372
     */
373
    inline void reset_irrelevant_removed()
374
0
    {
375
0
        first_irrelevant_removed_ = SequenceNumber_t::unknown();
376
0
        last_irrelevant_removed_ = SequenceNumber_t::unknown();
377
0
    }
378
379
    /**
380
     * Change the interval of nack-supression event.
381
     * @param interval Time from data sending to acknack processing.
382
     */
383
    void update_nack_supression_interval(
384
            const dds::Duration_t& interval);
385
386
    LocatorSelectorEntry* general_locator_selector_entry()
387
0
    {
388
0
        return locator_info_.general_locator_selector_entry();
389
0
    }
390
391
    LocatorSelectorEntry* async_locator_selector_entry()
392
0
    {
393
0
        return locator_info_.async_locator_selector_entry();
394
0
    }
395
396
    RTPSMessageSenderInterface* message_sender()
397
0
    {
398
0
        return &locator_info_;
399
0
    }
400
401
    bool is_datasharing_reader() const
402
0
    {
403
0
        return locator_info_.is_datasharing_reader();
404
0
    }
405
406
    IDataSharingNotifier* datasharing_notifier()
407
0
    {
408
0
        return locator_info_.datasharing_notifier();
409
0
    }
410
411
    const IDataSharingNotifier* datasharing_notifier() const
412
0
    {
413
0
        return locator_info_.datasharing_notifier();
414
0
    }
415
416
    void datasharing_notify()
417
0
    {
418
0
        locator_info_.datasharing_notify();
419
0
    }
420
421
    size_t locators_size() const
422
0
    {
423
0
        return locator_info_.locators_size();
424
0
    }
425
426
    bool active() const
427
0
    {
428
0
        return active_;
429
0
    }
430
431
    void active(
432
            bool active)
433
0
    {
434
0
        active_ = active;
435
0
    }
436
437
    /**
438
     * @brief Check if the sequence number given has been delivered at least once to the transport layer.
439
     *
440
     * @param seq_number Sequence number of the change to check.
441
     * @param found The sequence number has been found in the list of changes pending to be sent/ack.
442
     *              This flag allows to differentiate the case when the change is not found from the one that is found
443
     *              but it has not been delivered yet.
444
     * @return true if the change has been delivered.
445
     * @return false otherwise.
446
     */
447
    bool has_been_delivered(
448
            const SequenceNumber_t& seq_number,
449
            bool& found) const;
450
451
private:
452
453
    //!Is this proxy active? I.e. does it have a remote reader associated?
454
    bool is_active_;
455
    //!Reader locator information
456
    ReaderLocator locator_info_;
457
    //!Taken from QoS
458
    DurabilityKind_t durability_kind_;
459
    //!Taken from QoS
460
    bool expects_inline_qos_;
461
    //!Taken from QoS
462
    bool is_reliable_;
463
    //!Taken from QoS
464
    bool disable_positive_acks_;
465
    //!Pointer to the associated StatefulWriter.
466
    StatefulWriter* writer_;
467
    //!Set of the changes and its state.
468
    ResourceLimitedVector<ChangeForReader_t, std::true_type> changes_for_reader_;
469
    //! Timed Event to manage the delay to mark a change as UNACKED after sending it.
470
    TimedEvent* nack_supression_event_;
471
    TimedEvent* initial_heartbeat_event_;
472
    //! Are timed events enabled?
473
    std::atomic_bool timers_enabled_;
474
    //! Next expected ack/nack count
475
    uint32_t next_expected_acknack_count_;
476
    //! Last  NACKFRAG count.
477
    uint32_t last_nackfrag_count_;
478
479
    //! Sequence number of the lowest change not fully acknowledged.
480
    SequenceNumber_t changes_low_mark_;
481
482
    //! First sequence number not relevant that was removed without reader being informed.
483
    SequenceNumber_t first_irrelevant_removed_ {SequenceNumber_t::unknown()};
484
    //! Last sequence number not relevant that was removed without reader being informed.
485
    SequenceNumber_t last_irrelevant_removed_ {SequenceNumber_t::unknown()};
486
487
    bool active_ = false;
488
489
    //! Listener to notify about data acknowledgements and resends.
490
    StatefulWriterListener* const stateful_writer_listener_ = nullptr;
491
492
    using ChangeIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::iterator;
493
    using ChangeConstIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::const_iterator;
494
495
    void disable_timers();
496
497
    /*
498
     * Converts all changes with a given status to a different status.
499
     * @param previous Status to change.
500
     * @param next Status to adopt.
501
     * @param func Function executed for each change which changes its status.
502
     * @return the number of changes that have been modified.
503
     */
504
    uint32_t convert_status_on_all_changes(
505
            ChangeForReaderStatus_t previous,
506
            ChangeForReaderStatus_t next,
507
            bool notify_resend,
508
            const std::function<void(ChangeForReader_t& change)>& func = {});
509
510
    /*!
511
     * @brief Adds requested fragments. These fragments will be sent in next NackResponseDelay.
512
     * @param [in] seq_num Sequence number to be paired with the requested fragments.
513
     * @param [in] frag_set set containing the requested fragments to be sent.
514
     * @return True if there is at least one requested fragment. False in other case.
515
     */
516
    bool requested_fragment_set(
517
            const SequenceNumber_t& seq_num,
518
            const FragmentNumberSet_t& frag_set);
519
520
    void add_change(
521
            const ChangeForReader_t& change,
522
            bool is_relevant);
523
524
    /**
525
     * @brief Find a change with the specified sequence number.
526
     * @param seq_num Sequence number to find.
527
     * @param exact When false, the first change with a sequence number not less than seq_num will be returned.
528
     * When true, the change with a sequence number value of seq_num will be returned.
529
     * @return Iterator pointing to the change, changes_for_reader_.end() if not found.
530
     */
531
    ChangeIterator find_change(
532
            const SequenceNumber_t& seq_num,
533
            bool exact);
534
535
    /**
536
     * @brief Find a change with the specified sequence number.
537
     * @param seq_num Sequence number to find.
538
     * @return Iterator pointing to the change, changes_for_reader_.end() if not found.
539
     */
540
    ChangeConstIterator find_change(
541
            const SequenceNumber_t& seq_num) const;
542
543
    /**
544
     * @brief Notifies that a change has been acknowledged by this ReaderProxy.
545
     *
546
     * @param chiange  Reference to the ChangeForReader_t that has been acknowledged.
547
     */
548
    void notify_acknowledged(
549
            const ChangeForReader_t& change) const;
550
551
    /**
552
     * @brief Notifies that a change has been resent to this ReaderProxy.
553
     *
554
     * @param change  Reference to the ChangeForReader_t that has been resent.
555
     */
556
    void notify_resent(
557
            const ChangeForReader_t& change) const;
558
559
};
560
561
} /* namespace rtps */
562
} /* namespace fastdds */
563
} /* namespace eprosima */
564
565
#endif // RTPS_WRITER__READERPROXY_HPP