Coverage Report

Created: 2026-05-23 06:42

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/Fast-DDS/src/cpp/fastdds/publisher/DataWriterImpl.hpp
Line
Count
Source
1
// Copyright 2019, 2020 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file DataWriterImpl.hpp
17
 */
18
19
#ifndef _FASTDDS_DATAWRITERIMPL_HPP_
20
#define _FASTDDS_DATAWRITERIMPL_HPP_
21
22
#include <memory>
23
#include <mutex>
24
25
#include <fastdds/dds/builtin/topic/PublicationBuiltinTopicData.hpp>
26
#include <fastdds/dds/core/ReturnCode.hpp>
27
#include <fastdds/dds/core/status/BaseStatus.hpp>
28
#include <fastdds/dds/core/status/DeadlineMissedStatus.hpp>
29
#include <fastdds/dds/core/status/IncompatibleQosStatus.hpp>
30
#include <fastdds/dds/publisher/DataWriter.hpp>
31
#include <fastdds/dds/publisher/DataWriterListener.hpp>
32
#include <fastdds/dds/publisher/qos/DataWriterQos.hpp>
33
#include <fastdds/dds/topic/Topic.hpp>
34
#include <fastdds/dds/topic/TypeSupport.hpp>
35
#include <fastdds/rtps/attributes/WriterAttributes.hpp>
36
#include <fastdds/rtps/common/Guid.hpp>
37
#include <fastdds/rtps/common/LocatorList.hpp>
38
#include <fastdds/rtps/common/SerializedPayload.hpp>
39
#include <fastdds/rtps/common/WriteParams.hpp>
40
#include <fastdds/rtps/history/IChangePool.hpp>
41
#include <fastdds/rtps/history/IPayloadPool.hpp>
42
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
43
#include <fastdds/rtps/writer/WriterListener.hpp>
44
45
#include <fastdds/publisher/DataWriterHistory.hpp>
46
#include <fastdds/publisher/filtering/ReaderFilterCollection.hpp>
47
#include <rtps/DataSharing/DataSharingPayloadPool.hpp>
48
#include <rtps/history/ITopicPayloadPool.h>
49
#include <rtps/history/PoolConfig.h>
50
51
namespace eprosima {
52
namespace fastdds {
53
namespace rtps {
54
55
class BaseWriter;
56
class RTPSWriter;
57
class RTPSParticipant;
58
class TimedEvent;
59
60
} // namespace rtps
61
62
#ifdef FASTDDS_STATISTICS
63
namespace statistics {
64
namespace dds {
65
class DomainParticipantImpl;
66
} // namespace dds
67
} // namespace statistics
68
#endif // FASTDDS_STATISTICS
69
70
namespace dds {
71
72
class PublisherListener;
73
class PublisherImpl;
74
class Publisher;
75
76
/**
77
 * Class DataWriterImpl, contains the actual implementation of the behaviour of the DataWriter.
78
 * @ingroup FASTDDS_MODULE
79
 */
80
class DataWriterImpl : protected rtps::IReaderDataFilter
81
{
82
    using LoanInitializationKind = DataWriter::LoanInitializationKind;
83
    using SerializedPayload_t = eprosima::fastdds::rtps::SerializedPayload_t;
84
    using CacheChange_t = eprosima::fastdds::rtps::CacheChange_t;
85
    class LoanCollection;
86
87
protected:
88
89
    friend class PublisherImpl;
90
91
#ifdef FASTDDS_STATISTICS
92
    friend class eprosima::fastdds::statistics::dds::DomainParticipantImpl;
93
#endif // FASTDDS_STATISTICS
94
95
    /**
96
     * Create a data writer, assigning its pointer to the associated writer.
97
     * Don't use directly, create Publisher using DomainRTPSParticipant static function.
98
     */
99
    DataWriterImpl(
100
            PublisherImpl* p,
101
            TypeSupport type,
102
            Topic* topic,
103
            const DataWriterQos& qos,
104
            DataWriterListener* listener = nullptr,
105
            std::shared_ptr<fastdds::rtps::IPayloadPool> payload_pool = nullptr);
106
107
    DataWriterImpl(
108
            PublisherImpl* p,
109
            TypeSupport type,
110
            Topic* topic,
111
            const DataWriterQos& qos,
112
            const fastdds::rtps::EntityId_t& entity_id,
113
            DataWriterListener* listener = nullptr);
114
115
public:
116
117
    virtual ~DataWriterImpl();
118
119
    /**
120
     * Enable this object.
121
     * The required lower layer entities will be created.
122
     *
123
     * @pre This method has not previously returned RETCODE_OK
124
     *
125
     * @return RETCODE_OK if all the lower layer entities have been correctly created.
126
     * @return Other standard return codes on error.
127
     */
128
    virtual ReturnCode_t enable();
129
130
    /**
131
     * Check if the preconditions to delete this object are met.
132
     *
133
     * @return RETCODE_PRECONDITION_NOT_MET if the preconditions to delete this object are not met.
134
     * @return RETCODE_OK if it is safe to delete this object.
135
     */
136
    ReturnCode_t check_delete_preconditions();
137
138
    /**
139
     * Get a pointer to the internal pool where the user could directly write.
140
     *
141
     * @param [out] sample          Pointer to the sample on the internal pool.
142
     * @param [in]  initialization  How to initialize the loaned sample.
143
     *
144
     * @return RETCODE_ILLEGAL_OPERATION when the type does not support loans.
145
     * @return RETCODE_OUT_OF_RESOURCES if the pool has been exhausted.
146
     * @return RETCODE_OK if a pointer to a sample is successfully obtained.
147
     */
148
    ReturnCode_t loan_sample(
149
            void*& sample,
150
            LoanInitializationKind initialization);
151
152
    /**
153
     * Discards a loaned sample pointer.
154
     *
155
     * @param [in,out] sample  Pointer to the previously loaned sample.
156
     *
157
     * @return RETCODE_ILLEGAL_OPERATION when the type does not support loans.
158
     * @return RETCODE_BAD_PARAMETER if the pointer does not correspond to a loaned sample.
159
     * @return RETCODE_OK if the loan is successfully discarded.
160
     */
161
    ReturnCode_t discard_loan(
162
            void*& sample);
163
164
    /**
165
     * Write data to the topic.
166
     *
167
     * @param data Pointer to the data.
168
     *
169
     * @return any of the standard return codes.
170
     */
171
    ReturnCode_t write(
172
            const void* const data);
173
174
    /**
175
     * Write data with params to the topic.
176
     *
177
     * @param data Pointer to the data.
178
     * @param params Extra write parameters.
179
     *
180
     * @return any of the standard return codes.
181
     */
182
    ReturnCode_t write(
183
            const void* const data,
184
            fastdds::rtps::WriteParams& params);
185
186
    /**
187
     * @brief Implementation of the DDS `write` operation.
188
     *
189
     * @param [in] data    Pointer to the data to publish.
190
     * @param [in] handle  Handle of the instance to update. The special value @c HANDLE_NIL can be used to indicate
191
     *                    that the instance should be automatically calculated.
192
     *
193
     * @return any of the standard return codes.
194
     */
195
    ReturnCode_t write(
196
            const void* const data,
197
            const InstanceHandle_t& handle);
198
199
    /**
200
     * @brief Implementation of the DDS `write_w_timestamp` operation.
201
     *
202
     * @param [in] data        Pointer to the data to publish.
203
     * @param [in] handle      Handle of the instance to update. The special value @c HANDLE_NIL can be used to indicate
204
     *                        that the instance should be automatically calculated.
205
     * @param [in] timestamp   Timestamp to associate to the sample info of the published data.
206
     *
207
     * @return any of the standard return codes.
208
     */
209
    ReturnCode_t write_w_timestamp(
210
            const void* const data,
211
            const InstanceHandle_t& handle,
212
            const fastdds::dds::Time_t& timestamp);
213
214
    /**
215
     * @brief Implementation of the DDS `register_instance` operation.
216
     * It deduces the instance's key and tries to get resources in the DataWriterHistory.
217
     *
218
     * @param [in] instance Sample used to get the instance's key.
219
     *
220
     * @return Handle containing the instance's key.
221
     * This handle could be used in successive `write` or `dispose` operations.
222
     * In case of error, HANDLE_NIL will be returned.
223
     */
224
    InstanceHandle_t register_instance(
225
            const void* const instance);
226
227
    /**
228
     * @brief Implementation of the DDS `register_instance_w_timestamp` operation.
229
     * It deduces the instance's key and tries to get resources in the DataWriterHistory.
230
     *
231
     * @param [in] instance Sample used to get the instance's key.
232
     * @param [in] timestamp Timestamp to set on the instance registration operation.
233
     *
234
     * @return Handle containing the instance's key.
235
     * This handle could be used in successive `write` or `dispose` operations.
236
     * In case of error, HANDLE_NIL will be returned.
237
     */
238
    InstanceHandle_t register_instance_w_timestamp(
239
            const void* const instance,
240
            const fastdds::dds::Time_t& timestamp);
241
242
    /**
243
     * @brief Implementation of the DDS `unregister_instance` and `dispose` operations.
244
     * It sends a CacheChange_t with a kind that depends on the `dispose` parameter and
245
     * `writer_data_lifecycle` QoS.
246
     *
247
     * @param [in] instance  Sample used to deduce instance's key in case of `handle` parameter is HANDLE_NIL.
248
     * @param [in] handle    Instance's key to be unregistered or disposed.
249
     * @param [in] dispose   If `dispose` is `false`, a CacheChange_t with kind set to NOT_ALIVE_UNREGISTERED is sent,
250
     *                      or if `writer_data_lifecycle.autodispose_unregistered_instances` is `true` then it is sent
251
     *                      with kind set to NOT_ALIVE_DISPOSED_UNREGISTERED.
252
     *                      If `dispose` is `true`, a CacheChange_t with kind set to NOT_ALIVE_DISPOSED is sent.
253
     *
254
     * @return Returns the operation's result.
255
     * If the operation finishes successfully, RETCODE_OK is returned.
256
     */
257
    ReturnCode_t unregister_instance(
258
            const void* const instance,
259
            const InstanceHandle_t& handle,
260
            bool dispose = false);
261
262
    /**
263
     * @brief Implementation of the DDS `unregister_instance_w_timestamp` and `dispose_w_timestamp` operations.
264
     * It sends a CacheChange_t with a kind that depends on the `dispose` parameter and
265
     * `writer_data_lifecycle` QoS.
266
     *
267
     * @param [in] instance  Sample used to deduce instance's key in case of `handle` parameter is HANDLE_NIL.
268
     * @param [in] handle    Instance's key to be unregistered or disposed.
269
     * @param [in] timestamp Source timestamp to set on the CacheChange_t being sent.
270
     * @param [in] dispose   If `dispose` is `false`, a CacheChange_t with kind set to NOT_ALIVE_UNREGISTERED is sent,
271
     *                      or if `writer_data_lifecycle.autodispose_unregistered_instances` is `true` then it is sent
272
     *                      with kind set to NOT_ALIVE_DISPOSED_UNREGISTERED.
273
     *                      If `dispose` is `true`, a CacheChange_t with kind set to NOT_ALIVE_DISPOSED is sent.
274
     *
275
     * @return Returns the operation's result.
276
     * If the operation finishes successfully, RETCODE_OK is returned.
277
     */
278
    ReturnCode_t unregister_instance_w_timestamp(
279
            const void* const instance,
280
            const InstanceHandle_t& handle,
281
            const fastdds::dds::Time_t& timestamp,
282
            bool dispose = false);
283
284
    /**
285
     *
286
     * @return
287
     */
288
    const fastdds::rtps::GUID_t& guid() const;
289
290
    InstanceHandle_t get_instance_handle() const;
291
292
    /**
293
     * Get topic data type
294
     * @return Topic data type
295
     */
296
    TypeSupport get_type() const
297
0
    {
298
0
        return type_;
299
0
    }
300
301
    ReturnCode_t wait_for_acknowledgments(
302
            const fastdds::dds::Duration_t& max_wait);
303
304
    ReturnCode_t wait_for_acknowledgments(
305
            const void* const instance,
306
            const InstanceHandle_t& handle,
307
            const fastdds::dds::Duration_t& max_wait);
308
309
    ReturnCode_t get_publication_matched_status(
310
            PublicationMatchedStatus& status);
311
312
    ReturnCode_t get_offered_deadline_missed_status(
313
            OfferedDeadlineMissedStatus& status);
314
315
    ReturnCode_t get_offered_incompatible_qos_status(
316
            OfferedIncompatibleQosStatus& status);
317
318
    ReturnCode_t set_qos(
319
            const DataWriterQos& qos);
320
321
    const DataWriterQos& get_qos() const;
322
323
    Topic* get_topic() const;
324
325
    const DataWriterListener* get_listener() const;
326
327
    ReturnCode_t set_listener(
328
            DataWriterListener* listener);
329
330
    /**
331
     * This operation can be used to retrieve the instance key that corresponds to an
332
     * @ref eprosima::fastdds::dds::Entity::instance_handle_ "instance_handle".
333
     * The operation will only fill the fields that form the key inside the key_holder instance.
334
     *
335
     * This operation may return BAD_PARAMETER if the InstanceHandle_t handle does not correspond to an existing
336
     * data-object known to the DataWriter. If the implementation is not able to check invalid handles then the result
337
     * in this situation is unspecified.
338
     *
339
     * @param [in,out] key_holder  Sample where the key fields will be returned.
340
     * @param [in] handle          Handle to the instance to retrieve the key values from.
341
     *
342
     * @return Any of the standard return codes.
343
     */
344
    ReturnCode_t get_key_value(
345
            void* key_holder,
346
            const InstanceHandle_t& handle);
347
348
    ReturnCode_t get_liveliness_lost_status(
349
            LivelinessLostStatus& status);
350
351
    const Publisher* get_publisher() const;
352
353
    ReturnCode_t assert_liveliness();
354
355
    //! Remove all listeners in the hierarchy to allow a quiet destruction
356
    virtual void disable();
357
358
    /**
359
     * Removes all changes from the History.
360
     * @param [out] removed Number of removed elements
361
     * @return RETCODE_OK if correct, RETCODE_ERROR if not.
362
     */
363
    ReturnCode_t clear_history(
364
            size_t* removed);
365
366
    /**
367
     * @brief Get the list of locators from which this DataWriter may send data.
368
     *
369
     * @param [out] locators  LocatorList where the list of locators will be stored.
370
     *
371
     * @return NOT_ENABLED if the reader has not been enabled.
372
     * @return OK if a list of locators is returned.
373
     */
374
    ReturnCode_t get_sending_locators(
375
            rtps::LocatorList& locators) const;
376
377
    /**
378
     * Called from the DomainParticipant when a filter factory is being unregistered.
379
     *
380
     * @param filter_class_name  The class name under which the factory was registered.
381
     */
382
    void filter_is_being_removed(
383
            const char* filter_class_name);
384
385
    /**
386
     * @brief Retrieves in a subscription associated with the @ref DataWriter
387
     *
388
     * @param[out] subscription_data subscription data struct
389
     * @param subscription_handle @ref InstanceHandle_t of the subscription
390
     * @return @ref RETCODE_BAD_PARAMETER if the DataWriter is not matched with
391
     * the given subscription handle, @ref RETCODE_OK otherwise.
392
     *
393
     */
394
    ReturnCode_t get_matched_subscription_data(
395
            SubscriptionBuiltinTopicData& subscription_data,
396
            const InstanceHandle_t& subscription_handle) const;
397
398
    /**
399
     * @brief Fills the given vector with the @ref InstanceHandle_t of matched DataReaders
400
     *
401
     * @param[out] subscription_handles Vector where the @ref InstanceHandle_t are returned
402
     * @return @ref RETCODE_OK if the operation succeeds.
403
     *
404
     * @note Returning an empty list is not an error, it returns @ref RETCODE_OK.
405
     *
406
     */
407
    ReturnCode_t get_matched_subscriptions(
408
            std::vector<InstanceHandle_t>& subscription_handles) const;
409
410
    /**
411
     * Retrieve the publication data discovery information.
412
     *
413
     * @param [out] publication_data The publication data discovery information.
414
     *
415
     * @return NOT_ENABLED if the writer has not been enabled.
416
     * @return OK if the publication data is returned.
417
     */
418
    ReturnCode_t get_publication_builtin_topic_data(
419
            PublicationBuiltinTopicData& publication_data) const;
420
421
    /**
422
     *  @brief Set a sample prefilter to be used. This filter is always
423
     *  evaluated before sending the sample to any DataReader and prior to
424
     *  any content filtering.
425
     *  Passing a nullptr disables prefiltering.
426
     *
427
     * @param prefilter The prefilter to be set.
428
     *
429
     * @return RETCODE_OK if the prefilter is set correctly.
430
     *
431
     * @note Prefiltering is currently incompatible with DataSharing.
432
     */
433
    ReturnCode_t set_sample_prefilter(
434
            std::shared_ptr<IContentFilter> prefilter);
435
436
    /**
437
     * This operation sets the key of the DataReader that is related to this DataWriter.
438
     * This is used to establish a relationship between a DataReader and a DataWriter
439
     * in the context of RPC over DDS.
440
     *
441
     * @warning This operation is only valid if the entity is not enabled.
442
     *
443
     * @param [in] related_reader Pointer to the DataReader to set as related.
444
     *
445
     * @return RETCODE_OK if the key is set successfully.
446
     * @return RETCODE_ILLEGAL_OPERATION if this entity is enabled.
447
     * @return RETCODE_PRECONDITION_NOT_MET if the entity does not belong to the same participant.
448
     * @return RETCODE_BAD_PARAMETER if the provided GUID is unknown
449
     * @return RETCODE_UNSUPPORTED if the implementation does not support RPC over DDS
450
     * or the pointer is not valid.
451
     */
452
    ReturnCode_t set_related_datareader(
453
            const DataReader* related_reader);
454
455
    /**
456
     * @brief Set the type support context to be used when serializing data for this DataWriter.
457
     *
458
     * @param context Shared pointer to the type support context to be used for serialization.
459
     *
460
     * @pre The DataWriter must not be enabled.
461
     */
462
    void set_type_support_context(
463
            const std::shared_ptr<TopicDataType::Context>& context);
464
465
protected:
466
467
    using IChangePool = eprosima::fastdds::rtps::IChangePool;
468
    using IPayloadPool = eprosima::fastdds::rtps::IPayloadPool;
469
    using ITopicPayloadPool = eprosima::fastdds::rtps::ITopicPayloadPool;
470
471
    PublisherImpl* publisher_ = nullptr;
472
473
    //! Pointer to the associated RTPS Writer.
474
    fastdds::rtps::BaseWriter* writer_ = nullptr;
475
476
    //! Pointer to the TopicDataType object.
477
    TypeSupport type_;
478
479
    Topic* topic_ = nullptr;
480
481
    DataWriterQos qos_;
482
483
    //! DataWriterListener
484
    DataWriterListener* listener_ = nullptr;
485
486
    //! Mutex to protect listener_
487
    std::mutex listener_mutex_;
488
489
    //!History
490
    std::unique_ptr<DataWriterHistory> history_;
491
492
    //!Listener to capture the events of the Writer
493
    class InnerDataWriterListener : public fastdds::rtps::WriterListener
494
    {
495
    public:
496
497
        InnerDataWriterListener(
498
                DataWriterImpl* w)
499
0
            : data_writer_(w)
500
0
        {
501
0
        }
502
503
        virtual ~InnerDataWriterListener() override
504
0
        {
505
0
        }
506
507
        void on_writer_matched(
508
                fastdds::rtps::RTPSWriter* writer,
509
                const fastdds::rtps::MatchingInfo& info) override;
510
511
        void on_offered_incompatible_qos(
512
                fastdds::rtps::RTPSWriter* writer,
513
                fastdds::dds::PolicyMask qos) override;
514
515
        void on_writer_change_received_by_all(
516
                fastdds::rtps::RTPSWriter* writer,
517
                fastdds::rtps::CacheChange_t* change) override;
518
519
        void on_liveliness_lost(
520
                fastdds::rtps::RTPSWriter* writer,
521
                const LivelinessLostStatus& status) override;
522
523
        void on_reader_discovery(
524
                fastdds::rtps::RTPSWriter* writer,
525
                fastdds::rtps::ReaderDiscoveryStatus reason,
526
                const fastdds::rtps::GUID_t& reader_guid,
527
                const fastdds::rtps::SubscriptionBuiltinTopicData* reader_info) override;
528
529
#ifdef FASTDDS_STATISTICS
530
        void notify_status_observer(
531
                const uint32_t& status_id);
532
#endif //FASTDDS_STATISTICS
533
534
    private:
535
536
        DataWriterImpl* data_writer_;
537
        std::mutex matching_info_mutex_;
538
    }
539
    writer_listener_;
540
541
    //! A timer used to check for deadlines
542
    fastdds::rtps::TimedEvent* deadline_timer_ = nullptr;
543
544
    //! Deadline duration in microseconds
545
    std::chrono::duration<double, std::ratio<1, 1000000>> deadline_duration_us_;
546
547
    //! The current timer owner, i.e. the instance which started the deadline timer
548
    InstanceHandle_t timer_owner_;
549
550
    //! The publication matched status
551
    PublicationMatchedStatus publication_matched_status_;
552
553
    //! The offered deadline missed status
554
    OfferedDeadlineMissedStatus deadline_missed_status_;
555
556
    //! The liveliness lost status
557
    LivelinessLostStatus liveliness_lost_status_;
558
559
    //! The offered incompatible qos status
560
    OfferedIncompatibleQosStatus offered_incompatible_qos_status_;
561
562
    //! A timed callback to remove expired samples for lifespan QoS
563
    fastdds::rtps::TimedEvent* lifespan_timer_ = nullptr;
564
565
    //! The lifespan duration, in microseconds
566
    std::chrono::duration<double, std::ratio<1, 1000000>> lifespan_duration_us_;
567
568
    DataWriter* user_datawriter_ = nullptr;
569
570
    bool is_data_sharing_compatible_ = false;
571
572
    uint32_t fixed_payload_size_ = 0u;
573
574
    rtps::PoolConfig pool_config_ {};
575
576
    std::shared_ptr<IPayloadPool> payload_pool_;
577
578
    bool is_custom_payload_pool_ = false;
579
580
    std::unique_ptr<LoanCollection> loans_;
581
582
    fastdds::rtps::GUID_t guid_;
583
584
    std::unique_ptr<ReaderFilterCollection> reader_filters_;
585
586
    DataRepresentationId_t data_representation_ {DEFAULT_DATA_REPRESENTATION};
587
588
    mutable std::mutex filters_mtx_;
589
    std::shared_ptr<IContentFilter> sample_prefilter_;
590
591
    std::shared_ptr<TopicDataType::Context> type_support_context_ {};
592
593
    ReturnCode_t check_write_preconditions(
594
            const void* const data,
595
            const InstanceHandle_t& handle,
596
            InstanceHandle_t& instance_handle);
597
598
    ReturnCode_t check_instance_preconditions(
599
            const void* const data,
600
            const InstanceHandle_t& handle,
601
            InstanceHandle_t& instance_handle);
602
603
    InstanceHandle_t do_register_instance(
604
            const void* const key,
605
            const InstanceHandle_t instance_handle,
606
            fastdds::rtps::WriteParams& wparams);
607
608
    /**
609
     *
610
     * @param kind
611
     * @param  data
612
     * @return
613
     */
614
    ReturnCode_t create_new_change(
615
            fastdds::rtps::ChangeKind_t kind,
616
            const void* const data);
617
618
    /**
619
     *
620
     * @param kind
621
     * @param  data
622
     * @param wparams
623
     * @return
624
     */
625
    ReturnCode_t create_new_change_with_params(
626
            fastdds::rtps::ChangeKind_t kind,
627
            const void* const data,
628
            fastdds::rtps::WriteParams& wparams);
629
630
    /**
631
     *
632
     * @param kind
633
     * @param  data
634
     * @param wparams
635
     * @param handle
636
     * @return
637
     */
638
    ReturnCode_t create_new_change_with_params(
639
            fastdds::rtps::ChangeKind_t kind,
640
            const void* const data,
641
            fastdds::rtps::WriteParams& wparams,
642
            const InstanceHandle_t& handle);
643
644
    /**
645
     * Removes the cache change with the minimum sequence number
646
     * @return True if correct.
647
     */
648
    bool remove_min_seq_change();
649
650
    void update_publication_matched_status(
651
            const fastdds::rtps::MatchingInfo& status);
652
653
    /**
654
     * @brief A method called when an instance misses the deadline
655
     */
656
    bool deadline_missed();
657
658
    /**
659
     * @brief A method to reschedule the deadline timer
660
     * @return true if deadline rescheduling succeeded, false otherwise
661
     */
662
    bool deadline_timer_reschedule();
663
664
    /**
665
     * @brief A method to remove expired samples, invoked when the lifespan timer expires
666
     */
667
    bool lifespan_expired();
668
669
    ReturnCode_t check_new_change_preconditions(
670
            fastdds::rtps::ChangeKind_t change_kind,
671
            const void* const data);
672
673
    ReturnCode_t perform_create_new_change(
674
            fastdds::rtps::ChangeKind_t change_kind,
675
            const void* const data,
676
            fastdds::rtps::WriteParams& wparams,
677
            const InstanceHandle_t& handle);
678
679
    static void set_qos(
680
            DataWriterQos& to,
681
            const DataWriterQos& from,
682
            bool update_immutable);
683
684
    /**
685
     * Extends the check_qos() call, including the check for
686
     * resource limits policy.
687
     * @param qos Pointer to the qos to be checked.
688
     * @param type Pointer to the associated TypeSupport object.
689
     * @return True if correct.
690
     */
691
    static ReturnCode_t check_qos_including_resource_limits(
692
            const DataWriterQos& qos,
693
            const TypeSupport& type);
694
695
    /**
696
     * Checks the consistency of the qos configuration.
697
     * @param qos Pointer to the qos to be checked.
698
     * @return True if correct.
699
     */
700
    static ReturnCode_t check_qos(
701
            const DataWriterQos& qos);
702
703
    /**
704
     * Checks resource limits policy: Instance allocation consistency
705
     * @param qos Pointer to the qos to be checked.
706
     * @return True if correct.
707
     */
708
    static ReturnCode_t check_allocation_consistency(
709
            const DataWriterQos& qos);
710
711
    static bool can_qos_be_updated(
712
            const DataWriterQos& to,
713
            const DataWriterQos& from);
714
715
    void publisher_qos_updated();
716
717
    OfferedIncompatibleQosStatus& update_offered_incompatible_qos(
718
            PolicyMask incompatible_policies);
719
720
    /*!
721
     * @brief Updates liveliness lost status.
722
     *
723
     * @param [in] liveliness_lost_status Liveliness lost status coming from RTPS layer.
724
     * @return Current liveliness lost status.
725
     */
726
    LivelinessLostStatus& update_liveliness_lost_status(
727
            const LivelinessLostStatus& liveliness_lost_status);
728
729
    /**
730
     * Returns the most appropriate listener to handle the callback for the given status,
731
     * or nullptr if there is no appropriate listener.
732
     */
733
    DataWriterListener* get_listener_for(
734
            const StatusMask& status);
735
736
    void set_fragment_size_on_change(
737
            fastdds::rtps::WriteParams& wparams,
738
            fastdds::rtps::CacheChange_t* ch,
739
            const uint32_t& high_mark_for_frag);
740
741
    std::shared_ptr<IChangePool> get_change_pool() const;
742
743
    std::shared_ptr<IPayloadPool> get_payload_pool();
744
745
    bool release_payload_pool();
746
747
    ReturnCode_t check_datasharing_compatible(
748
            const fastdds::rtps::WriterAttributes& writer_attributes,
749
            bool& is_datasharing_compatible) const;
750
751
    bool get_free_payload_from_pool(
752
            uint32_t size,
753
            SerializedPayload_t& payload);
754
755
    bool add_loan(
756
            const void* const data,
757
            SerializedPayload_t& payload);
758
759
    bool check_and_remove_loan(
760
            const void* const data,
761
            SerializedPayload_t& payload);
762
763
    /**
764
     * Remove internal filtering information about a reader.
765
     * Called whenever a non-intra-process reader is unmatched.
766
     *
767
     * @param reader_guid  GUID of the reader that has been unmatched.
768
     */
769
    void remove_reader_filter(
770
            const fastdds::rtps::GUID_t& reader_guid);
771
772
    /**
773
     * Process filtering information for a reader.
774
     * Called when a new reader is matched, and whenever the discovery information of a matched reader changes.
775
     *
776
     * @param reader_guid  The GUID of the reader for which the discovery information changed.
777
     * @param reader_info  The reader's discovery information.
778
     */
779
    void process_reader_filter_info(
780
            const fastdds::rtps::GUID_t& reader_guid,
781
            const fastdds::rtps::SubscriptionBuiltinTopicData& reader_info);
782
783
    bool is_relevant(
784
            const fastdds::rtps::CacheChange_t& change,
785
            const fastdds::rtps::GUID_t& reader_guid) const override;
786
787
private:
788
789
    /**
790
     * (Re)configures the deadline timer:
791
     *  In case of infinite deadline period cancel it, for 0 warn and notify once (with max counts), and
792
     *  for non-infinite positive values store period.
793
     */
794
    void configure_deadline_timer_();
795
796
    /**
797
     * Notifies listeners that a deadline has been missed.
798
     */
799
    void notify_deadline_missed_nts_();
800
801
    void create_history(
802
            const std::shared_ptr<IPayloadPool>& payload_pool,
803
            const std::shared_ptr<IChangePool>& change_pool);
804
805
    DataWriterQos get_datawriter_qos_from_settings(
806
            const DataWriterQos& qos);
807
808
};
809
810
} /* namespace dds */
811
} /* namespace fastdds */
812
} /* namespace eprosima */
813
814
#endif //_FASTDDS_DATAWRITERIMPL_HPP_