Coverage Report

Created: 2026-04-01 06:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/Fast-DDS/src/cpp/fastdds/publisher/DataWriterHistory.hpp
Line
Count
Source
1
// Copyright 2022 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file DataWriterHistory.hpp
17
 */
18
19
#ifndef FASTDDS_PUBLISHER__DATAWRITERHISTORY_HPP
20
#define FASTDDS_PUBLISHER__DATAWRITERHISTORY_HPP
21
22
#include <chrono>
23
#include <mutex>
24
25
#include <fastdds/dds/core/policy/QosPolicies.hpp>
26
#include <fastdds/rtps/attributes/ResourceManagement.hpp>
27
#include <fastdds/rtps/common/InstanceHandle.hpp>
28
#include <fastdds/rtps/common/Time_t.hpp>
29
#include <fastdds/rtps/history/IChangePool.hpp>
30
#include <fastdds/rtps/history/IPayloadPool.hpp>
31
#include <fastdds/rtps/history/WriterHistory.hpp>
32
33
#include <fastdds/publisher/history/DataWriterInstance.hpp>
34
35
namespace eprosima {
36
namespace fastdds {
37
namespace dds {
38
39
/**
40
 * Class DataWriterHistory, implementing a WriterHistory with support for keyed topics and HistoryQOS.
41
 * This class is created by the PublisherImpl and should not be used by the user directly.
42
 * @ingroup FASTDDS_MODULE
43
 */
44
class DataWriterHistory : public rtps::WriterHistory
45
{
46
47
public:
48
49
    static rtps::HistoryAttributes to_history_attributes(
50
            const HistoryQosPolicy& history_qos,
51
            const ResourceLimitsQosPolicy& resource_limits_qos,
52
            const rtps::TopicKind_t& topic_kind,
53
            uint32_t payloadMaxSize,
54
            rtps::MemoryManagementPolicy_t mempolicy);
55
56
    /**
57
     * Constructor of the DataWriterHistory.
58
     *
59
     * @param payload_pool                 Pool to use for allocation of payloads.
60
     * @param change_pool                  Pool to use for allocation of changes.
61
     * @param history_qos                  HistoryQosPolicy of the DataWriter creating this history.
62
     * @param resource_limits_qos          ResourceLimitsQosPolicy of the DataWriter creating this history.
63
     * @param topic_kind                   TopicKind of the DataWriter creating this history.
64
     * @param payloadMax                   Maximum payload size.
65
     * @param mempolicy                    Set whether the payloads can dynamically resized or not.
66
     * @param unack_sample_remove_functor  Functor to call DDS listener callback on_unacknowledged_sample_removed
67
     */
68
    DataWriterHistory(
69
            const std::shared_ptr<rtps::IPayloadPool>& payload_pool,
70
            const std::shared_ptr<rtps::IChangePool>& change_pool,
71
            const HistoryQosPolicy& history_qos,
72
            const ResourceLimitsQosPolicy& resource_limits_qos,
73
            const rtps::TopicKind_t& topic_kind,
74
            uint32_t payloadMax,
75
            rtps::MemoryManagementPolicy_t mempolicy,
76
            std::function<void (const rtps::InstanceHandle_t&)> unack_sample_remove_functor);
77
78
    virtual ~DataWriterHistory();
79
80
    /**
81
     * Rebuild instances loaded from DB. Does nothing if the topic doesn't have key.
82
     */
83
    void rebuild_instances();
84
85
    /*!
86
     * @brief Tries to reserve resources for the new instance.
87
     *
88
     * @param [in]  instance_handle    Instance's key.
89
     * @param [in]  lock               Lock which should be unlock in case the operation has to wait.
90
     * @param [in]  max_blocking_time  Maximum time the operation should be waiting.
91
     * @param [out] payload            Pointer to a serialized payload structure where the serialized payload of the
92
     *                                 newly allocated instance should be written.
93
     *
94
     * @return True if resources were reserved successfully.
95
     */
96
    bool register_instance(
97
            const rtps::InstanceHandle_t& instance_handle,
98
            std::unique_lock<RecursiveTimedMutex>& lock,
99
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time,
100
            rtps::SerializedPayload_t*& payload);
101
102
    /**
103
     * This operation can be used to retrieve the serialized payload of the instance key that corresponds to an
104
     * @ref eprosima::dds::Entity::instance_handle_ "instance_handle".
105
     *
106
     * This operation will return @c nullptr if the InstanceHandle_t handle does not correspond to an existing
107
     * data-object known to the DataWriterHistory.
108
     *
109
     * @param [in] handle  Handle to the instance to retrieve the key values from.
110
     *
111
     * @return Pointer to the serialized payload of the sample with which the instance was registered.
112
     */
113
    rtps::SerializedPayload_t* get_key_value(
114
            const rtps::InstanceHandle_t& handle);
115
116
    /**
117
     * Add a change comming from the DataWriter.
118
     * @param change Pointer to the change
119
     * @param wparams Extra write parameters.
120
     * @param lock
121
     * @param max_blocking_time
122
     * @return True if added.
123
     */
124
    bool add_pub_change(
125
            rtps::CacheChange_t* change,
126
            rtps::WriteParams& wparams,
127
            std::unique_lock<RecursiveTimedMutex>& lock,
128
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);
129
130
    /**
131
     * Add a change comming from the DataWriter.
132
     *
133
     * @param change             Pointer to the change
134
     * @param wparams            Extra writer parameters.
135
     * @param pre_commit         Functor receiving a CacheChange_t& to perform actions after the
136
     *                           change has been added to the history, but before notifying the RTPS writer.
137
     * @param lock               Lock to the history mutex.
138
     * @param max_blocking_time  Maximum time point to wait for room on the history.
139
     *
140
     * @return True if added.
141
     */
142
    template<typename PreCommitHook>
143
    bool add_pub_change_with_commit_hook(
144
            rtps::CacheChange_t* change,
145
            rtps::WriteParams& wparams,
146
            PreCommitHook pre_commit,
147
            std::unique_lock<RecursiveTimedMutex>& lock,
148
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time)
149
0
    {
150
0
        bool returnedValue = false;
151
0
        bool add = prepare_change(change, lock, max_blocking_time);
152
153
0
        if (add)
154
0
        {
155
    #if HAVE_STRICT_REALTIME
156
            if (this->add_change_with_commit_hook(change, wparams, pre_commit, max_blocking_time))
157
    #else
158
0
            auto time_point = std::chrono::steady_clock::now() + std::chrono::hours(24);
159
0
            if (this->add_change_with_commit_hook(change, wparams, pre_commit, time_point))
160
0
    #endif // if HAVE_STRICT_REALTIME
161
0
            {
162
0
                EPROSIMA_LOG_INFO(RTPS_HISTORY,
163
0
                        " Change " << change->sequenceNumber << " added with key: " << change->instanceHandle
164
0
                                   << " and " << change->serializedPayload.length << " bytes");
165
0
                returnedValue = true;
166
0
            }
167
0
        }
168
169
0
        return returnedValue;
170
0
    }
171
172
    /**
173
     * Remove all change from the associated history.
174
     * @param removed Number of elements removed.
175
     * @return True if all elements were removed.
176
     */
177
    bool removeAllChange(
178
            size_t* removed);
179
180
    /**
181
     * Remove the change with the minimum sequence Number.
182
     * @return True if removed.
183
     */
184
    bool removeMinChange();
185
186
    /**
187
     * Remove a change by the publisher History.
188
     * @param change Pointer to the CacheChange_t.
189
     * @return True if removed.
190
     */
191
    bool remove_change_pub(
192
            rtps::CacheChange_t* change);
193
194
    /**
195
     * Remove a change by the publisher History.
196
     * @param change Pointer to the CacheChange_t.
197
     * @param [in] max_blocking_time Maximum time this method has to complete the task.
198
     * @return True if removed.
199
     */
200
    bool remove_change_pub(
201
            rtps::CacheChange_t* change,
202
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);
203
204
    bool remove_change_g(
205
            rtps::CacheChange_t* a_change) override;
206
207
    bool remove_change_g(
208
            rtps::CacheChange_t* a_change,
209
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;
210
211
    bool remove_instance_changes(
212
            const rtps::InstanceHandle_t& handle,
213
            const rtps::SequenceNumber_t& seq_up_to);
214
215
    /**
216
     * @brief Sets the next deadline for the given instance
217
     * @param handle The instance handle
218
     * @param next_deadline_us The time point when the deadline will occur
219
     * @return True if deadline was set successfully
220
     */
221
    bool set_next_deadline(
222
            const rtps::InstanceHandle_t& handle,
223
            const std::chrono::steady_clock::time_point& next_deadline_us);
224
225
    /**
226
     * @brief Returns the deadline for the instance that is next going to 'expire'
227
     * @param handle The handle for the instance that will next miss the deadline
228
     * @param next_deadline_us The time point when the deadline will occur
229
     * @return True if deadline could be retrieved for the given instance
230
     */
231
    bool get_next_deadline(
232
            rtps::InstanceHandle_t& handle,
233
            std::chrono::steady_clock::time_point& next_deadline_us);
234
235
    /*!
236
     * @brief Checks if the instance's key is registered.
237
     * @param [in] handle Instance's key.
238
     * return `true` if instance's key is registered in the history.
239
     */
240
    bool is_key_registered(
241
            const rtps::InstanceHandle_t& handle);
242
243
    /**
244
     * Waits till the last change in the instance history has been acknowledged.
245
     * @param handle Instance's handle.
246
     * @param lock Lock which should be unlock in case the operation has to wait.
247
     * @param max_blocking_time Maximum time the operation should be waiting.
248
     * @return true when the last change of the instance history is acknowleged, false when timeout is reached.
249
     */
250
    bool wait_for_acknowledgement_last_change(
251
            const rtps::InstanceHandle_t& handle,
252
            std::unique_lock<RecursiveTimedMutex>& lock,
253
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);
254
255
private:
256
257
    typedef std::map<rtps::InstanceHandle_t, detail::DataWriterInstance> t_m_Inst_Caches;
258
259
    //!Map where keys are instance handles and values are vectors of cache changes associated
260
    t_m_Inst_Caches keyed_changes_;
261
    //!Time point when the next deadline will occur (only used for topics with no key)
262
    std::chrono::steady_clock::time_point next_deadline_us_;
263
    //!HistoryQosPolicy values.
264
    HistoryQosPolicy history_qos_;
265
    //!ResourceLimitsQosPolicy values.
266
    ResourceLimitsQosPolicy resource_limited_qos_;
267
    //!TopicKind
268
    rtps::TopicKind_t topic_kind_;
269
270
    //! Unacknowledged sample removed functor
271
    std::function<void (const rtps::InstanceHandle_t&)> unacknowledged_sample_removed_functor_;
272
273
    /**
274
     * @brief Method that finds a key in the DataWriterHistory or tries to add it if not found
275
     * @param [in]  instance_handle  Instance of the key.
276
     * @param [in]  payload          Serialized payload of the sample for which the instance is being registered.
277
     * @param [out] map_it           A map iterator to the given key.
278
     * @return True if the key was found or could be added to the map
279
     */
280
    bool find_or_add_key(
281
            const rtps::InstanceHandle_t& instance_handle,
282
            const rtps::SerializedPayload_t& payload,
283
            t_m_Inst_Caches::iterator* map_it);
284
285
    /**
286
     * Add a change comming from the Publisher.
287
     * @param change Pointer to the change
288
     * @param lock
289
     * @param max_blocking_time
290
     * @return True if added.
291
     */
292
    bool prepare_change(
293
            rtps::CacheChange_t* change,
294
            std::unique_lock<RecursiveTimedMutex>& lock,
295
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);
296
297
    /**
298
     * @brief Check if a specific change has been acknowledged or fully delivered if disable positive ACKs QoS is
299
     *        enabled.
300
     *
301
     * @param change CacheChange to check
302
     * @return true if acknowledged or fully delivered. False otherwise.
303
     */
304
    bool change_is_acked_or_fully_delivered(
305
            const rtps::CacheChange_t* change);
306
307
};
308
309
}  // namespace dds
310
}  // namespace fastdds
311
}  // namespace eprosima
312
313
#endif // FASTDDS_PUBLISHER__DATAWRITERHISTORY_HPP