Line data Source code
1 : #pragma once 2 : #include <cstdint> 3 : #include <iostream> 4 : #include <queue> 5 : 6 : #include "envoy/upstream/scheduler.h" 7 : 8 : #include "source/common/common/assert.h" 9 : 10 : namespace Envoy { 11 : namespace Upstream { 12 : 13 : // It's not sufficient to use trace level logging, since it becomes far too noisy for a number of 14 : // tests, so we can kill trace debug here. 15 : #define EDF_DEBUG 0 16 : 17 : #if EDF_DEBUG 18 : #define EDF_TRACE(...) ENVOY_LOG_MISC(trace, __VA_ARGS__) 19 : #else 20 : #define EDF_TRACE(...) 21 : #endif 22 : 23 : // Earliest Deadline First (EDF) scheduler 24 : // (https://en.wikipedia.org/wiki/Earliest_deadline_first_scheduling) used for weighted round robin. 25 : // Each pick from the schedule has the earliest deadline entry selected. Entries have deadlines set 26 : // at current time + 1 / weight, providing weighted round robin behavior with floating point 27 : // weights and an O(log n) pick time. 28 : template <class C> class EdfScheduler : public Scheduler<C> { 29 : public: 30 : // See scheduler.h for an explanation of each public method. 31 20 : std::shared_ptr<C> peekAgain(std::function<double(const C&)> calculate_weight) override { 32 20 : std::shared_ptr<C> ret = popEntry(); 33 20 : if (ret) { 34 20 : prepick_list_.push_back(ret); 35 20 : add(calculate_weight(*ret), ret); 36 20 : } 37 20 : return ret; 38 20 : } 39 : 40 525471 : std::shared_ptr<C> pickAndAdd(std::function<double(const C&)> calculate_weight) override { 41 525471 : while (!prepick_list_.empty()) { 42 : // In this case the entry was added back during peekAgain so don't re-add. 43 16 : std::shared_ptr<C> ret = prepick_list_.front().lock(); 44 16 : prepick_list_.pop_front(); 45 16 : if (ret) { 46 16 : return ret; 47 16 : } 48 16 : } 49 525455 : std::shared_ptr<C> ret = popEntry(); 50 525455 : if (ret) { 51 525455 : add(calculate_weight(*ret), ret); 52 525455 : } 53 525455 : return ret; 54 525471 : } 55 : 56 1167914 : void add(double weight, std::shared_ptr<C> entry) override { 57 1167914 : ASSERT(weight > 0); 58 1167914 : const double deadline = current_time_ + 1.0 / weight; 59 1167914 : EDF_TRACE("Insertion {} in queue with deadline {} and weight {}.", 60 1167914 : static_cast<const void*>(entry.get()), deadline, weight); 61 1167914 : queue_.push({deadline, order_offset_++, entry}); 62 1167914 : ASSERT(queue_.top().deadline_ >= current_time_); 63 1167914 : } 64 : 65 465 : bool empty() const override { return queue_.empty(); } 66 : 67 : private: 68 : /** 69 : * Clears expired entries and pops the next unexpired entry in the queue. 70 : */ 71 525475 : std::shared_ptr<C> popEntry() { 72 525475 : EDF_TRACE("Queue pick: queue_.size()={}, current_time_={}.", queue_.size(), current_time_); 73 525475 : while (true) { 74 525475 : if (queue_.empty()) { 75 0 : EDF_TRACE("Queue is empty."); 76 0 : return nullptr; 77 0 : } 78 525475 : const EdfEntry& edf_entry = queue_.top(); 79 : // Entry has been removed, let's see if there's another one. 80 525475 : std::shared_ptr<C> ret = edf_entry.entry_.lock(); 81 525475 : if (!ret) { 82 0 : EDF_TRACE("Entry has expired, repick."); 83 0 : queue_.pop(); 84 0 : continue; 85 0 : } 86 525475 : ASSERT(edf_entry.deadline_ >= current_time_); 87 525475 : current_time_ = edf_entry.deadline_; 88 525475 : EDF_TRACE("Picked {}, current_time_={}.", static_cast<const void*>(ret.get()), current_time_); 89 525475 : queue_.pop(); 90 525475 : return ret; 91 525475 : } 92 525475 : } 93 : 94 : struct EdfEntry { 95 : double deadline_; 96 : // Tie breaker for entries with the same deadline. This is used to provide FIFO behavior. 97 : uint64_t order_offset_; 98 : // We only hold a weak pointer, since we don't support a remove operator. This allows entries to 99 : // be lazily unloaded from the queue. 100 : std::weak_ptr<C> entry_; 101 : 102 : // Flip < direction to make this a min queue. 103 12343381 : bool operator<(const EdfEntry& other) const { 104 12343381 : return deadline_ > other.deadline_ || 105 12343381 : (deadline_ == other.deadline_ && order_offset_ > other.order_offset_); 106 12343381 : } 107 : }; 108 : 109 : // Current time in EDF scheduler. 110 : // TODO(htuch): Is it worth the small extra complexity to use integer time for performance 111 : // reasons? 112 : double current_time_{}; 113 : // Offset used during addition to break ties when entries have the same weight but should reflect 114 : // FIFO insertion order in picks. 115 : uint64_t order_offset_{}; 116 : // Min priority queue for EDF. 117 : std::priority_queue<EdfEntry> queue_; 118 : std::list<std::weak_ptr<C>> prepick_list_; 119 : }; 120 : 121 : #undef EDF_DEBUG 122 : 123 : } // namespace Upstream 124 : } // namespace Envoy