1
#pragma once
2
#include <cstdint>
3
#include <iosfwd>
4
#include <queue>
5

            
6
#include "envoy/upstream/scheduler.h"
7

            
8
#include "source/common/common/assert.h"
9

            
10
namespace Envoy {
11
namespace Upstream {
12

            
13
// It's not sufficient to use trace level logging, since it becomes far too noisy for a number of
14
// tests, so we can kill trace debug here.
15
#define EDF_DEBUG 0
16

            
17
#if EDF_DEBUG
18
#define EDF_TRACE(...) ENVOY_LOG_MISC(trace, __VA_ARGS__)
19
#else
20
#define EDF_TRACE(...)
21
#endif
22

            
23
// Earliest Deadline First (EDF) scheduler
24
// (https://en.wikipedia.org/wiki/Earliest_deadline_first_scheduling) used for weighted round robin.
25
// Each pick from the schedule has the earliest deadline entry selected. Entries have deadlines set
26
// at current time + 1 / weight, providing weighted round robin behavior with floating point
27
// weights and an O(log n) pick time.
28
template <class C> class EdfScheduler : public Scheduler<C> {
29
public:
30
15
  EdfScheduler() = default;
31

            
32
  // See scheduler.h for an explanation of each public method.
33
24658
  std::shared_ptr<C> peekAgain(std::function<double(const C&)> calculate_weight) override {
34
24658
    std::shared_ptr<C> ret = popEntry();
35
24658
    if (ret) {
36
24655
      prepick_list_.push_back(ret);
37
24655
      add(calculate_weight(*ret), ret);
38
24655
    }
39
24658
    return ret;
40
24658
  }
41

            
42
5206489
  std::shared_ptr<C> pickAndAdd(std::function<double(const C&)> calculate_weight) override {
43
5206492
    while (!prepick_list_.empty()) {
44
      // In this case the entry was added back during peekAgain so don't re-add.
45
24654
      std::shared_ptr<C> ret = prepick_list_.front().lock();
46
24654
      prepick_list_.pop_front();
47
24654
      if (ret) {
48
24651
        return ret;
49
24651
      }
50
24654
    }
51
5181838
    std::shared_ptr<C> ret = popEntry();
52
5181838
    if (ret) {
53
5181835
      add(calculate_weight(*ret), ret);
54
5181835
    }
55
5181838
    return ret;
56
5206489
  }
57

            
58
5207777
  void add(double weight, std::shared_ptr<C> entry) override {
59
5207777
    ASSERT(weight > 0);
60
5207777
    const double deadline = current_time_ + 1.0 / weight;
61
5207777
    EDF_TRACE("Insertion {} in queue with deadline {} and weight {}.",
62
5207777
              static_cast<const void*>(entry.get()), deadline, weight);
63
5207777
    queue_.push({deadline, order_offset_++, entry});
64
5207777
    ASSERT(queue_.top().deadline_ >= current_time_);
65
5207777
  }
66

            
67
1548
  bool empty() const override { return queue_.empty(); }
68

            
69
  // Creates an EdfScheduler with the given weights and their corresponding
70
  // entries, and emulating a number of initial picks to be performed. Note that
71
  // the internal state of the scheduler will be very similar to creating an empty
72
  // scheduler, adding the entries one after the other, and then performing
73
  // "picks" pickAndAdd operation without modifying the entries' weights.
74
  // The only thing that may be different is that entries with the same weight
75
  // may be chosen a bit differently (the order_offset_ values may be different).
76
  // Breaking the ties of same weight entries will be kept in future picks from
77
  // the scheduler.
78
  static EdfScheduler<C> createWithPicks(const std::vector<std::shared_ptr<C>>& entries,
79
                                         std::function<double(const C&)> calculate_weight,
80
1600311
                                         uint32_t picks) {
81
    // Limiting the number of picks, as over 400M picks should be sufficient
82
    // for most scenarios.
83
1600311
    picks = picks % 429496729; // % UINT_MAX/10
84
1600311
    EDF_TRACE("Creating an EDF-scheduler with {} weights and {} pre-picks.", entries.size(), picks);
85
    // Assume no non-positive weights.
86
1600311
    ASSERT(std::none_of(entries.cbegin(), entries.cend(),
87
1600311
                        [&calculate_weight](const std::shared_ptr<C>& entry) {
88
1600311
                          return calculate_weight(*entry) <= 0;
89
1600311
                        }));
90

            
91
    // Nothing to do if there are no entries.
92
1600311
    if (entries.empty()) {
93
1
      return EdfScheduler<C>();
94
1
    }
95

            
96
    // Augment the weight computation to add some epsilon to each entry's
97
    // weight to avoid cases where weights are multiplies of each other. For
98
    // example if there are 2 weights: 25 and 75, and picks=23, then the
99
    // floor_picks will be {5, 17} (respectively), and the deadlines will be
100
    // {0.24000000000000002 and 0.24} (respectively). This small difference will
101
    // cause a "wrong" pick compared to when starting from an empty scheduler
102
    // and picking 23 times. Adding a small value to each weight circumvents
103
    // this problem. This was added as a result of the following comment:
104
    // https://github.com/envoyproxy/envoy/pull/31592#issuecomment-1877663769.
105
3601628
    auto aug_calculate_weight = [&calculate_weight](const C& entry) -> double {
106
3601628
      return calculate_weight(entry) + 1e-13;
107
3601628
    };
108

            
109
    // Take a snapshot of entry weights so they remain consistent during scheduling.
110
1600310
    std::vector<double> weights;
111
1600310
    weights.reserve(entries.size());
112
1600310
    std::transform(entries.cbegin(), entries.cend(), std::back_inserter(weights),
113
3601628
                   [&aug_calculate_weight](const std::shared_ptr<C>& entry) {
114
3601628
                     return aug_calculate_weight(*entry);
115
3601628
                   });
116
    // Let weights {w_1, w_2, ..., w_N} be the per-entry weight where (w_i > 0),
117
    // W = sum(w_i), and P be the number of times to "pick" from the scheduler.
118
    // Let p'_i = floor(P * w_i/W), then the number of times each entry is being
119
    // picked is p_i >= p'_i. Note that 0 <= P - sum(p'_i) < N.
120
    //
121
    // The following code does P picks, by first emulating p'_i picks for each
122
    // entry, and then executing the leftover P - sum(p'_i) picks.
123
1600310
    const double weights_sum = std::accumulate(weights.cbegin(), weights.cend(), 0.0);
124
1600310
    std::vector<uint32_t> floor_picks;
125
1600310
    floor_picks.reserve(entries.size());
126
1600310
    std::transform(weights.cbegin(), weights.cend(), std::back_inserter(floor_picks),
127
3601628
                   [picks, weights_sum](const double& weight) {
128
                     // Getting the lower-bound by casting to an integer.
129
3601628
                     return static_cast<uint32_t>(weight * picks / weights_sum);
130
3601628
                   });
131

            
132
    // Pre-compute the priority-queue entries to use an O(N) initialization c'tor.
133
1600310
    std::vector<EdfEntry> scheduler_entries;
134
1600310
    scheduler_entries.reserve(entries.size());
135
1600310
    uint32_t picks_so_far = 0;
136
1600310
    double max_pick_time = 0.0;
137
    // Emulate a per-entry addition to a deadline that is applicable to N picks.
138
5201938
    for (size_t i = 0; i < entries.size(); ++i) {
139
      // Add the entry with p'_i picks. As there were p'_i picks, the entry's
140
      // next deadline is (p'_i + 1) / w_i.
141
3601628
      const double weight = weights[i];
142
      // While validating the algorithm there were a few cases where the math
143
      // and floating-point arithmetic did not agree (specifically floor(A*B)
144
      // was greater than A*B). The following if statement solves the problem by
145
      // reducing floor-picks for the entry, which may result in more iterations
146
      // in the code after the loop.
147
3601628
      if ((floor_picks[i] > 0) && (floor_picks[i] / weight >= picks / weights_sum)) {
148
371756
        floor_picks[i]--;
149
371756
      }
150
3601628
      const double pick_time = floor_picks[i] / weight;
151
3601628
      const double deadline = (floor_picks[i] + 1) / weight;
152
3601628
      EDF_TRACE("Insertion {} in queue with emualted {} picks, deadline {} and weight {}.",
153
3601628
                static_cast<const void*>(entries[i].get()), floor_picks[i], deadline, weight);
154
3601628
      scheduler_entries.emplace_back(EdfEntry{deadline, i, entries[i]});
155
3601628
      max_pick_time = std::max(max_pick_time, pick_time);
156
3601628
      picks_so_far += floor_picks[i];
157
3601628
    }
158
    // The scheduler's current_time_ needs to be the largest time that some entry was picked.
159
1600310
    EdfScheduler<C> scheduler(std::move(scheduler_entries), max_pick_time, entries.size());
160
1600310
    ASSERT(scheduler.queue_.top().deadline_ >= scheduler.current_time_);
161

            
162
    // Left to do some picks, execute them one after the other.
163
1600310
    EDF_TRACE("Emulated {} picks in init step, {} picks remaining for one after the other step",
164
1600310
              picks_so_far, picks - picks_so_far);
165
3560517
    while (picks_so_far < picks) {
166
1960207
      scheduler.pickAndAdd(calculate_weight);
167
1960207
      picks_so_far++;
168
1960207
    }
169
1600310
    return scheduler;
170
1600311
  }
171

            
172
private:
173
  friend class EdfSchedulerTest;
174

            
175
  /**
176
   * Clears expired entries and pops the next unexpired entry in the queue.
177
   */
178
5206496
  std::shared_ptr<C> popEntry() {
179
5206496
    EDF_TRACE("Queue pick: queue_.size()={}, current_time_={}.", queue_.size(), current_time_);
180
5206499
    while (true) {
181
5206499
      if (queue_.empty()) {
182
6
        EDF_TRACE("Queue is empty.");
183
6
        return nullptr;
184
6
      }
185
5206493
      const EdfEntry& edf_entry = queue_.top();
186
      // Entry has been removed, let's see if there's another one.
187
5206493
      std::shared_ptr<C> ret = edf_entry.entry_.lock();
188
5206493
      if (!ret) {
189
3
        EDF_TRACE("Entry has expired, repick.");
190
3
        queue_.pop();
191
3
        continue;
192
3
      }
193
5206490
      ASSERT(edf_entry.deadline_ >= current_time_);
194
5206490
      current_time_ = edf_entry.deadline_;
195
5206490
      EDF_TRACE("Picked {}, current_time_={}.", static_cast<const void*>(ret.get()), current_time_);
196
5206490
      queue_.pop();
197
5206490
      return ret;
198
5206493
    }
199
5206496
  }
200

            
201
  struct EdfEntry {
202
    double deadline_;
203
    // Tie breaker for entries with the same deadline. This is used to provide FIFO behavior.
204
    uint64_t order_offset_;
205
    // We only hold a weak pointer, since we don't support a remove operator. This allows entries to
206
    // be lazily unloaded from the queue.
207
    std::weak_ptr<C> entry_;
208

            
209
    // Flip < direction to make this a min queue.
210
16606042
    bool operator<(const EdfEntry& other) const {
211
16606042
      return deadline_ > other.deadline_ ||
212
16606042
             (deadline_ == other.deadline_ && order_offset_ > other.order_offset_);
213
16606042
    }
214
  };
215

            
216
  EdfScheduler(std::vector<EdfEntry>&& scheduler_entries, double current_time,
217
               uint32_t order_offset)
218
1600310
      : current_time_(current_time), order_offset_(order_offset),
219
1600310
        queue_(scheduler_entries.cbegin(), scheduler_entries.cend()) {}
220

            
221
  // Current time in EDF scheduler.
222
  // TODO(htuch): Is it worth the small extra complexity to use integer time for performance
223
  // reasons?
224
  double current_time_{};
225
  // Offset used during addition to break ties when entries have the same weight but should reflect
226
  // FIFO insertion order in picks.
227
  uint64_t order_offset_{};
228
  // Min priority queue for EDF.
229
  std::priority_queue<EdfEntry> queue_;
230
  std::list<std::weak_ptr<C>> prepick_list_;
231
};
232

            
233
#undef EDF_DEBUG
234

            
235
} // namespace Upstream
236
} // namespace Envoy