Coverage Report

Created: 2026-05-04 06:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/Fast-DDS/src/cpp/rtps/writer/BaseWriter.hpp
Line
Count
Source
1
// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file BaseWriter.hpp
17
 */
18
19
#ifndef RTPS_WRITER__BASEWRITER_HPP
20
#define RTPS_WRITER__BASEWRITER_HPP
21
22
#include <atomic>
23
#include <chrono>
24
#include <cstdint>
25
#include <memory>
26
#include <mutex>
27
#include <vector>
28
29
#include <fastdds/fastdds_dll.hpp>
30
#include <fastdds/dds/core/policy/QosPolicies.hpp>
31
#include <fastdds/dds/core/status/BaseStatus.hpp>
32
#include <fastdds/rtps/Endpoint.hpp>
33
#include <fastdds/rtps/common/FragmentNumber.hpp>
34
#include <fastdds/rtps/common/SequenceNumber.hpp>
35
#include <fastdds/rtps/common/VendorId_t.hpp>
36
#include <fastdds/rtps/common/Time_t.hpp>
37
#include <fastdds/rtps/transport/NetworkBuffer.hpp>
38
#include <fastdds/rtps/writer/RTPSWriter.hpp>
39
#include <fastdds/statistics/IListeners.hpp>
40
#include <fastdds/statistics/rtps/StatisticsCommon.hpp>
41
#include <fastdds/statistics/rtps/monitor_service/connections_fwd.hpp>
42
#include <fastdds/utils/TimedMutex.hpp>
43
44
#include <rtps/builtin/data/ReaderProxyData.hpp>
45
#include <rtps/writer/DeliveryRetCode.hpp>
46
#include <rtps/writer/LocatorSelectorSender.hpp>
47
48
namespace eprosima {
49
namespace fastdds {
50
namespace rtps {
51
52
struct CacheChange_t;
53
class DataSharingNotifier;
54
class FlowController;
55
struct GUID_t;
56
class ICacheChangePool;
57
class IPayloadPool;
58
class RTPSMessageGroup;
59
class RTPSParticipantImpl;
60
class WriterAttributes;
61
class WriterHistory;
62
class WriterListener;
63
64
class BaseWriter
65
    : public fastdds::rtps::RTPSWriter
66
    , public fastdds::statistics::StatisticsWriterImpl
67
{
68
69
public:
70
71
    //vvvvvvvvvvvvvvvvvvvvv [Exported API] vvvvvvvvvvvvvvvvvvvvv
72
73
    bool matched_reader_add(
74
            const SubscriptionBuiltinTopicData& rqos) final;
75
76
    WriterListener* get_listener() const final;
77
78
    bool set_listener(
79
            WriterListener* listener) final;
80
81
    bool is_async() const final;
82
83
    int32_t get_transport_priority() const final;
84
85
    void update_attributes(
86
            const WriterAttributes& att) override;
87
88
    virtual void local_actions_on_writer_removed();
89
90
#ifdef FASTDDS_STATISTICS
91
92
    bool add_statistics_listener(
93
            std::shared_ptr<fastdds::statistics::IListener> listener) final;
94
95
    bool remove_statistics_listener(
96
            std::shared_ptr<fastdds::statistics::IListener> listener) final;
97
98
    void set_enabled_statistics_writers_mask(
99
            uint32_t enabled_writers) final;
100
101
#endif // FASTDDS_STATISTICS
102
103
    //^^^^^^^^^^^^^^^^^^^^^^^ [Exported API] ^^^^^^^^^^^^^^^^^^^^^^^
104
105
    //vvvvvvvvvvvvvvvvvvvv [Implementation API] vvvvvvvvvvvvvvvvvvvv
106
107
    /**
108
     * Add a matched reader.
109
     * @param data Pointer to the ReaderProxyData object added.
110
     * @return True if added.
111
     */
112
    virtual bool matched_reader_add_edp(
113
            const ReaderProxyData& data) = 0;
114
115
    /**
116
     * Add a change to the unsent list.
117
     * @param change Pointer to the change to add.
118
     * @param [in] max_blocking_time Maximum time this method has to complete the task.
119
     */
120
    virtual void unsent_change_added_to_history(
121
            CacheChange_t* change,
122
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0;
123
124
    /**
125
     * Indicate the writer that a change has been removed by the history due to some HistoryQos requirement.
126
     * @param a_change Pointer to the change that is going to be removed.
127
     * @param [in] max_blocking_time Maximum time this method has to complete the task.
128
     * @return True if removed correctly.
129
     */
130
    virtual bool change_removed_by_history(
131
            CacheChange_t* a_change,
132
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0;
133
134
    /**
135
     * Tells writer the sample can be sent to the network.
136
     * This function should be used by a fastdds::rtps::FlowController.
137
     *
138
     * @param cache_change Pointer to the CacheChange_t that represents the sample which can be sent.
139
     * @param group RTPSMessageGroup reference uses for generating the RTPS message.
140
     * @param locator_selector RTPSMessageSenderInterface reference uses for selecting locators. The reference has to
141
     * be a member of this RTPSWriter object.
142
     * @param max_blocking_time Future timepoint where blocking send should end.
143
     * @return Return code.
144
     * @note Must be non-thread safe.
145
     */
146
    virtual DeliveryRetCode deliver_sample_nts(
147
            CacheChange_t* cache_change,
148
            RTPSMessageGroup& group,
149
            LocatorSelectorSender& locator_selector,
150
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) = 0;
151
152
    /**
153
     * @brief Get the general locator selector.
154
     *
155
     * @return Reference to the general locator selector.
156
     */
157
    virtual LocatorSelectorSender& get_general_locator_selector() = 0;
158
159
    /**
160
     * @brief Get the async locator selector.
161
     *
162
     * @return Reference to the async locator selector.
163
     */
164
    virtual LocatorSelectorSender& get_async_locator_selector() = 0;
165
166
    /**
167
     * Send a message through this interface.
168
     *
169
     * @param buffers Vector of NetworkBuffers to send with data already serialized.
170
     * @param total_bytes Total number of bytes to send. Should be equal to the sum of the @c size field of all buffers.
171
     * @param locator_selector RTPSMessageSenderInterface reference uses for selecting locators. The reference has to
172
     * be a member of this RTPSWriter object.
173
     * @param max_blocking_time_point Future timepoint where blocking send should end.
174
     */
175
    virtual bool send_nts(
176
            const std::vector<eprosima::fastdds::rtps::NetworkBuffer>& buffers,
177
            const uint32_t& total_bytes,
178
            const LocatorSelectorSender& locator_selector,
179
            std::chrono::steady_clock::time_point& max_blocking_time_point) const;
180
181
    /**
182
     * Process an incoming ACKNACK submessage.
183
     * @param [in] writer_guid      GUID of the writer the submessage is directed to.
184
     * @param [in] reader_guid      GUID of the reader originating the submessage.
185
     * @param [in] ack_count        Count field of the submessage.
186
     * @param [in] sn_set           Sequence number bitmap field of the submessage.
187
     * @param [in] final_flag       Final flag field of the submessage.
188
     * @param [out] result          true if the writer could process the submessage.
189
     *                             Only valid when returned value is true.
190
     * @param [in] origin_vendor_id VendorId of the source participant from which the message was received
191
     * @return true when the submessage was destinated to this writer, false otherwise.
192
     */
193
    virtual bool process_acknack(
194
            const GUID_t& writer_guid,
195
            const GUID_t& reader_guid,
196
            uint32_t ack_count,
197
            const SequenceNumberSet_t& sn_set,
198
            bool final_flag,
199
            bool& result,
200
            fastdds::rtps::VendorId_t origin_vendor_id) = 0;
201
202
    /**
203
     * Process an incoming NACKFRAG submessage.
204
     * @param [in] writer_guid      GUID of the writer the submessage is directed to.
205
     * @param [in] reader_guid      GUID of the reader originating the submessage.
206
     * @param [in] ack_count        Count field of the submessage.
207
     * @param [in] seq_num          Sequence number field of the submessage.
208
     * @param [in] fragments_state  Fragment number bitmap field of the submessage.
209
     * @param [out] result          true if the writer could process the submessage.
210
     *                             Only valid when returned value is true.
211
     * @param [in] origin_vendor_id VendorId of the source participant from which the message was received
212
     * @return true when the submessage was destinated to this writer, false otherwise.
213
     */
214
    virtual bool process_nack_frag(
215
            const GUID_t& writer_guid,
216
            const GUID_t& reader_guid,
217
            uint32_t ack_count,
218
            const SequenceNumber_t& seq_num,
219
            const FragmentNumberSet_t& fragments_state,
220
            bool& result,
221
            fastdds::rtps::VendorId_t origin_vendor_id) = 0;
222
223
    /**
224
     * Tries to remove a change waiting a maximum of the provided microseconds.
225
     * @param max_blocking_time_point Maximum time to wait for.
226
     * @param lock Lock of the Change list.
227
     * @return at least one change has been removed
228
     */
229
    virtual bool try_remove_change(
230
            const std::chrono::steady_clock::time_point& max_blocking_time_point,
231
            std::unique_lock<RecursiveTimedMutex>& lock) = 0;
232
233
    /**
234
     * Waits till a change has been acknowledged.
235
     * @param seq Sequence number to wait for acknowledgement.
236
     * @param max_blocking_time_point Maximum time to wait for.
237
     * @param lock Lock of the Change list.
238
     * @return true when change was acknowledged, false when timeout is reached.
239
     */
240
    virtual bool wait_for_acknowledgement(
241
            const SequenceNumber_t& seq,
242
            const std::chrono::steady_clock::time_point& max_blocking_time_point,
243
            std::unique_lock<RecursiveTimedMutex>& lock) = 0;
244
245
    //^^^^^^^^^^^^^^^^^^^^ [Implementation API] ^^^^^^^^^^^^^^^^^^^^
246
247
    /**
248
     * @brief Get the WriterHistory associated with this writer.
249
     *
250
     * @return pointer to the WriterHistory associated with this writer.
251
     */
252
    inline WriterHistory* get_history() const
253
0
    {
254
0
        return history_;
255
0
    }
256
257
    /**
258
     * @brief Get biggest output payload size allowed by this writer.
259
     *
260
     * @return Maximum number of bytes allowed for the payload.
261
     */
262
    uint32_t get_max_allowed_payload_size();
263
264
    /**
265
     * @brief Get the RTPS participant that this writer belongs to.
266
     *
267
     * @return pointer to the RTPSParticipantImpl object that created this writer.
268
     */
269
    inline RTPSParticipantImpl* get_participant_impl() const
270
0
    {
271
0
        return mp_RTPSParticipant;
272
0
    }
273
274
    /**
275
     * @brief Inform if data is sent to readers separately.
276
     *
277
     * @return true if separate sending is enabled
278
     */
279
    inline bool get_separate_sending() const
280
0
    {
281
0
        return separate_sending_enabled_;
282
0
    }
283
284
    /**
285
     * @brief A method to retrieve the liveliness kind
286
     *
287
     * @return Liveliness kind
288
     */
289
    const dds::LivelinessQosPolicyKind& get_liveliness_kind() const;
290
291
    /**
292
     * @brief A method to retrieve the liveliness lease duration
293
     *
294
     * @return Lease duration
295
     */
296
    const dds::Duration_t& get_liveliness_lease_duration() const;
297
298
    /**
299
     * @brief A method to return the liveliness announcement period
300
     *
301
     * @return The announcement period
302
     */
303
    const dds::Duration_t& get_liveliness_announcement_period() const;
304
305
    /**
306
     * @brief Notify the writer that it has lost liveliness
307
     */
308
    void liveliness_lost();
309
310
    /**
311
     * @return Whether the writer is data sharing compatible or not
312
     */
313
    bool is_datasharing_compatible() const;
314
315
    bool is_datasharing_compatible_with(
316
            const dds::DataSharingQosPolicy& qos) const;
317
318
    /**
319
     * Get Min Seq Num in History.
320
     * @return Minimum sequence number in history
321
     */
322
    SequenceNumber_t get_seq_num_min();
323
324
    /**
325
     * Get Max Seq Num in History.
326
     * @return Maximum sequence number in history
327
     */
328
    SequenceNumber_t get_seq_num_max();
329
330
    /**
331
     * @brief Get a pointer to a BaseWriter object from a RTPSWriter pointer.
332
     *
333
     * @param writer  Pointer to the RTPSWriter object.
334
     *
335
     * @return Pointer to the BaseWriter object.
336
     */
337
    static BaseWriter* downcast(
338
            RTPSWriter* writer);
339
340
    /**
341
     * @brief Get a pointer to a BaseWriter object from a Endpoint pointer.
342
     *
343
     * @param endpoint  Pointer to the Endpoint object.
344
     *
345
     * @return Pointer to the BaseWriter object.
346
     */
347
    static BaseWriter* downcast(
348
            Endpoint* endpoint);
349
350
    virtual ~BaseWriter();
351
352
protected:
353
354
    BaseWriter(
355
            RTPSParticipantImpl* impl,
356
            const GUID_t& guid,
357
            const WriterAttributes& att,
358
            FlowController* flow_controller,
359
            WriterHistory* hist,
360
            WriterListener* listen = nullptr);
361
362
    void init(
363
            const WriterAttributes& att);
364
365
    void add_guid(
366
            LocatorSelectorSender& locator_selector,
367
            const GUID_t& remote_guid);
368
369
    void compute_selected_guids(
370
            LocatorSelectorSender& locator_selector);
371
372
    void update_cached_info_nts(
373
            LocatorSelectorSender& locator_selector);
374
375
    void add_statistics_sent_submessage(
376
            CacheChange_t* change,
377
            size_t num_locators);
378
379
    /// Liveliness lost status of this writer
380
    LivelinessLostStatus liveliness_lost_status_;
381
    /// Is the data sent directly or announced by HB and THEN sent to the ones who ask for it?.
382
    bool push_mode_ = true;
383
384
    /// Flow controller.
385
    FlowController* flow_controller_;
386
    /// Maximum number of bytes allowed for an RTPS datagram generated by this writer.
387
    uint32_t max_output_message_size_ = std::numeric_limits<uint32_t>::max();
388
389
    /// WriterHistory
390
    WriterHistory* history_ = nullptr;
391
    /// Listener
392
    WriterListener* listener_ = nullptr;
393
    /// Asynchronous publication activated
394
    bool is_async_ = false;
395
    /// Separate sending activated
396
    bool separate_sending_enabled_ = false;
397
398
    /// The liveliness kind of this writer
399
    dds::LivelinessQosPolicyKind liveliness_kind_;
400
    /// The liveliness lease duration of this writer
401
    dds::Duration_t liveliness_lease_duration_;
402
    /// The liveliness announcement period
403
    dds::Duration_t liveliness_announcement_period_;
404
    /// The transport priority of this writer
405
    std::atomic<int32_t> transport_priority_;
406
407
private:
408
409
    /**
410
     * @brief Calculate the maximum payload size that can be sent in a single datagram.
411
     *
412
     * @param datagram_length Length of the datagram.
413
     *
414
     * @return Maximum payload size.
415
     */
416
    uint32_t calculate_max_payload_size(
417
            uint32_t datagram_length);
418
419
};
420
421
} // namespace rtps
422
} // namespace fastdds
423
} // namespace eprosima
424
425
#endif  // RTPS_WRITER__BASEWRITER_HPP