LCOV - code coverage report
Current view: top level - source/common/event - scaled_range_timer_manager_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 42 144 29.2 %
Date: 2024-01-05 06:35:25 Functions: 12 29 41.4 %

          Line data    Source code
       1             : #include "source/common/event/scaled_range_timer_manager_impl.h"
       2             : 
       3             : #include <chrono>
       4             : #include <cmath>
       5             : #include <memory>
       6             : 
       7             : #include "envoy/event/timer.h"
       8             : 
       9             : #include "source/common/common/assert.h"
      10             : #include "source/common/common/scope_tracker.h"
      11             : 
      12             : namespace Envoy {
      13             : namespace Event {
      14             : 
      15             : /**
      16             :  * Implementation of Timer that can be scaled by the backing manager object.
      17             :  *
      18             :  * Instances of this class exist in one of 3 states:
      19             :  *  - inactive: not enabled
      20             :  *  - waiting-for-min: enabled, min timeout not elapsed
      21             :  *  - scaling-max: enabled, min timeout elapsed, max timeout not elapsed
      22             :  *
      23             :  * The allowed state transitions are:
      24             :  *  - inactive -> waiting-for-min
      25             :  *  - waiting-for-min -> scaling-max | inactive
      26             :  *  - scaling-max -> inactive
      27             :  *
      28             :  * Some methods combine multiple state transitions; enableTimer(0, max) on a
      29             :  * timer in the scaling-max state will logically execute the transition sequence
      30             :  * [scaling-max -> inactive -> waiting-for-min -> scaling-max] in a single
      31             :  * method call. The waiting-for-min transitions are elided for efficiency.
      32             :  */
      33             : class ScaledRangeTimerManagerImpl::RangeTimerImpl final : public Timer {
      34             : public:
      35             :   RangeTimerImpl(ScaledTimerMinimum minimum, TimerCb callback, ScaledRangeTimerManagerImpl& manager)
      36             :       : minimum_(minimum), manager_(manager), callback_(std::move(callback)),
      37        1298 :         min_duration_timer_(manager.dispatcher_.createTimer([this] { onMinTimerComplete(); })) {}
      38             : 
      39        1298 :   ~RangeTimerImpl() override { disableTimer(); }
      40             : 
      41        6115 :   void disableTimer() override {
      42        6115 :     struct Dispatch {
      43        6115 :       Dispatch(RangeTimerImpl& timer) : timer_(timer) {}
      44        6115 :       RangeTimerImpl& timer_;
      45        6115 :       void operator()(const Inactive&) {}
      46        6115 :       void operator()(const WaitingForMin&) { timer_.min_duration_timer_->disableTimer(); }
      47        6115 :       void operator()(ScalingMax& active) { timer_.manager_.removeTimer(active.handle_); }
      48        6115 :     };
      49        6115 :     absl::visit(Dispatch(*this), state_);
      50        6115 :     state_.emplace<Inactive>();
      51        6115 :     scope_ = nullptr;
      52        6115 :   }
      53             : 
      54             :   void enableTimer(const std::chrono::milliseconds max_ms,
      55        3005 :                    const ScopeTrackedObject* scope) override {
      56        3005 :     disableTimer();
      57        3005 :     scope_ = scope;
      58        3005 :     const std::chrono::milliseconds min_ms = std::min(minimum_.computeMinimum(max_ms), max_ms);
      59        3005 :     ENVOY_LOG_MISC(trace, "enableTimer called on {} for {}ms, min is {}ms",
      60        3005 :                    static_cast<void*>(this), max_ms.count(), min_ms.count());
      61        3005 :     if (min_ms <= std::chrono::milliseconds::zero()) {
      62             :       // If the duration spread (max - min) is zero, skip over the waiting-for-min and straight to
      63             :       // the scaling-max state.
      64           0 :       auto handle = manager_.activateTimer(max_ms, *this);
      65           0 :       state_.emplace<ScalingMax>(handle);
      66        3005 :     } else {
      67        3005 :       state_.emplace<WaitingForMin>(max_ms - min_ms);
      68        3005 :       min_duration_timer_->enableTimer(std::min(max_ms, min_ms));
      69        3005 :     }
      70        3005 :   }
      71             : 
      72             :   void enableHRTimer(std::chrono::microseconds us,
      73           0 :                      const ScopeTrackedObject* object = nullptr) override {
      74           0 :     enableTimer(std::chrono::duration_cast<std::chrono::milliseconds>(us), object);
      75           0 :   }
      76             : 
      77           0 :   bool enabled() override { return !absl::holds_alternative<Inactive>(state_); }
      78             : 
      79           0 :   void trigger() {
      80           0 :     ASSERT(manager_.dispatcher_.isThreadSafe());
      81           0 :     ASSERT(!absl::holds_alternative<Inactive>(state_));
      82           0 :     ENVOY_LOG_MISC(trace, "RangeTimerImpl triggered: {}", static_cast<void*>(this));
      83           0 :     state_.emplace<Inactive>();
      84           0 :     if (scope_ == nullptr) {
      85           0 :       callback_();
      86           0 :     } else {
      87           0 :       ScopeTrackerScopeState scope(scope_, manager_.dispatcher_);
      88           0 :       scope_ = nullptr;
      89           0 :       callback_();
      90           0 :     }
      91           0 :   }
      92             : 
      93             : private:
      94             :   struct Inactive {};
      95             : 
      96             :   struct WaitingForMin {
      97             :     WaitingForMin(std::chrono::milliseconds scalable_duration)
      98        3005 :         : scalable_duration_(scalable_duration) {}
      99             : 
     100             :     // The amount of time between this enabled timer's max and min, which should
     101             :     // be scaled by the current scale factor.
     102             :     const std::chrono::milliseconds scalable_duration_;
     103             :   };
     104             : 
     105             :   struct ScalingMax {
     106           0 :     ScalingMax(ScaledRangeTimerManagerImpl::ScalingTimerHandle handle) : handle_(handle) {}
     107             : 
     108             :     // A handle that can be used to disable the timer.
     109             :     ScaledRangeTimerManagerImpl::ScalingTimerHandle handle_;
     110             :   };
     111             : 
     112             :   /**
     113             :    * This is called when the min timer expires, on the dispatcher for the manager. It registers with
     114             :    * the manager so the duration can be scaled, unless the duration is zero in which case it just
     115             :    * triggers the callback right away.
     116             :    */
     117           0 :   void onMinTimerComplete() {
     118           0 :     ASSERT(manager_.dispatcher_.isThreadSafe());
     119           0 :     ENVOY_LOG_MISC(trace, "min timer complete for {}", static_cast<void*>(this));
     120           0 :     ASSERT(absl::holds_alternative<WaitingForMin>(state_));
     121           0 :     const WaitingForMin& waiting = absl::get<WaitingForMin>(state_);
     122             : 
     123             :     // This
     124           0 :     if (waiting.scalable_duration_ < std::chrono::milliseconds::zero()) {
     125           0 :       trigger();
     126           0 :     } else {
     127           0 :       state_.emplace<ScalingMax>(manager_.activateTimer(waiting.scalable_duration_, *this));
     128           0 :     }
     129           0 :   }
     130             : 
     131             :   const ScaledTimerMinimum minimum_;
     132             :   ScaledRangeTimerManagerImpl& manager_;
     133             :   const TimerCb callback_;
     134             :   const TimerPtr min_duration_timer_;
     135             : 
     136             :   absl::variant<Inactive, WaitingForMin, ScalingMax> state_;
     137             :   const ScopeTrackedObject* scope_;
     138             : };
     139             : 
     140             : ScaledRangeTimerManagerImpl::ScaledRangeTimerManagerImpl(
     141             :     Dispatcher& dispatcher, const ScaledTimerTypeMapConstSharedPtr& timer_minimums)
     142             :     : dispatcher_(dispatcher),
     143             :       timer_minimums_(timer_minimums != nullptr ? timer_minimums
     144             :                                                 : std::make_shared<ScaledTimerTypeMap>()),
     145        1498 :       scale_factor_(1.0) {}
     146             : 
     147        1498 : ScaledRangeTimerManagerImpl::~ScaledRangeTimerManagerImpl() {
     148             :   // Scaled timers created by the manager shouldn't outlive it. This is
     149             :   // necessary but not sufficient to guarantee that.
     150        1498 :   ASSERT(queues_.empty());
     151        1498 : }
     152             : 
     153        1298 : TimerPtr ScaledRangeTimerManagerImpl::createTimer(ScaledTimerType timer_type, TimerCb callback) {
     154        1298 :   const auto minimum_it = timer_minimums_->find(timer_type);
     155        1298 :   const Event::ScaledTimerMinimum minimum =
     156        1298 :       minimum_it != timer_minimums_->end()
     157        1298 :           ? minimum_it->second
     158        1298 :           : Event::ScaledTimerMinimum(Event::ScaledMinimum(UnitFloat::max()));
     159        1298 :   return createTimer(minimum, std::move(callback));
     160        1298 : }
     161             : 
     162        1298 : TimerPtr ScaledRangeTimerManagerImpl::createTimer(ScaledTimerMinimum minimum, TimerCb callback) {
     163        1298 :   return std::make_unique<RangeTimerImpl>(minimum, callback, *this);
     164        1298 : }
     165             : 
     166           0 : void ScaledRangeTimerManagerImpl::setScaleFactor(UnitFloat scale_factor) {
     167           0 :   const MonotonicTime now = dispatcher_.approximateMonotonicTime();
     168           0 :   scale_factor_ = scale_factor;
     169           0 :   for (auto& queue : queues_) {
     170           0 :     resetQueueTimer(*queue, now);
     171           0 :   }
     172           0 : }
     173             : 
     174             : ScaledRangeTimerManagerImpl::Queue::Item::Item(RangeTimerImpl& timer, MonotonicTime active_time)
     175           0 :     : timer_(timer), active_time_(active_time) {}
     176             : 
     177             : ScaledRangeTimerManagerImpl::Queue::Queue(std::chrono::milliseconds duration,
     178             :                                           ScaledRangeTimerManagerImpl& manager,
     179             :                                           Dispatcher& dispatcher)
     180             :     : duration_(duration),
     181           0 :       timer_(dispatcher.createTimer([this, &manager] { manager.onQueueTimerFired(*this); })) {}
     182             : 
     183             : ScaledRangeTimerManagerImpl::ScalingTimerHandle::ScalingTimerHandle(Queue& queue,
     184             :                                                                     Queue::Iterator iterator)
     185           0 :     : queue_(queue), iterator_(iterator) {}
     186             : 
     187             : MonotonicTime ScaledRangeTimerManagerImpl::computeTriggerTime(const Queue::Item& item,
     188             :                                                               std::chrono::milliseconds duration,
     189           0 :                                                               UnitFloat scale_factor) {
     190           0 :   return item.active_time_ +
     191           0 :          std::chrono::duration_cast<MonotonicTime::duration>(duration * scale_factor.value());
     192           0 : }
     193             : 
     194             : ScaledRangeTimerManagerImpl::ScalingTimerHandle
     195             : ScaledRangeTimerManagerImpl::activateTimer(std::chrono::milliseconds duration,
     196           0 :                                            RangeTimerImpl& range_timer) {
     197             :   // Ensure this is being called on the same dispatcher.
     198           0 :   ASSERT(dispatcher_.isThreadSafe());
     199             : 
     200             :   // Find the matching queue for the (max - min) duration of the range timer; if there isn't one,
     201             :   // create it.
     202           0 :   auto it = queues_.find(duration);
     203           0 :   if (it == queues_.end()) {
     204           0 :     auto queue = std::make_unique<Queue>(duration, *this, dispatcher_);
     205           0 :     it = queues_.emplace(std::move(queue)).first;
     206           0 :   }
     207           0 :   Queue& queue = **it;
     208             : 
     209             :   // Put the timer at the back of the queue. Since the timer has the same maximum duration as all
     210             :   // the other timers in the queue, and since the activation times are monotonic, the queue stays in
     211             :   // sorted order.
     212           0 :   queue.range_timers_.emplace_back(range_timer, dispatcher_.approximateMonotonicTime());
     213           0 :   if (queue.range_timers_.size() == 1) {
     214           0 :     resetQueueTimer(queue, dispatcher_.approximateMonotonicTime());
     215           0 :   }
     216             : 
     217           0 :   return {queue, --queue.range_timers_.end()};
     218           0 : }
     219             : 
     220           0 : void ScaledRangeTimerManagerImpl::removeTimer(ScalingTimerHandle handle) {
     221             :   // Ensure this is being called on the same dispatcher.
     222           0 :   ASSERT(dispatcher_.isThreadSafe());
     223             : 
     224           0 :   const bool was_front = handle.queue_.range_timers_.begin() == handle.iterator_;
     225           0 :   handle.queue_.range_timers_.erase(handle.iterator_);
     226             :   // Don't keep around empty queues
     227           0 :   if (handle.queue_.range_timers_.empty()) {
     228             :     // Skip erasing the queue if we're in the middle of processing timers for the queue. The
     229             :     // queue will be erased in `onQueueTimerFired` after the queue entries have been processed.
     230           0 :     if (!handle.queue_.processing_timers_) {
     231           0 :       queues_.erase(handle.queue_);
     232           0 :     }
     233           0 :     return;
     234           0 :   }
     235             : 
     236             :   // The queue's timer tracks the expiration time of the first range timer, so it only needs
     237             :   // adjusting if the first timer is the one that was removed.
     238           0 :   if (was_front) {
     239           0 :     resetQueueTimer(handle.queue_, dispatcher_.approximateMonotonicTime());
     240           0 :   }
     241           0 : }
     242             : 
     243           0 : void ScaledRangeTimerManagerImpl::resetQueueTimer(Queue& queue, MonotonicTime now) {
     244           0 :   ASSERT(!queue.range_timers_.empty());
     245           0 :   const MonotonicTime trigger_time =
     246           0 :       computeTriggerTime(queue.range_timers_.front(), queue.duration_, scale_factor_);
     247           0 :   if (trigger_time < now) {
     248           0 :     queue.timer_->enableTimer(std::chrono::milliseconds::zero());
     249           0 :   } else {
     250           0 :     queue.timer_->enableTimer(
     251           0 :         std::chrono::duration_cast<std::chrono::milliseconds>(trigger_time - now));
     252           0 :   }
     253           0 : }
     254             : 
     255           0 : void ScaledRangeTimerManagerImpl::onQueueTimerFired(Queue& queue) {
     256           0 :   auto& timers = queue.range_timers_;
     257           0 :   ASSERT(!timers.empty());
     258           0 :   const MonotonicTime now = dispatcher_.approximateMonotonicTime();
     259             : 
     260             :   // Pop and trigger timers until the one at the front isn't supposed to have expired yet (given the
     261             :   // current scale factor).
     262           0 :   queue.processing_timers_ = true;
     263           0 :   while (!timers.empty() &&
     264           0 :          computeTriggerTime(timers.front(), queue.duration_, scale_factor_) <= now) {
     265           0 :     auto item = std::move(queue.range_timers_.front());
     266           0 :     queue.range_timers_.pop_front();
     267           0 :     item.timer_.trigger();
     268           0 :   }
     269           0 :   queue.processing_timers_ = false;
     270             : 
     271           0 :   if (queue.range_timers_.empty()) {
     272             :     // Maintain the invariant that queues are never empty.
     273           0 :     queues_.erase(queue);
     274           0 :   } else {
     275           0 :     resetQueueTimer(queue, now);
     276           0 :   }
     277           0 : }
     278             : 
     279             : } // namespace Event
     280             : } // namespace Envoy

Generated by: LCOV version 1.15