/proc/self/cwd/source/common/upstream/edf_scheduler.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | #include <cstdint> |
3 | | #include <iostream> |
4 | | #include <queue> |
5 | | |
6 | | #include "envoy/upstream/scheduler.h" |
7 | | |
8 | | #include "source/common/common/assert.h" |
9 | | |
10 | | namespace Envoy { |
11 | | namespace Upstream { |
12 | | |
13 | | // It's not sufficient to use trace level logging, since it becomes far too noisy for a number of |
14 | | // tests, so we can kill trace debug here. |
15 | | #define EDF_DEBUG 0 |
16 | | |
17 | | #if EDF_DEBUG |
18 | | #define EDF_TRACE(...) ENVOY_LOG_MISC(trace, __VA_ARGS__) |
19 | | #else |
20 | | #define EDF_TRACE(...) |
21 | | #endif |
22 | | |
23 | | // Earliest Deadline First (EDF) scheduler |
24 | | // (https://en.wikipedia.org/wiki/Earliest_deadline_first_scheduling) used for weighted round robin. |
25 | | // Each pick from the schedule has the earliest deadline entry selected. Entries have deadlines set |
26 | | // at current time + 1 / weight, providing weighted round robin behavior with floating point |
27 | | // weights and an O(log n) pick time. |
28 | | template <class C> class EdfScheduler : public Scheduler<C> { |
29 | | public: |
30 | | // See scheduler.h for an explanation of each public method. |
31 | 979 | std::shared_ptr<C> peekAgain(std::function<double(const C&)> calculate_weight) override { |
32 | 979 | std::shared_ptr<C> ret = popEntry(); |
33 | 979 | if (ret) { |
34 | 979 | prepick_list_.push_back(ret); |
35 | 979 | add(calculate_weight(*ret), ret); |
36 | 979 | } |
37 | 979 | return ret; |
38 | 979 | } Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host const>::peekAgain(std::__1::function<double (Envoy::Upstream::Host const&)>) Line | Count | Source | 31 | 979 | std::shared_ptr<C> peekAgain(std::function<double(const C&)> calculate_weight) override { | 32 | 979 | std::shared_ptr<C> ret = popEntry(); | 33 | 979 | if (ret) { | 34 | 979 | prepick_list_.push_back(ret); | 35 | 979 | add(calculate_weight(*ret), ret); | 36 | 979 | } | 37 | 979 | return ret; | 38 | 979 | } |
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::peekAgain(std::__1::function<double (Envoy::Upstream::HostSetImpl::LocalityEntry const&)>) |
39 | | |
40 | 13.9M | std::shared_ptr<C> pickAndAdd(std::function<double(const C&)> calculate_weight) override { |
41 | 13.9M | while (!prepick_list_.empty()) { |
42 | | // In this case the entry was added back during peekAgain so don't re-add. |
43 | 240 | std::shared_ptr<C> ret = prepick_list_.front().lock(); |
44 | 240 | prepick_list_.pop_front(); |
45 | 240 | if (ret) { |
46 | 240 | return ret; |
47 | 240 | } |
48 | 240 | } |
49 | 13.9M | std::shared_ptr<C> ret = popEntry(); |
50 | 13.9M | if (ret) { |
51 | 13.9M | add(calculate_weight(*ret), ret); |
52 | 13.9M | } |
53 | 13.9M | return ret; |
54 | 13.9M | } Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host const>::pickAndAdd(std::__1::function<double (Envoy::Upstream::Host const&)>) Line | Count | Source | 40 | 13.9M | std::shared_ptr<C> pickAndAdd(std::function<double(const C&)> calculate_weight) override { | 41 | 13.9M | while (!prepick_list_.empty()) { | 42 | | // In this case the entry was added back during peekAgain so don't re-add. | 43 | 240 | std::shared_ptr<C> ret = prepick_list_.front().lock(); | 44 | 240 | prepick_list_.pop_front(); | 45 | 240 | if (ret) { | 46 | 240 | return ret; | 47 | 240 | } | 48 | 240 | } | 49 | 13.9M | std::shared_ptr<C> ret = popEntry(); | 50 | 13.9M | if (ret) { | 51 | 13.9M | add(calculate_weight(*ret), ret); | 52 | 13.9M | } | 53 | 13.9M | return ret; | 54 | 13.9M | } |
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::pickAndAdd(std::__1::function<double (Envoy::Upstream::HostSetImpl::LocalityEntry const&)>) |
55 | | |
56 | 33.0M | void add(double weight, std::shared_ptr<C> entry) override { |
57 | 33.0M | ASSERT(weight > 0); |
58 | 33.0M | const double deadline = current_time_ + 1.0 / weight; |
59 | 33.0M | EDF_TRACE("Insertion {} in queue with deadline {} and weight {}.", |
60 | 33.0M | static_cast<const void*>(entry.get()), deadline, weight); |
61 | 33.0M | queue_.push({deadline, order_offset_++, entry}); |
62 | 33.0M | ASSERT(queue_.top().deadline_ >= current_time_); |
63 | 33.0M | } Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host const>::add(double, std::__1::shared_ptr<Envoy::Upstream::Host const>) Line | Count | Source | 56 | 33.0M | void add(double weight, std::shared_ptr<C> entry) override { | 57 | 33.0M | ASSERT(weight > 0); | 58 | 33.0M | const double deadline = current_time_ + 1.0 / weight; | 59 | 33.0M | EDF_TRACE("Insertion {} in queue with deadline {} and weight {}.", | 60 | 33.0M | static_cast<const void*>(entry.get()), deadline, weight); | 61 | 33.0M | queue_.push({deadline, order_offset_++, entry}); | 62 | 33.0M | ASSERT(queue_.top().deadline_ >= current_time_); | 63 | 33.0M | } |
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::add(double, std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry>) |
64 | | |
65 | 9.80k | bool empty() const override { return queue_.empty(); }Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host const>::empty() const Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::empty() const Line | Count | Source | 65 | 9.80k | bool empty() const override { return queue_.empty(); } |
|
66 | | |
67 | | private: |
68 | | /** |
69 | | * Clears expired entries and pops the next unexpired entry in the queue. |
70 | | */ |
71 | 13.9M | std::shared_ptr<C> popEntry() { |
72 | 13.9M | EDF_TRACE("Queue pick: queue_.size()={}, current_time_={}.", queue_.size(), current_time_); |
73 | 13.9M | while (true) { |
74 | 13.9M | if (queue_.empty()) { |
75 | 177 | EDF_TRACE("Queue is empty."); |
76 | 177 | return nullptr; |
77 | 177 | } |
78 | 13.9M | const EdfEntry& edf_entry = queue_.top(); |
79 | | // Entry has been removed, let's see if there's another one. |
80 | 13.9M | std::shared_ptr<C> ret = edf_entry.entry_.lock(); |
81 | 13.9M | if (!ret) { |
82 | 0 | EDF_TRACE("Entry has expired, repick."); |
83 | 0 | queue_.pop(); |
84 | 0 | continue; |
85 | 0 | } |
86 | 13.9M | ASSERT(edf_entry.deadline_ >= current_time_); |
87 | 13.9M | current_time_ = edf_entry.deadline_; |
88 | 13.9M | EDF_TRACE("Picked {}, current_time_={}.", static_cast<const void*>(ret.get()), current_time_); |
89 | 13.9M | queue_.pop(); |
90 | 13.9M | return ret; |
91 | 13.9M | } |
92 | 13.9M | } Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host const>::popEntry() Line | Count | Source | 71 | 13.9M | std::shared_ptr<C> popEntry() { | 72 | 13.9M | EDF_TRACE("Queue pick: queue_.size()={}, current_time_={}.", queue_.size(), current_time_); | 73 | 13.9M | while (true) { | 74 | 13.9M | if (queue_.empty()) { | 75 | 177 | EDF_TRACE("Queue is empty."); | 76 | 177 | return nullptr; | 77 | 177 | } | 78 | 13.9M | const EdfEntry& edf_entry = queue_.top(); | 79 | | // Entry has been removed, let's see if there's another one. | 80 | 13.9M | std::shared_ptr<C> ret = edf_entry.entry_.lock(); | 81 | 13.9M | if (!ret) { | 82 | 0 | EDF_TRACE("Entry has expired, repick."); | 83 | 0 | queue_.pop(); | 84 | 0 | continue; | 85 | 0 | } | 86 | 13.9M | ASSERT(edf_entry.deadline_ >= current_time_); | 87 | 13.9M | current_time_ = edf_entry.deadline_; | 88 | 13.9M | EDF_TRACE("Picked {}, current_time_={}.", static_cast<const void*>(ret.get()), current_time_); | 89 | 13.9M | queue_.pop(); | 90 | 13.9M | return ret; | 91 | 13.9M | } | 92 | 13.9M | } |
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::popEntry() |
93 | | |
94 | | struct EdfEntry { |
95 | | double deadline_; |
96 | | // Tie breaker for entries with the same deadline. This is used to provide FIFO behavior. |
97 | | uint64_t order_offset_; |
98 | | // We only hold a weak pointer, since we don't support a remove operator. This allows entries to |
99 | | // be lazily unloaded from the queue. |
100 | | std::weak_ptr<C> entry_; |
101 | | |
102 | | // Flip < direction to make this a min queue. |
103 | 426M | bool operator<(const EdfEntry& other) const { |
104 | 426M | return deadline_ > other.deadline_ || |
105 | 426M | (deadline_ == other.deadline_ && order_offset_ > other.order_offset_); |
106 | 426M | } Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host const>::EdfEntry::operator<(Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host const>::EdfEntry const&) const Line | Count | Source | 103 | 426M | bool operator<(const EdfEntry& other) const { | 104 | 426M | return deadline_ > other.deadline_ || | 105 | 426M | (deadline_ == other.deadline_ && order_offset_ > other.order_offset_); | 106 | 426M | } |
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::EdfEntry::operator<(Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::EdfEntry const&) const |
107 | | }; |
108 | | |
109 | | // Current time in EDF scheduler. |
110 | | // TODO(htuch): Is it worth the small extra complexity to use integer time for performance |
111 | | // reasons? |
112 | | double current_time_{}; |
113 | | // Offset used during addition to break ties when entries have the same weight but should reflect |
114 | | // FIFO insertion order in picks. |
115 | | uint64_t order_offset_{}; |
116 | | // Min priority queue for EDF. |
117 | | std::priority_queue<EdfEntry> queue_; |
118 | | std::list<std::weak_ptr<C>> prepick_list_; |
119 | | }; |
120 | | |
121 | | #undef EDF_DEBUG |
122 | | |
123 | | } // namespace Upstream |
124 | | } // namespace Envoy |