/src/opendnp3/cpp/lib/src/master/MasterSchedulerBackend.cpp
Line | Count | Source |
1 | | /* |
2 | | * Copyright 2013-2022 Step Function I/O, LLC |
3 | | * |
4 | | * Licensed to Green Energy Corp (www.greenenergycorp.com) and Step Function I/O |
5 | | * LLC (https://stepfunc.io) under one or more contributor license agreements. |
6 | | * See the NOTICE file distributed with this work for additional information |
7 | | * regarding copyright ownership. Green Energy Corp and Step Function I/O LLC license |
8 | | * this file to you under the Apache License, Version 2.0 (the "License"); you |
9 | | * may not use this file except in compliance with the License. You may obtain |
10 | | * a copy of the License at: |
11 | | * |
12 | | * http://www.apache.org/licenses/LICENSE-2.0 |
13 | | * |
14 | | * Unless required by applicable law or agreed to in writing, software |
15 | | * distributed under the License is distributed on an "AS IS" BASIS, |
16 | | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
17 | | * See the License for the specific language governing permissions and |
18 | | * limitations under the License. |
19 | | */ |
20 | | #include "MasterSchedulerBackend.h" |
21 | | |
22 | | #include <algorithm> |
23 | | |
24 | | namespace opendnp3 |
25 | | { |
26 | | |
27 | 6.78k | MasterSchedulerBackend::MasterSchedulerBackend(const std::shared_ptr<exe4cpp::IExecutor>& executor) : executor(executor) |
28 | 6.78k | { |
29 | 6.78k | } |
30 | | |
31 | | void MasterSchedulerBackend::Shutdown() |
32 | 6.78k | { |
33 | 6.78k | this->isShutdown = true; |
34 | 6.78k | this->tasks.clear(); |
35 | 6.78k | this->current.Clear(); |
36 | 6.78k | this->taskTimer.cancel(); |
37 | 6.78k | this->taskStartTimeout.cancel(); |
38 | 6.78k | this->executor.reset(); |
39 | 6.78k | } |
40 | | |
41 | | void MasterSchedulerBackend::Add(const std::shared_ptr<IMasterTask>& task, IMasterTaskRunner& runner) |
42 | 40.7k | { |
43 | 40.7k | if (this->isShutdown) |
44 | 0 | return; |
45 | | |
46 | 40.7k | this->tasks.emplace_back(task, runner); |
47 | 40.7k | this->PostCheckForTaskRun(); |
48 | 40.7k | } |
49 | | |
50 | | void MasterSchedulerBackend::SetRunnerOffline(const IMasterTaskRunner& runner) |
51 | 0 | { |
52 | 0 | if (this->isShutdown) |
53 | 0 | return; |
54 | | |
55 | 0 | const auto now = Timestamp(this->executor->get_time()); |
56 | |
|
57 | 0 | auto checkForOwnership = [now, &runner](const Record& record) -> bool { |
58 | 0 | if (record.BelongsTo(runner)) |
59 | 0 | { |
60 | 0 | if (!record.task->IsRecurring()) |
61 | 0 | { |
62 | 0 | record.task->OnLowerLayerClose(now); |
63 | 0 | } |
64 | |
|
65 | 0 | return true; |
66 | 0 | } |
67 | | |
68 | 0 | return false; |
69 | 0 | }; |
70 | |
|
71 | 0 | if (this->current && checkForOwnership(this->current)) |
72 | 0 | this->current.Clear(); |
73 | | |
74 | | // move erase idiom |
75 | 0 | this->tasks.erase(std::remove_if(this->tasks.begin(), this->tasks.end(), checkForOwnership), this->tasks.end()); |
76 | |
|
77 | 0 | this->PostCheckForTaskRun(); |
78 | 0 | } |
79 | | |
80 | | bool MasterSchedulerBackend::CompleteCurrentFor(const IMasterTaskRunner& runner) |
81 | 14 | { |
82 | | // no active task |
83 | 14 | if (!this->current) |
84 | 0 | return false; |
85 | | |
86 | | // active task not for this runner |
87 | 14 | if (!this->current.BelongsTo(runner)) |
88 | 0 | return false; |
89 | | |
90 | 14 | if (this->current.task->IsRecurring()) |
91 | 14 | { |
92 | 14 | this->Add(this->current.task, *this->current.runner); |
93 | 14 | } |
94 | | |
95 | 14 | this->current.Clear(); |
96 | | |
97 | 14 | this->PostCheckForTaskRun(); |
98 | | |
99 | 14 | return true; |
100 | 14 | } |
101 | | |
102 | | void MasterSchedulerBackend::Demand(const std::shared_ptr<IMasterTask>& task) |
103 | 0 | { |
104 | 0 | auto callback = [this, task, self = shared_from_this()]() { |
105 | 0 | task->SetMinExpiration(); |
106 | 0 | this->CheckForTaskRun(); |
107 | 0 | }; |
108 | |
|
109 | 0 | this->executor->post(callback); |
110 | 0 | } |
111 | | |
112 | | void MasterSchedulerBackend::Evaluate() |
113 | 4.21k | { |
114 | 4.21k | this->PostCheckForTaskRun(); |
115 | 4.21k | } |
116 | | |
117 | | void MasterSchedulerBackend::PostCheckForTaskRun() |
118 | 44.9k | { |
119 | 44.9k | if (!this->taskCheckPending) |
120 | 10.2k | { |
121 | 10.2k | this->taskCheckPending = true; |
122 | 10.2k | this->executor->post([this, self = shared_from_this()]() { this->CheckForTaskRun(); }); |
123 | 10.2k | } |
124 | 44.9k | } |
125 | | |
126 | | bool MasterSchedulerBackend::CheckForTaskRun() |
127 | 10.2k | { |
128 | 10.2k | if (this->isShutdown) |
129 | 0 | return false; |
130 | | |
131 | 10.2k | this->taskCheckPending = false; |
132 | | |
133 | 10.2k | this->RestartTimeoutTimer(); |
134 | | |
135 | 10.2k | if (this->current) |
136 | 3.43k | return false; |
137 | | |
138 | 6.79k | const auto now = Timestamp(this->executor->get_time()); |
139 | | |
140 | | // try to find a task that can run |
141 | 6.79k | auto current = this->tasks.begin(); |
142 | 6.79k | auto best_task = current; |
143 | 6.79k | if (current == this->tasks.end()) |
144 | 0 | return false; |
145 | 6.79k | ++current; |
146 | | |
147 | 40.7k | while (current != this->tasks.end()) |
148 | 33.9k | { |
149 | 33.9k | if (GetBestTaskToRun(now, *best_task, *current) == Comparison::RIGHT) |
150 | 13.5k | { |
151 | 13.5k | best_task = current; |
152 | 13.5k | } |
153 | | |
154 | 33.9k | ++current; |
155 | 33.9k | } |
156 | | |
157 | | // is the task runnable now? |
158 | 6.79k | const auto is_expired = now >= best_task->task->ExpirationTime(); |
159 | 6.79k | if (is_expired) |
160 | 6.79k | { |
161 | 6.79k | this->current = *best_task; |
162 | 6.79k | this->tasks.erase(best_task); |
163 | 6.79k | this->current.runner->Run(this->current.task); |
164 | | |
165 | 6.79k | return true; |
166 | 6.79k | } |
167 | | |
168 | 0 | auto callback = [this, self = shared_from_this()]() { this->CheckForTaskRun(); }; |
169 | |
|
170 | 0 | this->taskTimer.cancel(); |
171 | 0 | this->taskTimer = this->executor->start(best_task->task->ExpirationTime().value, callback); |
172 | |
|
173 | 0 | return false; |
174 | 6.79k | } |
175 | | |
176 | | void MasterSchedulerBackend::RestartTimeoutTimer() |
177 | 10.2k | { |
178 | 10.2k | if (this->isShutdown) |
179 | 0 | return; |
180 | | |
181 | 10.2k | auto min = Timestamp::Max(); |
182 | | |
183 | 10.2k | for (auto& record : this->tasks) |
184 | 57.9k | { |
185 | 57.9k | if (!record.task->IsRecurring() && (record.task->StartExpirationTime() < min)) |
186 | 0 | { |
187 | 0 | min = record.task->StartExpirationTime(); |
188 | 0 | } |
189 | 57.9k | } |
190 | | |
191 | 10.2k | this->taskStartTimeout.cancel(); |
192 | 10.2k | if (min != Timestamp::Max()) |
193 | 0 | { |
194 | 0 | this->taskStartTimeout.cancel(); |
195 | 0 | this->taskStartTimeout |
196 | 0 | = this->executor->start(min.value, [this, self = shared_from_this()]() { this->TimeoutTasks(); }); |
197 | 0 | } |
198 | 10.2k | } |
199 | | |
200 | | void MasterSchedulerBackend::TimeoutTasks() |
201 | 0 | { |
202 | 0 | if (this->isShutdown) |
203 | 0 | return; |
204 | | |
205 | | // find the minimum start timeout value |
206 | 0 | auto isTimedOut = [now = Timestamp(this->executor->get_time())](const Record& record) -> bool { |
207 | 0 | if (record.task->IsRecurring() || record.task->StartExpirationTime() > now) |
208 | 0 | { |
209 | 0 | return false; |
210 | 0 | } |
211 | | |
212 | 0 | record.task->OnStartTimeout(now); |
213 | |
|
214 | 0 | return true; |
215 | 0 | }; |
216 | | |
217 | | // erase-remove idion (https://en.wikipedia.org/wiki/Erase-remove_idiom) |
218 | 0 | this->tasks.erase(std::remove_if(this->tasks.begin(), this->tasks.end(), isTimedOut), this->tasks.end()); |
219 | |
|
220 | 0 | this->RestartTimeoutTimer(); |
221 | 0 | } |
222 | | |
223 | | MasterSchedulerBackend::Comparison MasterSchedulerBackend::GetBestTaskToRun(const Timestamp& now, |
224 | | const Record& left, |
225 | | const Record& right) |
226 | 33.9k | { |
227 | 33.9k | const auto BEST_ENABLED_STATUS = CompareEnabledStatus(left, right); |
228 | | |
229 | 33.9k | if (BEST_ENABLED_STATUS != Comparison::SAME) |
230 | 13.6k | { |
231 | | // if one task is disabled, return the other task |
232 | 13.6k | return BEST_ENABLED_STATUS; |
233 | 13.6k | } |
234 | | |
235 | 20.3k | const auto BEST_BLOCKED_STATUS = CompareBlockedStatus(left, right); |
236 | | |
237 | 20.3k | if (BEST_BLOCKED_STATUS != Comparison::SAME) |
238 | 16 | { |
239 | | // if one task is blocked and the other isn't, return the unblocked task |
240 | 16 | return BEST_BLOCKED_STATUS; |
241 | 16 | } |
242 | | |
243 | 20.3k | const auto EARLIEST_EXPIRATION = CompareTime(now, left, right); |
244 | 20.3k | const auto BEST_PRIORITY = ComparePriority(left, right); |
245 | | |
246 | | // if the expiration times are the same, break based on priority, otherwise go with the expiration time |
247 | 20.3k | return (EARLIEST_EXPIRATION == Comparison::SAME) ? BEST_PRIORITY : EARLIEST_EXPIRATION; |
248 | 20.3k | } |
249 | | |
250 | | MasterSchedulerBackend::Comparison MasterSchedulerBackend::CompareTime(const Timestamp& now, |
251 | | const Record& left, |
252 | | const Record& right) |
253 | 20.3k | { |
254 | | // if tasks are already expired, the effective expiration time is NOW |
255 | 20.3k | const auto leftTime = left.task->IsExpired(now) ? now : left.task->ExpirationTime(); |
256 | 20.3k | const auto rightTime = right.task->IsExpired(now) ? now : right.task->ExpirationTime(); |
257 | | |
258 | 20.3k | if (leftTime < rightTime) |
259 | 0 | { |
260 | 0 | return Comparison::LEFT; |
261 | 0 | } |
262 | 20.3k | if (rightTime < leftTime) |
263 | 0 | { |
264 | 0 | return Comparison::RIGHT; |
265 | 0 | } |
266 | 20.3k | else |
267 | 20.3k | { |
268 | 20.3k | return Comparison::SAME; |
269 | 20.3k | } |
270 | 20.3k | } |
271 | | |
272 | | MasterSchedulerBackend::Comparison MasterSchedulerBackend::CompareEnabledStatus(const Record& left, const Record& right) |
273 | 33.9k | { |
274 | 33.9k | if (left.task->ExpirationTime() == Timestamp::Max()) // left is disabled, check the right |
275 | 13.5k | { |
276 | 13.5k | return right.task->ExpirationTime() == Timestamp::Max() ? Comparison::SAME : Comparison::RIGHT; |
277 | 13.5k | } |
278 | 20.4k | if (right.task->ExpirationTime() == Timestamp::Max()) // left is enabled, right is disabled |
279 | 6.81k | { |
280 | 6.81k | return Comparison::LEFT; |
281 | 6.81k | } |
282 | 13.5k | else |
283 | 13.5k | { |
284 | | // both tasks are enabled |
285 | 13.5k | return Comparison::SAME; |
286 | 13.5k | } |
287 | 20.4k | } |
288 | | |
289 | | MasterSchedulerBackend::Comparison MasterSchedulerBackend::CompareBlockedStatus(const Record& left, const Record& right) |
290 | 20.3k | { |
291 | 20.3k | if (left.task->IsBlocked()) |
292 | 8 | { |
293 | 8 | return right.task->IsBlocked() ? Comparison::SAME : Comparison::RIGHT; |
294 | 8 | } |
295 | | |
296 | 20.3k | return right.task->IsBlocked() ? Comparison::LEFT : Comparison::SAME; |
297 | 20.3k | } |
298 | | |
299 | | MasterSchedulerBackend::Comparison MasterSchedulerBackend::ComparePriority(const Record& left, const Record& right) |
300 | 20.3k | { |
301 | 20.3k | if (left.task->Priority() < right.task->Priority()) |
302 | 13.5k | { |
303 | 13.5k | return Comparison::LEFT; |
304 | 13.5k | } |
305 | 6.78k | if (right.task->Priority() < left.task->Priority()) |
306 | 6.78k | { |
307 | 6.78k | return Comparison::RIGHT; |
308 | 6.78k | } |
309 | 0 | else |
310 | 0 | { |
311 | 0 | return Comparison::SAME; |
312 | 0 | } |
313 | 6.78k | } |
314 | | |
315 | | } // namespace opendnp3 |