LCOV - code coverage report
Current view: top level - source/common/upstream - edf_scheduler.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 49 56 87.5 %
Date: 2024-01-05 06:35:25 Functions: 6 12 50.0 %

          Line data    Source code
       1             : #pragma once
       2             : #include <cstdint>
       3             : #include <iostream>
       4             : #include <queue>
       5             : 
       6             : #include "envoy/upstream/scheduler.h"
       7             : 
       8             : #include "source/common/common/assert.h"
       9             : 
      10             : namespace Envoy {
      11             : namespace Upstream {
      12             : 
      13             : // It's not sufficient to use trace level logging, since it becomes far too noisy for a number of
      14             : // tests, so we can kill trace debug here.
      15             : #define EDF_DEBUG 0
      16             : 
      17             : #if EDF_DEBUG
      18             : #define EDF_TRACE(...) ENVOY_LOG_MISC(trace, __VA_ARGS__)
      19             : #else
      20             : #define EDF_TRACE(...)
      21             : #endif
      22             : 
      23             : // Earliest Deadline First (EDF) scheduler
      24             : // (https://en.wikipedia.org/wiki/Earliest_deadline_first_scheduling) used for weighted round robin.
      25             : // Each pick from the schedule has the earliest deadline entry selected. Entries have deadlines set
      26             : // at current time + 1 / weight, providing weighted round robin behavior with floating point
      27             : // weights and an O(log n) pick time.
      28             : template <class C> class EdfScheduler : public Scheduler<C> {
      29             : public:
      30             :   // See scheduler.h for an explanation of each public method.
      31          20 :   std::shared_ptr<C> peekAgain(std::function<double(const C&)> calculate_weight) override {
      32          20 :     std::shared_ptr<C> ret = popEntry();
      33          20 :     if (ret) {
      34          20 :       prepick_list_.push_back(ret);
      35          20 :       add(calculate_weight(*ret), ret);
      36          20 :     }
      37          20 :     return ret;
      38          20 :   }
      39             : 
      40      525471 :   std::shared_ptr<C> pickAndAdd(std::function<double(const C&)> calculate_weight) override {
      41      525471 :     while (!prepick_list_.empty()) {
      42             :       // In this case the entry was added back during peekAgain so don't re-add.
      43          16 :       std::shared_ptr<C> ret = prepick_list_.front().lock();
      44          16 :       prepick_list_.pop_front();
      45          16 :       if (ret) {
      46          16 :         return ret;
      47          16 :       }
      48          16 :     }
      49      525455 :     std::shared_ptr<C> ret = popEntry();
      50      525455 :     if (ret) {
      51      525455 :       add(calculate_weight(*ret), ret);
      52      525455 :     }
      53      525455 :     return ret;
      54      525471 :   }
      55             : 
      56     1167914 :   void add(double weight, std::shared_ptr<C> entry) override {
      57     1167914 :     ASSERT(weight > 0);
      58     1167914 :     const double deadline = current_time_ + 1.0 / weight;
      59     1167914 :     EDF_TRACE("Insertion {} in queue with deadline {} and weight {}.",
      60     1167914 :               static_cast<const void*>(entry.get()), deadline, weight);
      61     1167914 :     queue_.push({deadline, order_offset_++, entry});
      62     1167914 :     ASSERT(queue_.top().deadline_ >= current_time_);
      63     1167914 :   }
      64             : 
      65         465 :   bool empty() const override { return queue_.empty(); }
      66             : 
      67             : private:
      68             :   /**
      69             :    * Clears expired entries and pops the next unexpired entry in the queue.
      70             :    */
      71      525475 :   std::shared_ptr<C> popEntry() {
      72      525475 :     EDF_TRACE("Queue pick: queue_.size()={}, current_time_={}.", queue_.size(), current_time_);
      73      525475 :     while (true) {
      74      525475 :       if (queue_.empty()) {
      75           0 :         EDF_TRACE("Queue is empty.");
      76           0 :         return nullptr;
      77           0 :       }
      78      525475 :       const EdfEntry& edf_entry = queue_.top();
      79             :       // Entry has been removed, let's see if there's another one.
      80      525475 :       std::shared_ptr<C> ret = edf_entry.entry_.lock();
      81      525475 :       if (!ret) {
      82           0 :         EDF_TRACE("Entry has expired, repick.");
      83           0 :         queue_.pop();
      84           0 :         continue;
      85           0 :       }
      86      525475 :       ASSERT(edf_entry.deadline_ >= current_time_);
      87      525475 :       current_time_ = edf_entry.deadline_;
      88      525475 :       EDF_TRACE("Picked {}, current_time_={}.", static_cast<const void*>(ret.get()), current_time_);
      89      525475 :       queue_.pop();
      90      525475 :       return ret;
      91      525475 :     }
      92      525475 :   }
      93             : 
      94             :   struct EdfEntry {
      95             :     double deadline_;
      96             :     // Tie breaker for entries with the same deadline. This is used to provide FIFO behavior.
      97             :     uint64_t order_offset_;
      98             :     // We only hold a weak pointer, since we don't support a remove operator. This allows entries to
      99             :     // be lazily unloaded from the queue.
     100             :     std::weak_ptr<C> entry_;
     101             : 
     102             :     // Flip < direction to make this a min queue.
     103    12343381 :     bool operator<(const EdfEntry& other) const {
     104    12343381 :       return deadline_ > other.deadline_ ||
     105    12343381 :              (deadline_ == other.deadline_ && order_offset_ > other.order_offset_);
     106    12343381 :     }
     107             :   };
     108             : 
     109             :   // Current time in EDF scheduler.
     110             :   // TODO(htuch): Is it worth the small extra complexity to use integer time for performance
     111             :   // reasons?
     112             :   double current_time_{};
     113             :   // Offset used during addition to break ties when entries have the same weight but should reflect
     114             :   // FIFO insertion order in picks.
     115             :   uint64_t order_offset_{};
     116             :   // Min priority queue for EDF.
     117             :   std::priority_queue<EdfEntry> queue_;
     118             :   std::list<std::weak_ptr<C>> prepick_list_;
     119             : };
     120             : 
     121             : #undef EDF_DEBUG
     122             : 
     123             : } // namespace Upstream
     124             : } // namespace Envoy

Generated by: LCOV version 1.15