Coverage Report

Created: 2022-08-24 06:19

/src/Fast-DDS/include/fastdds/rtps/writer/RTPSWriter.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file RTPSWriter.h
17
 */
18
19
#ifndef _FASTDDS_RTPS_RTPSWRITER_H_
20
#define _FASTDDS_RTPS_RTPSWRITER_H_
21
22
#include <chrono>
23
#include <functional>
24
#include <memory>
25
#include <mutex>
26
#include <vector>
27
28
#include <fastdds/rtps/Endpoint.h>
29
#include <fastdds/rtps/attributes/HistoryAttributes.h>
30
#include <fastdds/rtps/attributes/WriterAttributes.h>
31
#include <fastdds/rtps/builtin/data/ReaderProxyData.h>
32
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
33
#include <fastdds/rtps/messages/RTPSMessageGroup.h>
34
#include "DeliveryRetCode.hpp"
35
#include "LocatorSelectorSender.hpp"
36
#include <fastrtps/qos/LivelinessLostStatus.h>
37
38
#include <fastdds/statistics/rtps/StatisticsCommon.hpp>
39
40
namespace eprosima {
41
42
namespace fastdds {
43
namespace rtps {
44
45
class FlowController;
46
47
} // namespace rtps
48
49
namespace dds {
50
51
class DataWriterImpl;
52
53
} // namespace dds
54
} // namespace fastdds
55
56
namespace fastrtps {
57
namespace rtps {
58
59
class WriterListener;
60
class WriterHistory;
61
class DataSharingNotifier;
62
struct CacheChange_t;
63
64
/**
65
 * Class RTPSWriter, manages the sending of data to the readers. Is always associated with a HistoryCache.
66
 * @ingroup WRITER_MODULE
67
 */
68
class RTPSWriter
69
    : public Endpoint
70
    , public fastdds::statistics::StatisticsWriterImpl
71
{
72
    friend class WriterHistory;
73
    friend class RTPSParticipantImpl;
74
    friend class RTPSMessageGroup;
75
    friend class fastdds::dds::DataWriterImpl;
76
77
protected:
78
79
    RTPSWriter(
80
            RTPSParticipantImpl* impl,
81
            const GUID_t& guid,
82
            const WriterAttributes& att,
83
            fastdds::rtps::FlowController* flow_controller,
84
            WriterHistory* hist,
85
            WriterListener* listen = nullptr);
86
87
    RTPSWriter(
88
            RTPSParticipantImpl* impl,
89
            const GUID_t& guid,
90
            const WriterAttributes& att,
91
            const std::shared_ptr<IPayloadPool>& payload_pool,
92
            fastdds::rtps::FlowController* flow_controller,
93
            WriterHistory* hist,
94
            WriterListener* listen = nullptr);
95
96
    RTPSWriter(
97
            RTPSParticipantImpl* impl,
98
            const GUID_t& guid,
99
            const WriterAttributes& att,
100
            const std::shared_ptr<IPayloadPool>& payload_pool,
101
            const std::shared_ptr<IChangePool>& change_pool,
102
            fastdds::rtps::FlowController* flow_controller,
103
            WriterHistory* hist,
104
            WriterListener* listen = nullptr);
105
106
    virtual ~RTPSWriter();
107
108
public:
109
110
    /**
111
     * Create a new change based with the provided changeKind.
112
     * @param data Data of the change.
113
     * @param changeKind The type of change.
114
     * @param handle InstanceHandle to assign.
115
     * @return Pointer to the CacheChange or nullptr if incorrect.
116
     */
117
    template<typename T>
118
    CacheChange_t* new_change(
119
            T& data,
120
            ChangeKind_t changeKind,
121
            InstanceHandle_t handle = c_InstanceHandle_Unknown)
122
    {
123
        return new_change([data]() -> uint32_t
124
                       {
125
                           return (uint32_t)T::getCdrSerializedSize(data);
126
                       }, changeKind, handle);
127
    }
128
129
    RTPS_DllAPI CacheChange_t* new_change(
130
            const std::function<uint32_t()>& dataCdrSerializedSize,
131
            ChangeKind_t changeKind,
132
            InstanceHandle_t handle = c_InstanceHandle_Unknown);
133
134
    RTPS_DllAPI CacheChange_t* new_change(
135
            ChangeKind_t changeKind,
136
            InstanceHandle_t handle = c_InstanceHandle_Unknown);
137
138
    /**
139
     * Release a change when it is not being used anymore.
140
     *
141
     * @param change Pointer to the cache change to be released.
142
     *
143
     * @returns whether the operation succeeded or not
144
     *
145
     * @pre
146
     *     @li @c change is not @c nullptr
147
     *     @li @c change points to a cache change obtained from a call to @c this->new_change
148
     *
149
     * @post memory pointed to by @c change is not accessed
150
     */
151
    RTPS_DllAPI bool release_change(
152
            CacheChange_t* change);
153
154
    /**
155
     * Add a matched reader.
156
     * @param data Pointer to the ReaderProxyData object added.
157
     * @return True if added.
158
     */
159
    RTPS_DllAPI virtual bool matched_reader_add(
160
            const ReaderProxyData& data) = 0;
161
162
    /**
163
     * Remove a matched reader.
164
     * @param reader_guid GUID of the reader to remove.
165
     * @return True if removed.
166
     */
167
    RTPS_DllAPI virtual bool matched_reader_remove(
168
            const GUID_t& reader_guid) = 0;
169
170
    /**
171
     * Tells us if a specific Reader is matched against this writer.
172
     * @param reader_guid GUID of the reader to check.
173
     * @return True if it was matched.
174
     */
175
    RTPS_DllAPI virtual bool matched_reader_is_matched(
176
            const GUID_t& reader_guid) = 0;
177
178
    /**
179
     * @brief Set a content filter to perform content filtering on this writer.
180
     *
181
     * This method sets a content filter that will be used to check whether a cache change is relevant
182
     * for a reader or not.
183
     *
184
     * @param filter  The content filter to use on this writer. May be @c nullptr to remove the content filter
185
     *                (i.e. treat all samples as relevant).
186
     */
187
    RTPS_DllAPI virtual void reader_data_filter(
188
            fastdds::rtps::IReaderDataFilter* filter) = 0;
189
190
    /**
191
     * @brief Get the content filter used to perform content filtering on this writer.
192
     *
193
     * @return The content filter used on this writer.
194
     */
195
    RTPS_DllAPI virtual const fastdds::rtps::IReaderDataFilter* reader_data_filter() const = 0;
196
197
    /**
198
     * Check if a specific change has been acknowledged by all Readers.
199
     * Is only useful in reliable Writer. In BE Writers returns false when pending to be sent.
200
     * @return True if acknowledged by all.
201
     */
202
    RTPS_DllAPI virtual bool is_acked_by_all(
203
            const CacheChange_t* /*a_change*/) const
204
0
    {
205
0
        return false;
206
0
    }
207
208
    /**
209
     * Waits until all changes were acknowledged or max_wait.
210
     * @return True if all were acknowledged.
211
     */
212
    RTPS_DllAPI virtual bool wait_for_all_acked(
213
            const Duration_t& /*max_wait*/)
214
0
    {
215
0
        return true;
216
0
    }
217
218
    /**
219
     * Update the Attributes of the Writer.
220
     * @param att New attributes
221
     */
222
    RTPS_DllAPI virtual void updateAttributes(
223
            const WriterAttributes& att) = 0;
224
225
    /**
226
     * Get Min Seq Num in History.
227
     * @return Minimum sequence number in history
228
     */
229
    RTPS_DllAPI SequenceNumber_t get_seq_num_min();
230
231
    /**
232
     * Get Max Seq Num in History.
233
     * @return Maximum sequence number in history
234
     */
235
    RTPS_DllAPI SequenceNumber_t get_seq_num_max();
236
237
    /**
238
     * Get maximum size of the serialized type
239
     * @return Maximum size of the serialized type
240
     */
241
    RTPS_DllAPI uint32_t getTypeMaxSerialized();
242
243
    //!Get maximum size of the data
244
    uint32_t getMaxDataSize();
245
246
    //! Calculates the maximum size of the data
247
    uint32_t calculateMaxDataSize(
248
            uint32_t length);
249
250
    /**
251
     * Get listener
252
     * @return Listener
253
     */
254
    RTPS_DllAPI inline WriterListener* getListener()
255
0
    {
256
0
        return mp_listener;
257
0
    }
258
259
    RTPS_DllAPI inline bool set_listener(
260
            WriterListener* listener)
261
0
    {
262
0
        mp_listener = listener;
263
0
        return true;
264
0
    }
265
266
    /**
267
     * Get the publication mode
268
     * @return publication mode
269
     */
270
    RTPS_DllAPI inline bool isAsync() const
271
0
    {
272
0
        return is_async_;
273
0
    }
274
275
    /**
276
     * Remove an specified max number of changes
277
     * @param max Maximum number of changes to remove.
278
     * @return at least one change has been removed
279
     */
280
    RTPS_DllAPI bool remove_older_changes(
281
            unsigned int max = 0);
282
283
    /**
284
     * Tries to remove a change waiting a maximum of the provided microseconds.
285
     * @param max_blocking_time_point Maximum time to wait for.
286
     * @param lock Lock of the Change list.
287
     * @return at least one change has been removed
288
     */
289
    virtual bool try_remove_change(
290
            const std::chrono::steady_clock::time_point& max_blocking_time_point,
291
            std::unique_lock<RecursiveTimedMutex>& lock) = 0;
292
293
    /**
294
     * Waits till a change has been acknowledged.
295
     * @param seq Sequence number to wait for acknowledgement.
296
     * @param max_blocking_time_point Maximum time to wait for.
297
     * @param lock Lock of the Change list.
298
     * @return true when change was acknowledged, false when timeout is reached.
299
     */
300
    virtual bool wait_for_acknowledgement(
301
            const SequenceNumber_t& seq,
302
            const std::chrono::steady_clock::time_point& max_blocking_time_point,
303
            std::unique_lock<RecursiveTimedMutex>& lock) = 0;
304
305
#ifdef FASTDDS_STATISTICS
306
307
    /*
308
     * Add a listener to receive statistics backend callbacks
309
     * @param listener
310
     * @return true if successfully added
311
     */
312
    RTPS_DllAPI bool add_statistics_listener(
313
            std::shared_ptr<fastdds::statistics::IListener> listener);
314
315
    /*
316
     * Remove a listener from receiving statistics backend callbacks
317
     * @param listener
318
     * @return true if successfully removed
319
     */
320
    RTPS_DllAPI bool remove_statistics_listener(
321
            std::shared_ptr<fastdds::statistics::IListener> listener);
322
323
#endif // FASTDDS_STATISTICS
324
325
    /**
326
     * Get RTPS participant
327
     * @return RTPS participant
328
     */
329
    inline RTPSParticipantImpl* getRTPSParticipant() const
330
0
    {
331
0
        return mp_RTPSParticipant;
332
0
    }
333
334
    /**
335
     * Enable or disable sending data to readers separately
336
     * NOTE: This will only work for synchronous writers
337
     * @param enable If separate sending should be enabled
338
     */
339
    void set_separate_sending (
340
            bool enable)
341
0
    {
342
0
        m_separateSendingEnabled = enable;
343
0
    }
344
345
    /**
346
     * Inform if data is sent to readers separately
347
     * @return true if separate sending is enabled
348
     */
349
    bool get_separate_sending () const
350
0
    {
351
0
        return m_separateSendingEnabled;
352
0
    }
353
354
    /**
355
     * Process an incoming ACKNACK submessage.
356
     * @param[in] writer_guid      GUID of the writer the submessage is directed to.
357
     * @param[in] reader_guid      GUID of the reader originating the submessage.
358
     * @param[in] ack_count        Count field of the submessage.
359
     * @param[in] sn_set           Sequence number bitmap field of the submessage.
360
     * @param[in] final_flag       Final flag field of the submessage.
361
     * @param[out] result          true if the writer could process the submessage.
362
     *                             Only valid when returned value is true.
363
     * @return true when the submessage was destinated to this writer, false otherwise.
364
     */
365
    virtual bool process_acknack(
366
            const GUID_t& writer_guid,
367
            const GUID_t& reader_guid,
368
            uint32_t ack_count,
369
            const SequenceNumberSet_t& sn_set,
370
            bool final_flag,
371
            bool& result)
372
0
    {
373
0
        (void)reader_guid; (void)ack_count; (void)sn_set; (void)final_flag;
374
0
375
0
        result = false;
376
0
        return writer_guid == m_guid;
377
0
    }
378
379
    /**
380
     * Process an incoming NACKFRAG submessage.
381
     * @param[in] writer_guid      GUID of the writer the submessage is directed to.
382
     * @param[in] reader_guid      GUID of the reader originating the submessage.
383
     * @param[in] ack_count        Count field of the submessage.
384
     * @param[in] seq_num          Sequence number field of the submessage.
385
     * @param[in] fragments_state  Fragment number bitmap field of the submessage.
386
     * @param[out] result          true if the writer could process the submessage.
387
     *                             Only valid when returned value is true.
388
     * @return true when the submessage was destinated to this writer, false otherwise.
389
     */
390
    virtual bool process_nack_frag(
391
            const GUID_t& writer_guid,
392
            const GUID_t& reader_guid,
393
            uint32_t ack_count,
394
            const SequenceNumber_t& seq_num,
395
            const FragmentNumberSet_t fragments_state,
396
            bool& result)
397
0
    {
398
0
        (void)reader_guid; (void)ack_count; (void)seq_num; (void)fragments_state;
399
0
400
0
        result = false;
401
0
        return writer_guid == m_guid;
402
0
    }
403
404
    /**
405
     * @brief A method to retrieve the liveliness kind
406
     *
407
     * @return Liveliness kind
408
     */
409
    const LivelinessQosPolicyKind& get_liveliness_kind() const;
410
411
    /**
412
     * @brief A method to retrieve the liveliness lease duration
413
     *
414
     * @return Lease duration
415
     */
416
    const Duration_t& get_liveliness_lease_duration() const;
417
418
    /**
419
     * @brief A method to return the liveliness announcement period
420
     *
421
     * @return The announcement period
422
     */
423
    const Duration_t& get_liveliness_announcement_period() const;
424
425
    //! Liveliness lost status of this writer
426
    LivelinessLostStatus liveliness_lost_status_;
427
428
    /**
429
     * @return Whether the writer is data sharing compatible or not
430
     */
431
    bool is_datasharing_compatible() const;
432
433
    /*!
434
     * Tells writer the sample can be sent to the network.
435
     * This function should be used by a fastdds::rtps::FlowController.
436
     *
437
     * @param cache_change Pointer to the CacheChange_t that represents the sample which can be sent.
438
     * @param group RTPSMessageGroup reference uses for generating the RTPS message.
439
     * @param locator_selector RTPSMessageSenderInterface reference uses for selecting locators. The reference has to
440
     * be a member of this RTPSWriter object.
441
     * @param max_blocking_time Future timepoint where blocking send should end.
442
     * @return Return code.
443
     * @note Must be non-thread safe.
444
     */
445
    virtual DeliveryRetCode deliver_sample_nts(
446
            CacheChange_t* cache_change,
447
            RTPSMessageGroup& group,
448
            LocatorSelectorSender& locator_selector,
449
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0;
450
451
    virtual LocatorSelectorSender& get_general_locator_selector() = 0;
452
453
    virtual LocatorSelectorSender& get_async_locator_selector() = 0;
454
455
    /**
456
     * Send a message through this interface.
457
     *
458
     * @param message Pointer to the buffer with the message already serialized.
459
     * @param locator_selector RTPSMessageSenderInterface reference uses for selecting locators. The reference has to
460
     * be a member of this RTPSWriter object.
461
     * @param max_blocking_time_point Future timepoint where blocking send should end.
462
     */
463
    virtual bool send_nts(
464
            CDRMessage_t* message,
465
            const LocatorSelectorSender& locator_selector,
466
            std::chrono::steady_clock::time_point& max_blocking_time_point) const;
467
468
protected:
469
470
    //!Is the data sent directly or announced by HB and THEN sent to the ones who ask for it?.
471
    bool m_pushMode = true;
472
473
    //! Flow controller.
474
    fastdds::rtps::FlowController* flow_controller_;
475
476
    //!WriterHistory
477
    WriterHistory* mp_history = nullptr;
478
    //!Listener
479
    WriterListener* mp_listener = nullptr;
480
    //!Asynchronous publication activated
481
    bool is_async_ = false;
482
    //!Separate sending activated
483
    bool m_separateSendingEnabled = false;
484
485
    //! The liveliness kind of this writer
486
    LivelinessQosPolicyKind liveliness_kind_;
487
    //! The liveliness lease duration of this writer
488
    Duration_t liveliness_lease_duration_;
489
    //! The liveliness announcement period
490
    Duration_t liveliness_announcement_period_;
491
492
    void add_guid(
493
            LocatorSelectorSender& locator_selector,
494
            const GUID_t& remote_guid);
495
496
    void compute_selected_guids(
497
            LocatorSelectorSender& locator_selector);
498
499
    void update_cached_info_nts(
500
            LocatorSelectorSender& locator_selector);
501
502
    /**
503
     * Add a change to the unsent list.
504
     * @param change Pointer to the change to add.
505
     * @param max_blocking_time
506
     */
507
    virtual void unsent_change_added_to_history(
508
            CacheChange_t* change,
509
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0;
510
511
    /**
512
     * Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
513
     * @param a_change Pointer to the change that is going to be removed.
514
     * @return True if removed correctly.
515
     */
516
    virtual bool change_removed_by_history(
517
            CacheChange_t* a_change) = 0;
518
519
    bool is_datasharing_compatible_with(
520
            const ReaderProxyData& rdata) const;
521
522
    bool is_pool_initialized() const;
523
524
    template<typename Functor>
525
    bool send_data_or_fragments(
526
            RTPSMessageGroup& group,
527
            CacheChange_t* change,
528
            bool inline_qos,
529
            Functor sent_fun)
530
    {
531
        bool sent_ok = true;
532
533
        uint32_t n_fragments = change->getFragmentCount();
534
        if (n_fragments > 0)
535
        {
536
            for (FragmentNumber_t frag = 1; frag <= n_fragments; frag++)
537
            {
538
                sent_ok &= group.add_data_frag(*change, frag, inline_qos);
539
                if (sent_ok)
540
                {
541
                    sent_fun(change, frag);
542
                }
543
                else
544
                {
545
                    logError(RTPS_WRITER, "Error sending fragment (" << change->sequenceNumber << ", " << frag << ")");
546
                    break;
547
                }
548
            }
549
        }
550
        else
551
        {
552
            sent_ok = group.add_data(*change, inline_qos);
553
            if (sent_ok)
554
            {
555
                sent_fun(change, 0);
556
            }
557
            else
558
            {
559
                logError(RTPS_WRITER, "Error sending change " << change->sequenceNumber);
560
            }
561
        }
562
563
        return sent_ok;
564
    }
565
566
    void add_statistics_sent_submessage(
567
            CacheChange_t* change,
568
            size_t num_locators);
569
570
    void deinit();
571
572
private:
573
574
    RecursiveTimedMutex& get_mutex()
575
0
    {
576
0
        return mp_mutex;
577
0
    }
578
579
    RTPSWriter& operator =(
580
            const RTPSWriter&) = delete;
581
582
    void init(
583
            const std::shared_ptr<IPayloadPool>& payload_pool,
584
            const std::shared_ptr<IChangePool>& change_pool,
585
            const WriterAttributes& att);
586
587
588
    RTPSWriter* next_[2] = { nullptr, nullptr };
589
};
590
591
} /* namespace rtps */
592
} /* namespace fastrtps */
593
} /* namespace eprosima */
594
595
#endif /* _FASTDDS_RTPS_RTPSWRITER_H_ */