1
#include "source/common/event/scaled_range_timer_manager_impl.h"
2

            
3
#include <chrono>
4
#include <cmath>
5
#include <memory>
6

            
7
#include "envoy/event/timer.h"
8

            
9
#include "source/common/common/assert.h"
10
#include "source/common/common/scope_tracker.h"
11

            
12
namespace Envoy {
13
namespace Event {
14

            
15
/**
16
 * Implementation of Timer that can be scaled by the backing manager object.
17
 *
18
 * Instances of this class exist in one of 3 states:
19
 *  - inactive: not enabled
20
 *  - waiting-for-min: enabled, min timeout not elapsed
21
 *  - scaling-max: enabled, min timeout elapsed, max timeout not elapsed
22
 *
23
 * The allowed state transitions are:
24
 *  - inactive -> waiting-for-min
25
 *  - waiting-for-min -> scaling-max | inactive
26
 *  - scaling-max -> inactive
27
 *
28
 * Some methods combine multiple state transitions; enableTimer(0, max) on a
29
 * timer in the scaling-max state will logically execute the transition sequence
30
 * [scaling-max -> inactive -> waiting-for-min -> scaling-max] in a single
31
 * method call. The waiting-for-min transitions are elided for efficiency.
32
 */
33
class ScaledRangeTimerManagerImpl::RangeTimerImpl final : public Timer {
34
public:
35
  RangeTimerImpl(ScaledTimerMinimum minimum, TimerCb callback, ScaledRangeTimerManagerImpl& manager)
36
115571
      : minimum_(minimum), manager_(manager), callback_(std::move(callback)),
37
115571
        min_duration_timer_(manager.dispatcher_.createTimer([this] { onMinTimerComplete(); })) {}
38

            
39
115571
  ~RangeTimerImpl() override { disableTimer(); }
40

            
41
931649
  void disableTimer() override {
42
931649
    struct Dispatch {
43
931649
      Dispatch(RangeTimerImpl& timer) : timer_(timer) {}
44
931649
      RangeTimerImpl& timer_;
45
931649
      void operator()(const Inactive&) {}
46
931649
      void operator()(const WaitingForMin&) { timer_.min_duration_timer_->disableTimer(); }
47
931649
      void operator()(ScalingMax& active) { timer_.manager_.removeTimer(active.handle_); }
48
931649
    };
49
931649
    absl::visit(Dispatch(*this), state_);
50
931649
    state_.emplace<Inactive>();
51
931649
    scope_ = nullptr;
52
931649
  }
53

            
54
  void enableTimer(const std::chrono::milliseconds max_ms,
55
607281
                   const ScopeTrackedObject* scope) override {
56
607281
    disableTimer();
57
607281
    scope_ = scope;
58
607281
    const std::chrono::milliseconds min_ms = std::min(minimum_.computeMinimum(max_ms), max_ms);
59
607281
    ENVOY_LOG_MISC(trace, "enableTimer called on {} for {}ms, min is {}ms",
60
607281
                   static_cast<void*>(this), max_ms.count(), min_ms.count());
61
607281
    if (min_ms <= std::chrono::milliseconds::zero()) {
62
      // If the duration spread (max - min) is zero, skip over the waiting-for-min and straight to
63
      // the scaling-max state.
64
6
      auto handle = manager_.activateTimer(max_ms, *this);
65
6
      state_.emplace<ScalingMax>(handle);
66
607275
    } else {
67
607275
      state_.emplace<WaitingForMin>(max_ms - min_ms);
68
607275
      min_duration_timer_->enableTimer(std::min(max_ms, min_ms));
69
607275
    }
70
607281
  }
71

            
72
  void enableHRTimer(std::chrono::microseconds us,
73
                     const ScopeTrackedObject* object = nullptr) override {
74
    enableTimer(std::chrono::duration_cast<std::chrono::milliseconds>(us), object);
75
  }
76

            
77
29
  bool enabled() override { return !absl::holds_alternative<Inactive>(state_); }
78

            
79
347
  void trigger() {
80
347
    ASSERT(manager_.dispatcher_.isThreadSafe());
81
347
    ASSERT(!absl::holds_alternative<Inactive>(state_));
82
347
    ENVOY_LOG_MISC(trace, "RangeTimerImpl triggered: {}", static_cast<void*>(this));
83
347
    state_.emplace<Inactive>();
84
347
    if (scope_ == nullptr) {
85
344
      callback_();
86
344
    } else {
87
3
      ScopeTrackerScopeState scope(scope_, manager_.dispatcher_);
88
3
      scope_ = nullptr;
89
3
      callback_();
90
3
    }
91
347
  }
92

            
93
private:
94
  struct Inactive {};
95

            
96
  struct WaitingForMin {
97
    WaitingForMin(std::chrono::milliseconds scalable_duration)
98
607275
        : scalable_duration_(scalable_duration) {}
99

            
100
    // The amount of time between this enabled timer's max and min, which should
101
    // be scaled by the current scale factor.
102
    const std::chrono::milliseconds scalable_duration_;
103
  };
104

            
105
  struct ScalingMax {
106
353
    ScalingMax(ScaledRangeTimerManagerImpl::ScalingTimerHandle handle) : handle_(handle) {}
107

            
108
    // A handle that can be used to disable the timer.
109
    ScaledRangeTimerManagerImpl::ScalingTimerHandle handle_;
110
  };
111

            
112
  /**
113
   * This is called when the min timer expires, on the dispatcher for the manager. It registers with
114
   * the manager so the duration can be scaled, unless the duration is zero in which case it just
115
   * triggers the callback right away.
116
   */
117
347
  void onMinTimerComplete() {
118
347
    ASSERT(manager_.dispatcher_.isThreadSafe());
119
347
    ENVOY_LOG_MISC(trace, "min timer complete for {}", static_cast<void*>(this));
120
347
    ASSERT(absl::holds_alternative<WaitingForMin>(state_));
121
347
    const WaitingForMin& waiting = absl::get<WaitingForMin>(state_);
122

            
123
    // This
124
347
    if (waiting.scalable_duration_ < std::chrono::milliseconds::zero()) {
125
      trigger();
126
347
    } else {
127
347
      state_.emplace<ScalingMax>(manager_.activateTimer(waiting.scalable_duration_, *this));
128
347
    }
129
347
  }
130

            
131
  const ScaledTimerMinimum minimum_;
132
  ScaledRangeTimerManagerImpl& manager_;
133
  const TimerCb callback_;
134
  const TimerPtr min_duration_timer_;
135

            
136
  absl::variant<Inactive, WaitingForMin, ScalingMax> state_;
137
  const ScopeTrackedObject* scope_;
138
};
139

            
140
ScaledRangeTimerManagerImpl::ScaledRangeTimerManagerImpl(
141
    Dispatcher& dispatcher, const ScaledTimerTypeMapConstSharedPtr& timer_minimums)
142
77833
    : dispatcher_(dispatcher),
143
77833
      timer_minimums_(timer_minimums != nullptr ? timer_minimums
144
77833
                                                : std::make_shared<ScaledTimerTypeMap>()),
145
77833
      scale_factor_(1.0) {}
146

            
147
77833
ScaledRangeTimerManagerImpl::~ScaledRangeTimerManagerImpl() {
148
  // Scaled timers created by the manager shouldn't outlive it. This is
149
  // necessary but not sufficient to guarantee that.
150
77833
  ASSERT(queues_.empty());
151
77833
}
152

            
153
115529
TimerPtr ScaledRangeTimerManagerImpl::createTimer(ScaledTimerType timer_type, TimerCb callback) {
154
115529
  const auto minimum_it = timer_minimums_->find(timer_type);
155
115529
  const Event::ScaledTimerMinimum minimum =
156
115529
      minimum_it != timer_minimums_->end()
157
115529
          ? minimum_it->second
158
115529
          : Event::ScaledTimerMinimum(Event::ScaledMinimum(UnitFloat::max()));
159
115529
  return createTimer(minimum, std::move(callback));
160
115529
}
161

            
162
115571
TimerPtr ScaledRangeTimerManagerImpl::createTimer(ScaledTimerMinimum minimum, TimerCb callback) {
163
115571
  return std::make_unique<RangeTimerImpl>(minimum, callback, *this);
164
115571
}
165

            
166
124
void ScaledRangeTimerManagerImpl::setScaleFactor(UnitFloat scale_factor) {
167
124
  const MonotonicTime now = dispatcher_.approximateMonotonicTime();
168
124
  scale_factor_ = scale_factor;
169
124
  for (auto& queue : queues_) {
170
63
    resetQueueTimer(*queue, now);
171
63
  }
172
124
}
173

            
174
ScaledRangeTimerManagerImpl::Queue::Item::Item(RangeTimerImpl& timer, MonotonicTime active_time)
175
353
    : timer_(timer), active_time_(active_time) {}
176

            
177
ScaledRangeTimerManagerImpl::Queue::Queue(std::chrono::milliseconds duration,
178
                                          ScaledRangeTimerManagerImpl& manager,
179
                                          Dispatcher& dispatcher)
180
344
    : duration_(duration),
181
344
      timer_(dispatcher.createTimer([this, &manager] { manager.onQueueTimerFired(*this); })) {}
182

            
183
ScaledRangeTimerManagerImpl::ScalingTimerHandle::ScalingTimerHandle(Queue& queue,
184
                                                                    Queue::Iterator iterator)
185
353
    : queue_(queue), iterator_(iterator) {}
186

            
187
MonotonicTime ScaledRangeTimerManagerImpl::computeTriggerTime(const Queue::Item& item,
188
                                                              std::chrono::milliseconds duration,
189
755
                                                              UnitFloat scale_factor) {
190
755
  return item.active_time_ +
191
755
         std::chrono::duration_cast<MonotonicTime::duration>(duration * scale_factor.value());
192
755
}
193

            
194
ScaledRangeTimerManagerImpl::ScalingTimerHandle
195
ScaledRangeTimerManagerImpl::activateTimer(std::chrono::milliseconds duration,
196
353
                                           RangeTimerImpl& range_timer) {
197
  // Ensure this is being called on the same dispatcher.
198
353
  ASSERT(dispatcher_.isThreadSafe());
199

            
200
  // Find the matching queue for the (max - min) duration of the range timer; if there isn't one,
201
  // create it.
202
353
  auto it = queues_.find(duration);
203
353
  if (it == queues_.end()) {
204
344
    auto queue = std::make_unique<Queue>(duration, *this, dispatcher_);
205
344
    it = queues_.emplace(std::move(queue)).first;
206
344
  }
207
353
  Queue& queue = **it;
208

            
209
  // Put the timer at the back of the queue. Since the timer has the same maximum duration as all
210
  // the other timers in the queue, and since the activation times are monotonic, the queue stays in
211
  // sorted order.
212
353
  queue.range_timers_.emplace_back(range_timer, dispatcher_.approximateMonotonicTime());
213
353
  if (queue.range_timers_.size() == 1) {
214
344
    resetQueueTimer(queue, dispatcher_.approximateMonotonicTime());
215
344
  }
216

            
217
353
  return {queue, --queue.range_timers_.end()};
218
353
}
219

            
220
6
void ScaledRangeTimerManagerImpl::removeTimer(ScalingTimerHandle handle) {
221
  // Ensure this is being called on the same dispatcher.
222
6
  ASSERT(dispatcher_.isThreadSafe());
223

            
224
6
  const bool was_front = handle.queue_.range_timers_.begin() == handle.iterator_;
225
6
  handle.queue_.range_timers_.erase(handle.iterator_);
226
  // Don't keep around empty queues
227
6
  if (handle.queue_.range_timers_.empty()) {
228
    // Skip erasing the queue if we're in the middle of processing timers for the queue. The
229
    // queue will be erased in `onQueueTimerFired` after the queue entries have been processed.
230
4
    if (!handle.queue_.processing_timers_) {
231
3
      queues_.erase(handle.queue_);
232
3
    }
233
4
    return;
234
4
  }
235

            
236
  // The queue's timer tracks the expiration time of the first range timer, so it only needs
237
  // adjusting if the first timer is the one that was removed.
238
2
  if (was_front) {
239
1
    resetQueueTimer(handle.queue_, dispatcher_.approximateMonotonicTime());
240
1
  }
241
2
}
242

            
243
408
void ScaledRangeTimerManagerImpl::resetQueueTimer(Queue& queue, MonotonicTime now) {
244
408
  ASSERT(!queue.range_timers_.empty());
245
408
  const MonotonicTime trigger_time =
246
408
      computeTriggerTime(queue.range_timers_.front(), queue.duration_, scale_factor_);
247
408
  if (trigger_time < now) {
248
58
    queue.timer_->enableTimer(std::chrono::milliseconds::zero());
249
350
  } else {
250
350
    queue.timer_->enableTimer(
251
350
        std::chrono::duration_cast<std::chrono::milliseconds>(trigger_time - now));
252
350
  }
253
408
}
254

            
255
341
void ScaledRangeTimerManagerImpl::onQueueTimerFired(Queue& queue) {
256
341
  auto& timers = queue.range_timers_;
257
341
  ASSERT(!timers.empty());
258
341
  const MonotonicTime now = dispatcher_.approximateMonotonicTime();
259

            
260
  // Pop and trigger timers until the one at the front isn't supposed to have expired yet (given the
261
  // current scale factor).
262
341
  queue.processing_timers_ = true;
263
688
  while (!timers.empty() &&
264
688
         computeTriggerTime(timers.front(), queue.duration_, scale_factor_) <= now) {
265
347
    auto item = std::move(queue.range_timers_.front());
266
347
    queue.range_timers_.pop_front();
267
347
    item.timer_.trigger();
268
347
  }
269
341
  queue.processing_timers_ = false;
270

            
271
341
  if (queue.range_timers_.empty()) {
272
    // Maintain the invariant that queues are never empty.
273
341
    queues_.erase(queue);
274
341
  } else {
275
    resetQueueTimer(queue, now);
276
  }
277
341
}
278

            
279
} // namespace Event
280
} // namespace Envoy