Coverage Report

Created: 2022-08-24 06:19

/src/Fast-DDS/include/fastdds/rtps/writer/StatefulWriter.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file StatefulWriter.h
17
 */
18
19
#ifndef _FASTDDS_RTPS_STATEFULWRITER_H_
20
#define _FASTDDS_RTPS_STATEFULWRITER_H_
21
22
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
23
24
#include <fastdds/rtps/writer/RTPSWriter.h>
25
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
26
#include <fastdds/rtps/history/IChangePool.h>
27
#include <fastdds/rtps/history/IPayloadPool.h>
28
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
29
#include <condition_variable>
30
#include <mutex>
31
32
namespace eprosima {
33
namespace fastrtps {
34
namespace rtps {
35
36
class ReaderProxy;
37
class TimedEvent;
38
39
/**
40
 * Class StatefulWriter, specialization of RTPSWriter that maintains information of each matched Reader.
41
 * @ingroup WRITER_MODULE
42
 */
43
class StatefulWriter : public RTPSWriter
44
{
45
    friend class RTPSParticipantImpl;
46
    friend class ReaderProxy;
47
48
public:
49
50
    //!Destructor
51
    virtual ~StatefulWriter();
52
53
protected:
54
55
    //!Constructor
56
    StatefulWriter(
57
            RTPSParticipantImpl* impl,
58
            const GUID_t& guid,
59
            const WriterAttributes& att,
60
            fastdds::rtps::FlowController* flow_controller,
61
            WriterHistory* hist,
62
            WriterListener* listen = nullptr);
63
64
    StatefulWriter(
65
            RTPSParticipantImpl* impl,
66
            const GUID_t& guid,
67
            const WriterAttributes& att,
68
            const std::shared_ptr<IPayloadPool>& payload_pool,
69
            fastdds::rtps::FlowController* flow_controller,
70
            WriterHistory* hist,
71
            WriterListener* listen = nullptr);
72
73
    StatefulWriter(
74
            RTPSParticipantImpl* impl,
75
            const GUID_t& guid,
76
            const WriterAttributes& att,
77
            const std::shared_ptr<IPayloadPool>& payload_pool,
78
            const std::shared_ptr<IChangePool>& change_pool,
79
            fastdds::rtps::FlowController* flow_controller,
80
            WriterHistory* hist,
81
            WriterListener* listen = nullptr);
82
83
    void rebuild_status_after_load();
84
85
    virtual void print_inconsistent_acknack(
86
            const GUID_t& writer_guid,
87
            const GUID_t& reader_guid,
88
            const SequenceNumber_t& min_requested_sequence_number,
89
            const SequenceNumber_t& max_requested_sequence_number,
90
            const SequenceNumber_t& next_sequence_number);
91
92
private:
93
94
    void init(
95
            RTPSParticipantImpl* pimpl,
96
            const WriterAttributes& att);
97
98
    //!Timed Event to manage the periodic HB to the Reader.
99
    TimedEvent* periodic_hb_event_;
100
101
    //! Timed Event to manage the Acknack response delay.
102
    TimedEvent* nack_response_event_;
103
104
    //! A timed event to mark samples as acknowledget (used only if disable positive ACKs QoS is enabled)
105
    TimedEvent* ack_event_;
106
107
    //!Count of the sent heartbeats.
108
    Count_t m_heartbeatCount;
109
    //!WriterTimes
110
    WriterTimes m_times;
111
112
    //! Vector containing all the remote ReaderProxies.
113
    ResourceLimitedVector<ReaderProxy*> matched_remote_readers_;
114
    //! Vector containing all the inactive, ready for reuse, ReaderProxies.
115
    ResourceLimitedVector<ReaderProxy*> matched_readers_pool_;
116
117
    using ReaderProxyIterator = ResourceLimitedVector<ReaderProxy*>::iterator;
118
    using ReaderProxyConstIterator = ResourceLimitedVector<ReaderProxy*>::const_iterator;
119
120
    //!To avoid notifying twice of the same sequence number
121
    SequenceNumber_t next_all_acked_notify_sequence_;
122
    SequenceNumber_t min_readers_low_mark_;
123
124
    // TODO Join this mutex when main mutex would not be recursive.
125
    std::mutex all_acked_mutex_;
126
    std::condition_variable all_acked_cond_;
127
    // TODO Also remove when main mutex not recursive.
128
    bool all_acked_;
129
    std::condition_variable_any may_remove_change_cond_;
130
    unsigned int may_remove_change_;
131
132
public:
133
134
    /**
135
     * Add a specific change to all ReaderLocators.
136
     * @param p Pointer to the change.
137
     * @param max_blocking_time
138
     */
139
    void unsent_change_added_to_history(
140
            CacheChange_t* p,
141
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;
142
143
    /**
144
     * Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
145
     * @param a_change Pointer to the change that is going to be removed.
146
     * @return True if removed correctly.
147
     */
148
    bool change_removed_by_history(
149
            CacheChange_t* a_change) override;
150
151
    /**
152
     * Sends a change directly to a intraprocess reader.
153
     */
154
    bool intraprocess_delivery(
155
            CacheChange_t* change,
156
            ReaderProxy* reader_proxy);
157
158
    bool intraprocess_gap(
159
            ReaderProxy* reader_proxy,
160
            const SequenceNumber_t& seq_num)
161
0
    {
162
0
        SequenceNumber_t last_seq = seq_num + 1;
163
0
        return intraprocess_gap(reader_proxy, seq_num, last_seq);
164
0
    }
165
166
    bool intraprocess_gap(
167
            ReaderProxy* reader_proxy,
168
            const SequenceNumber_t& first_seq,
169
            const SequenceNumber_t& last_seq);
170
171
    bool intraprocess_heartbeat(
172
            ReaderProxy* reader_proxy,
173
            bool liveliness = false);
174
175
    //!Increment the HB count.
176
    inline void incrementHBCount()
177
0
    {
178
0
        on_heartbeat(++m_heartbeatCount);
179
0
    }
180
181
    /**
182
     * Add a matched reader.
183
     * @param data Pointer to the ReaderProxyData object added.
184
     * @return True if added.
185
     */
186
    bool matched_reader_add(
187
            const ReaderProxyData& data) override;
188
189
    /**
190
     * Remove a matched reader.
191
     * @param reader_guid GUID of the reader to remove.
192
     * @return True if removed.
193
     */
194
    bool matched_reader_remove(
195
            const GUID_t& reader_guid) override;
196
197
    /**
198
     * Tells us if a specific Reader is matched against this writer
199
     * @param reader_guid GUID of the reader to check.
200
     * @return True if it was matched.
201
     */
202
    bool matched_reader_is_matched(
203
            const GUID_t& reader_guid) override;
204
205
    bool is_acked_by_all(
206
            const CacheChange_t* a_change) const override;
207
208
    template <typename Function>
209
    Function for_each_reader_proxy(
210
            Function f) const
211
0
    {
212
        // we cannot directly pass iterators neither const_iterators to matched_readers_ because then the functor would
213
        // be able to modify ReaderProxy elements
214
0
        for ( const ReaderProxy* rp : matched_local_readers_ )
215
0
        {
216
0
            f(rp);
217
0
        }
218
0
        for ( const ReaderProxy* rp : matched_datasharing_readers_ )
219
0
        {
220
0
            f(rp);
221
0
        }
222
0
        for ( const ReaderProxy* rp : matched_remote_readers_ )
223
0
        {
224
0
            f(rp);
225
0
        }
226
227
0
        return f;
228
0
    }
229
230
    bool wait_for_all_acked(
231
            const Duration_t& max_wait) override;
232
233
    bool all_readers_updated();
234
235
    /**
236
     * Remove the change with the minimum SequenceNumber
237
     * @return True if removed.
238
     */
239
    bool try_remove_change(
240
            const std::chrono::steady_clock::time_point& max_blocking_time_point,
241
            std::unique_lock<RecursiveTimedMutex>& lock) override;
242
243
    bool wait_for_acknowledgement(
244
            const SequenceNumber_t& seq,
245
            const std::chrono::steady_clock::time_point& max_blocking_time_point,
246
            std::unique_lock<RecursiveTimedMutex>& lock) override;
247
248
    /**
249
     * Update the Attributes of the Writer.
250
     * @param att New attributes
251
     */
252
    void updateAttributes(
253
            const WriterAttributes& att) override;
254
255
    /**
256
     * Find a Reader Proxy in this writer.
257
     * @param[in] readerGuid The GUID_t of the reader.
258
     * @param[out] RP Pointer to pointer to return the ReaderProxy.
259
     * @return True if correct.
260
     */
261
    bool matched_reader_lookup(
262
            GUID_t& readerGuid,
263
            ReaderProxy** RP);
264
265
    /** Get count of heartbeats
266
     * @return count of heartbeats
267
     */
268
    inline Count_t getHeartbeatCount() const
269
0
    {
270
0
        return this->m_heartbeatCount;
271
0
    }
272
273
    /**
274
     * Get the RTPS participant
275
     * @return RTPS participant
276
     */
277
    inline RTPSParticipantImpl* getRTPSParticipant() const
278
0
    {
279
0
        return mp_RTPSParticipant;
280
0
    }
281
282
    /**
283
     * Get the number of matched readers
284
     * @return Number of the matched readers
285
     */
286
    inline size_t getMatchedReadersSize() const
287
0
    {
288
0
        std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
289
0
        return matched_remote_readers_.size()
290
0
               + matched_local_readers_.size()
291
0
               + matched_datasharing_readers_.size();
292
0
    }
293
294
    /**
295
     * @brief Returns true if disable positive ACKs QoS is enabled
296
     *
297
     * @return True if positive acks are disabled, false otherwise
298
     */
299
    inline bool get_disable_positive_acks() const
300
0
    {
301
0
        return disable_positive_acks_;
302
0
    }
303
304
    /**
305
     * Update the WriterTimes attributes of all associated ReaderProxy.
306
     * @param times WriterTimes parameter.
307
     */
308
    void updateTimes(
309
            const WriterTimes& times);
310
311
    SequenceNumber_t next_sequence_number() const;
312
313
    /**
314
     * @brief Sends a periodic heartbeat
315
     *
316
     * @param final Final flag
317
     * @param liveliness Liveliness flag
318
     *
319
     * @return True on success
320
     */
321
    bool send_periodic_heartbeat(
322
            bool final = false,
323
            bool liveliness = false);
324
325
    /*!
326
     * @brief Sends a heartbeat to a remote reader.
327
     *
328
     * @remarks This function is non thread-safe.
329
     */
330
    void send_heartbeat_to_nts(
331
            ReaderProxy& remoteReaderProxy,
332
            bool liveliness = false,
333
            bool force = false);
334
335
    void perform_nack_response();
336
337
    void perform_nack_supression(
338
            const GUID_t& reader_guid);
339
340
    /**
341
     * Process an incoming ACKNACK submessage.
342
     * @param[in] writer_guid      GUID of the writer the submessage is directed to.
343
     * @param[in] reader_guid      GUID of the reader originating the submessage.
344
     * @param[in] ack_count        Count field of the submessage.
345
     * @param[in] sn_set           Sequence number bitmap field of the submessage.
346
     * @param[in] final_flag       Final flag field of the submessage.
347
     * @param[out] result          true if the writer could process the submessage.
348
     *                             Only valid when returned value is true.
349
     * @return true when the submessage was destinated to this writer, false otherwise.
350
     */
351
    bool process_acknack(
352
            const GUID_t& writer_guid,
353
            const GUID_t& reader_guid,
354
            uint32_t ack_count,
355
            const SequenceNumberSet_t& sn_set,
356
            bool final_flag,
357
            bool& result) override;
358
359
    /**
360
     * Process an incoming NACKFRAG submessage.
361
     * @param[in] writer_guid      GUID of the writer the submessage is directed to.
362
     * @param[in] reader_guid      GUID of the reader originating the submessage.
363
     * @param[in] ack_count        Count field of the submessage.
364
     * @param[in] seq_num          Sequence number field of the submessage.
365
     * @param[in] fragments_state  Sequence number field of the submessage.
366
     * @param[out] result          true if the writer could process the submessage.
367
     *                             Only valid when returned value is true.
368
     * @return true when the submessage was destinated to this writer, false otherwise.
369
     */
370
    virtual bool process_nack_frag(
371
            const GUID_t& writer_guid,
372
            const GUID_t& reader_guid,
373
            uint32_t ack_count,
374
            const SequenceNumber_t& seq_num,
375
            const FragmentNumberSet_t fragments_state,
376
            bool& result) override;
377
378
    /**
379
     * @brief Set a content filter to perform content filtering on this writer.
380
     *
381
     * This method sets a content filter that will be used to check whether a cache change is relevant
382
     * for a reader or not.
383
     *
384
     * @param filter  The content filter to use on this writer. May be @c nullptr to remove the content filter
385
     *                (i.e. treat all samples as relevant).
386
     */
387
    void reader_data_filter(
388
            fastdds::rtps::IReaderDataFilter* filter) final;
389
390
    /**
391
     * @brief Get the content filter used to perform content filtering on this writer.
392
     *
393
     * @return The content filter used on this writer.
394
     */
395
    const fastdds::rtps::IReaderDataFilter* reader_data_filter() const final;
396
397
    /*!
398
     * Tells writer the sample can be sent to the network.
399
     * This function should be used by a fastdds::rtps::FlowController.
400
     *
401
     * @param cache_change Pointer to the CacheChange_t that represents the sample which can be sent.
402
     * @param group RTPSMessageGroup reference uses for generating the RTPS message.
403
     * @param locator_selector RTPSMessageSenderInterface reference uses for selecting locators. The reference has to
404
     * be a member of this RTPSWriter object.
405
     * @param max_blocking_time Future timepoint where blocking send should end.
406
     * @return Return code.
407
     * @note Must be non-thread safe.
408
     */
409
    DeliveryRetCode deliver_sample_nts(
410
            CacheChange_t* cache_change,
411
            RTPSMessageGroup& group,
412
            LocatorSelectorSender& locator_selector,
413
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;
414
415
    LocatorSelectorSender& get_general_locator_selector() override
416
0
    {
417
0
        return locator_selector_general_;
418
0
    }
419
420
    LocatorSelectorSender& get_async_locator_selector() override
421
0
    {
422
0
        return locator_selector_async_;
423
0
    }
424
425
private:
426
427
    bool is_acked_by_all(
428
            const SequenceNumber_t seq) const;
429
430
    void update_reader_info(
431
            LocatorSelectorSender& locator_selector,
432
            bool create_sender_resources);
433
434
    void send_heartbeat_piggyback_nts_(
435
            ReaderProxy* reader,
436
            RTPSMessageGroup& message_group,
437
            LocatorSelectorSender& locator_selector,
438
            uint32_t& last_bytes_processed);
439
440
    void send_heartbeat_nts_(
441
            size_t number_of_readers,
442
            RTPSMessageGroup& message_group,
443
            bool final,
444
            bool liveliness = false);
445
446
    void check_acked_status();
447
448
    /**
449
     * @brief A method called when the ack timer expires
450
     *
451
     * @details Only used if disable positive ACKs QoS is enabled
452
     */
453
    bool ack_timer_expired();
454
455
    void send_heartbeat_to_all_readers();
456
457
    void deliver_sample_to_intraprocesses(
458
            CacheChange_t* change);
459
460
    void deliver_sample_to_datasharing(
461
            CacheChange_t* change);
462
463
    DeliveryRetCode deliver_sample_to_network(
464
            CacheChange_t* change,
465
            RTPSMessageGroup& group,
466
            LocatorSelectorSender& locator_selector,
467
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);
468
469
    void prepare_datasharing_delivery(
470
            CacheChange_t* change);
471
472
    //! True to disable piggyback heartbeats
473
    bool disable_heartbeat_piggyback_;
474
    //! True to disable positive ACKs
475
    bool disable_positive_acks_;
476
    //! Keep duration for disable positive ACKs QoS, in microseconds
477
    std::chrono::duration<double, std::ratio<1, 1000000>> keep_duration_us_;
478
    //! Last acknowledged cache change (only used if using disable positive ACKs QoS)
479
    SequenceNumber_t last_sequence_number_;
480
    //! Biggest sequence number removed from history
481
    SequenceNumber_t biggest_removed_sequence_number_;
482
483
    const uint32_t sendBufferSize_;
484
485
    int32_t currentUsageSendBufferSize_;
486
487
    bool there_are_remote_readers_ = false;
488
    bool there_are_local_readers_ = false;
489
490
    StatefulWriter& operator =(
491
            const StatefulWriter&) = delete;
492
493
    //! The filter for the reader
494
    fastdds::rtps::IReaderDataFilter* reader_data_filter_ = nullptr;
495
    //! Vector containing all the active ReaderProxies for intraprocess delivery.
496
    ResourceLimitedVector<ReaderProxy*> matched_local_readers_;
497
    //! Vector containing all the active ReaderProxies for datasharing delivery.
498
    ResourceLimitedVector<ReaderProxy*> matched_datasharing_readers_;
499
    bool there_are_datasharing_readers_ = false;
500
501
    LocatorSelectorSender locator_selector_general_;
502
503
    LocatorSelectorSender locator_selector_async_;
504
};
505
506
} /* namespace rtps */
507
} /* namespace fastrtps */
508
} /* namespace eprosima */
509
510
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
511
#endif /* _FASTDDS_RTPS_STATEFULWRITER_H_ */