Line data Source code
1 : #include "source/common/event/scaled_range_timer_manager_impl.h"
2 :
3 : #include <chrono>
4 : #include <cmath>
5 : #include <memory>
6 :
7 : #include "envoy/event/timer.h"
8 :
9 : #include "source/common/common/assert.h"
10 : #include "source/common/common/scope_tracker.h"
11 :
12 : namespace Envoy {
13 : namespace Event {
14 :
15 : /**
16 : * Implementation of Timer that can be scaled by the backing manager object.
17 : *
18 : * Instances of this class exist in one of 3 states:
19 : * - inactive: not enabled
20 : * - waiting-for-min: enabled, min timeout not elapsed
21 : * - scaling-max: enabled, min timeout elapsed, max timeout not elapsed
22 : *
23 : * The allowed state transitions are:
24 : * - inactive -> waiting-for-min
25 : * - waiting-for-min -> scaling-max | inactive
26 : * - scaling-max -> inactive
27 : *
28 : * Some methods combine multiple state transitions; enableTimer(0, max) on a
29 : * timer in the scaling-max state will logically execute the transition sequence
30 : * [scaling-max -> inactive -> waiting-for-min -> scaling-max] in a single
31 : * method call. The waiting-for-min transitions are elided for efficiency.
32 : */
33 : class ScaledRangeTimerManagerImpl::RangeTimerImpl final : public Timer {
34 : public:
35 : RangeTimerImpl(ScaledTimerMinimum minimum, TimerCb callback, ScaledRangeTimerManagerImpl& manager)
36 : : minimum_(minimum), manager_(manager), callback_(std::move(callback)),
37 1298 : min_duration_timer_(manager.dispatcher_.createTimer([this] { onMinTimerComplete(); })) {}
38 :
39 1298 : ~RangeTimerImpl() override { disableTimer(); }
40 :
41 6115 : void disableTimer() override {
42 6115 : struct Dispatch {
43 6115 : Dispatch(RangeTimerImpl& timer) : timer_(timer) {}
44 6115 : RangeTimerImpl& timer_;
45 6115 : void operator()(const Inactive&) {}
46 6115 : void operator()(const WaitingForMin&) { timer_.min_duration_timer_->disableTimer(); }
47 6115 : void operator()(ScalingMax& active) { timer_.manager_.removeTimer(active.handle_); }
48 6115 : };
49 6115 : absl::visit(Dispatch(*this), state_);
50 6115 : state_.emplace<Inactive>();
51 6115 : scope_ = nullptr;
52 6115 : }
53 :
54 : void enableTimer(const std::chrono::milliseconds max_ms,
55 3005 : const ScopeTrackedObject* scope) override {
56 3005 : disableTimer();
57 3005 : scope_ = scope;
58 3005 : const std::chrono::milliseconds min_ms = std::min(minimum_.computeMinimum(max_ms), max_ms);
59 3005 : ENVOY_LOG_MISC(trace, "enableTimer called on {} for {}ms, min is {}ms",
60 3005 : static_cast<void*>(this), max_ms.count(), min_ms.count());
61 3005 : if (min_ms <= std::chrono::milliseconds::zero()) {
62 : // If the duration spread (max - min) is zero, skip over the waiting-for-min and straight to
63 : // the scaling-max state.
64 0 : auto handle = manager_.activateTimer(max_ms, *this);
65 0 : state_.emplace<ScalingMax>(handle);
66 3005 : } else {
67 3005 : state_.emplace<WaitingForMin>(max_ms - min_ms);
68 3005 : min_duration_timer_->enableTimer(std::min(max_ms, min_ms));
69 3005 : }
70 3005 : }
71 :
72 : void enableHRTimer(std::chrono::microseconds us,
73 0 : const ScopeTrackedObject* object = nullptr) override {
74 0 : enableTimer(std::chrono::duration_cast<std::chrono::milliseconds>(us), object);
75 0 : }
76 :
77 0 : bool enabled() override { return !absl::holds_alternative<Inactive>(state_); }
78 :
79 0 : void trigger() {
80 0 : ASSERT(manager_.dispatcher_.isThreadSafe());
81 0 : ASSERT(!absl::holds_alternative<Inactive>(state_));
82 0 : ENVOY_LOG_MISC(trace, "RangeTimerImpl triggered: {}", static_cast<void*>(this));
83 0 : state_.emplace<Inactive>();
84 0 : if (scope_ == nullptr) {
85 0 : callback_();
86 0 : } else {
87 0 : ScopeTrackerScopeState scope(scope_, manager_.dispatcher_);
88 0 : scope_ = nullptr;
89 0 : callback_();
90 0 : }
91 0 : }
92 :
93 : private:
94 : struct Inactive {};
95 :
96 : struct WaitingForMin {
97 : WaitingForMin(std::chrono::milliseconds scalable_duration)
98 3005 : : scalable_duration_(scalable_duration) {}
99 :
100 : // The amount of time between this enabled timer's max and min, which should
101 : // be scaled by the current scale factor.
102 : const std::chrono::milliseconds scalable_duration_;
103 : };
104 :
105 : struct ScalingMax {
106 0 : ScalingMax(ScaledRangeTimerManagerImpl::ScalingTimerHandle handle) : handle_(handle) {}
107 :
108 : // A handle that can be used to disable the timer.
109 : ScaledRangeTimerManagerImpl::ScalingTimerHandle handle_;
110 : };
111 :
112 : /**
113 : * This is called when the min timer expires, on the dispatcher for the manager. It registers with
114 : * the manager so the duration can be scaled, unless the duration is zero in which case it just
115 : * triggers the callback right away.
116 : */
117 0 : void onMinTimerComplete() {
118 0 : ASSERT(manager_.dispatcher_.isThreadSafe());
119 0 : ENVOY_LOG_MISC(trace, "min timer complete for {}", static_cast<void*>(this));
120 0 : ASSERT(absl::holds_alternative<WaitingForMin>(state_));
121 0 : const WaitingForMin& waiting = absl::get<WaitingForMin>(state_);
122 :
123 : // This
124 0 : if (waiting.scalable_duration_ < std::chrono::milliseconds::zero()) {
125 0 : trigger();
126 0 : } else {
127 0 : state_.emplace<ScalingMax>(manager_.activateTimer(waiting.scalable_duration_, *this));
128 0 : }
129 0 : }
130 :
131 : const ScaledTimerMinimum minimum_;
132 : ScaledRangeTimerManagerImpl& manager_;
133 : const TimerCb callback_;
134 : const TimerPtr min_duration_timer_;
135 :
136 : absl::variant<Inactive, WaitingForMin, ScalingMax> state_;
137 : const ScopeTrackedObject* scope_;
138 : };
139 :
140 : ScaledRangeTimerManagerImpl::ScaledRangeTimerManagerImpl(
141 : Dispatcher& dispatcher, const ScaledTimerTypeMapConstSharedPtr& timer_minimums)
142 : : dispatcher_(dispatcher),
143 : timer_minimums_(timer_minimums != nullptr ? timer_minimums
144 : : std::make_shared<ScaledTimerTypeMap>()),
145 1498 : scale_factor_(1.0) {}
146 :
147 1498 : ScaledRangeTimerManagerImpl::~ScaledRangeTimerManagerImpl() {
148 : // Scaled timers created by the manager shouldn't outlive it. This is
149 : // necessary but not sufficient to guarantee that.
150 1498 : ASSERT(queues_.empty());
151 1498 : }
152 :
153 1298 : TimerPtr ScaledRangeTimerManagerImpl::createTimer(ScaledTimerType timer_type, TimerCb callback) {
154 1298 : const auto minimum_it = timer_minimums_->find(timer_type);
155 1298 : const Event::ScaledTimerMinimum minimum =
156 1298 : minimum_it != timer_minimums_->end()
157 1298 : ? minimum_it->second
158 1298 : : Event::ScaledTimerMinimum(Event::ScaledMinimum(UnitFloat::max()));
159 1298 : return createTimer(minimum, std::move(callback));
160 1298 : }
161 :
162 1298 : TimerPtr ScaledRangeTimerManagerImpl::createTimer(ScaledTimerMinimum minimum, TimerCb callback) {
163 1298 : return std::make_unique<RangeTimerImpl>(minimum, callback, *this);
164 1298 : }
165 :
166 0 : void ScaledRangeTimerManagerImpl::setScaleFactor(UnitFloat scale_factor) {
167 0 : const MonotonicTime now = dispatcher_.approximateMonotonicTime();
168 0 : scale_factor_ = scale_factor;
169 0 : for (auto& queue : queues_) {
170 0 : resetQueueTimer(*queue, now);
171 0 : }
172 0 : }
173 :
174 : ScaledRangeTimerManagerImpl::Queue::Item::Item(RangeTimerImpl& timer, MonotonicTime active_time)
175 0 : : timer_(timer), active_time_(active_time) {}
176 :
177 : ScaledRangeTimerManagerImpl::Queue::Queue(std::chrono::milliseconds duration,
178 : ScaledRangeTimerManagerImpl& manager,
179 : Dispatcher& dispatcher)
180 : : duration_(duration),
181 0 : timer_(dispatcher.createTimer([this, &manager] { manager.onQueueTimerFired(*this); })) {}
182 :
183 : ScaledRangeTimerManagerImpl::ScalingTimerHandle::ScalingTimerHandle(Queue& queue,
184 : Queue::Iterator iterator)
185 0 : : queue_(queue), iterator_(iterator) {}
186 :
187 : MonotonicTime ScaledRangeTimerManagerImpl::computeTriggerTime(const Queue::Item& item,
188 : std::chrono::milliseconds duration,
189 0 : UnitFloat scale_factor) {
190 0 : return item.active_time_ +
191 0 : std::chrono::duration_cast<MonotonicTime::duration>(duration * scale_factor.value());
192 0 : }
193 :
194 : ScaledRangeTimerManagerImpl::ScalingTimerHandle
195 : ScaledRangeTimerManagerImpl::activateTimer(std::chrono::milliseconds duration,
196 0 : RangeTimerImpl& range_timer) {
197 : // Ensure this is being called on the same dispatcher.
198 0 : ASSERT(dispatcher_.isThreadSafe());
199 :
200 : // Find the matching queue for the (max - min) duration of the range timer; if there isn't one,
201 : // create it.
202 0 : auto it = queues_.find(duration);
203 0 : if (it == queues_.end()) {
204 0 : auto queue = std::make_unique<Queue>(duration, *this, dispatcher_);
205 0 : it = queues_.emplace(std::move(queue)).first;
206 0 : }
207 0 : Queue& queue = **it;
208 :
209 : // Put the timer at the back of the queue. Since the timer has the same maximum duration as all
210 : // the other timers in the queue, and since the activation times are monotonic, the queue stays in
211 : // sorted order.
212 0 : queue.range_timers_.emplace_back(range_timer, dispatcher_.approximateMonotonicTime());
213 0 : if (queue.range_timers_.size() == 1) {
214 0 : resetQueueTimer(queue, dispatcher_.approximateMonotonicTime());
215 0 : }
216 :
217 0 : return {queue, --queue.range_timers_.end()};
218 0 : }
219 :
220 0 : void ScaledRangeTimerManagerImpl::removeTimer(ScalingTimerHandle handle) {
221 : // Ensure this is being called on the same dispatcher.
222 0 : ASSERT(dispatcher_.isThreadSafe());
223 :
224 0 : const bool was_front = handle.queue_.range_timers_.begin() == handle.iterator_;
225 0 : handle.queue_.range_timers_.erase(handle.iterator_);
226 : // Don't keep around empty queues
227 0 : if (handle.queue_.range_timers_.empty()) {
228 : // Skip erasing the queue if we're in the middle of processing timers for the queue. The
229 : // queue will be erased in `onQueueTimerFired` after the queue entries have been processed.
230 0 : if (!handle.queue_.processing_timers_) {
231 0 : queues_.erase(handle.queue_);
232 0 : }
233 0 : return;
234 0 : }
235 :
236 : // The queue's timer tracks the expiration time of the first range timer, so it only needs
237 : // adjusting if the first timer is the one that was removed.
238 0 : if (was_front) {
239 0 : resetQueueTimer(handle.queue_, dispatcher_.approximateMonotonicTime());
240 0 : }
241 0 : }
242 :
243 0 : void ScaledRangeTimerManagerImpl::resetQueueTimer(Queue& queue, MonotonicTime now) {
244 0 : ASSERT(!queue.range_timers_.empty());
245 0 : const MonotonicTime trigger_time =
246 0 : computeTriggerTime(queue.range_timers_.front(), queue.duration_, scale_factor_);
247 0 : if (trigger_time < now) {
248 0 : queue.timer_->enableTimer(std::chrono::milliseconds::zero());
249 0 : } else {
250 0 : queue.timer_->enableTimer(
251 0 : std::chrono::duration_cast<std::chrono::milliseconds>(trigger_time - now));
252 0 : }
253 0 : }
254 :
255 0 : void ScaledRangeTimerManagerImpl::onQueueTimerFired(Queue& queue) {
256 0 : auto& timers = queue.range_timers_;
257 0 : ASSERT(!timers.empty());
258 0 : const MonotonicTime now = dispatcher_.approximateMonotonicTime();
259 :
260 : // Pop and trigger timers until the one at the front isn't supposed to have expired yet (given the
261 : // current scale factor).
262 0 : queue.processing_timers_ = true;
263 0 : while (!timers.empty() &&
264 0 : computeTriggerTime(timers.front(), queue.duration_, scale_factor_) <= now) {
265 0 : auto item = std::move(queue.range_timers_.front());
266 0 : queue.range_timers_.pop_front();
267 0 : item.timer_.trigger();
268 0 : }
269 0 : queue.processing_timers_ = false;
270 :
271 0 : if (queue.range_timers_.empty()) {
272 : // Maintain the invariant that queues are never empty.
273 0 : queues_.erase(queue);
274 0 : } else {
275 0 : resetQueueTimer(queue, now);
276 0 : }
277 0 : }
278 :
279 : } // namespace Event
280 : } // namespace Envoy
|