/src/Fast-DDS/src/cpp/rtps/writer/LivelinessManager.cpp
Line | Count | Source |
1 | | // Copyright 2019 Proyectos y Sistemas de Mantenimiento SL (eProsima). |
2 | | // |
3 | | // Licensed under the Apache License, Version 2.0 (the "License"); |
4 | | // you may not use this file except in compliance with the License. |
5 | | // You may obtain a copy of the License at |
6 | | // |
7 | | // http://www.apache.org/licenses/LICENSE-2.0 |
8 | | // |
9 | | // Unless required by applicable law or agreed to in writing, software |
10 | | // distributed under the License is distributed on an "AS IS" BASIS, |
11 | | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | | // See the License for the specific language governing permissions and |
13 | | // limitations under the License. |
14 | | |
15 | | #include <rtps/writer/LivelinessManager.hpp> |
16 | | |
17 | | #include <algorithm> |
18 | | |
19 | | #include <fastdds/dds/log/Log.hpp> |
20 | | |
21 | | using namespace std::chrono; |
22 | | |
23 | | namespace eprosima { |
24 | | namespace fastdds { |
25 | | namespace rtps { |
26 | | |
27 | | using LivelinessDataIterator = ResourceLimitedVector<LivelinessData>::iterator; |
28 | | |
29 | | LivelinessManager::LivelinessManager( |
30 | | const LivelinessCallback& callback, |
31 | | ResourceEvent& service, |
32 | | bool manage_automatic) |
33 | 0 | : callback_(callback) |
34 | 0 | , manage_automatic_(manage_automatic) |
35 | 0 | , writers_() |
36 | 0 | , mutex_() |
37 | 0 | , col_mutex_() |
38 | 0 | , timer_owner_(nullptr) |
39 | 0 | , timer_( |
40 | 0 | service, |
41 | 0 | [this]() -> bool |
42 | 0 | { |
43 | 0 | return timer_expired(); |
44 | 0 | }, |
45 | 0 | 0) |
46 | 0 | { |
47 | 0 | } |
48 | | |
49 | | LivelinessManager::~LivelinessManager() |
50 | 0 | { |
51 | 0 | std::lock_guard<std::mutex> _(mutex_); |
52 | 0 | timer_owner_ = nullptr; |
53 | 0 | timer_.cancel_timer(); |
54 | 0 | } |
55 | | |
56 | | bool LivelinessManager::add_writer( |
57 | | GUID_t guid, |
58 | | fastdds::dds::LivelinessQosPolicyKind kind, |
59 | | dds::Duration_t lease_duration) |
60 | 0 | { |
61 | 0 | if (!manage_automatic_ && kind == fastdds::dds::LivelinessQosPolicyKind::AUTOMATIC_LIVELINESS_QOS) |
62 | 0 | { |
63 | 0 | EPROSIMA_LOG_WARNING(RTPS_WRITER, "Liveliness manager not managing automatic writers, writer not added"); |
64 | 0 | return false; |
65 | 0 | } |
66 | | |
67 | 0 | { |
68 | | // collection guard |
69 | 0 | std::lock_guard<shared_mutex> _(col_mutex_); |
70 | | // writers_ elements guard |
71 | 0 | std::lock_guard<std::mutex> __(mutex_); |
72 | |
|
73 | 0 | for (LivelinessData& writer : writers_) |
74 | 0 | { |
75 | 0 | if (writer.guid == guid && |
76 | 0 | writer.kind == kind && |
77 | 0 | writer.lease_duration == lease_duration) |
78 | 0 | { |
79 | 0 | writer.count++; |
80 | 0 | return true; |
81 | 0 | } |
82 | 0 | } |
83 | 0 | writers_.emplace_back(guid, kind, lease_duration); |
84 | 0 | } |
85 | | |
86 | 0 | if (!calculate_next()) |
87 | 0 | { |
88 | | // TimedEvent is thread safe |
89 | 0 | timer_.cancel_timer(); |
90 | 0 | return true; |
91 | 0 | } |
92 | | |
93 | 0 | std::lock_guard<std::mutex> _(mutex_); |
94 | |
|
95 | 0 | if (timer_owner_ != nullptr) |
96 | 0 | { |
97 | | // Some times the interval could be negative if a writer expired during the call to this function |
98 | | // Once in this situation there is not much we can do but let asio timers expire immediately |
99 | 0 | auto interval = timer_owner_->time - steady_clock::now(); |
100 | 0 | timer_.update_interval_millisec((double)duration_cast<milliseconds>(interval).count()); |
101 | 0 | timer_.restart_timer(); |
102 | 0 | } |
103 | |
|
104 | 0 | return true; |
105 | 0 | } |
106 | | |
107 | | bool LivelinessManager::remove_writer( |
108 | | GUID_t guid, |
109 | | fastdds::dds::LivelinessQosPolicyKind kind, |
110 | | dds::Duration_t lease_duration, |
111 | | LivelinessData::WriterStatus& writer_status) |
112 | 0 | { |
113 | 0 | bool removed = false; |
114 | |
|
115 | 0 | { |
116 | | // collection guard |
117 | 0 | std::lock_guard<shared_mutex> _(col_mutex_); |
118 | | // writers_ elements guard |
119 | 0 | std::lock_guard<std::mutex> __(mutex_); |
120 | |
|
121 | 0 | removed = writers_.remove_if([guid, kind, lease_duration, &writer_status](LivelinessData& writer) |
122 | 0 | { |
123 | 0 | writer_status = writer.status; |
124 | 0 | return writer.guid == guid && |
125 | 0 | writer.kind == kind && |
126 | 0 | writer.lease_duration == lease_duration && |
127 | 0 | --writer.count == 0; |
128 | 0 | }); |
129 | 0 | } |
130 | |
|
131 | 0 | if (!removed) |
132 | 0 | { |
133 | 0 | return false; |
134 | 0 | } |
135 | | |
136 | 0 | std::unique_lock<std::mutex> lock(mutex_); |
137 | |
|
138 | 0 | if (timer_owner_ != nullptr) |
139 | 0 | { |
140 | 0 | lock.unlock(); |
141 | |
|
142 | 0 | if (!calculate_next()) |
143 | 0 | { |
144 | 0 | timer_.cancel_timer(); |
145 | 0 | return true; |
146 | 0 | } |
147 | | |
148 | 0 | lock.lock(); |
149 | |
|
150 | 0 | if (timer_owner_ != nullptr) |
151 | 0 | { |
152 | | // Some times the interval could be negative if a writer expired during the call to this function |
153 | | // Once in this situation there is not much we can do but let asio timers expire inmediately |
154 | 0 | auto interval = timer_owner_->time - steady_clock::now(); |
155 | 0 | timer_.update_interval_millisec((double)duration_cast<milliseconds>(interval).count()); |
156 | 0 | timer_.restart_timer(); |
157 | 0 | } |
158 | 0 | } |
159 | | |
160 | 0 | return true; |
161 | 0 | } |
162 | | |
163 | | bool LivelinessManager::assert_liveliness( |
164 | | GUID_t guid, |
165 | | fastdds::dds::LivelinessQosPolicyKind kind, |
166 | | dds::Duration_t lease_duration) |
167 | 0 | { |
168 | 0 | bool found = false; |
169 | |
|
170 | 0 | { |
171 | | // collection guard |
172 | 0 | shared_lock<shared_mutex> _(col_mutex_); |
173 | |
|
174 | 0 | for (LivelinessData& writer : writers_) |
175 | 0 | { |
176 | | // writers_ elements guard |
177 | 0 | std::unique_lock<std::mutex> lock(mutex_); |
178 | |
|
179 | 0 | if (writer.guid == guid && |
180 | 0 | writer.kind == kind && |
181 | 0 | writer.lease_duration == lease_duration) |
182 | 0 | { |
183 | 0 | lock.unlock(); |
184 | |
|
185 | 0 | found = true; |
186 | | |
187 | | // Execute the callbacks |
188 | 0 | if (writer.kind == fastdds::dds::LivelinessQosPolicyKind::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS || |
189 | 0 | writer.kind == fastdds::dds::LivelinessQosPolicyKind::AUTOMATIC_LIVELINESS_QOS) |
190 | 0 | { |
191 | 0 | for (LivelinessData& w: writers_) |
192 | 0 | { |
193 | 0 | if (w.kind == writer.kind && |
194 | 0 | w.guid.guidPrefix == guid.guidPrefix) |
195 | 0 | { |
196 | 0 | assert_writer_liveliness(w); |
197 | 0 | } |
198 | 0 | } |
199 | 0 | } |
200 | 0 | else if (writer.kind == fastdds::dds::LivelinessQosPolicyKind::MANUAL_BY_TOPIC_LIVELINESS_QOS) |
201 | 0 | { |
202 | 0 | assert_writer_liveliness(writer); |
203 | 0 | } |
204 | |
|
205 | 0 | break; |
206 | 0 | } |
207 | 0 | } |
208 | 0 | } |
209 | |
|
210 | 0 | if (!found) |
211 | 0 | { |
212 | 0 | return false; |
213 | 0 | } |
214 | | |
215 | 0 | timer_.cancel_timer(); |
216 | | |
217 | | // Updates the timer owner |
218 | 0 | if (!calculate_next()) |
219 | 0 | { |
220 | 0 | EPROSIMA_LOG_ERROR(RTPS_WRITER, "Error when restarting liveliness timer"); |
221 | 0 | return false; |
222 | 0 | } |
223 | | |
224 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
225 | |
|
226 | 0 | if (timer_owner_ != nullptr) |
227 | 0 | { |
228 | | // Some times the interval could be negative if a writer expired during the call to this function |
229 | | // Once in this situation there is not much we can do but let asio timers expire inmediately |
230 | 0 | auto interval = timer_owner_->time - steady_clock::now(); |
231 | 0 | timer_.update_interval_millisec((double)duration_cast<milliseconds>(interval).count()); |
232 | 0 | timer_.restart_timer(); |
233 | 0 | } |
234 | |
|
235 | 0 | return true; |
236 | 0 | } |
237 | | |
238 | | bool LivelinessManager::assert_liveliness( |
239 | | fastdds::dds::LivelinessQosPolicyKind kind, |
240 | | GuidPrefix_t guid_prefix) |
241 | 0 | { |
242 | |
|
243 | 0 | if (!manage_automatic_ && kind == fastdds::dds::LivelinessQosPolicyKind::AUTOMATIC_LIVELINESS_QOS) |
244 | 0 | { |
245 | 0 | EPROSIMA_LOG_WARNING(RTPS_WRITER, "Liveliness manager not managing automatic writers, writer not added"); |
246 | 0 | return false; |
247 | 0 | } |
248 | | |
249 | 0 | { |
250 | | // collection guard |
251 | 0 | shared_lock<shared_mutex> _(col_mutex_); |
252 | |
|
253 | 0 | if (writers_.empty()) |
254 | 0 | { |
255 | 0 | return true; |
256 | 0 | } |
257 | | |
258 | | |
259 | 0 | for (LivelinessData& writer: writers_) |
260 | 0 | { |
261 | 0 | if (writer.kind == kind && |
262 | 0 | guid_prefix == writer.guid.guidPrefix) |
263 | 0 | { |
264 | 0 | assert_writer_liveliness(writer); |
265 | 0 | } |
266 | 0 | } |
267 | 0 | } |
268 | | |
269 | 0 | timer_.cancel_timer(); |
270 | | |
271 | | // Updates the timer owner |
272 | 0 | if (!calculate_next()) |
273 | 0 | { |
274 | 0 | EPROSIMA_LOG_INFO(RTPS_WRITER, |
275 | 0 | "Error when restarting liveliness timer: " << writers_.size() << " writers, liveliness " |
276 | 0 | << kind); |
277 | 0 | return false; |
278 | 0 | } |
279 | | |
280 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
281 | |
|
282 | 0 | if (timer_owner_ != nullptr) |
283 | 0 | { |
284 | | // Some times the interval could be negative if a writer expired during the call to this function |
285 | | // Once in this situation there is not much we can do but let asio timers expire inmediately |
286 | 0 | auto interval = timer_owner_->time - steady_clock::now(); |
287 | 0 | timer_.update_interval_millisec((double)duration_cast<milliseconds>(interval).count()); |
288 | 0 | timer_.restart_timer(); |
289 | 0 | } |
290 | |
|
291 | 0 | return true; |
292 | 0 | } |
293 | | |
294 | | bool LivelinessManager::calculate_next() |
295 | 0 | { |
296 | | // Keep this lock order to prevent ABBA deadlocks |
297 | 0 | shared_lock<shared_mutex> _(col_mutex_); |
298 | 0 | std::lock_guard<std::mutex> __(mutex_); |
299 | |
|
300 | 0 | bool any_alive = false; |
301 | 0 | steady_clock::time_point min_time = steady_clock::now() + nanoseconds(dds::c_TimeInfinite.to_ns()); |
302 | |
|
303 | 0 | timer_owner_ = nullptr; |
304 | | |
305 | | // collection guard |
306 | 0 | for (LivelinessData& writer : writers_) |
307 | 0 | { |
308 | 0 | if (writer.status == LivelinessData::WriterStatus::ALIVE) |
309 | 0 | { |
310 | 0 | if (writer.time < min_time) |
311 | 0 | { |
312 | 0 | min_time = writer.time; |
313 | 0 | timer_owner_ = &writer; |
314 | 0 | } |
315 | 0 | any_alive = true; |
316 | 0 | } |
317 | 0 | } |
318 | |
|
319 | 0 | return any_alive; |
320 | 0 | } |
321 | | |
322 | | bool LivelinessManager::timer_expired() |
323 | 0 | { |
324 | 0 | std::unique_lock<std::mutex> lock(mutex_); |
325 | |
|
326 | 0 | if (timer_owner_ == nullptr) |
327 | 0 | { |
328 | 0 | EPROSIMA_LOG_ERROR(RTPS_WRITER, "Liveliness timer expired but there is no writer"); |
329 | 0 | return false; |
330 | 0 | } |
331 | 0 | else |
332 | 0 | { |
333 | 0 | timer_owner_->status = LivelinessData::WriterStatus::NOT_ALIVE; |
334 | 0 | } |
335 | | |
336 | 0 | auto guid = timer_owner_->guid; |
337 | 0 | auto kind = timer_owner_->kind; |
338 | 0 | auto lease_duration = timer_owner_->lease_duration; |
339 | |
|
340 | 0 | lock.unlock(); |
341 | |
|
342 | 0 | if (callback_ != nullptr) |
343 | 0 | { |
344 | 0 | callback_(guid, kind, lease_duration, -1, 1); |
345 | 0 | } |
346 | |
|
347 | 0 | if (calculate_next()) |
348 | 0 | { |
349 | 0 | lock.lock(); |
350 | |
|
351 | 0 | if ( timer_owner_ != nullptr) |
352 | 0 | { |
353 | | // Some times the interval could be negative if a writer expired during the call to this function |
354 | | // Once in this situation there is not much we can do but let asio timers expire inmediately |
355 | 0 | auto interval = timer_owner_->time - steady_clock::now(); |
356 | 0 | timer_.update_interval_millisec((double)duration_cast<milliseconds>(interval).count()); |
357 | |
|
358 | 0 | return true; |
359 | 0 | } |
360 | 0 | } |
361 | | |
362 | 0 | return false; |
363 | 0 | } |
364 | | |
365 | | bool LivelinessManager::is_any_alive( |
366 | | fastdds::dds::LivelinessQosPolicyKind kind) |
367 | 0 | { |
368 | | // Keep this lock order to prevent ABBA deadlocks |
369 | 0 | shared_lock<shared_mutex> _(col_mutex_); |
370 | 0 | std::lock_guard<std::mutex> __(mutex_); |
371 | |
|
372 | 0 | for (const auto& writer : writers_) |
373 | 0 | { |
374 | 0 | if (writer.kind == kind && writer.status == LivelinessData::WriterStatus::ALIVE) |
375 | 0 | { |
376 | 0 | return true; |
377 | 0 | } |
378 | 0 | } |
379 | | |
380 | 0 | return false; |
381 | 0 | } |
382 | | |
383 | | void LivelinessManager::assert_writer_liveliness( |
384 | | LivelinessData& writer) |
385 | 0 | { |
386 | | // The shared_mutex is taken, that is, the writer referenced will not be destroyed during this call |
387 | 0 | std::unique_lock<std::mutex> lock(mutex_); |
388 | |
|
389 | 0 | auto status = writer.status; |
390 | 0 | auto guid = writer.guid; |
391 | 0 | auto kind = writer.kind; |
392 | 0 | auto lease_duration = writer.lease_duration; |
393 | |
|
394 | 0 | writer.status = LivelinessData::WriterStatus::ALIVE; |
395 | 0 | writer.time = steady_clock::now() + nanoseconds(writer.lease_duration.to_ns()); |
396 | |
|
397 | 0 | lock.unlock(); |
398 | |
|
399 | 0 | if (callback_ != nullptr) |
400 | 0 | { |
401 | 0 | if (status == LivelinessData::WriterStatus::NOT_ASSERTED) |
402 | 0 | { |
403 | 0 | callback_(guid, kind, lease_duration, 1, 0); |
404 | 0 | } |
405 | 0 | else if (status == LivelinessData::WriterStatus::NOT_ALIVE) |
406 | 0 | { |
407 | 0 | callback_(guid, kind, lease_duration, 1, -1); |
408 | 0 | } |
409 | 0 | } |
410 | 0 | } |
411 | | |
412 | | const ResourceLimitedVector<LivelinessData>& LivelinessManager::get_liveliness_data() const |
413 | 0 | { |
414 | 0 | return writers_; |
415 | 0 | } |
416 | | |
417 | | } // namespace rtps |
418 | | } // namespace fastdds |
419 | | } // namespace eprosima |