/src/Fast-DDS/include/fastdds/rtps/messages/RTPSMessageGroup.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | /** |
16 | | * @file RTPSMessageGroup.h |
17 | | * |
18 | | */ |
19 | | |
20 | | #ifndef _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_ |
21 | | #define _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_ |
22 | | #ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC |
23 | | |
24 | | #include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp> |
25 | | #include <fastdds/rtps/messages/RTPSMessageCreator.h> |
26 | | #include <fastdds/rtps/common/FragmentNumber.h> |
27 | | |
28 | | #include <vector> |
29 | | #include <chrono> |
30 | | #include <cassert> |
31 | | #include <memory> |
32 | | |
33 | | |
34 | | namespace eprosima { |
35 | | namespace fastrtps { |
36 | | namespace rtps { |
37 | | |
38 | | class RTPSParticipantImpl; |
39 | | class Endpoint; |
40 | | class RTPSMessageGroup_t; |
41 | | |
42 | | /** |
43 | | * RTPSMessageGroup Class used to construct a RTPS message. |
44 | | * @ingroup WRITER_MODULE |
45 | | */ |
46 | | class RTPSMessageGroup |
47 | | { |
48 | | public: |
49 | | |
50 | | /*! |
51 | | * Exception thrown when a operation exceeds the maximum blocking time. |
52 | | */ |
53 | | class timeout : public std::runtime_error |
54 | | { |
55 | | public: |
56 | | |
57 | | timeout() |
58 | | : std::runtime_error("timeout") |
59 | 0 | { |
60 | 0 | } |
61 | | |
62 | | virtual ~timeout() = default; |
63 | | }; |
64 | | |
65 | | /*! |
66 | | * Exception thrown when a operation exceeds the maximum bytes this object can process in the current period of |
67 | | * time. |
68 | | */ |
69 | | class limit_exceeded : public std::runtime_error |
70 | | { |
71 | | public: |
72 | | |
73 | | limit_exceeded() |
74 | | : std::runtime_error("limit_exceeded") |
75 | 0 | { |
76 | 0 | } |
77 | | |
78 | | virtual ~limit_exceeded() = default; |
79 | | }; |
80 | | |
81 | | /** |
82 | | * Basic constructor. |
83 | | * Constructs a RTPSMessageGroup allowing to allocate its own buffer. |
84 | | * @param participant Pointer to the participant sending data. |
85 | | * @param internal_buffer true indicates this object to allocate its own buffer. false indicates to get a buffer |
86 | | * from the participant. |
87 | | */ |
88 | | RTPSMessageGroup( |
89 | | RTPSParticipantImpl* participant, |
90 | | bool internal_buffer = false); |
91 | | |
92 | | /** |
93 | | * Basic constructor. |
94 | | * Constructs a RTPSMessageGroup allowing the destination endpoints to change. |
95 | | * @param participant Pointer to the participant sending data. |
96 | | * @param endpoint Pointer to the endpoint sending data. |
97 | | * @param msg_sender Pointer to message sender interface. |
98 | | * @param max_blocking_time_point Future time point where blocking send should end. |
99 | | */ |
100 | | RTPSMessageGroup( |
101 | | RTPSParticipantImpl* participant, |
102 | | Endpoint* endpoint, |
103 | | RTPSMessageSenderInterface* msg_sender, |
104 | | std::chrono::steady_clock::time_point max_blocking_time_point = |
105 | | std::chrono::steady_clock::now() + std::chrono::hours(24)); |
106 | | |
107 | | ~RTPSMessageGroup() noexcept(false); |
108 | | |
109 | | /** |
110 | | * Adds a DATA message to the group. |
111 | | * @param change Reference to the cache change to send. |
112 | | * @param expects_inline_qos True when one destination is expecting inline QOS. |
113 | | * @return True when message was added to the group. |
114 | | */ |
115 | | bool add_data( |
116 | | const CacheChange_t& change, |
117 | | bool expects_inline_qos); |
118 | | |
119 | | /** |
120 | | * Adds a DATA_FRAG message to the group. |
121 | | * @param change Reference to the cache change to send. |
122 | | * @param fragment_number Index (1 based) of the fragment to send. |
123 | | * @param expects_inline_qos True when one destination is expecting inline QOS. |
124 | | * @return True when message was added to the group. |
125 | | */ |
126 | | bool add_data_frag( |
127 | | const CacheChange_t& change, |
128 | | const uint32_t fragment_number, |
129 | | bool expects_inline_qos); |
130 | | |
131 | | /** |
132 | | * Adds a HEARTBEAT message to the group. |
133 | | * @param first_seq First available sequence number. |
134 | | * @param last_seq Last available sequence number. |
135 | | * @param count Counting identifier. |
136 | | * @param is_final Should final flag be set? |
137 | | * @param liveliness_flag Should liveliness flag be set? |
138 | | * @return True when message was added to the group. |
139 | | */ |
140 | | bool add_heartbeat( |
141 | | const SequenceNumber_t& first_seq, |
142 | | const SequenceNumber_t& last_seq, |
143 | | Count_t count, |
144 | | bool is_final, |
145 | | bool liveliness_flag); |
146 | | |
147 | | /** |
148 | | * Adds one or more GAP messages to the group. |
149 | | * @param changes_seq_numbers Set of missed sequence numbers. |
150 | | * @return True when messages were added to the group. |
151 | | */ |
152 | | bool add_gap( |
153 | | std::set<SequenceNumber_t>& changes_seq_numbers); |
154 | | |
155 | | /** |
156 | | * Adds one GAP message to the group. |
157 | | * @param gap_initial_sequence Start of consecutive sequence numbers. |
158 | | * @param gap_bitmap Bitmap of non-consecutive sequence numbers. |
159 | | * @return True when message was added to the group. |
160 | | */ |
161 | | bool add_gap( |
162 | | const SequenceNumber_t& gap_initial_sequence, |
163 | | const SequenceNumberSet_t& gap_bitmap); |
164 | | |
165 | | /** |
166 | | * Adds one GAP message to the group. |
167 | | * @param gap_initial_sequence Start of consecutive sequence numbers. |
168 | | * @param gap_bitmap Bitmap of non-consecutive sequence numbers. |
169 | | * @param reader_guid GUID of the destination reader. |
170 | | * @return True when message was added to the group. |
171 | | */ |
172 | | bool add_gap( |
173 | | const SequenceNumber_t& gap_initial_sequence, |
174 | | const SequenceNumberSet_t& gap_bitmap, |
175 | | const GUID_t& reader_guid); |
176 | | |
177 | | /** |
178 | | * Adds a ACKNACK message to the group. |
179 | | * @param seq_num_set Set of missing sequence numbers. |
180 | | * @param count Counting identifier. |
181 | | * @param final_flag Should final flag be set? |
182 | | * @return True when message was added to the group. |
183 | | */ |
184 | | bool add_acknack( |
185 | | const SequenceNumberSet_t& seq_num_set, |
186 | | int32_t count, |
187 | | bool final_flag); |
188 | | |
189 | | /** |
190 | | * Adds a NACKFRAG message to the group. |
191 | | * @param seq_number Sequence number being nack'ed. |
192 | | * @param fn_state Set of missing fragment numbers. |
193 | | * @param count Counting identifier. |
194 | | * @return True when message was added to the group. |
195 | | */ |
196 | | bool add_nackfrag( |
197 | | const SequenceNumber_t& seq_number, |
198 | | FragmentNumberSet_t fn_state, |
199 | | int32_t count); |
200 | | |
201 | | /** |
202 | | * To be used whenever destination locators/guids change between two add_xxx calls. |
203 | | * Automatically called inside add_xxx calls if destinations_have_changed() method of |
204 | | * RTPSMessageSenderInterface returns true. |
205 | | * May become private again with a refactor of RTPSMessageSenderInterface, adding a |
206 | | * group_has_been_flushed() method. |
207 | | */ |
208 | | void flush_and_reset(); |
209 | | |
210 | | /*! |
211 | | * Change dynamically the sender of next RTPS submessages. |
212 | | * |
213 | | * @param endpoint Pointer to next Endpoint sender. nullptr resets object to initial state. |
214 | | * @param msg_sender Pointer to the RTPSMessageSenderInterface will be used to send next RTPS messages.. |
215 | | * nullptr resets object to initial state. |
216 | | * @pre (endpoint != nullptr && msg_sender != nullptr) || (endpoint == nullptr && msg_sender == nullptr) |
217 | | */ |
218 | | void sender( |
219 | | Endpoint* endpoint, |
220 | | RTPSMessageSenderInterface* msg_sender) |
221 | 0 | { |
222 | 0 | assert((endpoint != nullptr && msg_sender != nullptr) || (endpoint == nullptr && msg_sender == nullptr)); |
223 | 0 | if (endpoint != endpoint_ || msg_sender != sender_) |
224 | 0 | { |
225 | 0 | flush_and_reset(); |
226 | 0 | } |
227 | 0 |
|
228 | 0 | endpoint_ = endpoint; |
229 | 0 | sender_ = msg_sender; |
230 | 0 | } |
231 | | |
232 | | //! Maximum fragment size minus the headers |
233 | | static inline constexpr uint32_t get_max_fragment_payload_size() |
234 | 0 | { |
235 | 0 | // Max fragment is 64KBytes_max - header - inlineqos - 3(for better alignment) |
236 | 0 | return std::numeric_limits<uint16_t>::max() - data_frag_header_size_ - max_inline_qos_size_ - 3; |
237 | 0 | } |
238 | | |
239 | | void set_sent_bytes_limitation( |
240 | | uint32_t limit) |
241 | 0 | { |
242 | 0 | sent_bytes_limitation_ = limit; |
243 | 0 | } |
244 | | |
245 | | void reset_current_bytes_processed() |
246 | 0 | { |
247 | 0 | current_sent_bytes_ = 0; |
248 | 0 | } |
249 | | |
250 | | inline uint32_t get_current_bytes_processed() const |
251 | 0 | { |
252 | 0 | return current_sent_bytes_ + full_msg_->length; |
253 | 0 | } |
254 | | |
255 | | private: |
256 | | |
257 | | static constexpr uint32_t data_frag_header_size_ = 28; |
258 | | static constexpr uint32_t max_inline_qos_size_ = 32; |
259 | | |
260 | | void reset_to_header(); |
261 | | |
262 | | void flush(); |
263 | | |
264 | | void send(); |
265 | | |
266 | | void check_and_maybe_flush() |
267 | 0 | { |
268 | 0 | check_and_maybe_flush(sender_->destination_guid_prefix()); |
269 | 0 | } |
270 | | |
271 | | void check_and_maybe_flush( |
272 | | const GuidPrefix_t& destination_guid_prefix); |
273 | | |
274 | | bool insert_submessage( |
275 | | bool is_big_submessage) |
276 | 0 | { |
277 | 0 | return insert_submessage(sender_->destination_guid_prefix(), is_big_submessage); |
278 | 0 | } |
279 | | |
280 | | bool insert_submessage( |
281 | | const GuidPrefix_t& destination_guid_prefix, |
282 | | bool is_big_submessage); |
283 | | |
284 | | bool add_info_dst_in_buffer( |
285 | | CDRMessage_t* buffer, |
286 | | const GuidPrefix_t& destination_guid_prefix); |
287 | | |
288 | | bool add_info_ts_in_buffer( |
289 | | const Time_t& timestamp); |
290 | | |
291 | | bool create_gap_submessage( |
292 | | const SequenceNumber_t& gap_initial_sequence, |
293 | | const SequenceNumberSet_t& gap_bitmap, |
294 | | const EntityId_t& reader_id); |
295 | | |
296 | | RTPSMessageSenderInterface* sender_ = nullptr; |
297 | | |
298 | | Endpoint* endpoint_ = nullptr; |
299 | | |
300 | | CDRMessage_t* full_msg_ = nullptr; |
301 | | |
302 | | CDRMessage_t* submessage_msg_ = nullptr; |
303 | | |
304 | | GuidPrefix_t current_dst_; |
305 | | |
306 | | RTPSParticipantImpl* participant_ = nullptr; |
307 | | |
308 | | #if HAVE_SECURITY |
309 | | |
310 | | CDRMessage_t* encrypt_msg_ = nullptr; |
311 | | |
312 | | #endif // if HAVE_SECURITY |
313 | | |
314 | | std::chrono::steady_clock::time_point max_blocking_time_point_; |
315 | | |
316 | | bool max_blocking_time_is_set_ = false; |
317 | | |
318 | | std::unique_ptr<RTPSMessageGroup_t> send_buffer_; |
319 | | |
320 | | bool internal_buffer_ = false; |
321 | | |
322 | | uint32_t sent_bytes_limitation_ = 0; |
323 | | |
324 | | uint32_t current_sent_bytes_ = 0; |
325 | | }; |
326 | | |
327 | | } /* namespace rtps */ |
328 | | } /* namespace fastrtps */ |
329 | | } /* namespace eprosima */ |
330 | | |
331 | | #endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC |
332 | | #endif /* _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_ */ |