Coverage Report

Created: 2022-08-24 06:19

/src/Fast-DDS/include/fastdds/rtps/writer/ReaderProxy.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016-2019 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file ReaderProxy.h
17
 */
18
#ifndef _FASTDDS_RTPS_WRITER_READERPROXY_H_
19
#define _FASTDDS_RTPS_WRITER_READERPROXY_H_
20
21
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
22
23
#include <fastdds/rtps/attributes/WriterAttributes.h>
24
#include <fastdds/rtps/attributes/RTPSParticipantAllocationAttributes.hpp>
25
26
#include <fastdds/rtps/builtin/data/ReaderProxyData.h>
27
28
#include <fastdds/rtps/common/Types.h>
29
#include <fastdds/rtps/common/Locator.h>
30
#include <fastdds/rtps/common/SequenceNumber.h>
31
#include <fastdds/rtps/common/CacheChange.h>
32
#include <fastdds/rtps/common/FragmentNumber.h>
33
34
#include <fastdds/rtps/writer/ChangeForReader.h>
35
#include <fastdds/rtps/writer/ReaderLocator.h>
36
37
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
38
39
#include <algorithm>
40
#include <mutex>
41
#include <set>
42
#include <atomic>
43
44
namespace eprosima {
45
namespace fastrtps {
46
namespace rtps {
47
48
class StatefulWriter;
49
class TimedEvent;
50
class RTPSReader;
51
class IDataSharingNotifier;
52
class RTPSGapBuilder;
53
54
/**
55
 * ReaderProxy class that helps to keep the state of a specific Reader with respect to the RTPSWriter.
56
 * @ingroup WRITER_MODULE
57
 */
58
class ReaderProxy
59
{
60
public:
61
62
    ~ReaderProxy();
63
64
    /**
65
     * Constructor.
66
     * @param times WriterTimes to use in the ReaderProxy.
67
     * @param loc_alloc Maximum number of remote locators to keep in the ReaderProxy.
68
     * @param writer Pointer to the StatefulWriter creating the reader proxy.
69
     */
70
    ReaderProxy(
71
            const WriterTimes& times,
72
            const RemoteLocatorsAllocationAttributes& loc_alloc,
73
            StatefulWriter* writer);
74
75
    /**
76
     * Activate this proxy associating it to a remote reader.
77
     * @param reader_attributes ReaderProxyData of the reader for which to keep state.
78
     * @param is_datasharing whether the reader is datasharing compatible with the writer or not.
79
     */
80
    void start(
81
            const ReaderProxyData& reader_attributes,
82
            bool is_datasharing = false);
83
84
    /**
85
     * Update information about the remote reader.
86
     * @param reader_attributes ReaderProxyData with updated information of the reader.
87
     * @return true if data was modified, false otherwise.
88
     */
89
    bool update(
90
            const ReaderProxyData& reader_attributes);
91
92
    /**
93
     * Disable this proxy.
94
     */
95
    void stop();
96
97
    /**
98
     * Called when a change is added to the writer's history.
99
     * @param change Information regarding the change added.
100
     * @param is_relevant Specify if change is relevant for this remote reader.
101
     * @param restart_nack_supression Whether nack-supression event should be restarted.
102
     */
103
    void add_change(
104
            const ChangeForReader_t& change,
105
            bool is_relevant,
106
            bool restart_nack_supression);
107
108
    void add_change(
109
            const ChangeForReader_t& change,
110
            bool is_relevant,
111
            bool restart_nack_supression,
112
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);
113
114
    /**
115
     * Check if there are changes pending for this reader.
116
     * @return true when there are pending changes, false otherwise.
117
     */
118
    bool has_changes() const;
119
120
    /**
121
     * Check if a specific change has been already acknowledged for this reader.
122
     * @param seq_num Sequence number of the change to be checked.
123
     * @return true when the change is irrelevant or has been already acknowledged, false otherwise.
124
     */
125
    bool change_is_acked(
126
            const SequenceNumber_t& seq_num) const;
127
128
    /**
129
     * Check if a specific change is marked to be sent to this reader.
130
     *
131
     * @param[in]  seq_num Sequence number of the change to be checked.
132
     * @param[out] next_unsent_frag Return next fragment to be sent.
133
     * @param[out] gap_seq Return, when it is its first delivery (should be relevant seq_num), the sequence number of
134
     * the first sequence of the gap [first, seq_num). Otherwise return SequenceNumber_t::unknown().
135
     * @param[in]  min_seq Minimum sequence number managed by the History. It could be SequenceNumber_t::unknown() if
136
     * history is empty.
137
     * @param[out] need_reactivate_periodic_heartbeat Indicates if the heartbeat period event has to be restarted.
138
     *
139
     * @return true if the change is marked to be sent. False otherwise.
140
     */
141
    bool change_is_unsent(
142
            const SequenceNumber_t& seq_num,
143
            FragmentNumber_t& next_unsent_frag,
144
            SequenceNumber_t& gap_seq,
145
            const SequenceNumber_t& min_seq,
146
            bool& need_reactivate_periodic_heartbeat) const;
147
148
    /**
149
     * Mark all changes up to the one indicated by seq_num as Acknowledged.
150
     * For instance, when seq_num is 30, changes 1-29 are marked as acknowledged.
151
     * @param seq_num Sequence number of the first change not to be marked as acknowledged.
152
     */
153
    void acked_changes_set(
154
            const SequenceNumber_t& seq_num);
155
156
    /**
157
     * Mark all changes in the vector as requested.
158
     * @param seq_num_set Bitmap of sequence numbers.
159
     * @param gap_builder RTPSGapBuilder reference uses for adding  each requested change that is irrelevant for the
160
     * requester.
161
     * @param[in] min_seq_in_history Minimum SequenceNumber_t in the writer's history. If writer's history is empty,
162
     * SequenceNumber_t::unknown() is expected.
163
     * @return true if at least one change has been marked as REQUESTED, false otherwise.
164
     */
165
    bool requested_changes_set(
166
            const SequenceNumberSet_t& seq_num_set,
167
            RTPSGapBuilder& gap_builder,
168
            const SequenceNumber_t& min_seq_in_history);
169
170
    /**
171
     * Performs processing of preemptive acknack
172
     * @param func functor called, if the requester is a local reader, for each changes moved to UNSENT status.
173
     * @return true if a heartbeat should be sent, false otherwise.
174
     */
175
    bool process_initial_acknack(
176
            const std::function<void(ChangeForReader_t& change)>& func);
177
178
    /*!
179
     * @brief Sets a change to a particular status (if present in the ReaderProxy)
180
     * @param seq_num Sequence number of the change to update.
181
     * @param status Status to apply.
182
     * @param restart_nack_supression Whether nack supression event should be restarted or not.
183
     * @param delivered true if change was able to be delivered to its addressees. false otherwise.
184
     */
185
    void from_unsent_to_status(
186
            const SequenceNumber_t& seq_num,
187
            ChangeForReaderStatus_t status,
188
            bool restart_nack_supression,
189
            bool delivered = true);
190
191
    /**
192
     * @brief Mark a particular fragment as sent.
193
     * @param[in]  seq_num Sequence number of the change to update.
194
     * @param[in]  frag_num Fragment number to mark as sent.
195
     * @param[out] was_last_fragment Indicates if the fragment was the last one pending.
196
     * @return true when the change was found, false otherwise.
197
     */
198
    bool mark_fragment_as_sent_for_change(
199
            const SequenceNumber_t& seq_num,
200
            FragmentNumber_t frag_num,
201
            bool& was_last_fragment);
202
203
    /**
204
     * Turns all UNDERWAY changes into UNACKNOWLEDGED.
205
     *
206
     * @return true if at least one change changed its status, false otherwise.
207
     */
208
    bool perform_nack_supression();
209
210
    /**
211
     * Turns all REQUESTED changes into UNSENT.
212
     *
213
     * @param func Function executed for each change which changes its status.
214
     * @return the number of changes that changed its status.
215
     */
216
    uint32_t perform_acknack_response(
217
            const std::function<void(ChangeForReader_t& change)>& func);
218
219
    /**
220
     * Call this to inform a change was removed from history.
221
     * @param seq_num Sequence number of the removed change.
222
     */
223
    void change_has_been_removed(
224
            const SequenceNumber_t& seq_num);
225
226
    /*!
227
     * @brief Returns there is some UNACKNOWLEDGED change.
228
     * @param first_seq_in_history Minimum sequence number in the writer history.
229
     * @return There is some UNACKNOWLEDGED change.
230
     */
231
    bool has_unacknowledged(
232
            const SequenceNumber_t& first_seq_in_history) const;
233
234
    /**
235
     * Get the GUID of the reader represented by this proxy.
236
     * @return the GUID of the reader represented by this proxy.
237
     */
238
    inline const GUID_t& guid() const
239
0
    {
240
0
        return locator_info_.remote_guid();
241
0
    }
242
243
    /**
244
     * Get the durability of the reader represented by this proxy.
245
     * @return the durability of the reader represented by this proxy.
246
     */
247
    inline DurabilityKind_t durability_kind() const
248
0
    {
249
0
        return durability_kind_;
250
0
    }
251
252
    /**
253
     * Check if the reader represented by this proxy expexts inline QOS to be received.
254
     * @return true if the reader represented by this proxy expexts inline QOS to be received.
255
     */
256
    inline bool expects_inline_qos() const
257
0
    {
258
0
        return expects_inline_qos_;
259
0
    }
260
261
    /**
262
     * Check if the reader represented by this proxy is reliable.
263
     * @return true if the reader represented by this proxy is reliable.
264
     */
265
    inline bool is_reliable() const
266
0
    {
267
0
        return is_reliable_;
268
0
    }
269
270
    inline bool disable_positive_acks() const
271
0
    {
272
0
        return disable_positive_acks_;
273
0
    }
274
275
    /**
276
     * Check if the reader represented by this proxy is remote and reliable.
277
     * @return true if the reader represented by this proxy is remote and reliable.
278
     */
279
    inline bool is_remote_and_reliable() const
280
0
    {
281
0
        return !locator_info_.is_local_reader() && !locator_info_.is_datasharing_reader() && is_reliable_;
282
0
    }
283
284
    /**
285
     * Check if the reader is on the same process.
286
     * @return true if the reader is no the same process.
287
     */
288
    inline bool is_local_reader()
289
0
    {
290
0
        return locator_info_.is_local_reader();
291
0
    }
292
293
    /**
294
     * Get the local reader on the same process (if any).
295
     * @return The local reader on the same process.
296
     */
297
    inline RTPSReader* local_reader()
298
0
    {
299
0
        return locator_info_.local_reader();
300
0
    }
301
302
    /**
303
     * Called when an ACKNACK is received to set a new value for the count of the last received ACKNACK.
304
     * @param acknack_count The count of the received ACKNACK.
305
     * @return true if internal count changed (i.e. new ACKNACK is accepted)
306
     */
307
    bool check_and_set_acknack_count(
308
            uint32_t acknack_count)
309
0
    {
310
0
        if (last_acknack_count_ < acknack_count)
311
0
        {
312
0
            last_acknack_count_ = acknack_count;
313
0
            return true;
314
0
        }
315
316
0
        return false;
317
0
    }
318
319
    /**
320
     * Process an incoming NACKFRAG submessage.
321
     * @param reader_guid Destination guid of the submessage.
322
     * @param nack_count Counter field of the submessage.
323
     * @param seq_num Sequence number field of the submessage.
324
     * @param fragments_state Bitmap indicating the requested fragments.
325
     * @return true if a change was modified, false otherwise.
326
     */
327
    bool process_nack_frag(
328
            const GUID_t& reader_guid,
329
            uint32_t nack_count,
330
            const SequenceNumber_t& seq_num,
331
            const FragmentNumberSet_t& fragments_state);
332
333
    /**
334
     * Filter a CacheChange_t using the StatefulWriter's IReaderDataFilter.
335
     * @param change
336
     * @return true if the change is relevant, false otherwise.
337
     */
338
    bool rtps_is_relevant(
339
            CacheChange_t* change) const;
340
341
    /**
342
     * Get the highest fully acknowledged sequence number.
343
     * @return the highest fully acknowledged sequence number.
344
     */
345
    SequenceNumber_t changes_low_mark() const
346
0
    {
347
0
        return changes_low_mark_;
348
0
    }
349
350
    /**
351
     * Change the interval of nack-supression event.
352
     * @param interval Time from data sending to acknack processing.
353
     */
354
    void update_nack_supression_interval(
355
            const Duration_t& interval);
356
357
    LocatorSelectorEntry* general_locator_selector_entry()
358
0
    {
359
0
        return locator_info_.general_locator_selector_entry();
360
0
    }
361
362
    LocatorSelectorEntry* async_locator_selector_entry()
363
0
    {
364
0
        return locator_info_.async_locator_selector_entry();
365
0
    }
366
367
    RTPSMessageSenderInterface* message_sender()
368
0
    {
369
0
        return &locator_info_;
370
0
    }
371
372
    bool is_datasharing_reader() const
373
0
    {
374
0
        return locator_info_.is_datasharing_reader();
375
0
    }
376
377
    IDataSharingNotifier* datasharing_notifier()
378
0
    {
379
0
        return locator_info_.datasharing_notifier();
380
0
    }
381
382
    const IDataSharingNotifier* datasharing_notifier() const
383
0
    {
384
0
        return locator_info_.datasharing_notifier();
385
0
    }
386
387
    void datasharing_notify()
388
0
    {
389
0
        locator_info_.datasharing_notify();
390
0
    }
391
392
    size_t locators_size() const
393
0
    {
394
0
        return locator_info_.locators_size();
395
0
    }
396
397
    bool active() const
398
0
    {
399
0
        return active_;
400
0
    }
401
402
    void active(
403
            bool active)
404
0
    {
405
0
        active_ = active;
406
0
    }
407
408
private:
409
410
    //!Is this proxy active? I.e. does it have a remote reader associated?
411
    bool is_active_;
412
    //!Reader locator information
413
    ReaderLocator locator_info_;
414
    //!Taken from QoS
415
    DurabilityKind_t durability_kind_;
416
    //!Taken from QoS
417
    bool expects_inline_qos_;
418
    //!Taken from QoS
419
    bool is_reliable_;
420
    //!Taken from QoS
421
    bool disable_positive_acks_;
422
    //!Pointer to the associated StatefulWriter.
423
    StatefulWriter* writer_;
424
    //!Set of the changes and its state.
425
    ResourceLimitedVector<ChangeForReader_t, std::true_type> changes_for_reader_;
426
    //! Timed Event to manage the delay to mark a change as UNACKED after sending it.
427
    TimedEvent* nack_supression_event_;
428
    TimedEvent* initial_heartbeat_event_;
429
    //! Are timed events enabled?
430
    std::atomic_bool timers_enabled_;
431
    //! Last ack/nack count
432
    uint32_t last_acknack_count_;
433
    //! Last  NACKFRAG count.
434
    uint32_t last_nackfrag_count_;
435
436
    SequenceNumber_t changes_low_mark_;
437
438
    bool active_ = false;
439
440
    using ChangeIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::iterator;
441
    using ChangeConstIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::const_iterator;
442
443
    void disable_timers();
444
445
    /*
446
     * Converts all changes with a given status to a different status.
447
     * @param previous Status to change.
448
     * @param next Status to adopt.
449
     * @param func Function executed for each change which changes its status.
450
     * @return the number of changes that have been modified.
451
     */
452
    uint32_t convert_status_on_all_changes(
453
            ChangeForReaderStatus_t previous,
454
            ChangeForReaderStatus_t next,
455
            const std::function<void(ChangeForReader_t& change)>& func = {});
456
457
    /*!
458
     * @brief Adds requested fragments. These fragments will be sent in next NackResponseDelay.
459
     * @param[in] seq_num Sequence number to be paired with the requested fragments.
460
     * @param[in] frag_set set containing the requested fragments to be sent.
461
     * @return True if there is at least one requested fragment. False in other case.
462
     */
463
    bool requested_fragment_set(
464
            const SequenceNumber_t& seq_num,
465
            const FragmentNumberSet_t& frag_set);
466
467
    void add_change(
468
            const ChangeForReader_t& change,
469
            bool is_relevant);
470
471
    /**
472
     * @brief Find a change with the specified sequence number.
473
     * @param seq_num Sequence number to find.
474
     * @param exact When false, the first change with a sequence number not less than seq_num will be returned.
475
     * When true, the change with a sequence number value of seq_num will be returned.
476
     * @return Iterator pointing to the change, changes_for_reader_.end() if not found.
477
     */
478
    ChangeIterator find_change(
479
            const SequenceNumber_t& seq_num,
480
            bool exact);
481
482
    /**
483
     * @brief Find a change with the specified sequence number.
484
     * @param seq_num Sequence number to find.
485
     * @return Iterator pointing to the change, changes_for_reader_.end() if not found.
486
     */
487
    ChangeConstIterator find_change(
488
            const SequenceNumber_t& seq_num) const;
489
};
490
491
} /* namespace rtps */
492
} /* namespace fastrtps */
493
} /* namespace eprosima */
494
495
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
496
#endif /* _FASTDDS_RTPS_WRITER_READERPROXY_H_ */