Coverage Report

Created: 2025-07-03 06:58

/src/Fast-DDS/src/cpp/fastdds/publisher/DataWriterImpl.cpp
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2019, 2020 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/*
16
 * DataWriterImpl.cpp
17
 *
18
 */
19
#include <fastdds/publisher/DataWriterImpl.hpp>
20
21
#include <functional>
22
#include <iostream>
23
24
#include <fastdds/config.hpp>
25
#include <fastdds/core/condition/StatusConditionImpl.hpp>
26
#include <fastdds/core/policy/ParameterSerializer.hpp>
27
#include <fastdds/core/policy/QosPolicyUtils.hpp>
28
#include <fastdds/dds/core/ReturnCode.hpp>
29
#include <fastdds/dds/domain/DomainParticipant.hpp>
30
#include <fastdds/dds/log/Log.hpp>
31
#include <fastdds/dds/subscriber/DataReader.hpp>
32
#include <fastdds/dds/publisher/DataWriter.hpp>
33
#include <fastdds/dds/publisher/Publisher.hpp>
34
#include <fastdds/dds/publisher/PublisherListener.hpp>
35
#include <fastdds/dds/topic/TypeSupport.hpp>
36
#include <fastdds/domain/DomainParticipantImpl.hpp>
37
#include <fastdds/publisher/filtering/DataWriterFilteredChangePool.hpp>
38
#include <fastdds/publisher/PublisherImpl.hpp>
39
#include <fastdds/rtps/builtin/data/TopicDescription.hpp>
40
#include <fastdds/rtps/common/Time_t.hpp>
41
#include <fastdds/rtps/participant/RTPSParticipant.hpp>
42
#include <fastdds/rtps/RTPSDomain.hpp>
43
#include <fastdds/rtps/writer/RTPSWriter.hpp>
44
45
#include <fastdds/utils/TypePropagation.hpp>
46
#include <rtps/builtin/liveliness/WLP.hpp>
47
#include <rtps/DataSharing/DataSharingPayloadPool.hpp>
48
#include <rtps/DataSharing/WriterPool.hpp>
49
#include <rtps/history/CacheChangePool.h>
50
#include <rtps/history/TopicPayloadPoolRegistry.hpp>
51
#include <rtps/participant/RTPSParticipantImpl.hpp>
52
#include <rtps/resources/ResourceEvent.h>
53
#include <rtps/resources/TimedEvent.h>
54
#include <rtps/RTPSDomainImpl.hpp>
55
#include <rtps/writer/BaseWriter.hpp>
56
#include <rtps/writer/StatefulWriter.hpp>
57
#include <utils/TimeConversion.hpp>
58
#include <utils/BuiltinTopicKeyConversions.hpp>
59
#ifdef FASTDDS_STATISTICS
60
#include <statistics/fastdds/domain/DomainParticipantImpl.hpp>
61
#include <statistics/types/monitorservice_types.hpp>
62
#endif //FASTDDS_STATISTICS
63
64
using namespace eprosima::fastdds;
65
using namespace eprosima::fastdds::rtps;
66
using namespace std::chrono;
67
68
namespace eprosima {
69
namespace fastdds {
70
namespace dds {
71
72
static ChangeKind_t unregister_change_kind(
73
        bool dispose,
74
        const DataWriterQos& qos)
75
0
{
76
0
    if (dispose)
77
0
    {
78
0
        return NOT_ALIVE_DISPOSED;
79
0
    }
80
81
0
    return qos.writer_data_lifecycle().autodispose_unregistered_instances ?
82
0
           NOT_ALIVE_DISPOSED_UNREGISTERED : NOT_ALIVE_UNREGISTERED;
83
0
}
84
85
static bool qos_has_pull_mode_request(
86
        const DataWriterQos& qos)
87
0
{
88
0
    auto push_mode = PropertyPolicyHelper::find_property(qos.properties(), "fastdds.push_mode");
89
0
    return (nullptr != push_mode) && ("false" == *push_mode);
90
0
}
91
92
class DataWriterImpl::LoanCollection
93
{
94
public:
95
96
    explicit LoanCollection(
97
            const PoolConfig& config)
98
0
        : loans_(get_collection_limits(config))
99
0
    {
100
0
    }
101
102
    bool add_loan(
103
            const void* const data,
104
            SerializedPayload_t& payload)
105
0
    {
106
0
        static_cast<void>(data);
107
0
        assert(data == payload.data + SerializedPayload_t::representation_header_size);
108
0
        return loans_.push_back(std::move(payload));
109
0
    }
110
111
    bool check_and_remove_loan(
112
            const void* const data,
113
            SerializedPayload_t& payload)
114
0
    {
115
0
        const octet* payload_data = static_cast<const octet*>(data) - SerializedPayload_t::representation_header_size;
116
0
        for (auto it = loans_.begin(); it != loans_.end(); ++it)
117
0
        {
118
0
            if (it->data == payload_data)
119
0
            {
120
                // Avoid releasing the payload in destructor
121
0
                payload = std::move(*it);
122
0
                loans_.erase(it);
123
0
                return true;
124
0
            }
125
0
        }
126
0
        return false;
127
0
    }
128
129
    bool is_empty() const
130
0
    {
131
0
        return loans_.empty();
132
0
    }
133
134
private:
135
136
    static ResourceLimitedContainerConfig get_collection_limits(
137
            const PoolConfig& config)
138
0
    {
139
0
        return
140
0
            {
141
0
                config.initial_size,
142
0
                config.maximum_size,
143
0
                config.initial_size == config.maximum_size ? 0u : 1u
144
0
            };
145
0
    }
146
147
    ResourceLimitedVector<SerializedPayload_t> loans_;
148
149
};
150
151
DataWriterImpl::DataWriterImpl(
152
        PublisherImpl* p,
153
        TypeSupport type,
154
        Topic* topic,
155
        const DataWriterQos& qos,
156
        DataWriterListener* listen,
157
        std::shared_ptr<fastdds::rtps::IPayloadPool> payload_pool)
158
0
    : publisher_(p)
159
0
    , type_(type)
160
0
    , topic_(topic)
161
0
    , qos_(get_datawriter_qos_from_settings(qos))
162
0
    , listener_(listen)
163
0
    , history_()
164
#pragma warning (disable : 4355 )
165
0
    , writer_listener_(this)
166
0
    , deadline_duration_us_(qos_.deadline().period.to_ns() * 1e-3)
167
0
    , lifespan_duration_us_(qos_.lifespan().duration.to_ns() * 1e-3)
168
0
{
169
0
    EndpointAttributes endpoint_attributes;
170
0
    endpoint_attributes.endpointKind = WRITER;
171
0
    endpoint_attributes.topicKind = type_->is_compute_key_provided ? WITH_KEY : NO_KEY;
172
0
    endpoint_attributes.setEntityID(qos_.endpoint().entity_id);
173
0
    endpoint_attributes.setUserDefinedID(qos_.endpoint().user_defined_id);
174
0
    fastdds::rtps::RTPSParticipantImpl::preprocess_endpoint_attributes<WRITER, 0x03, 0x02>(
175
0
        fastdds::rtps::EntityId_t::unknown(),
176
0
        publisher_->get_participant_impl()->id_counter(), endpoint_attributes, guid_.entityId);
177
0
    guid_.guidPrefix = publisher_->get_participant_impl()->guid().guidPrefix;
178
179
0
    if (payload_pool != nullptr)
180
0
    {
181
0
        is_custom_payload_pool_ = true;
182
0
        payload_pool_ = payload_pool;
183
0
    }
184
0
}
185
186
DataWriterImpl::DataWriterImpl(
187
        PublisherImpl* p,
188
        TypeSupport type,
189
        Topic* topic,
190
        const DataWriterQos& qos,
191
        const fastdds::rtps::EntityId_t& entity_id,
192
        DataWriterListener* listen)
193
0
    : publisher_(p)
194
0
    , type_(type)
195
0
    , topic_(topic)
196
0
    , qos_(get_datawriter_qos_from_settings(qos))
197
0
    , listener_(listen)
198
0
    , history_()
199
#pragma warning (disable : 4355 )
200
0
    , writer_listener_(this)
201
0
    , deadline_duration_us_(qos_.deadline().period.to_ns() * 1e-3)
202
0
    , lifespan_duration_us_(qos_.lifespan().duration.to_ns() * 1e-3)
203
0
{
204
0
    guid_ = { publisher_->get_participant_impl()->guid().guidPrefix, entity_id};
205
0
}
206
207
DataWriterQos DataWriterImpl::get_datawriter_qos_from_settings(
208
        const DataWriterQos& qos)
209
0
{
210
0
    DataWriterQos return_qos;
211
0
    if (&DATAWRITER_QOS_DEFAULT == &qos)
212
0
    {
213
0
        return_qos = publisher_->get_default_datawriter_qos();
214
0
    }
215
0
    else if (&DATAWRITER_QOS_USE_TOPIC_QOS == &qos)
216
0
    {
217
0
        return_qos = publisher_->get_default_datawriter_qos();
218
0
        publisher_->copy_from_topic_qos(return_qos, topic_->get_qos());
219
0
    }
220
0
    else
221
0
    {
222
0
        return_qos = qos;
223
0
    }
224
225
0
    return return_qos;
226
0
}
227
228
void DataWriterImpl::create_history(
229
        const std::shared_ptr<IPayloadPool>& payload_pool,
230
        const std::shared_ptr<IChangePool>& change_pool)
231
0
{
232
0
    history_.reset(new DataWriterHistory(
233
0
                payload_pool, change_pool,
234
0
                qos_.history(),
235
0
                qos_.resource_limits(),
236
0
                (type_->is_compute_key_provided ? WITH_KEY : NO_KEY),
237
0
                type_->max_serialized_type_size,
238
0
                qos_.endpoint().history_memory_policy,
239
0
                [this](
240
0
                    const InstanceHandle_t& handle) -> void
241
0
                {
242
0
                    if (nullptr != listener_)
243
0
                    {
244
0
                        listener_->on_unacknowledged_sample_removed(user_datawriter_, handle);
245
0
                    }
246
0
                }));
247
0
}
248
249
ReturnCode_t DataWriterImpl::enable()
250
0
{
251
0
    assert(writer_ == nullptr);
252
253
0
    auto history_att = DataWriterHistory::to_history_attributes(
254
0
        qos_.history(),
255
0
        qos_.resource_limits(), (type_->is_compute_key_provided ? WITH_KEY : NO_KEY), type_->max_serialized_type_size,
256
0
        qos_.endpoint().history_memory_policy);
257
0
    pool_config_ = PoolConfig::from_history_attributes(history_att);
258
259
    // When the user requested PREALLOCATED_WITH_REALLOC, but we know the type cannot
260
    // grow, we translate the policy into bare PREALLOCATED
261
0
    if (PREALLOCATED_WITH_REALLOC_MEMORY_MODE == pool_config_.memory_policy &&
262
0
            (type_->is_bounded() || type_->is_plain(data_representation_)))
263
0
    {
264
0
        pool_config_.memory_policy = PREALLOCATED_MEMORY_MODE;
265
0
    }
266
267
0
    WriterAttributes w_att;
268
0
    w_att.endpoint.durabilityKind = qos_.durability().durabilityKind();
269
0
    w_att.endpoint.endpointKind = WRITER;
270
0
    w_att.endpoint.reliabilityKind = qos_.reliability().kind == RELIABLE_RELIABILITY_QOS ? RELIABLE : BEST_EFFORT;
271
0
    w_att.endpoint.topicKind = type_->is_compute_key_provided ? WITH_KEY : NO_KEY;
272
0
    w_att.endpoint.multicastLocatorList = qos_.endpoint().multicast_locator_list;
273
0
    w_att.endpoint.unicastLocatorList = qos_.endpoint().unicast_locator_list;
274
0
    w_att.endpoint.remoteLocatorList = qos_.endpoint().remote_locator_list;
275
0
    w_att.endpoint.external_unicast_locators = qos_.endpoint().external_unicast_locators;
276
0
    w_att.endpoint.ignore_non_matching_locators = qos_.endpoint().ignore_non_matching_locators;
277
0
    w_att.mode = qos_.publish_mode().kind == SYNCHRONOUS_PUBLISH_MODE ? SYNCHRONOUS_WRITER : ASYNCHRONOUS_WRITER;
278
0
    w_att.flow_controller_name = qos_.publish_mode().flow_controller_name;
279
0
    w_att.endpoint.properties = qos_.properties();
280
0
    w_att.endpoint.ownershipKind = qos_.ownership().kind;
281
0
    w_att.endpoint.setEntityID(qos_.endpoint().entity_id);
282
0
    w_att.endpoint.setUserDefinedID(qos_.endpoint().user_defined_id);
283
0
    w_att.times = qos_.reliable_writer_qos().times;
284
0
    w_att.liveliness_kind = qos_.liveliness().kind;
285
0
    w_att.liveliness_lease_duration = qos_.liveliness().lease_duration;
286
0
    w_att.liveliness_announcement_period = qos_.liveliness().announcement_period;
287
0
    w_att.matched_readers_allocation = qos_.writer_resource_limits().matched_subscriber_allocation;
288
0
    w_att.disable_heartbeat_piggyback = qos_.reliable_writer_qos().disable_heartbeat_piggyback;
289
290
    // TODO(Ricardo) Remove in future
291
    // Insert topic_name and partitions
292
0
    Property property;
293
0
    property.name("topic_name");
294
0
    property.value(topic_->get_name().c_str());
295
0
    w_att.endpoint.properties.properties().push_back(std::move(property));
296
297
0
    std::string* endpoint_partitions = PropertyPolicyHelper::find_property(qos_.properties(), "partitions");
298
299
0
    if (endpoint_partitions)
300
0
    {
301
0
        property.name("partitions");
302
0
        property.value(*endpoint_partitions);
303
0
        w_att.endpoint.properties.properties().push_back(std::move(property));
304
0
    }
305
0
    else if (publisher_->get_qos().partition().names().size() > 0)
306
0
    {
307
0
        property.name("partitions");
308
0
        std::string partitions;
309
0
        bool is_first_partition = true;
310
0
        for (auto partition : publisher_->get_qos().partition().names())
311
0
        {
312
0
            partitions += (is_first_partition ? "" : ";") + partition;
313
0
            is_first_partition = false;
314
0
        }
315
0
        property.value(std::move(partitions));
316
0
        w_att.endpoint.properties.properties().push_back(std::move(property));
317
0
    }
318
319
0
    if (qos_.reliable_writer_qos().disable_positive_acks.enabled &&
320
0
            qos_.reliable_writer_qos().disable_positive_acks.duration != dds::c_TimeInfinite)
321
0
    {
322
0
        w_att.disable_positive_acks = true;
323
0
        w_att.keep_duration = qos_.reliable_writer_qos().disable_positive_acks.duration;
324
0
    }
325
326
0
    ReturnCode_t ret_code = check_datasharing_compatible(w_att, is_data_sharing_compatible_);
327
0
    if (ret_code != RETCODE_OK)
328
0
    {
329
0
        return ret_code;
330
0
    }
331
332
0
    if (is_data_sharing_compatible_)
333
0
    {
334
0
        DataSharingQosPolicy datasharing(qos_.data_sharing());
335
0
        if (datasharing.domain_ids().empty())
336
0
        {
337
0
            datasharing.add_domain_id(utils::default_domain_id());
338
0
        }
339
0
        w_att.endpoint.set_data_sharing_configuration(datasharing);
340
0
    }
341
0
    else
342
0
    {
343
0
        DataSharingQosPolicy datasharing;
344
0
        datasharing.off();
345
0
        w_att.endpoint.set_data_sharing_configuration(datasharing);
346
0
    }
347
348
0
    bool filtering_enabled =
349
0
            qos_.liveliness().lease_duration.is_infinite() &&
350
0
            (0 < qos_.writer_resource_limits().reader_filters_allocation.maximum);
351
352
0
    if (filtering_enabled)
353
0
    {
354
0
        std::lock_guard<std::mutex> lock(filters_mtx_);
355
0
        reader_filters_.reset(new ReaderFilterCollection(qos_.writer_resource_limits().reader_filters_allocation));
356
0
    }
357
358
    // Set Datawriter's DataRepresentationId taking into account the QoS.
359
0
    data_representation_ = qos_.representation().m_value.empty()
360
0
            || XCDR_DATA_REPRESENTATION == qos_.representation().m_value.at(0)
361
0
                    ? XCDR_DATA_REPRESENTATION : XCDR2_DATA_REPRESENTATION;
362
363
0
    auto change_pool = get_change_pool();
364
0
    if (!change_pool)
365
0
    {
366
0
        EPROSIMA_LOG_ERROR(DATA_WRITER, "Problem creating change pool for associated Writer");
367
0
        return RETCODE_ERROR;
368
0
    }
369
370
0
    auto pool = get_payload_pool();
371
0
    if (!pool)
372
0
    {
373
0
        EPROSIMA_LOG_ERROR(DATA_WRITER, "Problem creating payload pool for associated Writer");
374
0
        return RETCODE_ERROR;
375
0
    }
376
377
0
    create_history(pool, change_pool);
378
379
0
    RTPSWriter* writer =  RTPSDomainImpl::create_rtps_writer(
380
0
        publisher_->rtps_participant(),
381
0
        guid_.entityId,
382
0
        w_att,
383
0
        history_.get(),
384
0
        static_cast<WriterListener*>(&writer_listener_));
385
386
0
    if (writer != nullptr &&
387
0
            w_att.endpoint.data_sharing_configuration().kind() != DataSharingKind::OFF)
388
0
    {
389
0
        auto writer_pool = std::dynamic_pointer_cast<fastdds::rtps::WriterPool>(pool);
390
0
        if (!writer_pool || !writer_pool->is_initialized())
391
0
        {
392
0
            EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not initialize DataSharing writer pool");
393
0
            RTPSDomain::removeRTPSWriter(writer);
394
0
            writer = nullptr;
395
0
        }
396
0
    }
397
398
0
    if (writer == nullptr &&
399
0
            w_att.endpoint.data_sharing_configuration().kind() == DataSharingKind::AUTO)
400
0
    {
401
0
        EPROSIMA_LOG_INFO(DATA_WRITER, "Trying with a non-datasharing pool");
402
0
        history_.reset();
403
0
        release_payload_pool();
404
0
        is_data_sharing_compatible_ = false;
405
0
        DataSharingQosPolicy datasharing;
406
0
        datasharing.off();
407
0
        w_att.endpoint.set_data_sharing_configuration(datasharing);
408
409
0
        pool = get_payload_pool();
410
0
        if (!pool)
411
0
        {
412
0
            EPROSIMA_LOG_ERROR(DATA_WRITER, "Problem creating payload pool for associated Writer");
413
0
            return RETCODE_ERROR;
414
0
        }
415
416
0
        create_history(pool, change_pool);
417
0
        writer = RTPSDomainImpl::create_rtps_writer(
418
0
            publisher_->rtps_participant(),
419
0
            guid_.entityId,
420
0
            w_att,
421
0
            history_.get(),
422
0
            static_cast<WriterListener*>(&writer_listener_));
423
0
    }
424
0
    if (writer == nullptr)
425
0
    {
426
0
        history_.reset();
427
0
        release_payload_pool();
428
0
        EPROSIMA_LOG_ERROR(DATA_WRITER, "Problem creating associated Writer");
429
0
        return RETCODE_ERROR;
430
0
    }
431
432
0
    writer_ = BaseWriter::downcast(writer);
433
434
    // Set DataWriterImpl as the implementer of the
435
    // IReaderDataFilter interface
436
0
    writer_->reader_data_filter(this);
437
438
    // In case it has been loaded from the persistence DB, rebuild instances on history
439
0
    history_->rebuild_instances();
440
441
0
    deadline_timer_ = new TimedEvent(publisher_->rtps_participant()->get_resource_event(),
442
0
                    [&]() -> bool
443
0
                    {
444
0
                        return deadline_missed();
445
0
                    },
446
0
                    qos_.deadline().period.to_ns() * 1e-6);
447
448
0
    lifespan_timer_ = new TimedEvent(publisher_->rtps_participant()->get_resource_event(),
449
0
                    [&]() -> bool
450
0
                    {
451
0
                        return lifespan_expired();
452
0
                    },
453
0
                    qos_.lifespan().duration.to_ns() * 1e-6);
454
455
    // In case it has been loaded from the persistence DB, expire old samples.
456
0
    if (qos_.lifespan().duration != dds::c_TimeInfinite)
457
0
    {
458
0
        if (lifespan_expired())
459
0
        {
460
0
            lifespan_timer_->restart_timer();
461
0
        }
462
0
    }
463
464
    // REGISTER THE WRITER
465
0
    fastdds::rtps::TopicDescription topic_desc;
466
0
    topic_desc.topic_name = topic_->get_name();
467
0
    topic_desc.type_name = topic_->get_type_name();
468
0
    publisher_->get_participant_impl()->fill_type_information(type_, topic_desc.type_information);
469
470
0
    PublicationBuiltinTopicData publication_data;
471
0
    if (get_publication_builtin_topic_data(publication_data) != RETCODE_OK)
472
0
    {
473
0
        EPROSIMA_LOG_ERROR(DATA_WRITER, "Error getting publication data. RTPS Writer not enabled.");
474
0
        return RETCODE_ERROR;
475
0
    }
476
0
    ReturnCode_t register_writer_code = publisher_->rtps_participant()->register_writer(writer_, topic_desc,
477
0
                    publication_data);
478
0
    if (register_writer_code != RETCODE_OK)
479
0
    {
480
0
        EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not register writer on discovery protocols");
481
0
    }
482
483
0
    return register_writer_code;
484
0
}
485
486
void DataWriterImpl::disable()
487
0
{
488
0
    set_listener(nullptr);
489
0
    if (writer_ != nullptr)
490
0
    {
491
0
        writer_->set_listener(nullptr);
492
0
    }
493
0
}
494
495
ReturnCode_t DataWriterImpl::check_delete_preconditions()
496
0
{
497
0
    if (loans_ && !loans_->is_empty())
498
0
    {
499
0
        return RETCODE_PRECONDITION_NOT_MET;
500
0
    }
501
502
0
    return RETCODE_OK;
503
0
}
504
505
DataWriterImpl::~DataWriterImpl()
506
0
{
507
0
    delete lifespan_timer_;
508
0
    delete deadline_timer_;
509
510
0
    if (writer_ != nullptr)
511
0
    {
512
0
        EPROSIMA_LOG_INFO(DATA_WRITER, guid().entityId << " in topic: " << type_->get_name());
513
0
        RTPSDomain::removeRTPSWriter(writer_);
514
0
        release_payload_pool();
515
0
    }
516
517
0
    delete user_datawriter_;
518
0
}
519
520
ReturnCode_t DataWriterImpl::loan_sample(
521
        void*& sample,
522
        LoanInitializationKind initialization)
523
0
{
524
    // Block lowlevel writer
525
0
    auto max_blocking_time = steady_clock::now() +
526
0
            microseconds(rtps::TimeConv::Time_t2MicroSecondsInt64(qos_.reliability().max_blocking_time));
527
528
    // Type should be plain and have space for the representation header
529
0
    if (!type_->is_plain(data_representation_) ||
530
0
            SerializedPayload_t::representation_header_size > type_->max_serialized_type_size)
531
0
    {
532
0
        return RETCODE_ILLEGAL_OPERATION;
533
0
    }
534
535
    // Writer should be enabled
536
0
    if (nullptr == writer_)
537
0
    {
538
0
        return RETCODE_NOT_ENABLED;
539
0
    }
540
541
#if HAVE_STRICT_REALTIME
542
    std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex(), std::defer_lock);
543
    if (!lock.try_lock_until(max_blocking_time))
544
    {
545
        return RETCODE_TIMEOUT;
546
    }
547
#else
548
0
    static_cast<void>(max_blocking_time);
549
0
    std::lock_guard<RecursiveTimedMutex> lock(writer_->getMutex());
550
0
#endif // if HAVE_STRICT_REALTIME
551
552
    // Get one payload from the pool
553
0
    SerializedPayload_t payload;
554
0
    uint32_t size = type_->max_serialized_type_size;
555
0
    if (!get_free_payload_from_pool(size, payload))
556
0
    {
557
0
        return RETCODE_OUT_OF_RESOURCES;
558
0
    }
559
560
    // Leave payload state as if serialization has already been performed
561
0
    payload.length = size;
562
0
    payload.pos = size;
563
0
    payload.data[1] = DEFAULT_ENCAPSULATION;
564
0
    payload.encapsulation = DEFAULT_ENCAPSULATION;
565
566
    // Sample starts after representation header
567
0
    sample = payload.data + SerializedPayload_t::representation_header_size;
568
569
    // Add to loans collection
570
0
    if (!add_loan(sample, payload))
571
0
    {
572
0
        sample = nullptr;
573
0
        payload_pool_->release_payload(payload);
574
0
        return RETCODE_OUT_OF_RESOURCES;
575
0
    }
576
577
0
    switch (initialization)
578
0
    {
579
0
        default:
580
0
            EPROSIMA_LOG_WARNING(DATA_WRITER, "Using wrong LoanInitializationKind value ("
581
0
                    << static_cast<int>(initialization) << "). Using default NO_LOAN_INITIALIZATION");
582
0
            break;
583
584
0
        case LoanInitializationKind::NO_LOAN_INITIALIZATION:
585
0
            break;
586
587
0
        case LoanInitializationKind::ZERO_LOAN_INITIALIZATION:
588
0
            if (SerializedPayload_t::representation_header_size < size)
589
0
            {
590
0
                size -= SerializedPayload_t::representation_header_size;
591
0
                memset(sample, 0, size);
592
0
            }
593
0
            break;
594
595
0
        case LoanInitializationKind::CONSTRUCTED_LOAN_INITIALIZATION:
596
0
            if (!type_->construct_sample(sample))
597
0
            {
598
0
                check_and_remove_loan(sample, payload);
599
0
                payload_pool_->release_payload(payload);
600
0
                sample = nullptr;
601
0
                return RETCODE_UNSUPPORTED;
602
0
            }
603
0
            break;
604
0
    }
605
606
    // Avoid releasing the payload in destructor
607
0
    payload.payload_owner = nullptr;
608
0
    payload.data = nullptr;
609
610
0
    return RETCODE_OK;
611
0
}
612
613
ReturnCode_t DataWriterImpl::discard_loan(
614
        void*& sample)
615
0
{
616
    // Type should be plain and have space for the representation header
617
0
    if (!type_->is_plain(data_representation_) ||
618
0
            SerializedPayload_t::representation_header_size > type_->max_serialized_type_size)
619
0
    {
620
0
        return RETCODE_ILLEGAL_OPERATION;
621
0
    }
622
623
    // Writer should be enabled
624
0
    if (nullptr == writer_)
625
0
    {
626
0
        return RETCODE_NOT_ENABLED;
627
0
    }
628
629
0
    std::lock_guard<RecursiveTimedMutex> lock(writer_->getMutex());
630
631
    // Remove sample from loans collection
632
0
    SerializedPayload_t payload;
633
0
    if ((nullptr == sample) || !check_and_remove_loan(sample, payload))
634
0
    {
635
0
        return RETCODE_BAD_PARAMETER;
636
0
    }
637
638
    // Return payload to pool
639
0
    payload_pool_->release_payload(payload);
640
0
    sample = nullptr;
641
642
0
    return RETCODE_OK;
643
0
}
644
645
ReturnCode_t DataWriterImpl::write(
646
        const void* const data)
647
0
{
648
0
    if (writer_ == nullptr)
649
0
    {
650
0
        return RETCODE_NOT_ENABLED;
651
0
    }
652
653
0
    EPROSIMA_LOG_INFO(DATA_WRITER, "Writing new data");
654
0
    return create_new_change(ALIVE, data);
655
0
}
656
657
ReturnCode_t DataWriterImpl::write(
658
        const void* const data,
659
        fastdds::rtps::WriteParams& params)
660
0
{
661
0
    if (writer_ == nullptr)
662
0
    {
663
0
        return RETCODE_NOT_ENABLED;
664
0
    }
665
666
0
    EPROSIMA_LOG_INFO(DATA_WRITER, "Writing new data with WriteParams");
667
0
    return create_new_change_with_params(ALIVE, data, params);
668
0
}
669
670
ReturnCode_t DataWriterImpl::check_write_preconditions(
671
        const void* const data,
672
        const InstanceHandle_t& handle,
673
        InstanceHandle_t& instance_handle)
674
0
{
675
0
    if (writer_ == nullptr)
676
0
    {
677
0
        return RETCODE_NOT_ENABLED;
678
0
    }
679
680
0
    if (type_.get()->is_compute_key_provided)
681
0
    {
682
0
        bool is_key_protected = false;
683
#if HAVE_SECURITY
684
        is_key_protected = writer_->getAttributes().security_attributes().is_key_protected;
685
#endif // if HAVE_SECURITY
686
0
        type_.get()->compute_key(data, instance_handle, is_key_protected);
687
0
    }
688
689
    //Check if the Handle is different from the special value HANDLE_NIL and
690
    //does not correspond with the instance referred by the data
691
0
    if (handle.isDefined() && handle != instance_handle)
692
0
    {
693
0
        return RETCODE_PRECONDITION_NOT_MET;
694
0
    }
695
696
0
    return RETCODE_OK;
697
0
}
698
699
ReturnCode_t DataWriterImpl::write(
700
        const void* const data,
701
        const InstanceHandle_t& handle)
702
0
{
703
0
    InstanceHandle_t instance_handle;
704
0
    ReturnCode_t ret = check_write_preconditions(data, handle, instance_handle);
705
0
    if (RETCODE_OK == ret)
706
0
    {
707
0
        EPROSIMA_LOG_INFO(DATA_WRITER, "Writing new data with Handle");
708
0
        WriteParams wparams;
709
0
        ret = create_new_change_with_params(ALIVE, data, wparams, instance_handle);
710
0
    }
711
712
0
    return ret;
713
0
}
714
715
ReturnCode_t DataWriterImpl::write_w_timestamp(
716
        const void* const data,
717
        const InstanceHandle_t& handle,
718
        const fastdds::dds::Time_t& timestamp)
719
0
{
720
0
    InstanceHandle_t instance_handle;
721
0
    ReturnCode_t ret = RETCODE_OK;
722
0
    if (timestamp.is_infinite() || timestamp.seconds < 0)
723
0
    {
724
0
        ret = RETCODE_BAD_PARAMETER;
725
0
    }
726
727
0
    if (RETCODE_OK == ret)
728
0
    {
729
0
        ret = check_write_preconditions(data, handle, instance_handle);
730
0
    }
731
732
0
    if (RETCODE_OK == ret)
733
0
    {
734
0
        EPROSIMA_LOG_INFO(DATA_WRITER, "Writing new data with Handle and timestamp");
735
0
        WriteParams wparams;
736
0
        wparams.source_timestamp(timestamp);
737
0
        ret = create_new_change_with_params(ALIVE, data, wparams, instance_handle);
738
0
    }
739
740
0
    return ret;
741
0
}
742
743
ReturnCode_t DataWriterImpl::check_instance_preconditions(
744
        const void* const data,
745
        const InstanceHandle_t& handle,
746
        InstanceHandle_t& instance_handle)
747
0
{
748
0
    if (nullptr == writer_)
749
0
    {
750
0
        return RETCODE_NOT_ENABLED;
751
0
    }
752
753
0
    if (nullptr == data)
754
0
    {
755
0
        EPROSIMA_LOG_ERROR(DATA_WRITER, "Data pointer not valid");
756
0
        return RETCODE_BAD_PARAMETER;
757
0
    }
758
759
0
    if (!type_->is_compute_key_provided)
760
0
    {
761
0
        EPROSIMA_LOG_ERROR(DATA_WRITER, "Topic is NO_KEY, operation not permitted");
762
0
        return RETCODE_PRECONDITION_NOT_MET;
763
0
    }
764
765
0
    instance_handle = handle;
766
767
0
#if defined(NDEBUG)
768
0
    if (!instance_handle.isDefined())
769
0
#endif // if !defined(NDEBUG)
770
0
    {
771
0
        bool is_key_protected = false;
772
#if HAVE_SECURITY
773
        is_key_protected = writer_->getAttributes().security_attributes().is_key_protected;
774
#endif // if HAVE_SECURITY
775
0
        type_->compute_key(data, instance_handle, is_key_protected);
776
0
    }
777
778
#if !defined(NDEBUG)
779
    if (handle.isDefined() && instance_handle != handle)
780
    {
781
        EPROSIMA_LOG_ERROR(DATA_WRITER, "handle differs from data's key.");
782
        return RETCODE_PRECONDITION_NOT_MET;
783
    }
784
#endif // if !defined(NDEBUG)
785
786
0
    return RETCODE_OK;
787
0
}
788
789
InstanceHandle_t DataWriterImpl::register_instance(
790
        const void* const key)
791
0
{
792
    /// Preconditions
793
0
    InstanceHandle_t instance_handle;
794
0
    if (RETCODE_OK != check_instance_preconditions(key, HANDLE_NIL, instance_handle))
795
0
    {
796
0
        return HANDLE_NIL;
797
0
    }
798
799
0
    WriteParams wparams;
800
0
    return do_register_instance(key, instance_handle, wparams);
801
0
}
802
803
InstanceHandle_t DataWriterImpl::register_instance_w_timestamp(
804
        const void* const key,
805
        const fastdds::dds::Time_t& timestamp)
806
0
{
807
    /// Preconditions
808
0
    InstanceHandle_t instance_handle;
809
0
    if (timestamp.is_infinite() || timestamp.seconds < 0 ||
810
0
            (RETCODE_OK != check_instance_preconditions(key, HANDLE_NIL, instance_handle)))
811
0
    {
812
0
        return HANDLE_NIL;
813
0
    }
814
815
0
    WriteParams wparams;
816
0
    wparams.source_timestamp(timestamp);
817
0
    return do_register_instance(key, instance_handle, wparams);
818
0
}
819
820
InstanceHandle_t DataWriterImpl::do_register_instance(
821
        const void* const key,
822
        const InstanceHandle_t instance_handle,
823
        WriteParams& wparams)
824
0
{
825
    // TODO(MiguelCompany): wparams should be used when propagating the register_instance operation to the DataReader.
826
    // See redmine issue #14494
827
0
    static_cast<void>(wparams);
828
829
    // Block lowlevel writer
830
0
    auto max_blocking_time = std::chrono::steady_clock::now() +
831
0
            std::chrono::microseconds(rtps::TimeConv::Time_t2MicroSecondsInt64(qos_.reliability().max_blocking_time));
832
833
#if HAVE_STRICT_REALTIME
834
    std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex(), std::defer_lock);
835
    if (lock.try_lock_until(max_blocking_time))
836
#else
837
0
    std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
838
0
#endif // if HAVE_STRICT_REALTIME
839
0
    {
840
0
        SerializedPayload_t* payload = nullptr;
841
0
        if (history_->register_instance(instance_handle, lock, max_blocking_time, payload))
842
0
        {
843
            // Keep serialization of sample inside the instance
844
0
            assert(nullptr != payload);
845
0
            if (0 == payload->length || nullptr == payload->data)
846
0
            {
847
0
                uint32_t size = fixed_payload_size_ ? fixed_payload_size_ : type_->calculate_serialized_size(key,
848
0
                                data_representation_);
849
0
                payload->reserve(size);
850
0
                if (!type_->serialize(key, *payload, data_representation_))
851
0
                {
852
0
                    EPROSIMA_LOG_WARNING(DATA_WRITER, "Key data serialization failed");
853
854
                    // Serialization of the sample failed. Remove the instance to keep original state.
855
                    // Note that we will only end-up here if the instance has just been created, so it will be empty
856
                    // and removing its changes will remove the instance completely.
857
0
                    history_->remove_instance_changes(instance_handle, rtps::SequenceNumber_t());
858
0
                }
859
0
            }
860
0
            return instance_handle;
861
0
        }
862
0
    }
863
864
0
    return HANDLE_NIL;
865
0
}
866
867
ReturnCode_t DataWriterImpl::unregister_instance(
868
        const void* const instance,
869
        const InstanceHandle_t& handle,
870
        bool dispose)
871
0
{
872
    // Preconditions
873
0
    InstanceHandle_t ih;
874
0
    ReturnCode_t returned_value = check_instance_preconditions(instance, handle, ih);
875
0
    if (RETCODE_OK == returned_value && !history_->is_key_registered(ih))
876
0
    {
877
0
        returned_value = RETCODE_PRECONDITION_NOT_MET;
878
0
    }
879
880
    // Operation
881
0
    if (RETCODE_OK == returned_value)
882
0
    {
883
0
        WriteParams wparams;
884
0
        ChangeKind_t change_kind = unregister_change_kind(dispose, qos_);
885
0
        returned_value = create_new_change_with_params(change_kind, instance, wparams, ih);
886
0
    }
887
888
0
    return returned_value;
889
0
}
890
891
ReturnCode_t DataWriterImpl::unregister_instance_w_timestamp(
892
        const void* const instance,
893
        const InstanceHandle_t& handle,
894
        const fastdds::dds::Time_t& timestamp,
895
        bool dispose)
896
0
{
897
    // Preconditions
898
0
    InstanceHandle_t instance_handle;
899
0
    ReturnCode_t ret = RETCODE_OK;
900
0
    if (timestamp.is_infinite() || timestamp.seconds < 0)
901
0
    {
902
0
        ret = RETCODE_BAD_PARAMETER;
903
0
    }
904
0
    if (RETCODE_OK == ret)
905
0
    {
906
0
        ret = check_instance_preconditions(instance, handle, instance_handle);
907
0
    }
908
0
    if (RETCODE_OK == ret && !history_->is_key_registered(instance_handle))
909
0
    {
910
0
        ret = RETCODE_PRECONDITION_NOT_MET;
911
0
    }
912
913
    // Operation
914
0
    if (RETCODE_OK == ret)
915
0
    {
916
0
        WriteParams wparams;
917
0
        wparams.source_timestamp(timestamp);
918
0
        ChangeKind_t change_kind = unregister_change_kind(dispose, qos_);
919
0
        ret = create_new_change_with_params(change_kind, instance, wparams, instance_handle);
920
0
    }
921
922
0
    return ret;
923
0
}
924
925
ReturnCode_t DataWriterImpl::get_key_value(
926
        void* key_holder,
927
        const InstanceHandle_t& handle)
928
0
{
929
    /// Preconditions
930
0
    if (key_holder == nullptr || !handle.isDefined())
931
0
    {
932
0
        EPROSIMA_LOG_ERROR(DATA_WRITER, "Key holder pointer not valid");
933
0
        return RETCODE_BAD_PARAMETER;
934
0
    }
935
936
0
    if (!type_->is_compute_key_provided)
937
0
    {
938
0
        EPROSIMA_LOG_ERROR(DATA_WRITER, "Topic is NO_KEY, operation not permitted");
939
0
        return RETCODE_ILLEGAL_OPERATION;
940
0
    }
941
942
0
    if (writer_ == nullptr)
943
0
    {
944
0
        return RETCODE_NOT_ENABLED;
945
0
    }
946
947
    // Block lowlevel writer
948
#if HAVE_STRICT_REALTIME
949
    auto max_blocking_time = std::chrono::steady_clock::now() +
950
            std::chrono::microseconds(rtps::TimeConv::Time_t2MicroSecondsInt64(qos_.reliability().max_blocking_time));
951
    std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex(), std::defer_lock);
952
    if (!lock.try_lock_until(max_blocking_time))
953
    {
954
        return RETCODE_TIMEOUT;
955
    }
956
#else
957
0
    std::lock_guard<RecursiveTimedMutex> lock(writer_->getMutex());
958
0
#endif // if HAVE_STRICT_REALTIME
959
960
0
    SerializedPayload_t* payload = history_->get_key_value(handle);
961
0
    if (nullptr == payload)
962
0
    {
963
0
        return RETCODE_BAD_PARAMETER;
964
0
    }
965
966
0
    type_->deserialize(*payload, key_holder);
967
0
    return RETCODE_OK;
968
0
}
969
970
ReturnCode_t DataWriterImpl::create_new_change(
971
        ChangeKind_t changeKind,
972
        const void* const data)
973
0
{
974
0
    WriteParams wparams;
975
0
    return create_new_change_with_params(changeKind, data, wparams);
976
0
}
977
978
ReturnCode_t DataWriterImpl::check_new_change_preconditions(
979
        ChangeKind_t change_kind,
980
        const void* const data)
981
0
{
982
    // Preconditions
983
0
    if (data == nullptr)
984
0
    {
985
0
        EPROSIMA_LOG_ERROR(DATA_WRITER, "Data pointer not valid");
986
0
        return RETCODE_BAD_PARAMETER;
987
0
    }
988
989
0
    if (change_kind == NOT_ALIVE_UNREGISTERED
990
0
            || change_kind == NOT_ALIVE_DISPOSED
991
0
            || change_kind == NOT_ALIVE_DISPOSED_UNREGISTERED)
992
0
    {
993
0
        if (!type_->is_compute_key_provided)
994
0
        {
995
0
            EPROSIMA_LOG_ERROR(DATA_WRITER, "Topic is NO_KEY, operation not permitted");
996
0
            return RETCODE_ILLEGAL_OPERATION;
997
0
        }
998
0
    }
999
1000
0
    return RETCODE_OK;
1001
0
}
1002
1003
ReturnCode_t DataWriterImpl::perform_create_new_change(
1004
        ChangeKind_t change_kind,
1005
        const void* const data,
1006
        WriteParams& wparams,
1007
        const InstanceHandle_t& handle)
1008
0
{
1009
    // Block lowlevel writer
1010
0
    auto max_blocking_time = steady_clock::now() +
1011
0
            microseconds(rtps::TimeConv::Time_t2MicroSecondsInt64(qos_.reliability().max_blocking_time));
1012
1013
#if HAVE_STRICT_REALTIME
1014
    std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex(), std::defer_lock);
1015
    if (!lock.try_lock_until(max_blocking_time))
1016
    {
1017
        return RETCODE_TIMEOUT;
1018
    }
1019
#else
1020
0
    std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1021
0
#endif // if HAVE_STRICT_REALTIME
1022
1023
0
    SerializedPayload_t payload;
1024
0
    bool was_loaned = check_and_remove_loan(data, payload);
1025
0
    if (!was_loaned)
1026
0
    {
1027
0
        uint32_t payload_size = fixed_payload_size_ ? fixed_payload_size_ : type_->calculate_serialized_size(
1028
0
            data, data_representation_);
1029
0
        if (!get_free_payload_from_pool(payload_size, payload))
1030
0
        {
1031
0
            return RETCODE_OUT_OF_RESOURCES;
1032
0
        }
1033
1034
0
        if ((ALIVE == change_kind) && !type_->serialize(data, payload, data_representation_))
1035
0
        {
1036
0
            EPROSIMA_LOG_WARNING(DATA_WRITER, "Data serialization returned false");
1037
0
            payload_pool_->release_payload(payload);
1038
0
            return RETCODE_ERROR;
1039
0
        }
1040
0
    }
1041
1042
0
    CacheChange_t* ch = history_->create_change(change_kind, handle);
1043
0
    if (ch != nullptr)
1044
0
    {
1045
0
        ch->serializedPayload = std::move(payload);
1046
1047
0
        bool added = false;
1048
0
        if (reader_filters_)
1049
0
        {
1050
0
            auto related_sample_identity = wparams.related_sample_identity();
1051
0
            auto filter_hook = [&related_sample_identity, this](CacheChange_t& ch)
1052
0
                    {
1053
0
                        reader_filters_->update_filter_info(static_cast<DataWriterFilteredChange&>(ch),
1054
0
                                related_sample_identity);
1055
0
                    };
1056
0
            added = history_->add_pub_change_with_commit_hook(ch, wparams, filter_hook, lock, max_blocking_time);
1057
0
        }
1058
0
        else
1059
0
        {
1060
0
            added = history_->add_pub_change(ch, wparams, lock, max_blocking_time);
1061
0
        }
1062
1063
0
        if (!added)
1064
0
        {
1065
0
            if (was_loaned)
1066
0
            {
1067
0
                payload = std::move(ch->serializedPayload);
1068
0
                add_loan(data, payload);
1069
0
            }
1070
0
            history_->release_change(ch);
1071
0
            return RETCODE_TIMEOUT;
1072
0
        }
1073
1074
0
        if (qos_.deadline().period != dds::c_TimeInfinite)
1075
0
        {
1076
0
            if (!history_->set_next_deadline(
1077
0
                        handle,
1078
0
                        steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_)))
1079
0
            {
1080
0
                EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set the next deadline in the history");
1081
0
            }
1082
0
            else
1083
0
            {
1084
0
                if (timer_owner_ == handle || timer_owner_ == InstanceHandle_t())
1085
0
                {
1086
0
                    if (deadline_timer_reschedule())
1087
0
                    {
1088
0
                        deadline_timer_->cancel_timer();
1089
0
                        deadline_timer_->restart_timer();
1090
0
                    }
1091
0
                }
1092
0
            }
1093
0
        }
1094
1095
0
        if (qos_.lifespan().duration != dds::c_TimeInfinite)
1096
0
        {
1097
0
            lifespan_duration_us_ = duration<double, std::ratio<1, 1000000>>(
1098
0
                qos_.lifespan().duration.to_ns() * 1e-3);
1099
0
            lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6);
1100
0
            lifespan_timer_->restart_timer();
1101
0
        }
1102
1103
0
        return RETCODE_OK;
1104
0
    }
1105
1106
0
    return RETCODE_OUT_OF_RESOURCES;
1107
0
}
1108
1109
ReturnCode_t DataWriterImpl::create_new_change_with_params(
1110
        ChangeKind_t changeKind,
1111
        const void* const data,
1112
        WriteParams& wparams)
1113
0
{
1114
0
    ReturnCode_t ret_code = check_new_change_preconditions(changeKind, data);
1115
0
    if (RETCODE_OK != ret_code)
1116
0
    {
1117
0
        return ret_code;
1118
0
    }
1119
1120
0
    InstanceHandle_t handle;
1121
0
    if (type_->is_compute_key_provided)
1122
0
    {
1123
0
        bool is_key_protected = false;
1124
#if HAVE_SECURITY
1125
        is_key_protected = writer_->getAttributes().security_attributes().is_key_protected;
1126
#endif // if HAVE_SECURITY
1127
0
        type_->compute_key(data, handle, is_key_protected);
1128
0
    }
1129
1130
0
    return perform_create_new_change(changeKind, data, wparams, handle);
1131
0
}
1132
1133
ReturnCode_t DataWriterImpl::create_new_change_with_params(
1134
        ChangeKind_t changeKind,
1135
        const void* const data,
1136
        WriteParams& wparams,
1137
        const InstanceHandle_t& handle)
1138
0
{
1139
0
    ReturnCode_t ret_code = check_new_change_preconditions(changeKind, data);
1140
0
    if (RETCODE_OK != ret_code)
1141
0
    {
1142
0
        return ret_code;
1143
0
    }
1144
1145
0
    return perform_create_new_change(changeKind, data, wparams, handle);
1146
0
}
1147
1148
bool DataWriterImpl::remove_min_seq_change()
1149
0
{
1150
0
    return history_->removeMinChange();
1151
0
}
1152
1153
ReturnCode_t DataWriterImpl::clear_history(
1154
        size_t* removed)
1155
0
{
1156
0
    return (history_->removeAllChange(removed) ? RETCODE_OK : RETCODE_ERROR);
1157
0
}
1158
1159
ReturnCode_t DataWriterImpl::get_sending_locators(
1160
        rtps::LocatorList& locators) const
1161
0
{
1162
0
    if (nullptr == writer_)
1163
0
    {
1164
0
        return RETCODE_NOT_ENABLED;
1165
0
    }
1166
1167
0
    writer_->get_participant_impl()->get_sending_locators(locators);
1168
0
    return RETCODE_OK;
1169
0
}
1170
1171
const fastdds::rtps::GUID_t& DataWriterImpl::guid() const
1172
0
{
1173
0
    return guid_;
1174
0
}
1175
1176
InstanceHandle_t DataWriterImpl::get_instance_handle() const
1177
0
{
1178
0
    return guid();
1179
0
}
1180
1181
void DataWriterImpl::publisher_qos_updated()
1182
0
{
1183
0
    if (writer_ != nullptr)
1184
0
    {
1185
        //NOTIFY THE BUILTIN PROTOCOLS THAT THE WRITER HAS CHANGED
1186
0
        WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos());
1187
0
        publisher_->rtps_participant()->update_writer(writer_, wqos);
1188
0
    }
1189
0
}
1190
1191
ReturnCode_t DataWriterImpl::set_qos(
1192
        const DataWriterQos& qos)
1193
0
{
1194
0
    bool enabled = writer_ != nullptr;
1195
0
    const DataWriterQos& qos_to_set = (&qos == &DATAWRITER_QOS_DEFAULT) ?
1196
0
            publisher_->get_default_datawriter_qos() : qos;
1197
1198
    // Default qos is always considered consistent
1199
0
    if (&qos != &DATAWRITER_QOS_DEFAULT)
1200
0
    {
1201
0
        ReturnCode_t ret_val = check_qos_including_resource_limits(qos_to_set, type_);
1202
0
        if (RETCODE_OK != ret_val)
1203
0
        {
1204
0
            return ret_val;
1205
0
        }
1206
1207
0
        if (publisher_->get_participant()->get_qos().allocation().data_limits.max_user_data != 0 &&
1208
0
                publisher_->get_participant()->get_qos().allocation().data_limits.max_user_data <
1209
0
                qos_to_set.user_data().getValue().size())
1210
0
        {
1211
0
            return RETCODE_INCONSISTENT_POLICY;
1212
0
        }
1213
0
    }
1214
1215
0
    if (enabled && !can_qos_be_updated(qos_, qos_to_set))
1216
0
    {
1217
0
        return RETCODE_IMMUTABLE_POLICY;
1218
0
    }
1219
1220
0
    set_qos(qos_, qos_to_set, !enabled);
1221
1222
0
    if (enabled)
1223
0
    {
1224
0
        if (qos_.reliability().kind == ReliabilityQosPolicyKind::RELIABLE_RELIABILITY_QOS &&
1225
0
                qos_.reliable_writer_qos() == qos_to_set.reliable_writer_qos())
1226
0
        {
1227
            // Update times and positive_acks attributes on RTPS Layer
1228
0
            WriterAttributes w_att;
1229
0
            w_att.times = qos_.reliable_writer_qos().times;
1230
0
            w_att.disable_positive_acks = qos_.reliable_writer_qos().disable_positive_acks.enabled;
1231
0
            w_att.keep_duration = qos_.reliable_writer_qos().disable_positive_acks.duration;
1232
0
            writer_->update_attributes(w_att);
1233
0
        }
1234
1235
        //Notify the participant that a Writer has changed its QOS
1236
0
        WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos());
1237
0
        publisher_->rtps_participant()->update_writer(writer_, wqos);
1238
1239
        // Deadline
1240
0
        if (qos_.deadline().period != dds::c_TimeInfinite)
1241
0
        {
1242
0
            deadline_duration_us_ =
1243
0
                    duration<double, std::ratio<1, 1000000>>(qos_.deadline().period.to_ns() * 1e-3);
1244
0
            deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6);
1245
0
        }
1246
0
        else
1247
0
        {
1248
0
            deadline_timer_->cancel_timer();
1249
0
        }
1250
1251
        // Lifespan
1252
0
        if (qos_.lifespan().duration != dds::c_TimeInfinite)
1253
0
        {
1254
0
            lifespan_duration_us_ =
1255
0
                    duration<double, std::ratio<1, 1000000>>(qos_.lifespan().duration.to_ns() * 1e-3);
1256
0
            lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6);
1257
0
        }
1258
0
        else
1259
0
        {
1260
0
            lifespan_timer_->cancel_timer();
1261
0
        }
1262
0
    }
1263
1264
0
    return RETCODE_OK;
1265
0
}
1266
1267
const DataWriterQos& DataWriterImpl::get_qos() const
1268
0
{
1269
0
    return qos_;
1270
0
}
1271
1272
ReturnCode_t DataWriterImpl::set_listener(
1273
        DataWriterListener* listener)
1274
0
{
1275
0
    std::lock_guard<std::mutex> scoped_lock(listener_mutex_);
1276
0
    listener_ = listener;
1277
0
    return RETCODE_OK;
1278
0
}
1279
1280
const DataWriterListener* DataWriterImpl::get_listener() const
1281
0
{
1282
0
    return listener_;
1283
0
}
1284
1285
Topic* DataWriterImpl::get_topic() const
1286
0
{
1287
0
    return topic_;
1288
0
}
1289
1290
const Publisher* DataWriterImpl::get_publisher() const
1291
0
{
1292
0
    return publisher_->get_publisher();
1293
0
}
1294
1295
void DataWriterImpl::InnerDataWriterListener::on_writer_matched(
1296
        RTPSWriter* /*writer*/,
1297
        const MatchingInfo& info)
1298
0
{
1299
0
    data_writer_->update_publication_matched_status(info);
1300
1301
0
    StatusMask notify_status = StatusMask::publication_matched();
1302
0
    DataWriterListener* listener = data_writer_->get_listener_for(notify_status);
1303
0
    if (listener != nullptr)
1304
0
    {
1305
0
        PublicationMatchedStatus callback_status;
1306
0
        if (RETCODE_OK == data_writer_->get_publication_matched_status(callback_status))
1307
0
        {
1308
0
            listener->on_publication_matched(data_writer_->user_datawriter_, callback_status);
1309
0
        }
1310
0
    }
1311
0
    data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
1312
0
}
1313
1314
void DataWriterImpl::InnerDataWriterListener::on_offered_incompatible_qos(
1315
        RTPSWriter* /*writer*/,
1316
        fastdds::dds::PolicyMask qos)
1317
0
{
1318
0
    data_writer_->update_offered_incompatible_qos(qos);
1319
0
    StatusMask notify_status = StatusMask::offered_incompatible_qos();
1320
0
    DataWriterListener* listener = data_writer_->get_listener_for(notify_status);
1321
0
    if (listener != nullptr)
1322
0
    {
1323
0
        OfferedIncompatibleQosStatus callback_status;
1324
0
        if (data_writer_->get_offered_incompatible_qos_status(callback_status) == RETCODE_OK)
1325
0
        {
1326
0
            listener->on_offered_incompatible_qos(data_writer_->user_datawriter_, callback_status);
1327
0
        }
1328
0
    }
1329
1330
0
#ifdef FASTDDS_STATISTICS
1331
0
    notify_status_observer(statistics::StatusKind::INCOMPATIBLE_QOS);
1332
0
#endif //FASTDDS_STATISTICS
1333
1334
0
    data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
1335
0
}
1336
1337
void DataWriterImpl::InnerDataWriterListener::on_writer_change_received_by_all(
1338
        RTPSWriter* /*writer*/,
1339
        CacheChange_t* ch)
1340
0
{
1341
0
    if (data_writer_->type_->is_compute_key_provided &&
1342
0
            (NOT_ALIVE_UNREGISTERED == ch->kind ||
1343
0
            NOT_ALIVE_DISPOSED_UNREGISTERED == ch->kind))
1344
0
    {
1345
0
        data_writer_->history_->remove_instance_changes(ch->instanceHandle, ch->sequenceNumber);
1346
0
    }
1347
0
    else if (data_writer_->qos_.durability().kind == VOLATILE_DURABILITY_QOS)
1348
0
    {
1349
0
        data_writer_->history_->remove_change_g(ch);
1350
0
    }
1351
0
}
1352
1353
void DataWriterImpl::InnerDataWriterListener::on_liveliness_lost(
1354
        fastdds::rtps::RTPSWriter* /*writer*/,
1355
        const LivelinessLostStatus& status)
1356
0
{
1357
0
    data_writer_->update_liveliness_lost_status(status);
1358
0
    StatusMask notify_status = StatusMask::liveliness_lost();
1359
0
    DataWriterListener* listener = data_writer_->get_listener_for(notify_status);
1360
0
    if (listener != nullptr)
1361
0
    {
1362
0
        LivelinessLostStatus callback_status;
1363
0
        if (RETCODE_OK == data_writer_->get_liveliness_lost_status(callback_status))
1364
0
        {
1365
0
            listener->on_liveliness_lost(data_writer_->user_datawriter_, callback_status);
1366
0
        }
1367
0
    }
1368
1369
0
#ifdef FASTDDS_STATISTICS
1370
0
    notify_status_observer(statistics::StatusKind::LIVELINESS_LOST);
1371
0
#endif //FASTDDS_STATISTICS
1372
1373
0
    data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
1374
0
}
1375
1376
void DataWriterImpl::InnerDataWriterListener::on_reader_discovery(
1377
        fastdds::rtps::RTPSWriter* writer,
1378
        fastdds::rtps::ReaderDiscoveryStatus reason,
1379
        const fastdds::rtps::GUID_t& reader_guid,
1380
        const fastdds::rtps::SubscriptionBuiltinTopicData* reader_info)
1381
0
{
1382
0
    if (!fastdds::rtps::RTPSDomainImpl::should_intraprocess_between(writer->getGuid(), reader_guid))
1383
0
    {
1384
0
        switch (reason)
1385
0
        {
1386
0
            case fastdds::rtps::ReaderDiscoveryStatus::REMOVED_READER:
1387
0
                data_writer_->remove_reader_filter(reader_guid);
1388
0
                break;
1389
1390
0
            case fastdds::rtps::ReaderDiscoveryStatus::DISCOVERED_READER:
1391
0
            case fastdds::rtps::ReaderDiscoveryStatus::CHANGED_QOS_READER:
1392
0
                data_writer_->process_reader_filter_info(reader_guid, *reader_info);
1393
0
                break;
1394
0
            default:
1395
0
                break;
1396
0
        }
1397
0
    }
1398
0
}
1399
1400
#ifdef FASTDDS_STATISTICS
1401
void DataWriterImpl::InnerDataWriterListener::notify_status_observer(
1402
        const uint32_t& status_id)
1403
0
{
1404
0
    DomainParticipantImpl* pp_impl = data_writer_->publisher_->get_participant_impl();
1405
0
    auto statistics_pp_impl = static_cast<eprosima::fastdds::statistics::dds::DomainParticipantImpl*>(pp_impl);
1406
0
    if (nullptr != statistics_pp_impl->get_status_observer())
1407
0
    {
1408
0
        if (!statistics_pp_impl->get_status_observer()->on_local_entity_status_change(data_writer_->guid(), status_id))
1409
0
        {
1410
0
            EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set entity status");
1411
0
        }
1412
0
    }
1413
0
}
1414
1415
#endif //FASTDDS_STATISTICS
1416
1417
ReturnCode_t DataWriterImpl::wait_for_acknowledgments(
1418
        const dds::Duration_t& max_wait)
1419
0
{
1420
0
    if (writer_ == nullptr)
1421
0
    {
1422
0
        return RETCODE_NOT_ENABLED;
1423
0
    }
1424
1425
0
    if (writer_->wait_for_all_acked(max_wait))
1426
0
    {
1427
0
        return RETCODE_OK;
1428
0
    }
1429
0
    return RETCODE_ERROR;
1430
0
}
1431
1432
ReturnCode_t DataWriterImpl::wait_for_acknowledgments(
1433
        const void* const instance,
1434
        const InstanceHandle_t& handle,
1435
        const dds::Duration_t& max_wait)
1436
0
{
1437
    // Preconditions
1438
0
    InstanceHandle_t ih;
1439
0
    ReturnCode_t returned_value = check_instance_preconditions(instance, handle, ih);
1440
0
    if (RETCODE_OK != returned_value)
1441
0
    {
1442
0
        return returned_value;
1443
0
    }
1444
1445
    // Block low-level writer
1446
0
    auto max_blocking_time = steady_clock::now() +
1447
0
            microseconds(rtps::TimeConv::Time_t2MicroSecondsInt64(max_wait));
1448
1449
# if HAVE_STRICT_REALTIME
1450
    std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex(), std::defer_lock);
1451
    if (!lock.try_lock_until(max_blocking_time))
1452
    {
1453
        return RETCODE_TIMEOUT;
1454
    }
1455
#else
1456
0
    std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1457
0
#endif // HAVE_STRICT_REALTIME
1458
1459
0
    if (!history_->is_key_registered(ih))
1460
0
    {
1461
0
        return RETCODE_PRECONDITION_NOT_MET;
1462
0
    }
1463
1464
0
    if (history_->wait_for_acknowledgement_last_change(ih, lock, max_blocking_time))
1465
0
    {
1466
0
        return RETCODE_OK;
1467
0
    }
1468
1469
0
    return RETCODE_TIMEOUT;
1470
0
}
1471
1472
void DataWriterImpl::update_publication_matched_status(
1473
        const MatchingInfo& status)
1474
0
{
1475
0
    auto count_change = status.status == MATCHED_MATCHING ? 1 : -1;
1476
0
    publication_matched_status_.current_count += count_change;
1477
0
    publication_matched_status_.current_count_change += count_change;
1478
0
    if (count_change > 0)
1479
0
    {
1480
0
        publication_matched_status_.total_count += count_change;
1481
0
        publication_matched_status_.total_count_change += count_change;
1482
0
    }
1483
0
    publication_matched_status_.last_subscription_handle = status.remoteEndpointGuid;
1484
0
}
1485
1486
ReturnCode_t DataWriterImpl::get_publication_matched_status(
1487
        PublicationMatchedStatus& status)
1488
0
{
1489
0
    if (writer_ == nullptr)
1490
0
    {
1491
0
        return RETCODE_NOT_ENABLED;
1492
0
    }
1493
1494
0
    {
1495
0
        std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1496
1497
0
        status = publication_matched_status_;
1498
0
        publication_matched_status_.current_count_change = 0;
1499
0
        publication_matched_status_.total_count_change = 0;
1500
0
    }
1501
1502
0
    user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::publication_matched(), false);
1503
0
    return RETCODE_OK;
1504
0
}
1505
1506
ReturnCode_t DataWriterImpl::set_sample_prefilter(
1507
        std::shared_ptr<IContentFilter> prefilter)
1508
0
{
1509
0
    if (is_data_sharing_compatible_)
1510
0
    {
1511
0
        EPROSIMA_LOG_WARNING(DATA_WRITER,
1512
0
                "Data-sharing is enabled on this DataWriter, which is not compatible with sample prefiltering. \
1513
0
                 Ensure that transport is used for communicating with DataReaders.");
1514
0
    }
1515
1516
0
    std::lock_guard<std::mutex> lock(filters_mtx_);
1517
0
    sample_prefilter_ = prefilter;
1518
0
    return RETCODE_OK;
1519
0
}
1520
1521
ReturnCode_t DataWriterImpl::set_related_datareader(
1522
        const DataReader* related_reader)
1523
0
{
1524
0
    ReturnCode_t ret = RETCODE_ILLEGAL_OPERATION;
1525
1526
0
    if (nullptr == writer_)
1527
0
    {
1528
0
        if (nullptr != related_reader &&
1529
0
                related_reader->guid() != c_Guid_Unknown)
1530
0
        {
1531
0
            if (related_reader->guid().guidPrefix == guid_.guidPrefix)
1532
0
            {
1533
0
                related_datareader_key_ = related_reader->guid();
1534
0
                ret = RETCODE_OK;
1535
0
            }
1536
0
            else
1537
0
            {
1538
0
                ret = RETCODE_PRECONDITION_NOT_MET;
1539
0
            }
1540
0
        }
1541
0
        else
1542
0
        {
1543
0
            ret = RETCODE_BAD_PARAMETER;
1544
0
        }
1545
0
    }
1546
0
    return ret;
1547
0
}
1548
1549
bool DataWriterImpl::deadline_timer_reschedule()
1550
0
{
1551
0
    assert(qos_.deadline().period != dds::c_TimeInfinite);
1552
1553
0
    std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1554
1555
0
    steady_clock::time_point next_deadline_us;
1556
0
    if (!history_->get_next_deadline(timer_owner_, next_deadline_us))
1557
0
    {
1558
0
        EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not get the next deadline from the history");
1559
0
        return false;
1560
0
    }
1561
1562
0
    auto interval_ms = duration_cast<milliseconds>(next_deadline_us - steady_clock::now());
1563
0
    deadline_timer_->update_interval_millisec(static_cast<double>(interval_ms.count()));
1564
0
    return true;
1565
0
}
1566
1567
bool DataWriterImpl::deadline_missed()
1568
0
{
1569
0
    assert(qos_.deadline().period != dds::c_TimeInfinite);
1570
1571
0
    std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1572
1573
0
    deadline_missed_status_.total_count++;
1574
0
    deadline_missed_status_.total_count_change++;
1575
0
    deadline_missed_status_.last_instance_handle = timer_owner_;
1576
0
    StatusMask notify_status = StatusMask::offered_deadline_missed();
1577
0
    auto listener = get_listener_for(notify_status);
1578
0
    if (nullptr != listener)
1579
0
    {
1580
0
        listener->on_offered_deadline_missed(user_datawriter_, deadline_missed_status_);
1581
0
        deadline_missed_status_.total_count_change = 0;
1582
0
    }
1583
1584
0
#ifdef FASTDDS_STATISTICS
1585
0
    writer_listener_.notify_status_observer(statistics::StatusKind::DEADLINE_MISSED);
1586
0
#endif //FASTDDS_STATISTICS
1587
1588
0
    user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
1589
1590
0
    if (!history_->set_next_deadline(
1591
0
                timer_owner_,
1592
0
                steady_clock::now() + duration_cast<steady_clock::duration>(deadline_duration_us_)))
1593
0
    {
1594
0
        EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set the next deadline in the history");
1595
0
        return false;
1596
0
    }
1597
0
    return deadline_timer_reschedule();
1598
0
}
1599
1600
ReturnCode_t DataWriterImpl::get_offered_deadline_missed_status(
1601
        OfferedDeadlineMissedStatus& status)
1602
0
{
1603
0
    if (writer_ == nullptr)
1604
0
    {
1605
0
        return RETCODE_NOT_ENABLED;
1606
0
    }
1607
1608
0
    {
1609
0
        std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1610
1611
0
        status = deadline_missed_status_;
1612
0
        deadline_missed_status_.total_count_change = 0;
1613
0
    }
1614
1615
0
    user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::offered_deadline_missed(), false);
1616
0
    return RETCODE_OK;
1617
0
}
1618
1619
ReturnCode_t DataWriterImpl::get_offered_incompatible_qos_status(
1620
        OfferedIncompatibleQosStatus& status)
1621
0
{
1622
0
    if (writer_ == nullptr)
1623
0
    {
1624
0
        return RETCODE_NOT_ENABLED;
1625
0
    }
1626
1627
0
    {
1628
0
        std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1629
1630
0
        status = offered_incompatible_qos_status_;
1631
0
        offered_incompatible_qos_status_.total_count_change = 0u;
1632
0
    }
1633
1634
0
    user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::offered_incompatible_qos(), false);
1635
0
    return RETCODE_OK;
1636
0
}
1637
1638
bool DataWriterImpl::lifespan_expired()
1639
0
{
1640
0
    std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1641
1642
0
    fastdds::rtps::Time_t current_ts;
1643
0
    fastdds::rtps::Time_t::now(current_ts);
1644
1645
0
    CacheChange_t* earliest_change;
1646
0
    while (history_->get_earliest_change(&earliest_change))
1647
0
    {
1648
0
        fastdds::rtps::Time_t expiration_ts = earliest_change->sourceTimestamp + qos_.lifespan().duration;
1649
1650
        // Check that the earliest change has expired (the change which started the timer could have been removed from the history)
1651
0
        if (current_ts < expiration_ts)
1652
0
        {
1653
0
            fastdds::rtps::Time_t interval = expiration_ts - current_ts;
1654
0
            lifespan_timer_->update_interval_millisec(interval.to_ns() * 1e-6);
1655
0
            return true;
1656
0
        }
1657
1658
        // The earliest change has expired
1659
0
        history_->remove_change_pub(earliest_change);
1660
0
    }
1661
1662
0
    return false;
1663
0
}
1664
1665
ReturnCode_t DataWriterImpl::get_liveliness_lost_status(
1666
        LivelinessLostStatus& status)
1667
0
{
1668
0
    if (writer_ == nullptr)
1669
0
    {
1670
0
        return RETCODE_NOT_ENABLED;
1671
0
    }
1672
1673
0
    {
1674
0
        std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());
1675
1676
0
        status = liveliness_lost_status_;
1677
0
        liveliness_lost_status_.total_count_change = 0u;
1678
0
    }
1679
1680
0
    user_datawriter_->get_statuscondition().get_impl()->set_status(StatusMask::liveliness_lost(), false);
1681
0
    return RETCODE_OK;
1682
0
}
1683
1684
ReturnCode_t DataWriterImpl::assert_liveliness()
1685
0
{
1686
0
    if (writer_ == nullptr)
1687
0
    {
1688
0
        return RETCODE_NOT_ENABLED;
1689
0
    }
1690
1691
0
    if (!publisher_->rtps_participant()->wlp()->assert_liveliness(
1692
0
                writer_->getGuid(),
1693
0
                writer_->get_liveliness_kind(),
1694
0
                writer_->get_liveliness_lease_duration()))
1695
0
    {
1696
0
        EPROSIMA_LOG_ERROR(DATAWRITER, "Could not assert liveliness of writer " << writer_->getGuid());
1697
0
        return RETCODE_ERROR;
1698
0
    }
1699
1700
0
    if (qos_.liveliness().kind == MANUAL_BY_TOPIC_LIVELINESS_QOS)
1701
0
    {
1702
        // As described in the RTPS specification, if liveliness kind is manual a heartbeat must be sent
1703
        // This only applies to stateful writers, as stateless writers do not send heartbeats
1704
1705
0
        StatefulWriter* stateful_writer = dynamic_cast<StatefulWriter*>(writer_);
1706
1707
0
        if (stateful_writer != nullptr)
1708
0
        {
1709
0
            stateful_writer->send_periodic_heartbeat(true, true);
1710
0
        }
1711
0
    }
1712
0
    return RETCODE_OK;
1713
0
}
1714
1715
ReturnCode_t DataWriterImpl::get_publication_builtin_topic_data(
1716
        PublicationBuiltinTopicData& publication_data) const
1717
0
{
1718
0
    if (nullptr == writer_)
1719
0
    {
1720
0
        return RETCODE_NOT_ENABLED;
1721
0
    }
1722
1723
    // sanity checks
1724
0
    assert(nullptr != publisher_);
1725
0
    assert(nullptr != topic_);
1726
0
    assert(nullptr != publisher_->get_participant());
1727
0
    assert(nullptr != writer_->get_participant_impl());
1728
1729
0
    publication_data = PublicationBuiltinTopicData{};
1730
1731
0
    from_entity_id_to_topic_key(guid_.entityId, publication_data.key.value);
1732
0
    from_guid_prefix_to_topic_key(
1733
0
        publisher_->get_participant()->guid().guidPrefix, publication_data.participant_key.value);
1734
1735
0
    publication_data.topic_name = topic_->get_name();
1736
0
    publication_data.type_name = topic_->get_type_name();
1737
0
    publication_data.topic_kind = type_->is_compute_key_provided ? TopicKind_t::WITH_KEY : TopicKind_t::NO_KEY;
1738
1739
    // DataWriter qos
1740
0
    publication_data.durability = qos_.durability();
1741
0
    publication_data.durability_service = qos_.durability_service();
1742
0
    publication_data.deadline = qos_.deadline();
1743
0
    publication_data.latency_budget = qos_.latency_budget();
1744
0
    publication_data.liveliness = qos_.liveliness();
1745
0
    publication_data.reliability = qos_.reliability();
1746
0
    publication_data.lifespan = qos_.lifespan();
1747
0
    publication_data.user_data = qos_.user_data();
1748
0
    publication_data.ownership = qos_.ownership();
1749
0
    publication_data.ownership_strength = qos_.ownership_strength();
1750
0
    publication_data.destination_order = qos_.destination_order();
1751
1752
    // Publisher qos
1753
0
    publication_data.presentation = publisher_->qos_.presentation();
1754
0
    publication_data.partition = publisher_->qos_.partition();
1755
0
    publication_data.topic_data = topic_->get_qos().topic_data();
1756
0
    publication_data.group_data = publisher_->qos_.group_data();
1757
1758
    // XTypes 1.3
1759
0
    publisher_->get_participant_impl()->fill_type_information(type_, publication_data.type_information);
1760
0
    publication_data.representation = qos_.representation();
1761
1762
    // RPC over DDS
1763
0
    publication_data.related_datareader_key = related_datareader_key_;
1764
1765
    // eProsima extensions
1766
1767
0
    publication_data.disable_positive_acks = qos_.reliable_writer_qos().disable_positive_acks;
1768
0
    publication_data.data_sharing = qos_.data_sharing();
1769
1770
0
    if (publication_data.data_sharing.kind() != OFF &&
1771
0
            publication_data.data_sharing.domain_ids().empty())
1772
0
    {
1773
0
        publication_data.data_sharing.add_domain_id(utils::default_domain_id());
1774
0
    }
1775
1776
0
    publication_data.guid = guid();
1777
0
    publication_data.participant_guid = publisher_->get_participant()->guid();
1778
1779
0
    const std::string* pers_guid = PropertyPolicyHelper::find_property(qos_.properties(), "dds.persistence.guid");
1780
0
    if (pers_guid)
1781
0
    {
1782
        // Load persistence_guid from property
1783
0
        std::istringstream(pers_guid->c_str()) >> publication_data.persistence_guid;
1784
0
    }
1785
1786
0
    qos_.endpoint().unicast_locator_list.copy_to(publication_data.remote_locators.unicast);
1787
0
    qos_.endpoint().multicast_locator_list.copy_to(publication_data.remote_locators.multicast);
1788
0
    publication_data.max_serialized_size = type_->max_serialized_type_size;
1789
0
    publication_data.loopback_transformation =
1790
0
            writer_->get_participant_impl()->network_factory().network_configuration();
1791
1792
0
    if (!is_data_sharing_compatible_)
1793
0
    {
1794
0
        publication_data.data_sharing.off();
1795
0
    }
1796
1797
0
    const std::string* endpoint_partitions = PropertyPolicyHelper::find_property(qos_.properties(), "partitions");
1798
0
    if (endpoint_partitions)
1799
0
    {
1800
0
        std::istringstream partition_string(*endpoint_partitions);
1801
0
        std::string partition_name;
1802
0
        publication_data.partition.clear();
1803
1804
0
        while (std::getline(partition_string, partition_name, ';'))
1805
0
        {
1806
0
            publication_data.partition.push_back(partition_name.c_str());
1807
0
        }
1808
0
    }
1809
1810
0
    publication_data.history = qos_.history();
1811
1812
    // Optional QoS
1813
0
    publication_data.resource_limits = qos_.resource_limits();
1814
0
    publication_data.transport_priority = qos_.transport_priority();
1815
0
    publication_data.writer_data_lifecycle = qos_.writer_data_lifecycle();
1816
0
    publication_data.publish_mode = qos_.publish_mode();
1817
0
    publication_data.rtps_reliable_writer = qos_.reliable_writer_qos();
1818
0
    publication_data.endpoint = qos_.endpoint();
1819
0
    publication_data.writer_resource_limits = qos_.writer_resource_limits();
1820
1821
0
    return RETCODE_OK;
1822
0
}
1823
1824
OfferedIncompatibleQosStatus& DataWriterImpl::update_offered_incompatible_qos(
1825
        PolicyMask incompatible_policies)
1826
0
{
1827
0
    ++offered_incompatible_qos_status_.total_count;
1828
0
    ++offered_incompatible_qos_status_.total_count_change;
1829
0
    for (uint32_t id = 1; id < NEXT_QOS_POLICY_ID; ++id)
1830
0
    {
1831
0
        if (incompatible_policies.test(id))
1832
0
        {
1833
0
            ++offered_incompatible_qos_status_.policies[static_cast<QosPolicyId_t>(id)].count;
1834
0
            offered_incompatible_qos_status_.last_policy_id = static_cast<QosPolicyId_t>(id);
1835
0
        }
1836
0
    }
1837
0
    return offered_incompatible_qos_status_;
1838
0
}
1839
1840
LivelinessLostStatus& DataWriterImpl::update_liveliness_lost_status(
1841
        const LivelinessLostStatus& liveliness_lost_status)
1842
0
{
1843
0
    liveliness_lost_status_.total_count = liveliness_lost_status.total_count;
1844
0
    liveliness_lost_status_.total_count_change += liveliness_lost_status.total_count_change;
1845
0
    return liveliness_lost_status_;
1846
0
}
1847
1848
void DataWriterImpl::set_qos(
1849
        DataWriterQos& to,
1850
        const DataWriterQos& from,
1851
        bool update_immutable)
1852
0
{
1853
    // Check immutable policies
1854
0
    if (update_immutable)
1855
0
    {
1856
0
        if (!(to.durability() == from.durability()))
1857
0
        {
1858
0
            to.durability() = from.durability();
1859
0
        }
1860
1861
0
        if (!(to.durability_service() == from.durability_service()))
1862
0
        {
1863
0
            to.durability_service() = from.durability_service();
1864
0
        }
1865
1866
0
        if (!(to.liveliness() == from.liveliness()))
1867
0
        {
1868
0
            to.liveliness() = from.liveliness();
1869
0
        }
1870
1871
0
        if (!(to.reliability().kind == from.reliability().kind))
1872
0
        {
1873
0
            to.reliability().kind = from.reliability().kind;
1874
0
        }
1875
1876
0
        if (!(to.destination_order() == from.destination_order()))
1877
0
        {
1878
0
            to.destination_order() = from.destination_order();
1879
0
        }
1880
1881
0
        if (!(to.history() == from.history()))
1882
0
        {
1883
0
            to.history() = from.history();
1884
0
        }
1885
1886
0
        if (!(to.resource_limits() == from.resource_limits()))
1887
0
        {
1888
0
            to.resource_limits() = from.resource_limits();
1889
0
        }
1890
1891
0
        if (!(to.ownership() == from.ownership()))
1892
0
        {
1893
0
            to.ownership() = from.ownership();
1894
0
        }
1895
1896
0
        to.publish_mode() = from.publish_mode();
1897
1898
0
        if (!(to.representation() == from.representation()))
1899
0
        {
1900
0
            to.representation() = from.representation();
1901
0
        }
1902
1903
0
        to.properties() = from.properties();
1904
1905
0
        if (!(to.reliable_writer_qos() == from.reliable_writer_qos()))
1906
0
        {
1907
0
            RTPSReliableWriterQos& rel_to = to.reliable_writer_qos();
1908
0
            rel_to.disable_heartbeat_piggyback = from.reliable_writer_qos().disable_heartbeat_piggyback;
1909
0
            rel_to.disable_positive_acks.enabled = from.reliable_writer_qos().disable_positive_acks.enabled;
1910
0
        }
1911
1912
0
        to.endpoint() = from.endpoint();
1913
1914
0
        to.writer_resource_limits() = from.writer_resource_limits();
1915
1916
0
        to.data_sharing() = from.data_sharing();
1917
0
    }
1918
1919
0
    if (!(to.deadline() == from.deadline()))
1920
0
    {
1921
0
        to.deadline() = from.deadline();
1922
0
    }
1923
1924
0
    if (!(to.latency_budget() == from.latency_budget()))
1925
0
    {
1926
0
        to.latency_budget() = from.latency_budget();
1927
0
    }
1928
1929
0
    if (!(to.reliability().max_blocking_time == from.reliability().max_blocking_time))
1930
0
    {
1931
0
        to.reliability().max_blocking_time = from.reliability().max_blocking_time;
1932
0
    }
1933
1934
0
    if (!(to.transport_priority() == from.transport_priority()))
1935
0
    {
1936
0
        to.transport_priority() = from.transport_priority();
1937
0
    }
1938
1939
0
    if (!(to.lifespan() == from.lifespan()))
1940
0
    {
1941
0
        to.lifespan() = from.lifespan();
1942
0
    }
1943
1944
0
    if (!(to.user_data() == from.user_data()))
1945
0
    {
1946
0
        to.user_data() = from.user_data();
1947
0
    }
1948
1949
0
    if (!(to.ownership_strength() == from.ownership_strength()))
1950
0
    {
1951
0
        to.ownership_strength() = from.ownership_strength();
1952
0
    }
1953
1954
0
    if (!(to.writer_data_lifecycle() == from.writer_data_lifecycle()))
1955
0
    {
1956
0
        to.writer_data_lifecycle() = from.writer_data_lifecycle();
1957
0
    }
1958
1959
0
    if (!(to.reliable_writer_qos() == from.reliable_writer_qos()))
1960
0
    {
1961
0
        RTPSReliableWriterQos& rel_to = to.reliable_writer_qos();
1962
0
        rel_to.times = from.reliable_writer_qos().times;
1963
0
        rel_to.disable_positive_acks.duration = from.reliable_writer_qos().disable_positive_acks.duration;
1964
0
    }
1965
0
}
1966
1967
ReturnCode_t DataWriterImpl::check_qos_including_resource_limits(
1968
        const DataWriterQos& qos,
1969
        const TypeSupport& type)
1970
0
{
1971
0
    ReturnCode_t check_qos_return = check_qos(qos);
1972
0
    if (RETCODE_OK == check_qos_return &&
1973
0
            type->is_compute_key_provided)
1974
0
    {
1975
0
        check_qos_return = check_allocation_consistency(qos);
1976
0
    }
1977
0
    return check_qos_return;
1978
0
}
1979
1980
ReturnCode_t DataWriterImpl::check_qos(
1981
        const DataWriterQos& qos)
1982
0
{
1983
0
    if (qos.destination_order().kind == BY_SOURCE_TIMESTAMP_DESTINATIONORDER_QOS)
1984
0
    {
1985
0
        EPROSIMA_LOG_ERROR(RTPS_QOS_CHECK, "BY SOURCE TIMESTAMP DestinationOrder not supported");
1986
0
        return RETCODE_UNSUPPORTED;
1987
0
    }
1988
0
    if (nullptr != PropertyPolicyHelper::find_property(qos.properties(), "fastdds.unique_network_flows"))
1989
0
    {
1990
0
        EPROSIMA_LOG_ERROR(RTPS_QOS_CHECK, "Unique network flows not supported on writers");
1991
0
        return RETCODE_UNSUPPORTED;
1992
0
    }
1993
0
    bool is_pull_mode = qos_has_pull_mode_request(qos);
1994
0
    if (is_pull_mode)
1995
0
    {
1996
0
        if (BEST_EFFORT_RELIABILITY_QOS == qos.reliability().kind)
1997
0
        {
1998
0
            EPROSIMA_LOG_ERROR(RTPS_QOS_CHECK, "BEST_EFFORT incompatible with pull mode");
1999
0
            return RETCODE_INCONSISTENT_POLICY;
2000
0
        }
2001
0
        if (dds::c_TimeInfinite == qos.reliable_writer_qos().times.heartbeat_period)
2002
0
        {
2003
0
            EPROSIMA_LOG_ERROR(RTPS_QOS_CHECK, "Infinite heartbeat period incompatible with pull mode");
2004
0
            return RETCODE_INCONSISTENT_POLICY;
2005
0
        }
2006
0
    }
2007
0
    if (qos.liveliness().kind == AUTOMATIC_LIVELINESS_QOS ||
2008
0
            qos.liveliness().kind == MANUAL_BY_PARTICIPANT_LIVELINESS_QOS)
2009
0
    {
2010
0
        if (qos.liveliness().lease_duration < eprosima::fastdds::dds::c_TimeInfinite &&
2011
0
                qos.liveliness().lease_duration <= qos.liveliness().announcement_period)
2012
0
        {
2013
0
            EPROSIMA_LOG_ERROR(RTPS_QOS_CHECK, "WRITERQOS: LeaseDuration <= announcement period.");
2014
0
            return RETCODE_INCONSISTENT_POLICY;
2015
0
        }
2016
0
    }
2017
0
    if (qos.data_sharing().kind() == DataSharingKind::ON &&
2018
0
            (qos.endpoint().history_memory_policy != PREALLOCATED_MEMORY_MODE &&
2019
0
            qos.endpoint().history_memory_policy != PREALLOCATED_WITH_REALLOC_MEMORY_MODE))
2020
0
    {
2021
0
        EPROSIMA_LOG_ERROR(RTPS_QOS_CHECK, "DATA_SHARING cannot be used with memory policies other than PREALLOCATED.");
2022
0
        return RETCODE_INCONSISTENT_POLICY;
2023
0
    }
2024
0
    if (qos.history().kind == KEEP_LAST_HISTORY_QOS && qos.history().depth <= 0)
2025
0
    {
2026
0
        EPROSIMA_LOG_ERROR(RTPS_QOS_CHECK, "HISTORY DEPTH must be higher than 0 if HISTORY KIND is KEEP_LAST.");
2027
0
        return RETCODE_INCONSISTENT_POLICY;
2028
0
    }
2029
0
    if (qos.history().kind == KEEP_LAST_HISTORY_QOS && qos.history().depth > 0 &&
2030
0
            qos.resource_limits().max_samples_per_instance > 0 &&
2031
0
            qos.history().depth > qos.resource_limits().max_samples_per_instance)
2032
0
    {
2033
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
2034
0
                "HISTORY DEPTH '" << qos.history().depth <<
2035
0
                "' is inconsistent with max_samples_per_instance: '" << qos.resource_limits().max_samples_per_instance <<
2036
0
                "'. Consistency rule: depth <= max_samples_per_instance." <<
2037
0
                " Effectively using max_samples_per_instance as depth.");
2038
0
    }
2039
0
    return RETCODE_OK;
2040
0
}
2041
2042
ReturnCode_t DataWriterImpl::check_allocation_consistency(
2043
        const DataWriterQos& qos)
2044
0
{
2045
0
    if ((qos.resource_limits().max_samples > 0) &&
2046
0
            (qos.resource_limits().max_samples <
2047
0
            (qos.resource_limits().max_instances * qos.resource_limits().max_samples_per_instance)))
2048
0
    {
2049
0
        EPROSIMA_LOG_ERROR(DDS_QOS_CHECK,
2050
0
                "max_samples should be greater than max_instances * max_samples_per_instance");
2051
0
        return RETCODE_INCONSISTENT_POLICY;
2052
0
    }
2053
0
    if ((qos.resource_limits().max_instances <= 0 || qos.resource_limits().max_samples_per_instance <= 0) &&
2054
0
            (qos.resource_limits().max_samples > 0))
2055
0
    {
2056
0
        EPROSIMA_LOG_ERROR(DDS_QOS_CHECK,
2057
0
                "max_samples should be infinite when max_instances or max_samples_per_instance are infinite");
2058
0
        return RETCODE_INCONSISTENT_POLICY;
2059
0
    }
2060
0
    return RETCODE_OK;
2061
0
}
2062
2063
bool DataWriterImpl::can_qos_be_updated(
2064
        const DataWriterQos& to,
2065
        const DataWriterQos& from)
2066
0
{
2067
0
    bool updatable = true;
2068
0
    if (to.durability().kind != from.durability().kind)
2069
0
    {
2070
0
        updatable = false;
2071
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK, "Durability kind cannot be changed after the creation of a DataWriter.");
2072
0
    }
2073
2074
0
    if (to.liveliness().kind !=  from.liveliness().kind)
2075
0
    {
2076
0
        updatable = false;
2077
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK, "Liveliness Kind cannot be changed after the creation of a DataWriter.");
2078
0
    }
2079
2080
0
    if (to.liveliness().lease_duration != from.liveliness().lease_duration)
2081
0
    {
2082
0
        updatable = false;
2083
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
2084
0
                "Liveliness lease duration cannot be changed after the creation of a DataWriter.");
2085
0
    }
2086
2087
0
    if (to.liveliness().announcement_period != from.liveliness().announcement_period)
2088
0
    {
2089
0
        updatable = false;
2090
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
2091
0
                "Liveliness announcement cannot be changed after the creation of a DataWriter.");
2092
0
    }
2093
2094
0
    if (to.reliability().kind != from.reliability().kind)
2095
0
    {
2096
0
        updatable = false;
2097
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK, "Reliability Kind cannot be changed after the creation of a DataWriter.");
2098
0
    }
2099
0
    if (to.ownership().kind != from.ownership().kind)
2100
0
    {
2101
0
        updatable = false;
2102
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK, "Ownership Kind cannot be changed after the creation of a DataWriter.");
2103
0
    }
2104
0
    if (to.destination_order().kind != from.destination_order().kind)
2105
0
    {
2106
0
        updatable = false;
2107
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
2108
0
                "Destination order Kind cannot be changed after the creation of a DataWriter.");
2109
0
    }
2110
0
    if (to.data_sharing().kind() != from.data_sharing().kind())
2111
0
    {
2112
0
        updatable = false;
2113
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
2114
0
                "Data sharing configuration cannot be changed after the creation of a DataWriter.");
2115
0
    }
2116
0
    if (to.data_sharing().shm_directory() != from.data_sharing().shm_directory())
2117
0
    {
2118
0
        updatable = false;
2119
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
2120
0
                "Data sharing configuration cannot be changed after the creation of a DataWriter.");
2121
0
    }
2122
0
    if (to.data_sharing().domain_ids() != from.data_sharing().domain_ids())
2123
0
    {
2124
0
        updatable = false;
2125
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
2126
0
                "Data sharing configuration cannot be changed after the creation of a DataWriter.");
2127
0
    }
2128
0
    if (to.reliable_writer_qos().disable_positive_acks.enabled !=
2129
0
            from.reliable_writer_qos().disable_positive_acks.enabled)
2130
0
    {
2131
0
        updatable = false;
2132
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK,
2133
0
                "Only the period of Positive ACKs can be changed after the creation of a DataWriter.");
2134
0
    }
2135
0
    if (to.properties() != from.properties())
2136
0
    {
2137
0
        updatable = false;
2138
0
        EPROSIMA_LOG_WARNING(RTPS_QOS_CHECK, "PropertyPolicyQos cannot be changed after the DataWriter is enabled.");
2139
0
    }
2140
2141
0
    return updatable;
2142
0
}
2143
2144
DataWriterListener* DataWriterImpl::get_listener_for(
2145
        const StatusMask& status)
2146
0
{
2147
0
    std::lock_guard<std::mutex> scoped_lock(listener_mutex_);
2148
0
    if (listener_ != nullptr &&
2149
0
            user_datawriter_->get_status_mask().is_active(status))
2150
0
    {
2151
0
        return listener_;
2152
0
    }
2153
0
    return publisher_->get_listener_for(status);
2154
0
}
2155
2156
std::shared_ptr<IChangePool> DataWriterImpl::get_change_pool() const
2157
0
{
2158
0
    if (reader_filters_)
2159
0
    {
2160
0
        return std::make_shared<DataWriterFilteredChangePool>(
2161
0
            pool_config_, qos_.writer_resource_limits().reader_filters_allocation);
2162
0
    }
2163
2164
0
    return std::make_shared<fastdds::rtps::CacheChangePool>(pool_config_);
2165
0
}
2166
2167
std::shared_ptr<IPayloadPool> DataWriterImpl::get_payload_pool()
2168
0
{
2169
0
    if (!payload_pool_)
2170
0
    {
2171
        // Avoid calling the serialization size functors on PREALLOCATED mode
2172
0
        fixed_payload_size_ =
2173
0
                pool_config_.memory_policy == PREALLOCATED_MEMORY_MODE ? pool_config_.payload_initial_size : 0u;
2174
2175
        // Get payload pool reference and allocate space for our history
2176
0
        if (is_data_sharing_compatible_)
2177
0
        {
2178
0
            payload_pool_ = DataSharingPayloadPool::get_writer_pool(pool_config_);
2179
0
        }
2180
0
        else
2181
0
        {
2182
0
            payload_pool_ = TopicPayloadPoolRegistry::get(topic_->get_name(), pool_config_);
2183
0
            if (!std::static_pointer_cast<ITopicPayloadPool>(payload_pool_)->reserve_history(pool_config_, false))
2184
0
            {
2185
0
                payload_pool_.reset();
2186
0
            }
2187
0
        }
2188
2189
        // Prepare loans collection for plain types only
2190
0
        if (type_->is_plain(data_representation_))
2191
0
        {
2192
0
            loans_.reset(new LoanCollection(pool_config_));
2193
0
        }
2194
0
    }
2195
2196
0
    return payload_pool_;
2197
0
}
2198
2199
bool DataWriterImpl::release_payload_pool()
2200
0
{
2201
0
    assert(payload_pool_);
2202
2203
0
    loans_.reset();
2204
2205
0
    bool result = true;
2206
2207
0
    if (is_data_sharing_compatible_ || is_custom_payload_pool_)
2208
0
    {
2209
        // No-op
2210
0
    }
2211
0
    else
2212
0
    {
2213
0
        auto topic_pool = std::static_pointer_cast<ITopicPayloadPool>(payload_pool_);
2214
0
        result = topic_pool->release_history(pool_config_, false);
2215
0
    }
2216
2217
0
    payload_pool_.reset();
2218
2219
0
    return result;
2220
0
}
2221
2222
bool DataWriterImpl::get_free_payload_from_pool(
2223
        uint32_t size,
2224
        SerializedPayload_t& payload)
2225
0
{
2226
0
    if (!payload_pool_)
2227
0
    {
2228
0
        return false;
2229
0
    }
2230
2231
0
    if (!payload_pool_->get_payload(size, payload))
2232
0
    {
2233
0
        return false;
2234
0
    }
2235
2236
0
    return true;
2237
0
}
2238
2239
bool DataWriterImpl::add_loan(
2240
        const void* const data,
2241
        SerializedPayload_t& payload)
2242
0
{
2243
0
    return loans_ && loans_->add_loan(data, payload);
2244
0
}
2245
2246
bool DataWriterImpl::check_and_remove_loan(
2247
        const void* const data,
2248
        SerializedPayload_t& payload)
2249
0
{
2250
0
    return loans_ && loans_->check_and_remove_loan(data, payload);
2251
0
}
2252
2253
ReturnCode_t DataWriterImpl::check_datasharing_compatible(
2254
        const WriterAttributes& writer_attributes,
2255
        bool& is_datasharing_compatible) const
2256
0
{
2257
2258
#if HAVE_SECURITY
2259
    bool has_security_enabled = publisher_->rtps_participant()->is_security_enabled_for_writer(writer_attributes);
2260
#else
2261
0
    (void) writer_attributes;
2262
0
#endif // HAVE_SECURITY
2263
2264
0
    bool has_bound_payload_size =
2265
0
            (qos_.endpoint().history_memory_policy == eprosima::fastdds::rtps::PREALLOCATED_MEMORY_MODE ||
2266
0
            qos_.endpoint().history_memory_policy == eprosima::fastdds::rtps::PREALLOCATED_WITH_REALLOC_MEMORY_MODE) &&
2267
0
            type_.is_bounded();
2268
2269
0
    bool has_key = type_->is_compute_key_provided;
2270
2271
0
    is_datasharing_compatible = false;
2272
0
    switch (qos_.data_sharing().kind())
2273
0
    {
2274
0
        case DataSharingKind::OFF:
2275
0
            return RETCODE_OK;
2276
0
            break;
2277
0
        case DataSharingKind::ON:
2278
0
            if (is_custom_payload_pool_)
2279
0
            {
2280
0
                EPROSIMA_LOG_ERROR(DATA_WRITER, "Custom payload pool detected. Cannot force Data sharing usage.");
2281
0
                return RETCODE_INCONSISTENT_POLICY;
2282
0
            }
2283
#if HAVE_SECURITY
2284
            if (has_security_enabled)
2285
            {
2286
                EPROSIMA_LOG_ERROR(DATA_WRITER, "Data sharing cannot be used with security protection.");
2287
                return RETCODE_NOT_ALLOWED_BY_SECURITY;
2288
            }
2289
#endif // HAVE_SECURITY
2290
2291
0
            if (!has_bound_payload_size)
2292
0
            {
2293
0
                EPROSIMA_LOG_ERROR(DATA_WRITER, "Data sharing cannot be used with " <<
2294
0
                        (type_.is_bounded() ? "memory policies other than PREALLOCATED" : "unbounded data types"));
2295
0
                return RETCODE_BAD_PARAMETER;
2296
0
            }
2297
2298
0
            if (has_key)
2299
0
            {
2300
0
                EPROSIMA_LOG_ERROR(DATA_WRITER, "Data sharing cannot be used with keyed data types");
2301
0
                return RETCODE_BAD_PARAMETER;
2302
0
            }
2303
2304
0
            is_datasharing_compatible = true;
2305
0
            return RETCODE_OK;
2306
0
            break;
2307
0
        case DataSharingKind::AUTO:
2308
0
            if (is_custom_payload_pool_)
2309
0
            {
2310
0
                EPROSIMA_LOG_INFO(DATA_WRITER, "Custom payload pool detected. Data Sharing disabled.");
2311
0
                return RETCODE_OK;
2312
0
            }
2313
#if HAVE_SECURITY
2314
            if (has_security_enabled)
2315
            {
2316
                EPROSIMA_LOG_INFO(DATA_WRITER, "Data sharing disabled due to security configuration.");
2317
                return RETCODE_OK;
2318
            }
2319
#endif // HAVE_SECURITY
2320
2321
0
            if (!has_bound_payload_size)
2322
0
            {
2323
0
                EPROSIMA_LOG_INFO(DATA_WRITER, "Data sharing disabled because " <<
2324
0
                        (type_.is_bounded() ? "memory policy is not PREALLOCATED" : "data type is not bounded"));
2325
0
                return RETCODE_OK;
2326
0
            }
2327
2328
0
            if (has_key)
2329
0
            {
2330
0
                EPROSIMA_LOG_INFO(DATA_WRITER, "Data sharing disabled because data type is keyed");
2331
0
                return RETCODE_OK;
2332
0
            }
2333
2334
0
            is_datasharing_compatible = true;
2335
0
            return RETCODE_OK;
2336
0
            break;
2337
0
        default:
2338
0
            EPROSIMA_LOG_ERROR(DATA_WRITER, "Unknown data sharing kind.");
2339
0
            return RETCODE_BAD_PARAMETER;
2340
0
    }
2341
0
}
2342
2343
void DataWriterImpl::remove_reader_filter(
2344
        const fastdds::rtps::GUID_t& reader_guid)
2345
0
{
2346
0
    if (reader_filters_)
2347
0
    {
2348
0
        assert(writer_);
2349
0
        std::lock_guard<RecursiveTimedMutex> guard(writer_->getMutex());
2350
0
        reader_filters_->remove_reader(reader_guid);
2351
0
    }
2352
0
}
2353
2354
void DataWriterImpl::process_reader_filter_info(
2355
        const fastdds::rtps::GUID_t& reader_guid,
2356
        const fastdds::rtps::SubscriptionBuiltinTopicData& reader_info)
2357
0
{
2358
0
    if (reader_filters_ &&
2359
0
            !writer_->is_datasharing_compatible_with(reader_info.data_sharing) &&
2360
0
            reader_info.remote_locators.multicast.empty())
2361
0
    {
2362
0
        reader_filters_->process_reader_filter_info(reader_guid, reader_info.content_filter,
2363
0
                publisher_->get_participant_impl(), topic_);
2364
0
    }
2365
0
}
2366
2367
void DataWriterImpl::filter_is_being_removed(
2368
        const char* filter_class_name)
2369
0
{
2370
0
    if (reader_filters_)
2371
0
    {
2372
0
        assert(writer_);
2373
0
        std::lock_guard<RecursiveTimedMutex> guard(writer_->getMutex());
2374
0
        reader_filters_->remove_filters(filter_class_name);
2375
0
    }
2376
0
}
2377
2378
ReturnCode_t DataWriterImpl::get_matched_subscription_data(
2379
        SubscriptionBuiltinTopicData& subscription_data,
2380
        const InstanceHandle_t& subscription_handle) const
2381
0
{
2382
0
    ReturnCode_t ret = RETCODE_BAD_PARAMETER;
2383
0
    fastdds::rtps::GUID_t reader_guid = iHandle2GUID(subscription_handle);
2384
2385
0
    if (writer_ && writer_->matched_reader_is_matched(reader_guid))
2386
0
    {
2387
0
        if (publisher_)
2388
0
        {
2389
0
            RTPSParticipant* rtps_participant = publisher_->rtps_participant();
2390
0
            if (rtps_participant &&
2391
0
                    rtps_participant->get_subscription_info(subscription_data, reader_guid))
2392
0
            {
2393
0
                ret = RETCODE_OK;
2394
0
            }
2395
0
        }
2396
0
    }
2397
2398
0
    return ret;
2399
0
}
2400
2401
ReturnCode_t DataWriterImpl::get_matched_subscriptions(
2402
        std::vector<InstanceHandle_t>& subscription_handles) const
2403
0
{
2404
0
    ReturnCode_t ret = RETCODE_ERROR;
2405
0
    std::vector<rtps::GUID_t> matched_reader_guids;
2406
0
    subscription_handles.clear();
2407
2408
0
    if (writer_ && writer_->matched_readers_guids(matched_reader_guids))
2409
0
    {
2410
0
        for (const rtps::GUID_t& guid : matched_reader_guids)
2411
0
        {
2412
0
            subscription_handles.emplace_back(InstanceHandle_t(guid));
2413
0
        }
2414
0
        ret = RETCODE_OK;
2415
0
    }
2416
2417
0
    return ret;
2418
0
}
2419
2420
bool DataWriterImpl::is_relevant(
2421
        const fastdds::rtps::CacheChange_t& change,
2422
        const fastdds::rtps::GUID_t& reader_guid) const
2423
0
{
2424
0
    bool is_relevant_for_reader = true;
2425
0
    std::lock_guard<std::mutex> lock(filters_mtx_);
2426
2427
0
    if (sample_prefilter_)
2428
0
    {
2429
0
        IContentFilter::FilterSampleInfo filter_sample_info(change.write_params);
2430
0
        is_relevant_for_reader = sample_prefilter_->evaluate(change.serializedPayload,
2431
0
                        filter_sample_info,
2432
0
                        reader_guid);
2433
0
    }
2434
2435
0
    if (is_relevant_for_reader && reader_filters_)
2436
0
    {
2437
0
        const DataWriterFilteredChange& writer_change = static_cast<const DataWriterFilteredChange&>(change);
2438
0
        is_relevant_for_reader = writer_change.is_relevant_for(reader_guid);
2439
0
    }
2440
2441
0
    return is_relevant_for_reader;
2442
0
}
2443
2444
} // namespace dds
2445
} // namespace fastdds
2446
} // namespace eprosima