/src/Fast-DDS/src/cpp/rtps/DataSharing/DataSharingPayloadPool.hpp
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright 2020 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | /** |
16 | | * @file DataSharingPayloadPool.hpp |
17 | | */ |
18 | | |
19 | | #ifndef FASTDDS_RTPS_DATASHARING__DATASHARINGPAYLOADPOOL_HPP |
20 | | #define FASTDDS_RTPS_DATASHARING__DATASHARINGPAYLOADPOOL_HPP |
21 | | |
22 | | #include <fastdds/rtps/common/CacheChange.hpp> |
23 | | #include <fastdds/rtps/history/IPayloadPool.hpp> |
24 | | #include <fastdds/dds/log/Log.hpp> |
25 | | #include <rtps/history/PoolConfig.h> |
26 | | #include <utils/shared_memory/SharedDir.hpp> |
27 | | #include <utils/shared_memory/SharedMemSegment.hpp> |
28 | | |
29 | | #include <memory> |
30 | | |
31 | | namespace eprosima { |
32 | | namespace fastdds { |
33 | | namespace rtps { |
34 | | |
35 | | class RTPSWriter; |
36 | | |
37 | | class DataSharingPayloadPool : public IPayloadPool |
38 | | { |
39 | | |
40 | | protected: |
41 | | |
42 | | class PayloadNode; |
43 | | |
44 | | public: |
45 | | |
46 | | using Segment = fastdds::rtps::SharedSegmentBase; |
47 | | using sharable_mutex = Segment::sharable_mutex; |
48 | | template <class M> |
49 | | using sharable_lock = Segment::sharable_lock<M>; |
50 | | |
51 | 0 | DataSharingPayloadPool() = default; |
52 | | |
53 | 0 | ~DataSharingPayloadPool() = default; |
54 | | |
55 | | virtual bool release_payload( |
56 | | SerializedPayload_t& payload) override; |
57 | | |
58 | | static std::shared_ptr<DataSharingPayloadPool> get_reader_pool( |
59 | | bool is_reader_volatile); |
60 | | |
61 | | static std::shared_ptr<DataSharingPayloadPool> get_writer_pool( |
62 | | const PoolConfig& config); |
63 | | |
64 | | static std::string get_default_directory() |
65 | 0 | { |
66 | 0 | std::string dir; |
67 | 0 | fastdds::rtps::SharedDir::get_default_shared_dir(dir); |
68 | 0 | return dir; |
69 | 0 | } |
70 | | |
71 | | virtual bool init_shared_memory( |
72 | | const GUID_t& /*writer_guid*/, |
73 | | const std::string& /*shared_dir*/) |
74 | 0 | { |
75 | | // Default implementation is NOP |
76 | | // will be overriden by children if needed |
77 | 0 | return false; |
78 | 0 | } |
79 | | |
80 | | virtual bool init_shared_memory( |
81 | | const RTPSWriter* /*writer*/, |
82 | | const std::string& /*shared_dir*/) |
83 | 0 | { |
84 | | // Default implementation is NOP |
85 | | // will be overriden by children if needed |
86 | 0 | return false; |
87 | 0 | } |
88 | | |
89 | | constexpr static const char* domain_name() |
90 | 0 | { |
91 | 0 | return "fast_datasharing"; |
92 | 0 | } |
93 | | |
94 | | constexpr static const char* descriptor_chunk_name() |
95 | 0 | { |
96 | 0 | return "descriptor"; |
97 | 0 | } |
98 | | |
99 | | constexpr static const char* history_chunk_name() |
100 | 0 | { |
101 | 0 | return "history"; |
102 | 0 | } |
103 | | |
104 | | uint32_t history_size() const |
105 | 0 | { |
106 | 0 | return descriptor_->history_size; |
107 | 0 | } |
108 | | |
109 | | /** |
110 | | * Advances an index to the history to the next position |
111 | | */ |
112 | | void advance( |
113 | | uint64_t& index) const; |
114 | | |
115 | | /** |
116 | | * The index of the first valid position in the history |
117 | | */ |
118 | | uint64_t begin() const; |
119 | | |
120 | | /** |
121 | | * The index of one past the last valid position in the history |
122 | | */ |
123 | | uint64_t end() const; |
124 | | |
125 | | /** |
126 | | * Whether the history is empty or not |
127 | | */ |
128 | | bool empty() const; |
129 | | |
130 | | const GUID_t& writer() const; |
131 | | |
132 | | uint32_t last_liveliness_sequence() const; |
133 | | |
134 | | static bool check_sequence_number( |
135 | | const octet* data, |
136 | | const SequenceNumber_t& sn); |
137 | | |
138 | | bool is_sample_valid( |
139 | | const CacheChange_t& change) const; |
140 | | |
141 | | protected: |
142 | | |
143 | | #pragma warning(push) |
144 | | #pragma warning(disable:4324) |
145 | | class alignas (8) PayloadNode |
146 | | { |
147 | | |
148 | | struct PayloadNodeMetaData |
149 | | { |
150 | | public: |
151 | | |
152 | | PayloadNodeMetaData() |
153 | 0 | : status(fastdds::rtps::ChangeKind_t::ALIVE) |
154 | 0 | , has_been_removed(0) |
155 | 0 | , data_length(0) |
156 | 0 | , sequence_number(c_SequenceNumber_Unknown) |
157 | 0 | , writer_GUID(c_Guid_Unknown) |
158 | 0 | , instance_handle(c_InstanceHandle_Unknown) |
159 | 0 | { |
160 | 0 | } |
161 | | |
162 | | ~PayloadNodeMetaData() = default; |
163 | | |
164 | | // writer/instance status |
165 | | uint8_t status; |
166 | | |
167 | | // Has this payload been removed from the shared history? |
168 | | uint8_t has_been_removed; |
169 | | |
170 | | // Encapsulation of the data |
171 | | uint16_t encapsulation; |
172 | | |
173 | | // Actual data size of the payload. Must be less than the configured maximum |
174 | | uint32_t data_length; |
175 | | |
176 | | // Writer's timestamp |
177 | | Time_t source_timestamp; |
178 | | |
179 | | // Sequence number of the payload inside the writer |
180 | | std::atomic<SequenceNumber_t> sequence_number; |
181 | | |
182 | | // GUID of the writer that created the payload |
183 | | GUID_t writer_GUID; |
184 | | |
185 | | // Instance handel for the change |
186 | | InstanceHandle_t instance_handle; |
187 | | |
188 | | // Related sample identity for the change |
189 | | fastdds::rtps::SampleIdentity related_sample_identity; |
190 | | |
191 | | // Mutex for shared read / exclusive write access to the payload |
192 | | sharable_mutex mutex; |
193 | | |
194 | | }; |
195 | | |
196 | | public: |
197 | | |
198 | | // Payload data comes after the metadata |
199 | | static constexpr size_t data_offset = sizeof(PayloadNodeMetaData); |
200 | | |
201 | | |
202 | 0 | PayloadNode() = default; |
203 | | |
204 | | ~PayloadNode() = default; |
205 | | |
206 | | void reset() |
207 | 0 | { |
208 | | // Reset the sequence number first, it signals the data is not valid anymore |
209 | 0 | metadata_.sequence_number.store(c_SequenceNumber_Unknown, std::memory_order_relaxed); |
210 | 0 | metadata_.status = fastdds::rtps::ChangeKind_t::ALIVE; |
211 | 0 | metadata_.has_been_removed = 0; |
212 | 0 | metadata_.data_length = 0; |
213 | 0 | metadata_.writer_GUID = c_Guid_Unknown; |
214 | 0 | metadata_.instance_handle = c_InstanceHandle_Unknown; |
215 | 0 | metadata_.related_sample_identity = fastdds::rtps::SampleIdentity(); |
216 | 0 | } |
217 | | |
218 | | static const PayloadNode* get_from_data( |
219 | | const octet* data) |
220 | 0 | { |
221 | 0 | return reinterpret_cast<const PayloadNode*>(data - data_offset); |
222 | 0 | } |
223 | | |
224 | | static PayloadNode* get_from_data( |
225 | | octet* data) |
226 | 0 | { |
227 | 0 | return reinterpret_cast<PayloadNode*>(data - data_offset); |
228 | 0 | } |
229 | | |
230 | | octet* data() |
231 | 0 | { |
232 | 0 | return reinterpret_cast<octet*>(this) + data_offset; |
233 | 0 | } |
234 | | |
235 | | uint32_t data_length() const |
236 | 0 | { |
237 | 0 | return metadata_.data_length; |
238 | 0 | } |
239 | | |
240 | | void data_length( |
241 | | uint32_t length) |
242 | 0 | { |
243 | 0 | metadata_.data_length = length; |
244 | 0 | } |
245 | | |
246 | | uint16_t encapsulation() const |
247 | 0 | { |
248 | 0 | return metadata_.encapsulation; |
249 | 0 | } |
250 | | |
251 | | void encapsulation( |
252 | | uint16_t encapsulation) |
253 | 0 | { |
254 | 0 | metadata_.encapsulation = encapsulation; |
255 | 0 | } |
256 | | |
257 | | GUID_t writer_GUID() const |
258 | 0 | { |
259 | 0 | return metadata_.writer_GUID; |
260 | 0 | } |
261 | | |
262 | | void writer_GUID( |
263 | | const GUID_t& guid) |
264 | 0 | { |
265 | 0 | metadata_.writer_GUID = guid; |
266 | 0 | } |
267 | | |
268 | | SequenceNumber_t sequence_number() const |
269 | 0 | { |
270 | 0 | SequenceNumber_t value = metadata_.sequence_number.load(std::memory_order_relaxed); |
271 | 0 | return value; |
272 | 0 | } |
273 | | |
274 | | void sequence_number( |
275 | | SequenceNumber_t sequence_number) |
276 | 0 | { |
277 | 0 | metadata_.sequence_number.store(sequence_number, std::memory_order_relaxed); |
278 | 0 | } |
279 | | |
280 | | Time_t source_timestamp() const |
281 | 0 | { |
282 | 0 | return metadata_.source_timestamp; |
283 | 0 | } |
284 | | |
285 | | void source_timestamp( |
286 | | Time_t timestamp) |
287 | 0 | { |
288 | 0 | metadata_.source_timestamp = timestamp; |
289 | 0 | } |
290 | | |
291 | | InstanceHandle_t instance_handle() const |
292 | 0 | { |
293 | 0 | return metadata_.instance_handle; |
294 | 0 | } |
295 | | |
296 | | void instance_handle( |
297 | | InstanceHandle_t handle) |
298 | 0 | { |
299 | 0 | metadata_.instance_handle = handle; |
300 | 0 | } |
301 | | |
302 | | uint8_t status() const |
303 | 0 | { |
304 | 0 | return metadata_.status; |
305 | 0 | } |
306 | | |
307 | | void status( |
308 | | uint8_t status) |
309 | 0 | { |
310 | 0 | metadata_.status = status; |
311 | 0 | } |
312 | | |
313 | | bool has_been_removed() const |
314 | 0 | { |
315 | 0 | return metadata_.has_been_removed == 1; |
316 | 0 | } |
317 | | |
318 | | void has_been_removed( |
319 | | bool removed) |
320 | 0 | { |
321 | 0 | metadata_.has_been_removed = removed ? 1 : 0; |
322 | 0 | } |
323 | | |
324 | | fastdds::rtps::SampleIdentity related_sample_identity() const |
325 | 0 | { |
326 | 0 | return metadata_.related_sample_identity; |
327 | 0 | } |
328 | | |
329 | | void related_sample_identity( |
330 | | fastdds::rtps::SampleIdentity identity) |
331 | 0 | { |
332 | 0 | metadata_.related_sample_identity = identity; |
333 | 0 | } |
334 | | |
335 | | private: |
336 | | |
337 | | PayloadNodeMetaData metadata_; |
338 | | |
339 | | }; |
340 | | |
341 | | struct alignas (8) PoolDescriptor |
342 | | { |
343 | | uint32_t history_size; //< Number of payloads in the history |
344 | | uint64_t notified_begin; //< The index of the oldest history entry already notified (ready to read) |
345 | | uint64_t notified_end; //< The index of the history entry that will be notified next |
346 | | uint32_t liveliness_sequence; //< The ID of the last liveliness assertion sent by the writer |
347 | | }; |
348 | | #pragma warning(pop) |
349 | | |
350 | | static std::string generate_segment_name( |
351 | | const std::string& shared_dir, |
352 | | const GUID_t& writer_guid) |
353 | 0 | { |
354 | 0 | std::stringstream ss; |
355 | 0 | if (!shared_dir.empty()) |
356 | 0 | { |
357 | 0 | ss << shared_dir << "/"; |
358 | 0 | } |
359 | |
|
360 | 0 | ss << DataSharingPayloadPool::domain_name() << "_" << writer_guid.guidPrefix << "_" << writer_guid.entityId; |
361 | 0 | return ss.str(); |
362 | 0 | } |
363 | | |
364 | | static size_t node_size ( |
365 | | size_t payload_size) |
366 | 0 | { |
367 | 0 | return (payload_size + PayloadNode::data_offset + alignof(PayloadNode) - 1) |
368 | 0 | & ~(alignof(PayloadNode) - 1); |
369 | 0 | } |
370 | | |
371 | | GUID_t segment_id_; //< The ID of the segment |
372 | | std::string segment_name_; //< Segment name |
373 | | |
374 | | std::unique_ptr<Segment> segment_; //< Shared memory segment |
375 | | |
376 | | Segment::Offset* history_; //< Offsets of the payloads that are currently in the writer's history |
377 | | PoolDescriptor* descriptor_; //< Shared descriptor of the pool |
378 | | |
379 | | }; |
380 | | |
381 | | |
382 | | } // namespace rtps |
383 | | } // namespace fastdds |
384 | | } // namespace eprosima |
385 | | |
386 | | #endif // FASTDDS_RTPS_DATASHARING__DATASHARINGPAYLOADPOOL_HPP |