Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/common/upstream/edf_scheduler.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
#include <cstdint>
3
#include <iostream>
4
#include <queue>
5
6
#include "envoy/upstream/scheduler.h"
7
8
#include "source/common/common/assert.h"
9
10
namespace Envoy {
11
namespace Upstream {
12
13
// It's not sufficient to use trace level logging, since it becomes far too noisy for a number of
14
// tests, so we can kill trace debug here.
15
#define EDF_DEBUG 0
16
17
#if EDF_DEBUG
18
#define EDF_TRACE(...) ENVOY_LOG_MISC(trace, __VA_ARGS__)
19
#else
20
#define EDF_TRACE(...)
21
#endif
22
23
// Earliest Deadline First (EDF) scheduler
24
// (https://en.wikipedia.org/wiki/Earliest_deadline_first_scheduling) used for weighted round robin.
25
// Each pick from the schedule has the earliest deadline entry selected. Entries have deadlines set
26
// at current time + 1 / weight, providing weighted round robin behavior with floating point
27
// weights and an O(log n) pick time.
28
template <class C> class EdfScheduler : public Scheduler<C> {
29
public:
30
  // See scheduler.h for an explanation of each public method.
31
979
  std::shared_ptr<C> peekAgain(std::function<double(const C&)> calculate_weight) override {
32
979
    std::shared_ptr<C> ret = popEntry();
33
979
    if (ret) {
34
979
      prepick_list_.push_back(ret);
35
979
      add(calculate_weight(*ret), ret);
36
979
    }
37
979
    return ret;
38
979
  }
Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host const>::peekAgain(std::__1::function<double (Envoy::Upstream::Host const&)>)
Line
Count
Source
31
979
  std::shared_ptr<C> peekAgain(std::function<double(const C&)> calculate_weight) override {
32
979
    std::shared_ptr<C> ret = popEntry();
33
979
    if (ret) {
34
979
      prepick_list_.push_back(ret);
35
979
      add(calculate_weight(*ret), ret);
36
979
    }
37
979
    return ret;
38
979
  }
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::peekAgain(std::__1::function<double (Envoy::Upstream::HostSetImpl::LocalityEntry const&)>)
39
40
13.9M
  std::shared_ptr<C> pickAndAdd(std::function<double(const C&)> calculate_weight) override {
41
13.9M
    while (!prepick_list_.empty()) {
42
      // In this case the entry was added back during peekAgain so don't re-add.
43
240
      std::shared_ptr<C> ret = prepick_list_.front().lock();
44
240
      prepick_list_.pop_front();
45
240
      if (ret) {
46
240
        return ret;
47
240
      }
48
240
    }
49
13.9M
    std::shared_ptr<C> ret = popEntry();
50
13.9M
    if (ret) {
51
13.9M
      add(calculate_weight(*ret), ret);
52
13.9M
    }
53
13.9M
    return ret;
54
13.9M
  }
Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host const>::pickAndAdd(std::__1::function<double (Envoy::Upstream::Host const&)>)
Line
Count
Source
40
13.9M
  std::shared_ptr<C> pickAndAdd(std::function<double(const C&)> calculate_weight) override {
41
13.9M
    while (!prepick_list_.empty()) {
42
      // In this case the entry was added back during peekAgain so don't re-add.
43
240
      std::shared_ptr<C> ret = prepick_list_.front().lock();
44
240
      prepick_list_.pop_front();
45
240
      if (ret) {
46
240
        return ret;
47
240
      }
48
240
    }
49
13.9M
    std::shared_ptr<C> ret = popEntry();
50
13.9M
    if (ret) {
51
13.9M
      add(calculate_weight(*ret), ret);
52
13.9M
    }
53
13.9M
    return ret;
54
13.9M
  }
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::pickAndAdd(std::__1::function<double (Envoy::Upstream::HostSetImpl::LocalityEntry const&)>)
55
56
33.0M
  void add(double weight, std::shared_ptr<C> entry) override {
57
33.0M
    ASSERT(weight > 0);
58
33.0M
    const double deadline = current_time_ + 1.0 / weight;
59
33.0M
    EDF_TRACE("Insertion {} in queue with deadline {} and weight {}.",
60
33.0M
              static_cast<const void*>(entry.get()), deadline, weight);
61
33.0M
    queue_.push({deadline, order_offset_++, entry});
62
33.0M
    ASSERT(queue_.top().deadline_ >= current_time_);
63
33.0M
  }
Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host const>::add(double, std::__1::shared_ptr<Envoy::Upstream::Host const>)
Line
Count
Source
56
33.0M
  void add(double weight, std::shared_ptr<C> entry) override {
57
33.0M
    ASSERT(weight > 0);
58
33.0M
    const double deadline = current_time_ + 1.0 / weight;
59
33.0M
    EDF_TRACE("Insertion {} in queue with deadline {} and weight {}.",
60
33.0M
              static_cast<const void*>(entry.get()), deadline, weight);
61
33.0M
    queue_.push({deadline, order_offset_++, entry});
62
33.0M
    ASSERT(queue_.top().deadline_ >= current_time_);
63
33.0M
  }
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::add(double, std::__1::shared_ptr<Envoy::Upstream::HostSetImpl::LocalityEntry>)
64
65
9.80k
  bool empty() const override { return queue_.empty(); }
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host const>::empty() const
Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::empty() const
Line
Count
Source
65
9.80k
  bool empty() const override { return queue_.empty(); }
66
67
private:
68
  /**
69
   * Clears expired entries and pops the next unexpired entry in the queue.
70
   */
71
13.9M
  std::shared_ptr<C> popEntry() {
72
13.9M
    EDF_TRACE("Queue pick: queue_.size()={}, current_time_={}.", queue_.size(), current_time_);
73
13.9M
    while (true) {
74
13.9M
      if (queue_.empty()) {
75
177
        EDF_TRACE("Queue is empty.");
76
177
        return nullptr;
77
177
      }
78
13.9M
      const EdfEntry& edf_entry = queue_.top();
79
      // Entry has been removed, let's see if there's another one.
80
13.9M
      std::shared_ptr<C> ret = edf_entry.entry_.lock();
81
13.9M
      if (!ret) {
82
0
        EDF_TRACE("Entry has expired, repick.");
83
0
        queue_.pop();
84
0
        continue;
85
0
      }
86
13.9M
      ASSERT(edf_entry.deadline_ >= current_time_);
87
13.9M
      current_time_ = edf_entry.deadline_;
88
13.9M
      EDF_TRACE("Picked {}, current_time_={}.", static_cast<const void*>(ret.get()), current_time_);
89
13.9M
      queue_.pop();
90
13.9M
      return ret;
91
13.9M
    }
92
13.9M
  }
Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host const>::popEntry()
Line
Count
Source
71
13.9M
  std::shared_ptr<C> popEntry() {
72
13.9M
    EDF_TRACE("Queue pick: queue_.size()={}, current_time_={}.", queue_.size(), current_time_);
73
13.9M
    while (true) {
74
13.9M
      if (queue_.empty()) {
75
177
        EDF_TRACE("Queue is empty.");
76
177
        return nullptr;
77
177
      }
78
13.9M
      const EdfEntry& edf_entry = queue_.top();
79
      // Entry has been removed, let's see if there's another one.
80
13.9M
      std::shared_ptr<C> ret = edf_entry.entry_.lock();
81
13.9M
      if (!ret) {
82
0
        EDF_TRACE("Entry has expired, repick.");
83
0
        queue_.pop();
84
0
        continue;
85
0
      }
86
13.9M
      ASSERT(edf_entry.deadline_ >= current_time_);
87
13.9M
      current_time_ = edf_entry.deadline_;
88
13.9M
      EDF_TRACE("Picked {}, current_time_={}.", static_cast<const void*>(ret.get()), current_time_);
89
13.9M
      queue_.pop();
90
13.9M
      return ret;
91
13.9M
    }
92
13.9M
  }
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::popEntry()
93
94
  struct EdfEntry {
95
    double deadline_;
96
    // Tie breaker for entries with the same deadline. This is used to provide FIFO behavior.
97
    uint64_t order_offset_;
98
    // We only hold a weak pointer, since we don't support a remove operator. This allows entries to
99
    // be lazily unloaded from the queue.
100
    std::weak_ptr<C> entry_;
101
102
    // Flip < direction to make this a min queue.
103
426M
    bool operator<(const EdfEntry& other) const {
104
426M
      return deadline_ > other.deadline_ ||
105
426M
             (deadline_ == other.deadline_ && order_offset_ > other.order_offset_);
106
426M
    }
Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host const>::EdfEntry::operator<(Envoy::Upstream::EdfScheduler<Envoy::Upstream::Host const>::EdfEntry const&) const
Line
Count
Source
103
426M
    bool operator<(const EdfEntry& other) const {
104
426M
      return deadline_ > other.deadline_ ||
105
426M
             (deadline_ == other.deadline_ && order_offset_ > other.order_offset_);
106
426M
    }
Unexecuted instantiation: Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::EdfEntry::operator<(Envoy::Upstream::EdfScheduler<Envoy::Upstream::HostSetImpl::LocalityEntry>::EdfEntry const&) const
107
  };
108
109
  // Current time in EDF scheduler.
110
  // TODO(htuch): Is it worth the small extra complexity to use integer time for performance
111
  // reasons?
112
  double current_time_{};
113
  // Offset used during addition to break ties when entries have the same weight but should reflect
114
  // FIFO insertion order in picks.
115
  uint64_t order_offset_{};
116
  // Min priority queue for EDF.
117
  std::priority_queue<EdfEntry> queue_;
118
  std::list<std::weak_ptr<C>> prepick_list_;
119
};
120
121
#undef EDF_DEBUG
122
123
} // namespace Upstream
124
} // namespace Envoy