Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/common/upstream/edf_scheduler.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
#include <cstdint>
3
#include <iostream>
4
#include <queue>
5
6
#include "envoy/upstream/scheduler.h"
7
8
#include "source/common/common/assert.h"
9
10
namespace Envoy {
11
namespace Upstream {
12
13
// It's not sufficient to use trace level logging, since it becomes far too noisy for a number of
14
// tests, so we can kill trace debug here.
15
#define EDF_DEBUG 0
16
17
#if EDF_DEBUG
18
#define EDF_TRACE(...) ENVOY_LOG_MISC(trace, __VA_ARGS__)
19
#else
20
#define EDF_TRACE(...)
21
#endif
22
23
// Earliest Deadline First (EDF) scheduler
24
// (https://en.wikipedia.org/wiki/Earliest_deadline_first_scheduling) used for weighted round robin.
25
// Each pick from the schedule has the earliest deadline entry selected. Entries have deadlines set
26
// at current time + 1 / weight, providing weighted round robin behavior with floating point
27
// weights and an O(log n) pick time.
28
template <class C> class EdfScheduler : public Scheduler<C> {
29
public:
30
0
  EdfScheduler() = default;
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::EdfScheduler()
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::EdfScheduler()
31
32
  // See scheduler.h for an explanation of each public method.
33
833
  std::shared_ptr<C> peekAgain(std::function<double(const C&)> calculate_weight) override {
34
833
    std::shared_ptr<C> ret = popEntry();
35
833
    if (ret) {
36
833
      prepick_list_.push_back(ret);
37
833
      add(calculate_weight(*ret), ret);
38
833
    }
39
833
    return ret;
40
833
  }
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::peekAgain(std::__1::function<double (Envoy::Upstream::HostSetImpl::LocalityEntry const&)>)
Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::peekAgain(std::__1::function<double (Envoy::Upstream::Host const&)>)
Line
Count
Source
33
833
  std::shared_ptr<C> peekAgain(std::function<double(const C&)> calculate_weight) override {
34
833
    std::shared_ptr<C> ret = popEntry();
35
833
    if (ret) {
36
833
      prepick_list_.push_back(ret);
37
833
      add(calculate_weight(*ret), ret);
38
833
    }
39
833
    return ret;
40
833
  }
41
42
12.1M
  std::shared_ptr<C> pickAndAdd(std::function<double(const C&)> calculate_weight) override {
43
12.1M
    while (!prepick_list_.empty()) {
44
      // In this case the entry was added back during peekAgain so don't re-add.
45
125
      std::shared_ptr<C> ret = prepick_list_.front().lock();
46
125
      prepick_list_.pop_front();
47
125
      if (ret) {
48
125
        return ret;
49
125
      }
50
125
    }
51
12.1M
    std::shared_ptr<C> ret = popEntry();
52
12.1M
    if (ret) {
53
12.1M
      add(calculate_weight(*ret), ret);
54
12.1M
    }
55
12.1M
    return ret;
56
12.1M
  }
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::pickAndAdd(std::__1::function<double (Envoy::Upstream::HostSetImpl::LocalityEntry const&)>)
Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::pickAndAdd(std::__1::function<double (Envoy::Upstream::Host const&)>)
Line
Count
Source
42
12.1M
  std::shared_ptr<C> pickAndAdd(std::function<double(const C&)> calculate_weight) override {
43
12.1M
    while (!prepick_list_.empty()) {
44
      // In this case the entry was added back during peekAgain so don't re-add.
45
125
      std::shared_ptr<C> ret = prepick_list_.front().lock();
46
125
      prepick_list_.pop_front();
47
125
      if (ret) {
48
125
        return ret;
49
125
      }
50
125
    }
51
12.1M
    std::shared_ptr<C> ret = popEntry();
52
12.1M
    if (ret) {
53
12.1M
      add(calculate_weight(*ret), ret);
54
12.1M
    }
55
12.1M
    return ret;
56
12.1M
  }
57
58
12.1M
  void add(double weight, std::shared_ptr<C> entry) override {
59
12.1M
    ASSERT(weight > 0);
60
12.1M
    const double deadline = current_time_ + 1.0 / weight;
61
12.1M
    EDF_TRACE("Insertion {} in queue with deadline {} and weight {}.",
62
12.1M
              static_cast<const void*>(entry.get()), deadline, weight);
63
12.1M
    queue_.push({deadline, order_offset_++, entry});
64
12.1M
    ASSERT(queue_.top().deadline_ >= current_time_);
65
12.1M
  }
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::add(double, std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry>)
Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::add(double, std::__1::shared_ptr<Envoy::Upstream::Host>)
Line
Count
Source
58
12.1M
  void add(double weight, std::shared_ptr<C> entry) override {
59
12.1M
    ASSERT(weight > 0);
60
12.1M
    const double deadline = current_time_ + 1.0 / weight;
61
12.1M
    EDF_TRACE("Insertion {} in queue with deadline {} and weight {}.",
62
12.1M
              static_cast<const void*>(entry.get()), deadline, weight);
63
12.1M
    queue_.push({deadline, order_offset_++, entry});
64
12.1M
    ASSERT(queue_.top().deadline_ >= current_time_);
65
12.1M
  }
66
67
0
  bool empty() const override { return queue_.empty(); }
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::empty() const
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::empty() const
68
69
  // Creates an EdfScheduler with the given weights and their corresponding
70
  // entries, and emulating a number of initial picks to be performed. Note that
71
  // the internal state of the scheduler will be very similar to creating an empty
72
  // scheduler, adding the entries one after the other, and then performing
73
  // "picks" pickAndAdd operation without modifying the entries' weights.
74
  // The only thing that may be different is that entries with the same weight
75
  // may be chosen a bit differently (the order_offset_ values may be different).
76
  // Breaking the ties of same weight entries will be kept in future picks from
77
  // the scheduler.
78
  static EdfScheduler<C> createWithPicks(const std::vector<std::shared_ptr<C>>& entries,
79
                                         std::function<double(const C&)> calculate_weight,
80
5.24k
                                         uint32_t picks) {
81
    // Limiting the number of picks, as over 400M picks should be sufficient
82
    // for most scenarios.
83
5.24k
    picks = picks % 429496729; // % UINT_MAX/10
84
5.24k
    EDF_TRACE("Creating an EDF-scheduler with {} weights and {} pre-picks.", entries.size(), picks);
85
    // Assume no non-positive weights.
86
5.24k
    ASSERT(std::none_of(entries.cbegin(), entries.cend(),
87
5.24k
                        [&calculate_weight](const std::shared_ptr<C>& entry) {
88
5.24k
                          return calculate_weight(*entry) <= 0;
89
5.24k
                        }));
90
91
    // Nothing to do if there are no entries.
92
5.24k
    if (entries.empty()) {
93
0
      return EdfScheduler<C>();
94
0
    }
95
96
    // Augment the weight computation to add some epsilon to each entry's
97
    // weight to avoid cases where weights are multiplies of each other. For
98
    // example if there are 2 weights: 25 and 75, and picks=23, then the
99
    // floor_picks will be {5, 17} (respectively), and the deadlines will be
100
    // {0.24000000000000002 and 0.24} (respectively). This small difference will
101
    // cause a "wrong" pick compared to when starting from an empty scheduler
102
    // and picking 23 times. Adding a small value to each weight circumvents
103
    // this problem. This was added as a result of the following comment:
104
    // https://github.com/envoyproxy/envoy/pull/31592#issuecomment-1877663769.
105
79.9M
    auto aug_calculate_weight = [&calculate_weight](const C& entry) -> double {
106
79.9M
      return calculate_weight(entry) + 1e-13;
107
79.9M
    };
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::createWithPicks(std::__1::vector<std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry>, std::__1::allocator<std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry> > > const&, std::__1::function<double (Envoy::Upstream::HostSetImpl::LocalityEntry const&)>, unsigned int)::{lambda(Envoy::Upstream::HostSetImpl::LocalityEntry const&)#1}::operator()(Envoy::Upstream::HostSetImpl::LocalityEntry const&) const
Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::createWithPicks(std::__1::vector<std::__1::shared_ptr<Envoy::Upstream::Host>, std::__1::allocator<std::__1::shared_ptr<Envoy::Upstream::Host> > > const&, std::__1::function<double (Envoy::Upstream::Host const&)>, unsigned int)::{lambda(Envoy::Upstream::Host const&)#1}::operator()(Envoy::Upstream::Host const&) const
Line
Count
Source
105
79.9M
    auto aug_calculate_weight = [&calculate_weight](const C& entry) -> double {
106
79.9M
      return calculate_weight(entry) + 1e-13;
107
79.9M
    };
108
109
    // Let weights {w_1, w_2, ..., w_N} be the per-entry weight where (w_i > 0),
110
    // W = sum(w_i), and P be the number of times to "pick" from the scheduler.
111
    // Let p'_i = floor(P * w_i/W), then the number of times each entry is being
112
    // picked is p_i >= p'_i. Note that 0 <= P - sum(p'_i) < N.
113
    //
114
    // The following code does P picks, by first emulating p'_i picks for each
115
    // entry, and then executing the leftover P - sum(p'_i) picks.
116
5.24k
    double weights_sum = std::accumulate(
117
5.24k
        entries.cbegin(), entries.cend(), 0.0,
118
26.6M
        [&aug_calculate_weight](double sum_so_far, const std::shared_ptr<C>& entry) {
119
26.6M
          return sum_so_far + aug_calculate_weight(*entry);
120
26.6M
        });
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::createWithPicks(std::__1::vector<std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry>, std::__1::allocator<std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry> > > const&, std::__1::function<double (Envoy::Upstream::HostSetImpl::LocalityEntry const&)>, unsigned int)::{lambda(double, std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry> const&)#1}::operator()(double, std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry> const&) const
Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::createWithPicks(std::__1::vector<std::__1::shared_ptr<Envoy::Upstream::Host>, std::__1::allocator<std::__1::shared_ptr<Envoy::Upstream::Host> > > const&, std::__1::function<double (Envoy::Upstream::Host const&)>, unsigned int)::{lambda(double, std::__1::shared_ptr<Envoy::Upstream::Host> const&)#1}::operator()(double, std::__1::shared_ptr<Envoy::Upstream::Host> const&) const
Line
Count
Source
118
26.6M
        [&aug_calculate_weight](double sum_so_far, const std::shared_ptr<C>& entry) {
119
26.6M
          return sum_so_far + aug_calculate_weight(*entry);
120
26.6M
        });
121
5.24k
    std::vector<uint32_t> floor_picks;
122
5.24k
    floor_picks.reserve(entries.size());
123
5.24k
    std::transform(entries.cbegin(), entries.cend(), std::back_inserter(floor_picks),
124
26.6M
                   [picks, weights_sum, &aug_calculate_weight](const std::shared_ptr<C>& entry) {
125
                     // Getting the lower-bound by casting to an integer.
126
26.6M
                     return static_cast<uint32_t>(aug_calculate_weight(*entry) * picks /
127
26.6M
                                                  weights_sum);
128
26.6M
                   });
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::createWithPicks(std::__1::vector<std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry>, std::__1::allocator<std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry> > > const&, std::__1::function<double (Envoy::Upstream::HostSetImpl::LocalityEntry const&)>, unsigned int)::{lambda(std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry> const&)#2}::operator()(std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry> const&) const
Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::createWithPicks(std::__1::vector<std::__1::shared_ptr<Envoy::Upstream::Host>, std::__1::allocator<std::__1::shared_ptr<Envoy::Upstream::Host> > > const&, std::__1::function<double (Envoy::Upstream::Host const&)>, unsigned int)::{lambda(std::__1::shared_ptr<Envoy::Upstream::Host> const&)#2}::operator()(std::__1::shared_ptr<Envoy::Upstream::Host> const&) const
Line
Count
Source
124
26.6M
                   [picks, weights_sum, &aug_calculate_weight](const std::shared_ptr<C>& entry) {
125
                     // Getting the lower-bound by casting to an integer.
126
26.6M
                     return static_cast<uint32_t>(aug_calculate_weight(*entry) * picks /
127
26.6M
                                                  weights_sum);
128
26.6M
                   });
129
130
    // Pre-compute the priority-queue entries to use an O(N) initialization c'tor.
131
5.24k
    std::vector<EdfEntry> scheduler_entries;
132
5.24k
    scheduler_entries.reserve(entries.size());
133
5.24k
    uint32_t picks_so_far = 0;
134
5.24k
    double max_pick_time = 0.0;
135
    // Emulate a per-entry addition to a deadline that is applicable to N picks.
136
26.6M
    for (size_t i = 0; i < entries.size(); ++i) {
137
      // Add the entry with p'_i picks. As there were p'_i picks, the entry's
138
      // next deadline is (p'_i + 1) / w_i.
139
26.6M
      const double weight = aug_calculate_weight(*entries[i]);
140
      // While validating the algorithm there were a few cases where the math
141
      // and floating-point arithmetic did not agree (specifically floor(A*B)
142
      // was greater than A*B). The following if statement solves the problem by
143
      // reducing floor-picks for the entry, which may result in more iterations
144
      // in the code after the loop.
145
26.6M
      if ((floor_picks[i] > 0) && (floor_picks[i] / weight >= picks / weights_sum)) {
146
989
        floor_picks[i]--;
147
989
      }
148
26.6M
      const double pick_time = floor_picks[i] / weight;
149
26.6M
      const double deadline = (floor_picks[i] + 1) / weight;
150
26.6M
      EDF_TRACE("Insertion {} in queue with emualted {} picks, deadline {} and weight {}.",
151
26.6M
                static_cast<const void*>(entries[i].get()), floor_picks[i], deadline, weight);
152
26.6M
      scheduler_entries.emplace_back(EdfEntry{deadline, i, entries[i]});
153
26.6M
      max_pick_time = std::max(max_pick_time, pick_time);
154
26.6M
      picks_so_far += floor_picks[i];
155
26.6M
    }
156
    // The scheduler's current_time_ needs to be the largest time that some entry was picked.
157
5.24k
    EdfScheduler<C> scheduler(std::move(scheduler_entries), max_pick_time, entries.size());
158
5.24k
    ASSERT(scheduler.queue_.top().deadline_ >= scheduler.current_time_);
159
160
    // Left to do some picks, execute them one after the other.
161
5.24k
    EDF_TRACE("Emulated {} picks in init step, {} picks remaining for one after the other step",
162
5.24k
              picks_so_far, picks - picks_so_far);
163
12.1M
    while (picks_so_far < picks) {
164
12.1M
      scheduler.pickAndAdd(calculate_weight);
165
12.1M
      picks_so_far++;
166
12.1M
    }
167
5.24k
    return scheduler;
168
5.24k
  }
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::createWithPicks(std::__1::vector<std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry>, std::__1::allocator<std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry> > > const&, std::__1::function<double (Envoy::Upstream::HostSetImpl::LocalityEntry const&)>, unsigned int)
Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::createWithPicks(std::__1::vector<std::__1::shared_ptr<Envoy::Upstream::Host>, std::__1::allocator<std::__1::shared_ptr<Envoy::Upstream::Host> > > const&, std::__1::function<double (Envoy::Upstream::Host const&)>, unsigned int)
Line
Count
Source
80
5.24k
                                         uint32_t picks) {
81
    // Limiting the number of picks, as over 400M picks should be sufficient
82
    // for most scenarios.
83
5.24k
    picks = picks % 429496729; // % UINT_MAX/10
84
5.24k
    EDF_TRACE("Creating an EDF-scheduler with {} weights and {} pre-picks.", entries.size(), picks);
85
    // Assume no non-positive weights.
86
5.24k
    ASSERT(std::none_of(entries.cbegin(), entries.cend(),
87
5.24k
                        [&calculate_weight](const std::shared_ptr<C>& entry) {
88
5.24k
                          return calculate_weight(*entry) <= 0;
89
5.24k
                        }));
90
91
    // Nothing to do if there are no entries.
92
5.24k
    if (entries.empty()) {
93
0
      return EdfScheduler<C>();
94
0
    }
95
96
    // Augment the weight computation to add some epsilon to each entry's
97
    // weight to avoid cases where weights are multiplies of each other. For
98
    // example if there are 2 weights: 25 and 75, and picks=23, then the
99
    // floor_picks will be {5, 17} (respectively), and the deadlines will be
100
    // {0.24000000000000002 and 0.24} (respectively). This small difference will
101
    // cause a "wrong" pick compared to when starting from an empty scheduler
102
    // and picking 23 times. Adding a small value to each weight circumvents
103
    // this problem. This was added as a result of the following comment:
104
    // https://github.com/envoyproxy/envoy/pull/31592#issuecomment-1877663769.
105
5.24k
    auto aug_calculate_weight = [&calculate_weight](const C& entry) -> double {
106
5.24k
      return calculate_weight(entry) + 1e-13;
107
5.24k
    };
108
109
    // Let weights {w_1, w_2, ..., w_N} be the per-entry weight where (w_i > 0),
110
    // W = sum(w_i), and P be the number of times to "pick" from the scheduler.
111
    // Let p'_i = floor(P * w_i/W), then the number of times each entry is being
112
    // picked is p_i >= p'_i. Note that 0 <= P - sum(p'_i) < N.
113
    //
114
    // The following code does P picks, by first emulating p'_i picks for each
115
    // entry, and then executing the leftover P - sum(p'_i) picks.
116
5.24k
    double weights_sum = std::accumulate(
117
5.24k
        entries.cbegin(), entries.cend(), 0.0,
118
5.24k
        [&aug_calculate_weight](double sum_so_far, const std::shared_ptr<C>& entry) {
119
5.24k
          return sum_so_far + aug_calculate_weight(*entry);
120
5.24k
        });
121
5.24k
    std::vector<uint32_t> floor_picks;
122
5.24k
    floor_picks.reserve(entries.size());
123
5.24k
    std::transform(entries.cbegin(), entries.cend(), std::back_inserter(floor_picks),
124
5.24k
                   [picks, weights_sum, &aug_calculate_weight](const std::shared_ptr<C>& entry) {
125
                     // Getting the lower-bound by casting to an integer.
126
5.24k
                     return static_cast<uint32_t>(aug_calculate_weight(*entry) * picks /
127
5.24k
                                                  weights_sum);
128
5.24k
                   });
129
130
    // Pre-compute the priority-queue entries to use an O(N) initialization c'tor.
131
5.24k
    std::vector<EdfEntry> scheduler_entries;
132
5.24k
    scheduler_entries.reserve(entries.size());
133
5.24k
    uint32_t picks_so_far = 0;
134
5.24k
    double max_pick_time = 0.0;
135
    // Emulate a per-entry addition to a deadline that is applicable to N picks.
136
26.6M
    for (size_t i = 0; i < entries.size(); ++i) {
137
      // Add the entry with p'_i picks. As there were p'_i picks, the entry's
138
      // next deadline is (p'_i + 1) / w_i.
139
26.6M
      const double weight = aug_calculate_weight(*entries[i]);
140
      // While validating the algorithm there were a few cases where the math
141
      // and floating-point arithmetic did not agree (specifically floor(A*B)
142
      // was greater than A*B). The following if statement solves the problem by
143
      // reducing floor-picks for the entry, which may result in more iterations
144
      // in the code after the loop.
145
26.6M
      if ((floor_picks[i] > 0) && (floor_picks[i] / weight >= picks / weights_sum)) {
146
989
        floor_picks[i]--;
147
989
      }
148
26.6M
      const double pick_time = floor_picks[i] / weight;
149
26.6M
      const double deadline = (floor_picks[i] + 1) / weight;
150
26.6M
      EDF_TRACE("Insertion {} in queue with emualted {} picks, deadline {} and weight {}.",
151
26.6M
                static_cast<const void*>(entries[i].get()), floor_picks[i], deadline, weight);
152
26.6M
      scheduler_entries.emplace_back(EdfEntry{deadline, i, entries[i]});
153
26.6M
      max_pick_time = std::max(max_pick_time, pick_time);
154
26.6M
      picks_so_far += floor_picks[i];
155
26.6M
    }
156
    // The scheduler's current_time_ needs to be the largest time that some entry was picked.
157
5.24k
    EdfScheduler<C> scheduler(std::move(scheduler_entries), max_pick_time, entries.size());
158
5.24k
    ASSERT(scheduler.queue_.top().deadline_ >= scheduler.current_time_);
159
160
    // Left to do some picks, execute them one after the other.
161
5.24k
    EDF_TRACE("Emulated {} picks in init step, {} picks remaining for one after the other step",
162
5.24k
              picks_so_far, picks - picks_so_far);
163
12.1M
    while (picks_so_far < picks) {
164
12.1M
      scheduler.pickAndAdd(calculate_weight);
165
12.1M
      picks_so_far++;
166
12.1M
    }
167
5.24k
    return scheduler;
168
5.24k
  }
169
170
private:
171
  friend class EdfSchedulerTest;
172
173
  /**
174
   * Clears expired entries and pops the next unexpired entry in the queue.
175
   */
176
12.1M
  std::shared_ptr<C> popEntry() {
177
12.1M
    EDF_TRACE("Queue pick: queue_.size()={}, current_time_={}.", queue_.size(), current_time_);
178
12.1M
    while (true) {
179
12.1M
      if (queue_.empty()) {
180
0
        EDF_TRACE("Queue is empty.");
181
0
        return nullptr;
182
0
      }
183
12.1M
      const EdfEntry& edf_entry = queue_.top();
184
      // Entry has been removed, let's see if there's another one.
185
12.1M
      std::shared_ptr<C> ret = edf_entry.entry_.lock();
186
12.1M
      if (!ret) {
187
0
        EDF_TRACE("Entry has expired, repick.");
188
0
        queue_.pop();
189
0
        continue;
190
0
      }
191
12.1M
      ASSERT(edf_entry.deadline_ >= current_time_);
192
12.1M
      current_time_ = edf_entry.deadline_;
193
12.1M
      EDF_TRACE("Picked {}, current_time_={}.", static_cast<const void*>(ret.get()), current_time_);
194
12.1M
      queue_.pop();
195
12.1M
      return ret;
196
12.1M
    }
197
12.1M
  }
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::popEntry()
Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::popEntry()
Line
Count
Source
176
12.1M
  std::shared_ptr<C> popEntry() {
177
12.1M
    EDF_TRACE("Queue pick: queue_.size()={}, current_time_={}.", queue_.size(), current_time_);
178
12.1M
    while (true) {
179
12.1M
      if (queue_.empty()) {
180
0
        EDF_TRACE("Queue is empty.");
181
0
        return nullptr;
182
0
      }
183
12.1M
      const EdfEntry& edf_entry = queue_.top();
184
      // Entry has been removed, let's see if there's another one.
185
12.1M
      std::shared_ptr<C> ret = edf_entry.entry_.lock();
186
12.1M
      if (!ret) {
187
0
        EDF_TRACE("Entry has expired, repick.");
188
0
        queue_.pop();
189
0
        continue;
190
0
      }
191
12.1M
      ASSERT(edf_entry.deadline_ >= current_time_);
192
12.1M
      current_time_ = edf_entry.deadline_;
193
12.1M
      EDF_TRACE("Picked {}, current_time_={}.", static_cast<const void*>(ret.get()), current_time_);
194
12.1M
      queue_.pop();
195
12.1M
      return ret;
196
12.1M
    }
197
12.1M
  }
198
199
  struct EdfEntry {
200
    double deadline_;
201
    // Tie breaker for entries with the same deadline. This is used to provide FIFO behavior.
202
    uint64_t order_offset_;
203
    // We only hold a weak pointer, since we don't support a remove operator. This allows entries to
204
    // be lazily unloaded from the queue.
205
    std::weak_ptr<C> entry_;
206
207
    // Flip < direction to make this a min queue.
208
223M
    bool operator<(const EdfEntry& other) const {
209
223M
      return deadline_ > other.deadline_ ||
210
223M
             (deadline_ == other.deadline_ && order_offset_ > other.order_offset_);
211
223M
    }
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::EdfEntry::operator<(Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::EdfEntry const&) const
Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::EdfEntry::operator<(Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::EdfEntry const&) const
Line
Count
Source
208
223M
    bool operator<(const EdfEntry& other) const {
209
223M
      return deadline_ > other.deadline_ ||
210
223M
             (deadline_ == other.deadline_ && order_offset_ > other.order_offset_);
211
223M
    }
212
  };
213
214
  EdfScheduler(std::vector<EdfEntry>&& scheduler_entries, double current_time,
215
               uint32_t order_offset)
216
      : current_time_(current_time), order_offset_(order_offset),
217
5.24k
        queue_(scheduler_entries.cbegin(), scheduler_entries.cend()) {}
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::EdfScheduler(std::__1::vector<Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::EdfEntry, std::__1::allocator<Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::EdfEntry> >&&, double, unsigned int)
Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::EdfScheduler(std::__1::vector<Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::EdfEntry, std::__1::allocator<Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host>::EdfEntry> >&&, double, unsigned int)
Line
Count
Source
217
5.24k
        queue_(scheduler_entries.cbegin(), scheduler_entries.cend()) {}
218
219
  // Current time in EDF scheduler.
220
  // TODO(htuch): Is it worth the small extra complexity to use integer time for performance
221
  // reasons?
222
  double current_time_{};
223
  // Offset used during addition to break ties when entries have the same weight but should reflect
224
  // FIFO insertion order in picks.
225
  uint64_t order_offset_{};
226
  // Min priority queue for EDF.
227
  std::priority_queue<EdfEntry> queue_;
228
  std::list<std::weak_ptr<C>> prepick_list_;
229
};
230
231
#undef EDF_DEBUG
232
233
} // namespace Upstream
234
} // namespace Envoy