Coverage Report

Created: 2022-08-24 06:19

/src/Fast-DDS/src/cpp/rtps/reader/WriterProxy.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file WriterProxy.h
17
 */
18
19
#ifndef FASTRTPS_RTPS_READER_WRITERPROXY_H_
20
#define FASTRTPS_RTPS_READER_WRITERPROXY_H_
21
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
22
23
#include <fastdds/rtps/common/Types.h>
24
#include <fastdds/rtps/common/Locator.h>
25
#include <fastdds/rtps/common/CacheChange.h>
26
#include <fastdds/rtps/attributes/ReaderAttributes.h>
27
#include <fastdds/rtps/attributes/RTPSParticipantAllocationAttributes.hpp>
28
#include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp>
29
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
30
#include <fastdds/rtps/builtin/data/WriterProxyData.h>
31
#include <fastdds/rtps/common/LocatorSelectorEntry.hpp>
32
33
#include <foonathan/memory/container.hpp>
34
#include <foonathan/memory/memory_pool.hpp>
35
36
#include <set>
37
38
// Testing purpose
39
#ifndef TEST_FRIENDS
40
#define TEST_FRIENDS
41
#endif // TEST_FRIENDS
42
43
namespace eprosima {
44
namespace fastrtps {
45
namespace rtps {
46
47
class RTPSParticipantImpl;
48
class StatefulReader;
49
class RTPSMessageGroup_t;
50
class TimedEvent;
51
52
/**
53
 * Class WriterProxy that contains the state of each matched writer for a specific reader.
54
 * @ingroup READER_MODULE
55
 */
56
class WriterProxy : public RTPSMessageSenderInterface
57
{
58
    TEST_FRIENDS
59
60
public:
61
62
    ~WriterProxy();
63
64
    /**
65
     * Constructor.
66
     * @param reader Pointer to the StatefulReader creating this proxy.
67
     * @param changes_allocation Configuration for the set of Change
68
     */
69
    WriterProxy(
70
            StatefulReader* reader,
71
            const RemoteLocatorsAllocationAttributes& loc_alloc,
72
            const ResourceLimitedContainerConfig& changes_allocation);
73
74
    /**
75
     * Activate this proxy associating it to a remote writer.
76
     * @param attributes WriterProxyData of the writer for which to keep state.
77
     * @param initial_sequence Sequence number of last acknowledged change.
78
     */
79
    void start(
80
            const WriterProxyData& attributes,
81
            const SequenceNumber_t& initial_sequence);
82
83
    /**
84
     * Activate this proxy associating it to a remote writer.
85
     * @param attributes WriterProxyData of the writer for which to keep state.
86
     * @param initial_sequence Sequence number of last acknowledged change.
87
     * @param is_datasharing Whether the writer is datasharing with us or not.
88
     */
89
    void start(
90
            const WriterProxyData& attributes,
91
            const SequenceNumber_t& initial_sequence,
92
            bool is_datasharing);
93
94
    /**
95
     * Update information on the remote writer.
96
     * @param attributes WriterProxyData with updated information of the writer.
97
     */
98
    void update(
99
            const WriterProxyData& attributes);
100
101
    /**
102
     * Disable this proxy.
103
     */
104
    void stop();
105
106
    /**
107
     * Get the maximum sequenceNumber received from this Writer.
108
     * @return the maximum sequence number.
109
     */
110
    const SequenceNumber_t available_changes_max() const;
111
112
    /**
113
     * Update the missing changes up to the provided sequenceNumber.
114
     * All changes with status UNKNOWN with seq_num <= input seq_num are marked MISSING.
115
     * @param[in] seq_num Pointer to the SequenceNumber.
116
     */
117
    void missing_changes_update(
118
            const SequenceNumber_t& seq_num);
119
120
    /**
121
     * Update the lost changes up to the provided sequenceNumber.
122
     * All changes with status UNKNOWN or MISSING with seq_num < input seq_num are marked LOST.
123
     * @param[in] seq_num Pointer to the SequenceNumber.
124
     */
125
    int32_t lost_changes_update(
126
            const SequenceNumber_t& seq_num);
127
128
    /**
129
     * The provided change is marked as RECEIVED.
130
     * @param seq_num Sequence number of the change
131
     * @return True if correct.
132
     */
133
    bool received_change_set(
134
            const SequenceNumber_t& seq_num);
135
136
    /**
137
     * Set a change as RECEIVED and NOT RELEVANT.
138
     * @param seq_num Sequence number of the change
139
     * @return true on success
140
     */
141
    bool irrelevant_change_set(
142
            const SequenceNumber_t& seq_num);
143
144
    /**
145
     * Check if this proxy has any missing change.
146
     * @return true when there is at least one missing change on this proxy.
147
     */
148
    bool are_there_missing_changes() const;
149
150
    /**
151
     * The method returns a SequenceNumberSet_t containing the sequence number of all missing changes.
152
     * @return Sequence number set of missing changes.
153
     */
154
    SequenceNumberSet_t missing_changes() const;
155
156
    /**
157
     * Get the number of missing changes up to a certain sequence number.
158
     * @param seq_num Sequence number limiting the query.
159
     *                Only changes with a sequence number less than this one will be considered.
160
     * @return the number of missing changes with a sequence number less than seq_num.
161
     */
162
    size_t unknown_missing_changes_up_to(
163
            const SequenceNumber_t& seq_num) const;
164
165
    /**
166
     * Get the GUID of the writer represented by this proxy.
167
     * @return const reference to the GUID of the writer represented by this proxy.
168
     */
169
    inline const GUID_t& guid() const
170
0
    {
171
0
        return locators_entry_.remote_guid;
172
0
    }
173
174
    inline const GUID_t& persistence_guid() const
175
0
    {
176
0
        return persistence_guid_;
177
0
    }
178
179
    inline LivelinessQosPolicyKind liveliness_kind() const
180
0
    {
181
0
        return liveliness_kind_;
182
0
    }
183
184
    /**
185
     * Get the ownership strength of the writer represented by this proxy.
186
     * @return ownership strength of the writer represented by this proxy.
187
     */
188
    inline uint32_t ownership_strength() const
189
0
    {
190
0
        return ownership_strength_;
191
0
    }
192
193
    /**
194
     * Get the locators that should be used to send data to the writer represented by this proxy.
195
     * @return the locators that should be used to send data to the writer represented by this proxy.
196
     */
197
    inline const ResourceLimitedVector<Locator_t>& remote_locators_shrinked() const
198
0
    {
199
0
        return locators_entry_.unicast.empty() ?
200
0
               locators_entry_.multicast :
201
0
               locators_entry_.unicast;
202
0
    }
203
204
    /**
205
     * Check if the writer is alive
206
     * @return true if the writer is alive
207
     */
208
    inline bool is_alive() const
209
0
    {
210
0
        return is_alive_;
211
0
    }
212
213
    /*!
214
     * @brief Returns number of ChangeFromWriter_t managed currently by the WriterProxy.
215
     * @return Number of ChangeFromWriter_t managed currently by the WriterProxy.
216
     */
217
    size_t number_of_changes_from_writer() const;
218
219
    /*!
220
     * @brief Returns next SequenceNumber_t to be notified.
221
     * @return Next SequenceNumber_t to be nofified or invalid SequenceNumber_t
222
     * if any SequenceNumber_t to be notified.
223
     */
224
    SequenceNumber_t next_cache_change_to_be_notified();
225
226
    /**
227
     * Checks whether a cache change was already received from this proxy.
228
     * @param[in] seq_num Sequence number of the cache change to check.
229
     * @return true if the cache change was received, false otherwise.
230
     */
231
    bool change_was_received(
232
            const SequenceNumber_t& seq_num) const;
233
234
    /**
235
     * Sends a preemptive acknack to the writer represented by this proxy.
236
     */
237
    bool perform_initial_ack_nack();
238
239
    /**
240
     * Sends the necessary acknac and nackfrag messages to answer the last received heartbeat message.
241
     */
242
    void perform_heartbeat_response();
243
244
    /**
245
     * Process an incoming heartbeat from the writer represented by this proxy.
246
     * @param count Count field of the heartbeat message.
247
     * @param first_seq First sequence field of the heartbeat message.
248
     * @param last_seq Last sequence field of the heartbeat message.
249
     * @param final_flag Final flag of the heartbeat message.
250
     * @param liveliness_flag Liveliness flag of the heartbeat message.
251
     * @param disable_positive True if positive ACKs are disabled.
252
     * @param [out] assert_liveliness Returns true when liveliness should be asserted on this writer
253
     * @return true if the message is processed, false if the message is ignored.
254
     */
255
    bool process_heartbeat(
256
            uint32_t count,
257
            const SequenceNumber_t& first_seq,
258
            const SequenceNumber_t& last_seq,
259
            bool final_flag,
260
            bool liveliness_flag,
261
            bool disable_positive,
262
            bool& assert_liveliness,
263
            int32_t& current_sample_lost);
264
265
    /**
266
     * Set a new value for the interval of the heartbeat response event.
267
     * @param interval New interval value.
268
     */
269
    void update_heartbeat_response_interval(
270
            const Duration_t& interval);
271
272
    /**
273
     * Check if the destinations managed by this sender interface have changed.
274
     *
275
     * @return true if destinations have changed, false otherwise.
276
     */
277
    virtual bool destinations_have_changed() const override
278
0
    {
279
0
        return false;
280
0
    }
281
282
    /**
283
     * Get a GUID prefix representing all destinations.
284
     *
285
     * @return When all the destinations share the same prefix (i.e. belong to the same participant)
286
     * that prefix is returned. When there are no destinations, or they belong to different
287
     * participants, c_GuidPrefix_Unknown is returned.
288
     */
289
    virtual GuidPrefix_t destination_guid_prefix() const override
290
0
    {
291
0
        return guid_prefix_as_vector_.at(0);
292
0
    }
293
294
    /**
295
     * Get the GUID prefix of all the destination participants.
296
     *
297
     * @return a const reference to a vector with the GUID prefix of all destination participants.
298
     */
299
    virtual const std::vector<GuidPrefix_t>& remote_participants() const override
300
0
    {
301
0
        return guid_prefix_as_vector_;
302
0
    }
303
304
    /**
305
     * Get the GUID of all destinations.
306
     *
307
     * @return a const reference to a vector with the GUID of all destinations.
308
     */
309
    virtual const std::vector<GUID_t>& remote_guids() const override
310
0
    {
311
0
        return guid_as_vector_;
312
0
    }
313
314
    /**
315
     * Send a message through this interface.
316
     *
317
     * @param message Pointer to the buffer with the message already serialized.
318
     * @param max_blocking_time_point Future timepoint where blocking send should end.
319
     */
320
    virtual bool send(
321
            CDRMessage_t* message,
322
            std::chrono::steady_clock::time_point max_blocking_time_point) const override;
323
324
    bool is_on_same_process() const
325
0
    {
326
0
        return is_on_same_process_;
327
0
    }
328
329
    bool is_datasharing_writer() const
330
0
    {
331
0
        return is_datasharing_writer_;
332
0
    }
333
334
    /*
335
     * Do nothing.
336
     * This object always is protected by reader's mutex.
337
     */
338
    void lock() override
339
0
    {
340
0
    }
341
342
    /*
343
     * Do nothing.
344
     * This object always is protected by reader's mutex.
345
     */
346
    void unlock() override
347
0
    {
348
0
    }
349
350
private:
351
352
    /**
353
     * Set initial value for last acked sequence number.
354
     * @param[in] seq_num last acked sequence number.
355
     */
356
    void loaded_from_storage(
357
            const SequenceNumber_t& seq_num);
358
359
    bool received_change_set(
360
            const SequenceNumber_t& seq_num,
361
            bool is_relevance);
362
363
    void cleanup();
364
365
    void clear();
366
367
    //! Pointer to associated StatefulReader.
368
    StatefulReader* reader_;
369
    //!Timed event to postpone the heartbeatResponse.
370
    TimedEvent* heartbeat_response_;
371
    //! Timed event to send initial acknack.
372
    TimedEvent* initial_acknack_;
373
    //! Last Heartbeatcount.
374
    std::atomic<uint32_t> last_heartbeat_count_;
375
    //!Indicates if the heartbeat has the final flag set.
376
    std::atomic<bool> heartbeat_final_flag_;
377
    //!Is the writer alive
378
    bool is_alive_;
379
380
    using pool_allocator_t =
381
            foonathan::memory::memory_pool<foonathan::memory::node_pool, foonathan::memory::heap_allocator>;
382
383
    //! Memory pool allocator for changes_received_
384
    pool_allocator_t changes_pool_;
385
    //! Vector containing the sequence number of the received ChangeFromWriter_t objects.
386
    foonathan::memory::set<SequenceNumber_t, pool_allocator_t> changes_received_;
387
    //! Sequence number of the highest available change
388
    SequenceNumber_t changes_from_writer_low_mark_;
389
    //! Highest sequence number informed by writer
390
    SequenceNumber_t max_sequence_number_;
391
    //! Store last ChacheChange_t notified.
392
    SequenceNumber_t last_notified_;
393
    //!To fool RTPSMessageGroup when using this proxy as single destination
394
    ResourceLimitedVector<GUID_t> guid_as_vector_;
395
    //!To fool RTPSMessageGroup when using this proxy as single destination
396
    ResourceLimitedVector<GuidPrefix_t> guid_prefix_as_vector_;
397
    //! Is the writer on the same process
398
    bool is_on_same_process_;
399
    //! Taken from QoS
400
    uint32_t ownership_strength_;
401
    //! Taken from QoS
402
    LivelinessQosPolicyKind liveliness_kind_;
403
    //! Taken from proxy data
404
    GUID_t persistence_guid_;
405
    //! Taken from proxy data
406
    LocatorSelectorEntry locators_entry_;
407
    //! Is the writer datasharing
408
    bool is_datasharing_writer_;
409
    //! Wether at least one heartbeat was recevied.
410
    bool received_at_least_one_heartbeat_;
411
412
    using ChangeIterator = decltype(changes_received_)::iterator;
413
414
#if !defined(NDEBUG) && defined(FASTRTPS_SOURCE) && defined(__unix__)
415
    int get_mutex_owner() const;
416
417
    int get_thread_id() const;
418
#endif // if !defined(NDEBUG) && defined(FASTRTPS_SOURCE) && defined(__unix__)
419
};
420
421
} /* namespace rtps */
422
} /* namespace fastrtps */
423
} /* namespace eprosima */
424
425
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
426
#endif /* FASTRTPS_RTPS_READER_WRITERPROXY_H_ */