/src/Fast-DDS/src/cpp/rtps/DataSharing/WriterPool.hpp
Line | Count | Source |
1 | | // Copyright 2020 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | /** |
16 | | * @file WriterPool.hpp |
17 | | */ |
18 | | |
19 | | #ifndef RTPS_DATASHARING_WRITERPOOL_HPP |
20 | | #define RTPS_DATASHARING_WRITERPOOL_HPP |
21 | | |
22 | | #include <fastdds/rtps/attributes/ResourceManagement.hpp> |
23 | | #include <fastdds/rtps/common/CacheChange.hpp> |
24 | | #include <fastdds/rtps/writer/RTPSWriter.hpp> |
25 | | #include <fastdds/dds/log/Log.hpp> |
26 | | #include <rtps/DataSharing/DataSharingPayloadPool.hpp> |
27 | | #include <utils/collections/FixedSizeQueue.hpp> |
28 | | |
29 | | #include <memory> |
30 | | |
31 | | namespace eprosima { |
32 | | namespace fastdds { |
33 | | namespace rtps { |
34 | | |
35 | | class WriterPool : public DataSharingPayloadPool |
36 | | { |
37 | | |
38 | | public: |
39 | | |
40 | | WriterPool( |
41 | | uint32_t pool_size, |
42 | | uint32_t payload_size) |
43 | 0 | : max_data_size_(payload_size) |
44 | 0 | , pool_size_(pool_size) |
45 | 0 | , free_history_size_(0) |
46 | 0 | { |
47 | 0 | } |
48 | | |
49 | | ~WriterPool() |
50 | 0 | { |
51 | 0 | EPROSIMA_LOG_INFO(DATASHARING_PAYLOADPOOL, "DataSharingPayloadPool::WriterPool destructor"); |
52 | | |
53 | | // We cannot destroy the objects in the SHM, as the Reader may still be using them. |
54 | | // We just remove the segment, and when the Reader closes it, it will be removed from the system. |
55 | 0 | if (segment_) |
56 | 0 | { |
57 | 0 | segment_->remove(); |
58 | 0 | } |
59 | 0 | } |
60 | | |
61 | | bool get_payload( |
62 | | uint32_t /*size*/, |
63 | | SerializedPayload_t& payload) override |
64 | 0 | { |
65 | 0 | if (free_payloads_.empty()) |
66 | 0 | { |
67 | 0 | return false; |
68 | 0 | } |
69 | | |
70 | 0 | PayloadNode* payload_node = free_payloads_.front(); |
71 | 0 | free_payloads_.pop_front(); |
72 | | // Reset all the metadata to signal the reader that the payload is dirty |
73 | 0 | payload_node->reset(); |
74 | |
|
75 | 0 | payload.data = payload_node->data(); |
76 | 0 | payload.max_size = max_data_size_; |
77 | 0 | payload.payload_owner = this; |
78 | |
|
79 | 0 | return true; |
80 | 0 | } |
81 | | |
82 | | bool get_payload( |
83 | | const SerializedPayload_t& data, |
84 | | SerializedPayload_t& payload) override |
85 | 0 | { |
86 | 0 | if (data.payload_owner == this) |
87 | 0 | { |
88 | 0 | payload.data = data.data; |
89 | 0 | payload.length = data.length; |
90 | 0 | payload.max_size = data.length; |
91 | 0 | payload.is_serialized_key = data.is_serialized_key; |
92 | 0 | payload.payload_owner = this; |
93 | 0 | return true; |
94 | 0 | } |
95 | 0 | else |
96 | 0 | { |
97 | 0 | if (get_payload(data.length, payload)) |
98 | 0 | { |
99 | 0 | if (!payload.copy(&data, true)) |
100 | 0 | { |
101 | 0 | release_payload(payload); |
102 | 0 | return false; |
103 | 0 | } |
104 | | |
105 | 0 | return true; |
106 | 0 | } |
107 | 0 | } |
108 | | |
109 | 0 | return false; |
110 | 0 | } |
111 | | |
112 | | bool release_payload( |
113 | | SerializedPayload_t& payload) override |
114 | 0 | { |
115 | 0 | assert(payload.payload_owner == this); |
116 | | |
117 | | // Payloads are reset on the `get` operation, the `release` leaves the data to give more chances to the reader |
118 | 0 | PayloadNode* payload_node = PayloadNode::get_from_data(payload.data); |
119 | 0 | if (payload_node->has_been_removed()) |
120 | 0 | { |
121 | 0 | advance_till_first_non_removed(); |
122 | 0 | } |
123 | 0 | else |
124 | 0 | { |
125 | 0 | free_payloads_.push_back(payload_node); |
126 | 0 | } |
127 | 0 | EPROSIMA_LOG_INFO(DATASHARING_PAYLOADPOOL, "Serialized payload released."); |
128 | |
|
129 | 0 | return DataSharingPayloadPool::release_payload(payload); |
130 | 0 | } |
131 | | |
132 | | template<typename T> |
133 | | bool init_shared_segment( |
134 | | const RTPSWriter* writer, |
135 | | const std::string& shared_dir) |
136 | 0 | { |
137 | 0 | segment_id_ = writer->getGuid(); |
138 | 0 | segment_name_ = generate_segment_name(shared_dir, segment_id_); |
139 | 0 | std::unique_ptr<T> local_segment; |
140 | 0 | size_t payload_size; |
141 | 0 | uint64_t estimated_size_for_payloads_pool; |
142 | 0 | uint64_t estimated_size_for_history; |
143 | 0 | uint32_t size_for_payloads_pool; |
144 | |
|
145 | 0 | try |
146 | 0 | { |
147 | | // We need to reserve the whole segment at once, and the underlying classes use uint32_t as size type. |
148 | | // In order to avoid overflows, we will calculate using uint64 and check the casting |
149 | 0 | bool overflow = false; |
150 | 0 | size_t per_allocation_extra_size = T::compute_per_allocation_extra_size( |
151 | 0 | alignof(PayloadNode), DataSharingPayloadPool::domain_name()); |
152 | 0 | payload_size = DataSharingPayloadPool::node_size(max_data_size_); |
153 | |
|
154 | 0 | estimated_size_for_payloads_pool = pool_size_ * payload_size; |
155 | 0 | overflow |= (estimated_size_for_payloads_pool != static_cast<uint32_t>(estimated_size_for_payloads_pool)); |
156 | 0 | size_for_payloads_pool = static_cast<uint32_t>(estimated_size_for_payloads_pool); |
157 | | |
158 | | //Reserve one extra to avoid pointer overlapping |
159 | 0 | estimated_size_for_history = (pool_size_ + 1) * sizeof(Segment::Offset); |
160 | 0 | overflow |= (estimated_size_for_history != static_cast<uint32_t>(estimated_size_for_history)); |
161 | 0 | uint32_t size_for_history = static_cast<uint32_t>(estimated_size_for_history); |
162 | |
|
163 | 0 | uint32_t descriptor_size = static_cast<uint32_t>(sizeof(PoolDescriptor)); |
164 | 0 | uint64_t estimated_segment_size = size_for_payloads_pool + per_allocation_extra_size + |
165 | 0 | size_for_history + per_allocation_extra_size + |
166 | 0 | descriptor_size + per_allocation_extra_size; |
167 | 0 | overflow |= (estimated_segment_size != static_cast<uint32_t>(estimated_segment_size)); |
168 | 0 | uint32_t segment_size = static_cast<uint32_t>(estimated_segment_size); |
169 | |
|
170 | 0 | if (overflow) |
171 | 0 | { |
172 | 0 | EPROSIMA_LOG_ERROR(DATASHARING_PAYLOADPOOL, "Failed to create segment " << segment_name_ |
173 | 0 | << |
174 | 0 | ": Segment size is too large: " << estimated_size_for_payloads_pool |
175 | 0 | << " (max is " |
176 | 0 | << (std::numeric_limits<uint32_t> |
177 | 0 | ::max)() << ")." |
178 | 0 | << |
179 | 0 | " Please reduce the maximum size of the history"); |
180 | 0 | return false; |
181 | 0 | } |
182 | | |
183 | | //Open the segment |
184 | 0 | T::remove(segment_name_); |
185 | |
|
186 | 0 | local_segment.reset( |
187 | 0 | new T(boost::interprocess::create_only, |
188 | 0 | segment_name_, |
189 | 0 | segment_size + T::EXTRA_SEGMENT_SIZE)); |
190 | 0 | } |
191 | 0 | catch (const std::exception& e) |
192 | 0 | { |
193 | 0 | EPROSIMA_LOG_ERROR(DATASHARING_PAYLOADPOOL, "Failed to create segment " << segment_name_ |
194 | 0 | << ": " << e.what()); |
195 | 0 | return false; |
196 | 0 | } |
197 | | |
198 | 0 | try |
199 | 0 | { |
200 | | // Alloc the memory for the pool |
201 | | // Cannot use 'construct' because we need to reserve extra space for the data, |
202 | | // which is not considered in sizeof(PayloadNode). |
203 | 0 | payloads_pool_ = static_cast<octet*>(local_segment->get().allocate(size_for_payloads_pool)); |
204 | | |
205 | | // Initialize each node in the pool |
206 | 0 | free_payloads_.init(pool_size_); |
207 | 0 | octet* payload = payloads_pool_; |
208 | 0 | for (uint32_t i = 0; i < pool_size_; ++i) |
209 | 0 | { |
210 | 0 | new (payload) PayloadNode(); |
211 | | |
212 | | // All payloads are free |
213 | 0 | free_payloads_.push_back(reinterpret_cast<PayloadNode*>(payload)); |
214 | |
|
215 | 0 | payload += (ptrdiff_t)payload_size; |
216 | 0 | } |
217 | | |
218 | | //Alloc the memory for the history |
219 | 0 | history_ = local_segment->get().template construct<Segment::Offset>(history_chunk_name())[pool_size_ + 1](); |
220 | | |
221 | | //Alloc the memory for the descriptor |
222 | 0 | descriptor_ = local_segment->get().template construct<PoolDescriptor>(descriptor_chunk_name())(); |
223 | | |
224 | | // Initialize the data in the descriptor |
225 | 0 | descriptor_->history_size = pool_size_ + 1; |
226 | 0 | descriptor_->notified_begin = 0u; |
227 | 0 | descriptor_->notified_end = 0u; |
228 | 0 | descriptor_->liveliness_sequence = 0u; |
229 | |
|
230 | 0 | free_history_size_ = pool_size_; |
231 | 0 | } |
232 | 0 | catch (std::exception& e) |
233 | 0 | { |
234 | 0 | T::remove(segment_name_); |
235 | |
|
236 | 0 | EPROSIMA_LOG_ERROR(DATASHARING_PAYLOADPOOL, "Failed to initialize segment " << segment_name_ |
237 | 0 | << ": " << e.what()); |
238 | 0 | return false; |
239 | 0 | } |
240 | | |
241 | 0 | segment_ = std::move(local_segment); |
242 | 0 | is_initialized_ = true; |
243 | 0 | return true; |
244 | 0 | } Unexecuted instantiation: bool eprosima::fastdds::rtps::WriterPool::init_shared_segment<eprosima::fastdds::rtps::SharedSegment<boost::interprocess::basic_managed_shared_memory<char, boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family, boost::interprocess::offset_ptr<void, unsigned int, unsigned long, 0ul>, 0ul>, boost::interprocess::iset_index>, boost::interprocess::shared_memory_object> >(eprosima::fastdds::rtps::RTPSWriter const*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) Unexecuted instantiation: bool eprosima::fastdds::rtps::WriterPool::init_shared_segment<eprosima::fastdds::rtps::SharedSegment<boost::interprocess::basic_managed_mapped_file<char, boost::interprocess::rbtree_best_fit<boost::interprocess::mutex_family, boost::interprocess::offset_ptr<void, unsigned int, unsigned long, 0ul>, 0ul>, boost::interprocess::iset_index>, boost::interprocess::file_mapping> >(eprosima::fastdds::rtps::RTPSWriter const*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) |
245 | | |
246 | | bool init_shared_memory( |
247 | | const RTPSWriter* writer, |
248 | | const std::string& shared_dir) override |
249 | 0 | { |
250 | 0 | if (shared_dir.empty()) |
251 | 0 | { |
252 | 0 | return init_shared_segment<fastdds::rtps::SharedMemSegment>(writer, shared_dir); |
253 | 0 | } |
254 | 0 | else |
255 | 0 | { |
256 | 0 | return init_shared_segment<fastdds::rtps::SharedFileSegment>(writer, shared_dir); |
257 | 0 | } |
258 | 0 | } |
259 | | |
260 | | /** |
261 | | * Fills the metadata of the shared payload from the cache change information |
262 | | * and adds the payload's offset to the shared history |
263 | | */ |
264 | | void add_to_shared_history( |
265 | | const CacheChange_t* cache_change) |
266 | 0 | { |
267 | 0 | assert(cache_change); |
268 | 0 | assert(cache_change->serializedPayload.data); |
269 | 0 | assert(cache_change->serializedPayload.payload_owner == this); |
270 | 0 | assert(free_history_size_ > 0); |
271 | | |
272 | | // Fill the payload metadata with the change info |
273 | 0 | PayloadNode* node = PayloadNode::get_from_data(cache_change->serializedPayload.data); |
274 | 0 | node->status(ALIVE); |
275 | 0 | node->data_length(cache_change->serializedPayload.length); |
276 | 0 | node->source_timestamp(cache_change->sourceTimestamp); |
277 | 0 | node->writer_GUID(cache_change->writerGUID); |
278 | 0 | node->instance_handle(cache_change->instanceHandle); |
279 | 0 | if (cache_change->write_params.related_sample_identity() != SampleIdentity::unknown()) |
280 | 0 | { |
281 | 0 | node->related_sample_identity(cache_change->write_params.related_sample_identity()); |
282 | 0 | } |
283 | | |
284 | | // Set the sequence number last, it signals the data is ready |
285 | 0 | node->sequence_number(cache_change->sequenceNumber); |
286 | | |
287 | | // Add it to the history |
288 | 0 | history_[static_cast<uint32_t>(descriptor_->notified_end)] = segment_->get_offset_from_address(node); |
289 | 0 | EPROSIMA_LOG_INFO(DATASHARING_PAYLOADPOOL, "Change added to shared history" |
290 | 0 | << " with SN " << cache_change->sequenceNumber); |
291 | 0 | advance(descriptor_->notified_end); |
292 | 0 | --free_history_size_; |
293 | 0 | } |
294 | | |
295 | | /** |
296 | | * Removes the payload's offset from the shared history |
297 | | * |
298 | | * Payloads don't need to be removed from the history in the same order |
299 | | * they where added, but a payload will not be available through @ref get_payload until all |
300 | | * payloads preceding it have been removed from the shared history. |
301 | | */ |
302 | | void remove_from_shared_history( |
303 | | const CacheChange_t* cache_change) |
304 | 0 | { |
305 | 0 | assert(cache_change); |
306 | 0 | assert(cache_change->serializedPayload.data); |
307 | 0 | assert(cache_change->serializedPayload.payload_owner == this); |
308 | 0 | assert(descriptor_->notified_end != descriptor_->notified_begin); |
309 | 0 | assert(free_history_size_ < descriptor_->history_size); |
310 | |
|
311 | 0 | EPROSIMA_LOG_INFO(DATASHARING_PAYLOADPOOL, "Change removed from shared history" |
312 | 0 | << " with SN " << cache_change->sequenceNumber); |
313 | |
|
314 | 0 | PayloadNode* payload = PayloadNode::get_from_data(cache_change->serializedPayload.data); |
315 | 0 | payload->has_been_removed(true); |
316 | 0 | } |
317 | | |
318 | | void advance_till_first_non_removed() |
319 | 0 | { |
320 | 0 | while (descriptor_->notified_begin != descriptor_->notified_end) |
321 | 0 | { |
322 | 0 | auto offset = history_[static_cast<uint32_t>(descriptor_->notified_begin)]; |
323 | 0 | auto payload = static_cast<PayloadNode*>(segment_->get_address_from_offset(offset)); |
324 | 0 | if (!payload->has_been_removed()) |
325 | 0 | { |
326 | 0 | break; |
327 | 0 | } |
328 | | |
329 | 0 | payload->has_been_removed(false); |
330 | 0 | free_payloads_.push_back(payload); |
331 | 0 | advance(descriptor_->notified_begin); |
332 | 0 | ++free_history_size_; |
333 | 0 | } |
334 | 0 | } |
335 | | |
336 | | void assert_liveliness() |
337 | 0 | { |
338 | 0 | ++descriptor_->liveliness_sequence; |
339 | 0 | } |
340 | | |
341 | | bool is_initialized() const |
342 | 0 | { |
343 | 0 | return is_initialized_; |
344 | 0 | } |
345 | | |
346 | | private: |
347 | | |
348 | | using DataSharingPayloadPool::init_shared_memory; |
349 | | |
350 | | octet* payloads_pool_; //< Shared pool of payloads |
351 | | |
352 | | uint32_t max_data_size_; //< Maximum size of the serialized payload data |
353 | | uint32_t pool_size_; //< Number of payloads in the pool |
354 | | uint32_t free_history_size_; //< Number of elements currently unused in the shared history |
355 | | |
356 | | FixedSizeQueue<PayloadNode*> free_payloads_; //< Pointers to the free payloads in the pool |
357 | | |
358 | | bool is_initialized_ = false; //< Whether the pool has been initialized on shared memory |
359 | | |
360 | | }; |
361 | | |
362 | | |
363 | | } // namespace rtps |
364 | | } // namespace fastdds |
365 | | } // namespace eprosima |
366 | | |
367 | | #endif // RTPS_DATASHARING_WRITERPOOL_HPP |