/proc/self/cwd/source/common/upstream/edf_scheduler.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | #include <cstdint> |
3 | | #include <iostream> |
4 | | #include <queue> |
5 | | |
6 | | #include "envoy/upstream/scheduler.h" |
7 | | |
8 | | #include "source/common/common/assert.h" |
9 | | |
10 | | namespace Envoy { |
11 | | namespace Upstream { |
12 | | |
13 | | // It's not sufficient to use trace level logging, since it becomes far too noisy for a number of |
14 | | // tests, so we can kill trace debug here. |
15 | | #define EDF_DEBUG 0 |
16 | | |
17 | | #if EDF_DEBUG |
18 | | #define EDF_TRACE(...) ENVOY_LOG_MISC(trace, __VA_ARGS__) |
19 | | #else |
20 | | #define EDF_TRACE(...) |
21 | | #endif |
22 | | |
23 | | // Earliest Deadline First (EDF) scheduler |
24 | | // (https://en.wikipedia.org/wiki/Earliest_deadline_first_scheduling) used for weighted round robin. |
25 | | // Each pick from the schedule has the earliest deadline entry selected. Entries have deadlines set |
26 | | // at current time + 1 / weight, providing weighted round robin behavior with floating point |
27 | | // weights and an O(log n) pick time. |
28 | | template <class C> class EdfScheduler : public Scheduler<C> { |
29 | | public: |
30 | 0 | EdfScheduler() = default; Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::EdfScheduler() Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::EdfScheduler() |
31 | | |
32 | | // See scheduler.h for an explanation of each public method. |
33 | 833 | std::shared_ptr<C> peekAgain(std::function<double(const C&)> calculate_weight) override { |
34 | 833 | std::shared_ptr<C> ret = popEntry(); |
35 | 833 | if (ret) { |
36 | 833 | prepick_list_.push_back(ret); |
37 | 833 | add(calculate_weight(*ret), ret); |
38 | 833 | } |
39 | 833 | return ret; |
40 | 833 | } Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::peekAgain(std::__1::function<double (Envoy::Upstream::HostSetImpl::LocalityEntry const&)>) Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::peekAgain(std::__1::function<double (Envoy::Upstream::Host const&)>) Line | Count | Source | 33 | 833 | std::shared_ptr<C> peekAgain(std::function<double(const C&)> calculate_weight) override { | 34 | 833 | std::shared_ptr<C> ret = popEntry(); | 35 | 833 | if (ret) { | 36 | 833 | prepick_list_.push_back(ret); | 37 | 833 | add(calculate_weight(*ret), ret); | 38 | 833 | } | 39 | 833 | return ret; | 40 | 833 | } |
|
41 | | |
42 | 12.1M | std::shared_ptr<C> pickAndAdd(std::function<double(const C&)> calculate_weight) override { |
43 | 12.1M | while (!prepick_list_.empty()) { |
44 | | // In this case the entry was added back during peekAgain so don't re-add. |
45 | 125 | std::shared_ptr<C> ret = prepick_list_.front().lock(); |
46 | 125 | prepick_list_.pop_front(); |
47 | 125 | if (ret) { |
48 | 125 | return ret; |
49 | 125 | } |
50 | 125 | } |
51 | 12.1M | std::shared_ptr<C> ret = popEntry(); |
52 | 12.1M | if (ret) { |
53 | 12.1M | add(calculate_weight(*ret), ret); |
54 | 12.1M | } |
55 | 12.1M | return ret; |
56 | 12.1M | } Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::pickAndAdd(std::__1::function<double (Envoy::Upstream::HostSetImpl::LocalityEntry const&)>) Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::pickAndAdd(std::__1::function<double (Envoy::Upstream::Host const&)>) Line | Count | Source | 42 | 12.1M | std::shared_ptr<C> pickAndAdd(std::function<double(const C&)> calculate_weight) override { | 43 | 12.1M | while (!prepick_list_.empty()) { | 44 | | // In this case the entry was added back during peekAgain so don't re-add. | 45 | 125 | std::shared_ptr<C> ret = prepick_list_.front().lock(); | 46 | 125 | prepick_list_.pop_front(); | 47 | 125 | if (ret) { | 48 | 125 | return ret; | 49 | 125 | } | 50 | 125 | } | 51 | 12.1M | std::shared_ptr<C> ret = popEntry(); | 52 | 12.1M | if (ret) { | 53 | 12.1M | add(calculate_weight(*ret), ret); | 54 | 12.1M | } | 55 | 12.1M | return ret; | 56 | 12.1M | } |
|
57 | | |
58 | 12.1M | void add(double weight, std::shared_ptr<C> entry) override { |
59 | 12.1M | ASSERT(weight > 0); |
60 | 12.1M | const double deadline = current_time_ + 1.0 / weight; |
61 | 12.1M | EDF_TRACE("Insertion {} in queue with deadline {} and weight {}.", |
62 | 12.1M | static_cast<const void*>(entry.get()), deadline, weight); |
63 | 12.1M | queue_.push({deadline, order_offset_++, entry}); |
64 | 12.1M | ASSERT(queue_.top().deadline_ >= current_time_); |
65 | 12.1M | } Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::add(double, std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry>) Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::add(double, std::__1::shared_ptr<Envoy::Upstream::Host>) Line | Count | Source | 58 | 12.1M | void add(double weight, std::shared_ptr<C> entry) override { | 59 | 12.1M | ASSERT(weight > 0); | 60 | 12.1M | const double deadline = current_time_ + 1.0 / weight; | 61 | 12.1M | EDF_TRACE("Insertion {} in queue with deadline {} and weight {}.", | 62 | 12.1M | static_cast<const void*>(entry.get()), deadline, weight); | 63 | 12.1M | queue_.push({deadline, order_offset_++, entry}); | 64 | 12.1M | ASSERT(queue_.top().deadline_ >= current_time_); | 65 | 12.1M | } |
|
66 | | |
67 | 0 | bool empty() const override { return queue_.empty(); } Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::empty() const Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::empty() const |
68 | | |
69 | | // Creates an EdfScheduler with the given weights and their corresponding |
70 | | // entries, and emulating a number of initial picks to be performed. Note that |
71 | | // the internal state of the scheduler will be very similar to creating an empty |
72 | | // scheduler, adding the entries one after the other, and then performing |
73 | | // "picks" pickAndAdd operation without modifying the entries' weights. |
74 | | // The only thing that may be different is that entries with the same weight |
75 | | // may be chosen a bit differently (the order_offset_ values may be different). |
76 | | // Breaking the ties of same weight entries will be kept in future picks from |
77 | | // the scheduler. |
78 | | static EdfScheduler<C> createWithPicks(const std::vector<std::shared_ptr<C>>& entries, |
79 | | std::function<double(const C&)> calculate_weight, |
80 | 5.24k | uint32_t picks) { |
81 | | // Limiting the number of picks, as over 400M picks should be sufficient |
82 | | // for most scenarios. |
83 | 5.24k | picks = picks % 429496729; // % UINT_MAX/10 |
84 | 5.24k | EDF_TRACE("Creating an EDF-scheduler with {} weights and {} pre-picks.", entries.size(), picks); |
85 | | // Assume no non-positive weights. |
86 | 5.24k | ASSERT(std::none_of(entries.cbegin(), entries.cend(), |
87 | 5.24k | [&calculate_weight](const std::shared_ptr<C>& entry) { |
88 | 5.24k | return calculate_weight(*entry) <= 0; |
89 | 5.24k | })); |
90 | | |
91 | | // Nothing to do if there are no entries. |
92 | 5.24k | if (entries.empty()) { |
93 | 0 | return EdfScheduler<C>(); |
94 | 0 | } |
95 | | |
96 | | // Augment the weight computation to add some epsilon to each entry's |
97 | | // weight to avoid cases where weights are multiplies of each other. For |
98 | | // example if there are 2 weights: 25 and 75, and picks=23, then the |
99 | | // floor_picks will be {5, 17} (respectively), and the deadlines will be |
100 | | // {0.24000000000000002 and 0.24} (respectively). This small difference will |
101 | | // cause a "wrong" pick compared to when starting from an empty scheduler |
102 | | // and picking 23 times. Adding a small value to each weight circumvents |
103 | | // this problem. This was added as a result of the following comment: |
104 | | // https://github.com/envoyproxy/envoy/pull/31592#issuecomment-1877663769. |
105 | 79.9M | auto aug_calculate_weight = [&calculate_weight](const C& entry) -> double { |
106 | 79.9M | return calculate_weight(entry) + 1e-13; |
107 | 79.9M | }; Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::createWithPicks(std::__1::vector<std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry>, std::__1::allocator<std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry> > > const&, std::__1::function<double (Envoy::Upstream::HostSetImpl::LocalityEntry const&)>, unsigned int)::{lambda(Envoy::Upstream::HostSetImpl::LocalityEntry const&)#1}::operator()(Envoy::Upstream::HostSetImpl::LocalityEntry const&) const Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::createWithPicks(std::__1::vector<std::__1::shared_ptr<Envoy::Upstream::Host>, std::__1::allocator<std::__1::shared_ptr<Envoy::Upstream::Host> > > const&, std::__1::function<double (Envoy::Upstream::Host const&)>, unsigned int)::{lambda(Envoy::Upstream::Host const&)#1}::operator()(Envoy::Upstream::Host const&) const Line | Count | Source | 105 | 79.9M | auto aug_calculate_weight = [&calculate_weight](const C& entry) -> double { | 106 | 79.9M | return calculate_weight(entry) + 1e-13; | 107 | 79.9M | }; |
|
108 | | |
109 | | // Let weights {w_1, w_2, ..., w_N} be the per-entry weight where (w_i > 0), |
110 | | // W = sum(w_i), and P be the number of times to "pick" from the scheduler. |
111 | | // Let p'_i = floor(P * w_i/W), then the number of times each entry is being |
112 | | // picked is p_i >= p'_i. Note that 0 <= P - sum(p'_i) < N. |
113 | | // |
114 | | // The following code does P picks, by first emulating p'_i picks for each |
115 | | // entry, and then executing the leftover P - sum(p'_i) picks. |
116 | 5.24k | double weights_sum = std::accumulate( |
117 | 5.24k | entries.cbegin(), entries.cend(), 0.0, |
118 | 26.6M | [&aug_calculate_weight](double sum_so_far, const std::shared_ptr<C>& entry) { |
119 | 26.6M | return sum_so_far + aug_calculate_weight(*entry); |
120 | 26.6M | }); Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::createWithPicks(std::__1::vector<std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry>, std::__1::allocator<std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry> > > const&, std::__1::function<double (Envoy::Upstream::HostSetImpl::LocalityEntry const&)>, unsigned int)::{lambda(double, std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry> const&)#1}::operator()(double, std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry> const&) const Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::createWithPicks(std::__1::vector<std::__1::shared_ptr<Envoy::Upstream::Host>, std::__1::allocator<std::__1::shared_ptr<Envoy::Upstream::Host> > > const&, std::__1::function<double (Envoy::Upstream::Host const&)>, unsigned int)::{lambda(double, std::__1::shared_ptr<Envoy::Upstream::Host> const&)#1}::operator()(double, std::__1::shared_ptr<Envoy::Upstream::Host> const&) const Line | Count | Source | 118 | 26.6M | [&aug_calculate_weight](double sum_so_far, const std::shared_ptr<C>& entry) { | 119 | 26.6M | return sum_so_far + aug_calculate_weight(*entry); | 120 | 26.6M | }); |
|
121 | 5.24k | std::vector<uint32_t> floor_picks; |
122 | 5.24k | floor_picks.reserve(entries.size()); |
123 | 5.24k | std::transform(entries.cbegin(), entries.cend(), std::back_inserter(floor_picks), |
124 | 26.6M | [picks, weights_sum, &aug_calculate_weight](const std::shared_ptr<C>& entry) { |
125 | | // Getting the lower-bound by casting to an integer. |
126 | 26.6M | return static_cast<uint32_t>(aug_calculate_weight(*entry) * picks / |
127 | 26.6M | weights_sum); |
128 | 26.6M | }); Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::createWithPicks(std::__1::vector<std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry>, std::__1::allocator<std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry> > > const&, std::__1::function<double (Envoy::Upstream::HostSetImpl::LocalityEntry const&)>, unsigned int)::{lambda(std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry> const&)#2}::operator()(std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry> const&) const Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::createWithPicks(std::__1::vector<std::__1::shared_ptr<Envoy::Upstream::Host>, std::__1::allocator<std::__1::shared_ptr<Envoy::Upstream::Host> > > const&, std::__1::function<double (Envoy::Upstream::Host const&)>, unsigned int)::{lambda(std::__1::shared_ptr<Envoy::Upstream::Host> const&)#2}::operator()(std::__1::shared_ptr<Envoy::Upstream::Host> const&) const Line | Count | Source | 124 | 26.6M | [picks, weights_sum, &aug_calculate_weight](const std::shared_ptr<C>& entry) { | 125 | | // Getting the lower-bound by casting to an integer. | 126 | 26.6M | return static_cast<uint32_t>(aug_calculate_weight(*entry) * picks / | 127 | 26.6M | weights_sum); | 128 | 26.6M | }); |
|
129 | | |
130 | | // Pre-compute the priority-queue entries to use an O(N) initialization c'tor. |
131 | 5.24k | std::vector<EdfEntry> scheduler_entries; |
132 | 5.24k | scheduler_entries.reserve(entries.size()); |
133 | 5.24k | uint32_t picks_so_far = 0; |
134 | 5.24k | double max_pick_time = 0.0; |
135 | | // Emulate a per-entry addition to a deadline that is applicable to N picks. |
136 | 26.6M | for (size_t i = 0; i < entries.size(); ++i) { |
137 | | // Add the entry with p'_i picks. As there were p'_i picks, the entry's |
138 | | // next deadline is (p'_i + 1) / w_i. |
139 | 26.6M | const double weight = aug_calculate_weight(*entries[i]); |
140 | | // While validating the algorithm there were a few cases where the math |
141 | | // and floating-point arithmetic did not agree (specifically floor(A*B) |
142 | | // was greater than A*B). The following if statement solves the problem by |
143 | | // reducing floor-picks for the entry, which may result in more iterations |
144 | | // in the code after the loop. |
145 | 26.6M | if ((floor_picks[i] > 0) && (floor_picks[i] / weight >= picks / weights_sum)) { |
146 | 989 | floor_picks[i]--; |
147 | 989 | } |
148 | 26.6M | const double pick_time = floor_picks[i] / weight; |
149 | 26.6M | const double deadline = (floor_picks[i] + 1) / weight; |
150 | 26.6M | EDF_TRACE("Insertion {} in queue with emualted {} picks, deadline {} and weight {}.", |
151 | 26.6M | static_cast<const void*>(entries[i].get()), floor_picks[i], deadline, weight); |
152 | 26.6M | scheduler_entries.emplace_back(EdfEntry{deadline, i, entries[i]}); |
153 | 26.6M | max_pick_time = std::max(max_pick_time, pick_time); |
154 | 26.6M | picks_so_far += floor_picks[i]; |
155 | 26.6M | } |
156 | | // The scheduler's current_time_ needs to be the largest time that some entry was picked. |
157 | 5.24k | EdfScheduler<C> scheduler(std::move(scheduler_entries), max_pick_time, entries.size()); |
158 | 5.24k | ASSERT(scheduler.queue_.top().deadline_ >= scheduler.current_time_); |
159 | | |
160 | | // Left to do some picks, execute them one after the other. |
161 | 5.24k | EDF_TRACE("Emulated {} picks in init step, {} picks remaining for one after the other step", |
162 | 5.24k | picks_so_far, picks - picks_so_far); |
163 | 12.1M | while (picks_so_far < picks) { |
164 | 12.1M | scheduler.pickAndAdd(calculate_weight); |
165 | 12.1M | picks_so_far++; |
166 | 12.1M | } |
167 | 5.24k | return scheduler; |
168 | 5.24k | } Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::createWithPicks(std::__1::vector<std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry>, std::__1::allocator<std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry> > > const&, std::__1::function<double (Envoy::Upstream::HostSetImpl::LocalityEntry const&)>, unsigned int) Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::createWithPicks(std::__1::vector<std::__1::shared_ptr<Envoy::Upstream::Host>, std::__1::allocator<std::__1::shared_ptr<Envoy::Upstream::Host> > > const&, std::__1::function<double (Envoy::Upstream::Host const&)>, unsigned int) Line | Count | Source | 80 | 5.24k | uint32_t picks) { | 81 | | // Limiting the number of picks, as over 400M picks should be sufficient | 82 | | // for most scenarios. | 83 | 5.24k | picks = picks % 429496729; // % UINT_MAX/10 | 84 | 5.24k | EDF_TRACE("Creating an EDF-scheduler with {} weights and {} pre-picks.", entries.size(), picks); | 85 | | // Assume no non-positive weights. | 86 | 5.24k | ASSERT(std::none_of(entries.cbegin(), entries.cend(), | 87 | 5.24k | [&calculate_weight](const std::shared_ptr<C>& entry) { | 88 | 5.24k | return calculate_weight(*entry) <= 0; | 89 | 5.24k | })); | 90 | | | 91 | | // Nothing to do if there are no entries. | 92 | 5.24k | if (entries.empty()) { | 93 | 0 | return EdfScheduler<C>(); | 94 | 0 | } | 95 | | | 96 | | // Augment the weight computation to add some epsilon to each entry's | 97 | | // weight to avoid cases where weights are multiplies of each other. For | 98 | | // example if there are 2 weights: 25 and 75, and picks=23, then the | 99 | | // floor_picks will be {5, 17} (respectively), and the deadlines will be | 100 | | // {0.24000000000000002 and 0.24} (respectively). This small difference will | 101 | | // cause a "wrong" pick compared to when starting from an empty scheduler | 102 | | // and picking 23 times. Adding a small value to each weight circumvents | 103 | | // this problem. This was added as a result of the following comment: | 104 | | // https://github.com/envoyproxy/envoy/pull/31592#issuecomment-1877663769. | 105 | 5.24k | auto aug_calculate_weight = [&calculate_weight](const C& entry) -> double { | 106 | 5.24k | return calculate_weight(entry) + 1e-13; | 107 | 5.24k | }; | 108 | | | 109 | | // Let weights {w_1, w_2, ..., w_N} be the per-entry weight where (w_i > 0), | 110 | | // W = sum(w_i), and P be the number of times to "pick" from the scheduler. | 111 | | // Let p'_i = floor(P * w_i/W), then the number of times each entry is being | 112 | | // picked is p_i >= p'_i. Note that 0 <= P - sum(p'_i) < N. | 113 | | // | 114 | | // The following code does P picks, by first emulating p'_i picks for each | 115 | | // entry, and then executing the leftover P - sum(p'_i) picks. | 116 | 5.24k | double weights_sum = std::accumulate( | 117 | 5.24k | entries.cbegin(), entries.cend(), 0.0, | 118 | 5.24k | [&aug_calculate_weight](double sum_so_far, const std::shared_ptr<C>& entry) { | 119 | 5.24k | return sum_so_far + aug_calculate_weight(*entry); | 120 | 5.24k | }); | 121 | 5.24k | std::vector<uint32_t> floor_picks; | 122 | 5.24k | floor_picks.reserve(entries.size()); | 123 | 5.24k | std::transform(entries.cbegin(), entries.cend(), std::back_inserter(floor_picks), | 124 | 5.24k | [picks, weights_sum, &aug_calculate_weight](const std::shared_ptr<C>& entry) { | 125 | | // Getting the lower-bound by casting to an integer. | 126 | 5.24k | return static_cast<uint32_t>(aug_calculate_weight(*entry) * picks / | 127 | 5.24k | weights_sum); | 128 | 5.24k | }); | 129 | | | 130 | | // Pre-compute the priority-queue entries to use an O(N) initialization c'tor. | 131 | 5.24k | std::vector<EdfEntry> scheduler_entries; | 132 | 5.24k | scheduler_entries.reserve(entries.size()); | 133 | 5.24k | uint32_t picks_so_far = 0; | 134 | 5.24k | double max_pick_time = 0.0; | 135 | | // Emulate a per-entry addition to a deadline that is applicable to N picks. | 136 | 26.6M | for (size_t i = 0; i < entries.size(); ++i) { | 137 | | // Add the entry with p'_i picks. As there were p'_i picks, the entry's | 138 | | // next deadline is (p'_i + 1) / w_i. | 139 | 26.6M | const double weight = aug_calculate_weight(*entries[i]); | 140 | | // While validating the algorithm there were a few cases where the math | 141 | | // and floating-point arithmetic did not agree (specifically floor(A*B) | 142 | | // was greater than A*B). The following if statement solves the problem by | 143 | | // reducing floor-picks for the entry, which may result in more iterations | 144 | | // in the code after the loop. | 145 | 26.6M | if ((floor_picks[i] > 0) && (floor_picks[i] / weight >= picks / weights_sum)) { | 146 | 989 | floor_picks[i]--; | 147 | 989 | } | 148 | 26.6M | const double pick_time = floor_picks[i] / weight; | 149 | 26.6M | const double deadline = (floor_picks[i] + 1) / weight; | 150 | 26.6M | EDF_TRACE("Insertion {} in queue with emualted {} picks, deadline {} and weight {}.", | 151 | 26.6M | static_cast<const void*>(entries[i].get()), floor_picks[i], deadline, weight); | 152 | 26.6M | scheduler_entries.emplace_back(EdfEntry{deadline, i, entries[i]}); | 153 | 26.6M | max_pick_time = std::max(max_pick_time, pick_time); | 154 | 26.6M | picks_so_far += floor_picks[i]; | 155 | 26.6M | } | 156 | | // The scheduler's current_time_ needs to be the largest time that some entry was picked. | 157 | 5.24k | EdfScheduler<C> scheduler(std::move(scheduler_entries), max_pick_time, entries.size()); | 158 | 5.24k | ASSERT(scheduler.queue_.top().deadline_ >= scheduler.current_time_); | 159 | | | 160 | | // Left to do some picks, execute them one after the other. | 161 | 5.24k | EDF_TRACE("Emulated {} picks in init step, {} picks remaining for one after the other step", | 162 | 5.24k | picks_so_far, picks - picks_so_far); | 163 | 12.1M | while (picks_so_far < picks) { | 164 | 12.1M | scheduler.pickAndAdd(calculate_weight); | 165 | 12.1M | picks_so_far++; | 166 | 12.1M | } | 167 | 5.24k | return scheduler; | 168 | 5.24k | } |
|
169 | | |
170 | | private: |
171 | | friend class EdfSchedulerTest; |
172 | | |
173 | | /** |
174 | | * Clears expired entries and pops the next unexpired entry in the queue. |
175 | | */ |
176 | 12.1M | std::shared_ptr<C> popEntry() { |
177 | 12.1M | EDF_TRACE("Queue pick: queue_.size()={}, current_time_={}.", queue_.size(), current_time_); |
178 | 12.1M | while (true) { |
179 | 12.1M | if (queue_.empty()) { |
180 | 0 | EDF_TRACE("Queue is empty."); |
181 | 0 | return nullptr; |
182 | 0 | } |
183 | 12.1M | const EdfEntry& edf_entry = queue_.top(); |
184 | | // Entry has been removed, let's see if there's another one. |
185 | 12.1M | std::shared_ptr<C> ret = edf_entry.entry_.lock(); |
186 | 12.1M | if (!ret) { |
187 | 0 | EDF_TRACE("Entry has expired, repick."); |
188 | 0 | queue_.pop(); |
189 | 0 | continue; |
190 | 0 | } |
191 | 12.1M | ASSERT(edf_entry.deadline_ >= current_time_); |
192 | 12.1M | current_time_ = edf_entry.deadline_; |
193 | 12.1M | EDF_TRACE("Picked {}, current_time_={}.", static_cast<const void*>(ret.get()), current_time_); |
194 | 12.1M | queue_.pop(); |
195 | 12.1M | return ret; |
196 | 12.1M | } |
197 | 12.1M | } Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::popEntry() Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::popEntry() Line | Count | Source | 176 | 12.1M | std::shared_ptr<C> popEntry() { | 177 | 12.1M | EDF_TRACE("Queue pick: queue_.size()={}, current_time_={}.", queue_.size(), current_time_); | 178 | 12.1M | while (true) { | 179 | 12.1M | if (queue_.empty()) { | 180 | 0 | EDF_TRACE("Queue is empty."); | 181 | 0 | return nullptr; | 182 | 0 | } | 183 | 12.1M | const EdfEntry& edf_entry = queue_.top(); | 184 | | // Entry has been removed, let's see if there's another one. | 185 | 12.1M | std::shared_ptr<C> ret = edf_entry.entry_.lock(); | 186 | 12.1M | if (!ret) { | 187 | 0 | EDF_TRACE("Entry has expired, repick."); | 188 | 0 | queue_.pop(); | 189 | 0 | continue; | 190 | 0 | } | 191 | 12.1M | ASSERT(edf_entry.deadline_ >= current_time_); | 192 | 12.1M | current_time_ = edf_entry.deadline_; | 193 | 12.1M | EDF_TRACE("Picked {}, current_time_={}.", static_cast<const void*>(ret.get()), current_time_); | 194 | 12.1M | queue_.pop(); | 195 | 12.1M | return ret; | 196 | 12.1M | } | 197 | 12.1M | } |
|
198 | | |
199 | | struct EdfEntry { |
200 | | double deadline_; |
201 | | // Tie breaker for entries with the same deadline. This is used to provide FIFO behavior. |
202 | | uint64_t order_offset_; |
203 | | // We only hold a weak pointer, since we don't support a remove operator. This allows entries to |
204 | | // be lazily unloaded from the queue. |
205 | | std::weak_ptr<C> entry_; |
206 | | |
207 | | // Flip < direction to make this a min queue. |
208 | 223M | bool operator<(const EdfEntry& other) const { |
209 | 223M | return deadline_ > other.deadline_ || |
210 | 223M | (deadline_ == other.deadline_ && order_offset_ > other.order_offset_); |
211 | 223M | } Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::EdfEntry::operator<(Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::EdfEntry const&) const Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::EdfEntry::operator<(Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::EdfEntry const&) const Line | Count | Source | 208 | 223M | bool operator<(const EdfEntry& other) const { | 209 | 223M | return deadline_ > other.deadline_ || | 210 | 223M | (deadline_ == other.deadline_ && order_offset_ > other.order_offset_); | 211 | 223M | } |
|
212 | | }; |
213 | | |
214 | | EdfScheduler(std::vector<EdfEntry>&& scheduler_entries, double current_time, |
215 | | uint32_t order_offset) |
216 | | : current_time_(current_time), order_offset_(order_offset), |
217 | 5.24k | queue_(scheduler_entries.cbegin(), scheduler_entries.cend()) {} Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::EdfScheduler(std::__1::vector<Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::EdfEntry, std::__1::allocator<Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::EdfEntry> >&&, double, unsigned int) Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::EdfScheduler(std::__1::vector<Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::EdfEntry, std::__1::allocator<Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::EdfEntry> >&&, double, unsigned int) Line | Count | Source | 217 | 5.24k | queue_(scheduler_entries.cbegin(), scheduler_entries.cend()) {} |
|
218 | | |
219 | | // Current time in EDF scheduler. |
220 | | // TODO(htuch): Is it worth the small extra complexity to use integer time for performance |
221 | | // reasons? |
222 | | double current_time_{}; |
223 | | // Offset used during addition to break ties when entries have the same weight but should reflect |
224 | | // FIFO insertion order in picks. |
225 | | uint64_t order_offset_{}; |
226 | | // Min priority queue for EDF. |
227 | | std::priority_queue<EdfEntry> queue_; |
228 | | std::list<std::weak_ptr<C>> prepick_list_; |
229 | | }; |
230 | | |
231 | | #undef EDF_DEBUG |
232 | | |
233 | | } // namespace Upstream |
234 | | } // namespace Envoy |