1
#include "source/extensions/retry/priority/previous_priorities/previous_priorities.h"
2

            
3
namespace Envoy {
4
namespace Extensions {
5
namespace Retry {
6
namespace Priority {
7

            
8
const Upstream::HealthyAndDegradedLoad& PreviousPrioritiesRetryPriority::determinePriorityLoad(
9
    StreamInfo::StreamInfo*, const Upstream::PrioritySet& priority_set,
10
    const Upstream::HealthyAndDegradedLoad& original_priority_load,
11
25
    const PriorityMappingFunc& priority_mapping_func) {
12
  // If we've not seen enough retries to modify the priority load, just
13
  // return the original.
14
  // If this retry should trigger an update, recalculate the priority load by excluding attempted
15
  // priorities.
16
25
  if (attempted_hosts_.size() < update_frequency_) {
17
7
    return original_priority_load;
18
18
  } else if (attempted_hosts_.size() % update_frequency_ == 0) {
19
18
    if (excluded_priorities_.size() < priority_set.hostSetsPerPriority().size()) {
20
9
      excluded_priorities_.resize(priority_set.hostSetsPerPriority().size());
21
9
    }
22

            
23
31
    for (const auto& host : attempted_hosts_) {
24
31
      absl::optional<uint32_t> mapped_host_priority = priority_mapping_func(*host);
25
31
      if (mapped_host_priority.has_value()) {
26
30
        excluded_priorities_[mapped_host_priority.value()] = true;
27
30
      }
28
31
    }
29

            
30
18
    if (!adjustForAttemptedPriorities(priority_set)) {
31
1
      return original_priority_load;
32
1
    }
33
18
  }
34

            
35
17
  return per_priority_load_;
36
25
}
37

            
38
bool PreviousPrioritiesRetryPriority::adjustForAttemptedPriorities(
39
18
    const Upstream::PrioritySet& priority_set) {
40
46
  for (auto& host_set : priority_set.hostSetsPerPriority()) {
41
46
    recalculatePerPriorityState(host_set->priority(), priority_set);
42
46
  }
43

            
44
18
  std::vector<uint32_t> adjusted_per_priority_health(per_priority_health_.get().size(), 0);
45
18
  std::vector<uint32_t> adjusted_per_priority_degraded(per_priority_degraded_.get().size(), 0);
46
18
  auto total_availability =
47
18
      adjustedAvailability(adjusted_per_priority_health, adjusted_per_priority_degraded);
48

            
49
  // If there are no available priorities left, we reset the attempted priorities and recompute the
50
  // adjusted availability.
51
  // This allows us to fall back to the unmodified priority load when we run out of priorities
52
  // instead of failing to route requests.
53
18
  if (total_availability == 0) {
54
16
    for (auto excluded_priority : excluded_priorities_) {
55
16
      excluded_priority = false;
56
16
    }
57
6
    attempted_hosts_.clear();
58
6
    total_availability =
59
6
        adjustedAvailability(adjusted_per_priority_health, adjusted_per_priority_degraded);
60
6
  }
61

            
62
  // If total availability is still zero at this point, it must mean that all clusters are
63
  // completely unavailable. If so, fall back to using the original priority loads. This maintains
64
  // whatever handling the default LB uses when all priorities are unavailable.
65
18
  if (total_availability == 0) {
66
1
    return false;
67
1
  }
68

            
69
17
  std::fill(per_priority_load_.healthy_priority_load_.get().begin(),
70
17
            per_priority_load_.healthy_priority_load_.get().end(), 0);
71
17
  std::fill(per_priority_load_.degraded_priority_load_.get().begin(),
72
17
            per_priority_load_.degraded_priority_load_.get().end(), 0);
73

            
74
  // TODO(snowp): This code is basically distributeLoad from load_balancer_impl.cc, should probably
75
  // reuse that.
76

            
77
  // We then adjust the load by rebalancing priorities with the adjusted availability values.
78
17
  size_t total_load = 100;
79
  // The outer loop is used to eliminate rounding errors: any remaining load will be assigned to the
80
  // first availability priority.
81
34
  while (total_load != 0) {
82
60
    for (size_t i = 0; i < adjusted_per_priority_health.size(); ++i) {
83
      // Now assign as much load as possible to the high priority levels and cease assigning load
84
      // when total_load runs out.
85
43
      const auto delta = std::min<uint32_t>(total_load, adjusted_per_priority_health[i] * 100 /
86
43
                                                            total_availability);
87
43
      per_priority_load_.healthy_priority_load_.get()[i] += delta;
88
43
      total_load -= delta;
89
43
    }
90

            
91
60
    for (size_t i = 0; i < adjusted_per_priority_degraded.size(); ++i) {
92
      // Now assign as much load as possible to the high priority levels and cease assigning load
93
      // when total_load runs out.
94
43
      const auto delta = std::min<uint32_t>(total_load, adjusted_per_priority_degraded[i] * 100 /
95
43
                                                            total_availability);
96
43
      per_priority_load_.degraded_priority_load_.get()[i] += delta;
97
43
      total_load -= delta;
98
43
    }
99
17
  }
100

            
101
17
  return true;
102
18
}
103

            
104
uint32_t PreviousPrioritiesRetryPriority::adjustedAvailability(
105
    std::vector<uint32_t>& adjusted_per_priority_health,
106
24
    std::vector<uint32_t>& adjusted_per_priority_degraded) const {
107
  // Create an adjusted view of the priorities, where attempted priorities are given a zero load.
108
  // Create an adjusted health view of the priorities, where attempted priorities are
109
  // given a zero weight.
110
24
  uint32_t total_availability = 0;
111

            
112
24
  ASSERT(per_priority_health_.get().size() == per_priority_degraded_.get().size());
113

            
114
86
  for (size_t i = 0; i < per_priority_health_.get().size(); ++i) {
115
62
    if (!excluded_priorities_[i]) {
116
33
      adjusted_per_priority_health[i] = per_priority_health_.get()[i];
117
33
      adjusted_per_priority_degraded[i] = per_priority_degraded_.get()[i];
118
33
      total_availability += per_priority_health_.get()[i];
119
33
      total_availability += per_priority_degraded_.get()[i];
120
33
    } else {
121
29
      adjusted_per_priority_health[i] = 0;
122
29
      adjusted_per_priority_degraded[i] = 0;
123
29
    }
124
62
  }
125

            
126
24
  return std::min(total_availability, 100u);
127
24
}
128

            
129
} // namespace Priority
130
} // namespace Retry
131
} // namespace Extensions
132
} // namespace Envoy