Coverage Report

Created: 2022-08-24 06:19

/src/Fast-DDS/include/fastdds/rtps/messages/RTPSMessageGroup.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
2
//
3
// Licensed under the Apache License, Version 2.0 (the "License");
4
// you may not use this file except in compliance with the License.
5
// You may obtain a copy of the License at
6
//
7
//     http://www.apache.org/licenses/LICENSE-2.0
8
//
9
// Unless required by applicable law or agreed to in writing, software
10
// distributed under the License is distributed on an "AS IS" BASIS,
11
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12
// See the License for the specific language governing permissions and
13
// limitations under the License.
14
15
/**
16
 * @file RTPSMessageGroup.h
17
 *
18
 */
19
20
#ifndef _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_
21
#define _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_
22
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
23
24
#include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp>
25
#include <fastdds/rtps/messages/RTPSMessageCreator.h>
26
#include <fastdds/rtps/common/FragmentNumber.h>
27
28
#include <vector>
29
#include <chrono>
30
#include <cassert>
31
#include <memory>
32
33
34
namespace eprosima {
35
namespace fastrtps {
36
namespace rtps {
37
38
class RTPSParticipantImpl;
39
class Endpoint;
40
class RTPSMessageGroup_t;
41
42
/**
43
 * RTPSMessageGroup Class used to construct a RTPS message.
44
 * @ingroup WRITER_MODULE
45
 */
46
class RTPSMessageGroup
47
{
48
public:
49
50
    /*!
51
     * Exception thrown when a operation exceeds the maximum blocking time.
52
     */
53
    class timeout : public std::runtime_error
54
    {
55
    public:
56
57
        timeout()
58
            : std::runtime_error("timeout")
59
0
        {
60
0
        }
61
62
        virtual ~timeout() = default;
63
    };
64
65
    /*!
66
     * Exception thrown when a operation exceeds the maximum bytes this object can process in the current period of
67
     * time.
68
     */
69
    class limit_exceeded : public std::runtime_error
70
    {
71
    public:
72
73
        limit_exceeded()
74
            : std::runtime_error("limit_exceeded")
75
0
        {
76
0
        }
77
78
        virtual ~limit_exceeded() = default;
79
    };
80
81
    /**
82
     * Basic constructor.
83
     * Constructs a RTPSMessageGroup allowing to allocate its own buffer.
84
     * @param participant Pointer to the participant sending data.
85
     * @param internal_buffer true indicates this object to allocate its own buffer. false indicates to get a buffer
86
     * from the participant.
87
     */
88
    RTPSMessageGroup(
89
            RTPSParticipantImpl* participant,
90
            bool internal_buffer = false);
91
92
    /**
93
     * Basic constructor.
94
     * Constructs a RTPSMessageGroup allowing the destination endpoints to change.
95
     * @param participant Pointer to the participant sending data.
96
     * @param endpoint Pointer to the endpoint sending data.
97
     * @param msg_sender Pointer to message sender interface.
98
     * @param max_blocking_time_point Future time point where blocking send should end.
99
     */
100
    RTPSMessageGroup(
101
            RTPSParticipantImpl* participant,
102
            Endpoint* endpoint,
103
            RTPSMessageSenderInterface* msg_sender,
104
            std::chrono::steady_clock::time_point max_blocking_time_point =
105
            std::chrono::steady_clock::now() + std::chrono::hours(24));
106
107
    ~RTPSMessageGroup() noexcept(false);
108
109
    /**
110
     * Adds a DATA message to the group.
111
     * @param change Reference to the cache change to send.
112
     * @param expects_inline_qos True when one destination is expecting inline QOS.
113
     * @return True when message was added to the group.
114
     */
115
    bool add_data(
116
            const CacheChange_t& change,
117
            bool expects_inline_qos);
118
119
    /**
120
     * Adds a DATA_FRAG message to the group.
121
     * @param change Reference to the cache change to send.
122
     * @param fragment_number Index (1 based) of the fragment to send.
123
     * @param expects_inline_qos True when one destination is expecting inline QOS.
124
     * @return True when message was added to the group.
125
     */
126
    bool add_data_frag(
127
            const CacheChange_t& change,
128
            const uint32_t fragment_number,
129
            bool expects_inline_qos);
130
131
    /**
132
     * Adds a HEARTBEAT message to the group.
133
     * @param first_seq First available sequence number.
134
     * @param last_seq Last available sequence number.
135
     * @param count Counting identifier.
136
     * @param is_final Should final flag be set?
137
     * @param liveliness_flag Should liveliness flag be set?
138
     * @return True when message was added to the group.
139
     */
140
    bool add_heartbeat(
141
            const SequenceNumber_t& first_seq,
142
            const SequenceNumber_t& last_seq,
143
            Count_t count,
144
            bool is_final,
145
            bool liveliness_flag);
146
147
    /**
148
     * Adds one or more GAP messages to the group.
149
     * @param changes_seq_numbers Set of missed sequence numbers.
150
     * @return True when messages were added to the group.
151
     */
152
    bool add_gap(
153
            std::set<SequenceNumber_t>& changes_seq_numbers);
154
155
    /**
156
     * Adds one GAP message to the group.
157
     * @param gap_initial_sequence Start of consecutive sequence numbers.
158
     * @param gap_bitmap Bitmap of non-consecutive sequence numbers.
159
     * @return True when message was added to the group.
160
     */
161
    bool add_gap(
162
            const SequenceNumber_t& gap_initial_sequence,
163
            const SequenceNumberSet_t& gap_bitmap);
164
165
    /**
166
     * Adds one GAP message to the group.
167
     * @param gap_initial_sequence Start of consecutive sequence numbers.
168
     * @param gap_bitmap Bitmap of non-consecutive sequence numbers.
169
     * @param reader_guid GUID of the destination reader.
170
     * @return True when message was added to the group.
171
     */
172
    bool add_gap(
173
            const SequenceNumber_t& gap_initial_sequence,
174
            const SequenceNumberSet_t& gap_bitmap,
175
            const GUID_t& reader_guid);
176
177
    /**
178
     * Adds a ACKNACK message to the group.
179
     * @param seq_num_set Set of missing sequence numbers.
180
     * @param count Counting identifier.
181
     * @param final_flag Should final flag be set?
182
     * @return True when message was added to the group.
183
     */
184
    bool add_acknack(
185
            const SequenceNumberSet_t& seq_num_set,
186
            int32_t count,
187
            bool final_flag);
188
189
    /**
190
     * Adds a NACKFRAG message to the group.
191
     * @param seq_number Sequence number being nack'ed.
192
     * @param fn_state Set of missing fragment numbers.
193
     * @param count Counting identifier.
194
     * @return True when message was added to the group.
195
     */
196
    bool add_nackfrag(
197
            const SequenceNumber_t& seq_number,
198
            FragmentNumberSet_t fn_state,
199
            int32_t count);
200
201
    /**
202
     * To be used whenever destination locators/guids change between two add_xxx calls.
203
     * Automatically called inside add_xxx calls if destinations_have_changed() method of
204
     * RTPSMessageSenderInterface returns true.
205
     * May become private again with a refactor of RTPSMessageSenderInterface, adding a
206
     * group_has_been_flushed() method.
207
     */
208
    void flush_and_reset();
209
210
    /*!
211
     * Change dynamically the sender of next RTPS submessages.
212
     *
213
     * @param endpoint Pointer to next Endpoint sender. nullptr resets object to initial state.
214
     * @param msg_sender Pointer to the RTPSMessageSenderInterface will be used to send next RTPS messages..
215
     * nullptr resets object to initial state.
216
     * @pre (endpoint != nullptr && msg_sender != nullptr) || (endpoint == nullptr && msg_sender == nullptr)
217
     */
218
    void sender(
219
            Endpoint* endpoint,
220
            RTPSMessageSenderInterface* msg_sender)
221
0
    {
222
0
        assert((endpoint != nullptr && msg_sender != nullptr) || (endpoint == nullptr && msg_sender == nullptr));
223
0
        if (endpoint != endpoint_ || msg_sender != sender_)
224
0
        {
225
0
            flush_and_reset();
226
0
        }
227
0
228
0
        endpoint_ = endpoint;
229
0
        sender_ = msg_sender;
230
0
    }
231
232
    //! Maximum fragment size minus the headers
233
    static inline constexpr uint32_t get_max_fragment_payload_size()
234
0
    {
235
0
        // Max fragment is 64KBytes_max - header - inlineqos - 3(for better alignment)
236
0
        return std::numeric_limits<uint16_t>::max() - data_frag_header_size_ - max_inline_qos_size_ - 3;
237
0
    }
238
239
    void set_sent_bytes_limitation(
240
            uint32_t limit)
241
0
    {
242
0
        sent_bytes_limitation_ = limit;
243
0
    }
244
245
    void reset_current_bytes_processed()
246
0
    {
247
0
        current_sent_bytes_ = 0;
248
0
    }
249
250
    inline uint32_t get_current_bytes_processed() const
251
0
    {
252
0
        return current_sent_bytes_ + full_msg_->length;
253
0
    }
254
255
private:
256
257
    static constexpr uint32_t data_frag_header_size_ = 28;
258
    static constexpr uint32_t max_inline_qos_size_ = 32;
259
260
    void reset_to_header();
261
262
    void flush();
263
264
    void send();
265
266
    void check_and_maybe_flush()
267
0
    {
268
0
        check_and_maybe_flush(sender_->destination_guid_prefix());
269
0
    }
270
271
    void check_and_maybe_flush(
272
            const GuidPrefix_t& destination_guid_prefix);
273
274
    bool insert_submessage(
275
            bool is_big_submessage)
276
0
    {
277
0
        return insert_submessage(sender_->destination_guid_prefix(), is_big_submessage);
278
0
    }
279
280
    bool insert_submessage(
281
            const GuidPrefix_t& destination_guid_prefix,
282
            bool is_big_submessage);
283
284
    bool add_info_dst_in_buffer(
285
            CDRMessage_t* buffer,
286
            const GuidPrefix_t& destination_guid_prefix);
287
288
    bool add_info_ts_in_buffer(
289
            const Time_t& timestamp);
290
291
    bool create_gap_submessage(
292
            const SequenceNumber_t& gap_initial_sequence,
293
            const SequenceNumberSet_t& gap_bitmap,
294
            const EntityId_t& reader_id);
295
296
    RTPSMessageSenderInterface* sender_ = nullptr;
297
298
    Endpoint* endpoint_ = nullptr;
299
300
    CDRMessage_t* full_msg_ = nullptr;
301
302
    CDRMessage_t* submessage_msg_ = nullptr;
303
304
    GuidPrefix_t current_dst_;
305
306
    RTPSParticipantImpl* participant_ = nullptr;
307
308
     #if HAVE_SECURITY
309
310
    CDRMessage_t* encrypt_msg_ = nullptr;
311
312
     #endif // if HAVE_SECURITY
313
314
    std::chrono::steady_clock::time_point max_blocking_time_point_;
315
316
    bool max_blocking_time_is_set_ = false;
317
318
    std::unique_ptr<RTPSMessageGroup_t> send_buffer_;
319
320
    bool internal_buffer_ = false;
321
322
    uint32_t sent_bytes_limitation_ = 0;
323
324
    uint32_t current_sent_bytes_ = 0;
325
};
326
327
}        /* namespace rtps */
328
} /* namespace fastrtps */
329
} /* namespace eprosima */
330
331
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
332
#endif /* _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_ */