1
#pragma once
2

            
3
#include <atomic>
4
#include <cstdint>
5
#include <memory>
6
#include <string>
7

            
8
#include "envoy/common/resource.h"
9
#include "envoy/runtime/runtime.h"
10
#include "envoy/upstream/resource_manager.h"
11
#include "envoy/upstream/upstream.h"
12

            
13
#include "source/common/common/assert.h"
14
#include "source/common/common/basic_resource_impl.h"
15

            
16
namespace Envoy {
17
namespace Upstream {
18

            
19
struct ManagedResourceImpl : public BasicResourceLimitImpl {
20
  ManagedResourceImpl(uint64_t max, Runtime::Loader& runtime, const std::string& runtime_key,
21
                      Stats::Gauge& open_gauge, Stats::Gauge& remaining)
22
1324079
      : BasicResourceLimitImpl(max, runtime, runtime_key), open_gauge_(open_gauge),
23
1324079
        remaining_(remaining) {
24
1324079
    remaining_.set(max);
25
1324079
  }
26

            
27
  // BasicResourceLimitImpl
28
126208
  void inc() override {
29
126208
    BasicResourceLimitImpl::inc();
30
126208
    updateRemaining();
31
126208
    open_gauge_.set(BasicResourceLimitImpl::canCreate() ? 0 : 1);
32
126208
  }
33
149386
  void decBy(uint64_t amount) override {
34
149386
    BasicResourceLimitImpl::decBy(amount);
35
149386
    updateRemaining();
36
149386
    open_gauge_.set(BasicResourceLimitImpl::canCreate() ? 0 : 1);
37
149386
  }
38

            
39
  /**
40
   * We set the gauge instead of incrementing and decrementing because,
41
   * though atomics are used, it is possible for the current resource count
42
   * to be greater than the supplied max.
43
   */
44
275594
  void updateRemaining() {
45
    /**
46
     * We cannot use std::max here because max() and current_ are
47
     * unsigned and subtracting them may overflow.
48
     */
49
275594
    const uint64_t current_copy = current_;
50
275594
    remaining_.set(max() > current_copy ? max() - current_copy : 0);
51
275594
  }
52

            
53
  /**
54
   * A gauge to notify the live circuit breaker state. The gauge is set to 0
55
   * to notify that the circuit breaker is not yet triggered.
56
   */
57
  Stats::Gauge& open_gauge_;
58

            
59
  /**
60
   * The number of resources remaining before the circuit breaker opens.
61
   */
62
  Stats::Gauge& remaining_;
63
};
64

            
65
/**
66
 * Implementation of ResourceManager.
67
 * NOTE: This implementation makes some assumptions which favor simplicity over correctness.
68
 * 1) Primarily, it assumes that traffic will be mostly balanced over all the worker threads since
69
 *    no attempt is made to balance resources between them. It is possible that starvation can
70
 *    occur during high contention.
71
 * 2) Though atomics are used, it is possible for resources to temporarily go above the supplied
72
 *    maximums. This should not effect overall behavior.
73
 */
74
class ResourceManagerImpl : public ResourceManager {
75
public:
76
  ResourceManagerImpl(Runtime::Loader& runtime, const std::string& runtime_key,
77
                      uint64_t max_connections, uint64_t max_pending_requests,
78
                      uint64_t max_requests, uint64_t max_retries, uint64_t max_connection_pools,
79
                      uint64_t max_connections_per_host, ClusterCircuitBreakersStats cb_stats,
80
                      absl::optional<double> budget_percent,
81
                      absl::optional<uint32_t> min_retry_concurrency)
82
264782
      : connections_(max_connections, runtime, runtime_key + "max_connections", cb_stats.cx_open_,
83
264782
                     cb_stats.remaining_cx_),
84
264782
        pending_requests_(max_pending_requests, runtime, runtime_key + "max_pending_requests",
85
264782
                          cb_stats.rq_pending_open_, cb_stats.remaining_pending_),
86
264782
        requests_(max_requests, runtime, runtime_key + "max_requests", cb_stats.rq_open_,
87
264782
                  cb_stats.remaining_rq_),
88
264782
        connection_pools_(max_connection_pools, runtime, runtime_key + "max_connection_pools",
89
264782
                          cb_stats.cx_pool_open_, cb_stats.remaining_cx_pools_),
90
264782
        max_connections_per_host_(max_connections_per_host),
91
264782
        retries_(budget_percent, min_retry_concurrency, max_retries, runtime,
92
264782
                 runtime_key + "retry_budget.", runtime_key + "max_retries",
93
264782
                 cb_stats.rq_retry_open_, cb_stats.remaining_retries_, requests_,
94
264782
                 pending_requests_) {}
95

            
96
  // Upstream::ResourceManager
97
96452
  ResourceLimit& connections() override { return connections_; }
98
95716
  ResourceLimit& pendingRequests() override { return pending_requests_; }
99
144498
  ResourceLimit& requests() override { return requests_; }
100
986
  ResourceLimit& retries() override { return retries_; }
101
46860
  ResourceLimit& connectionPools() override { return connection_pools_; }
102
31046
  uint64_t maxConnectionsPerHost() override { return max_connections_per_host_; }
103

            
104
private:
105
  class RetryBudgetImpl : public ResourceLimit {
106
  public:
107
    RetryBudgetImpl(absl::optional<double> budget_percent,
108
                    absl::optional<uint32_t> min_retry_concurrency, uint64_t max_retries,
109
                    Runtime::Loader& runtime, const std::string& retry_budget_runtime_key,
110
                    const std::string& max_retries_runtime_key, Stats::Gauge& open_gauge,
111
                    Stats::Gauge& remaining, const ResourceLimit& requests,
112
                    const ResourceLimit& pending_requests)
113
264782
        : runtime_(runtime),
114
264782
          max_retry_resource_(max_retries, runtime, max_retries_runtime_key, open_gauge, remaining),
115
264782
          budget_percent_(budget_percent), min_retry_concurrency_(min_retry_concurrency),
116
264782
          budget_percent_key_(retry_budget_runtime_key + "budget_percent"),
117
264782
          min_retry_concurrency_key_(retry_budget_runtime_key + "min_retry_concurrency"),
118
264782
          requests_(requests), pending_requests_(pending_requests), remaining_(remaining) {}
119

            
120
    // Envoy::ResourceLimit
121
308
    bool canCreate() override {
122
308
      if (!useRetryBudget()) {
123
299
        return max_retry_resource_.canCreate();
124
299
      }
125
9
      clearRemainingGauge();
126
9
      return count() < max();
127
308
    }
128
309
    void inc() override {
129
309
      max_retry_resource_.inc();
130
309
      clearRemainingGauge();
131
309
    }
132
309
    void dec() override {
133
309
      max_retry_resource_.dec();
134
309
      clearRemainingGauge();
135
309
    }
136
    void decBy(uint64_t amount) override {
137
      max_retry_resource_.decBy(amount);
138
      clearRemainingGauge();
139
    }
140
65
    uint64_t max() override {
141
65
      if (!useRetryBudget()) {
142
54
        return max_retry_resource_.max();
143
54
      }
144

            
145
11
      const uint64_t current_active = requests_.count() + pending_requests_.count();
146
11
      const double budget_percent = runtime_.snapshot().getDouble(
147
11
          budget_percent_key_, budget_percent_ ? *budget_percent_ : 20.0);
148
11
      const uint32_t min_retry_concurrency = runtime_.snapshot().getInteger(
149
11
          min_retry_concurrency_key_, min_retry_concurrency_ ? *min_retry_concurrency_ : 3);
150

            
151
11
      clearRemainingGauge();
152

            
153
      // We enforce that the retry concurrency is never allowed to go below the
154
      // min_retry_concurrency, even if the configured percent of the current active requests
155
      // yields a value that is smaller.
156
11
      return std::max<uint64_t>(budget_percent / 100.0 * current_active, min_retry_concurrency);
157
65
    }
158
13
    uint64_t count() const override { return max_retry_resource_.count(); }
159

            
160
  private:
161
1011
    bool useRetryBudget() const {
162
1011
      return runtime_.snapshot().get(budget_percent_key_).has_value() ||
163
1011
             runtime_.snapshot().get(min_retry_concurrency_key_).has_value() || budget_percent_ ||
164
1011
             min_retry_concurrency_;
165
1011
    }
166

            
167
    // If the retry budget is in use, the stats tracking remaining retries do not make sense since
168
    // they would dependent on other resources that can change without a call to this object.
169
    // Therefore, the gauge should just be reset to 0.
170
638
    void clearRemainingGauge() {
171
638
      if (useRetryBudget()) {
172
44
        remaining_.set(0);
173
44
      }
174
638
    }
175

            
176
    Runtime::Loader& runtime_;
177
    // The max_retry resource is nested within the budget to maintain state if the retry budget is
178
    // toggled.
179
    ManagedResourceImpl max_retry_resource_;
180
    const absl::optional<double> budget_percent_;
181
    const absl::optional<uint32_t> min_retry_concurrency_;
182
    const std::string budget_percent_key_;
183
    const std::string min_retry_concurrency_key_;
184
    const ResourceLimit& requests_;
185
    const ResourceLimit& pending_requests_;
186
    Stats::Gauge& remaining_;
187
  };
188

            
189
  ManagedResourceImpl connections_;
190
  ManagedResourceImpl pending_requests_;
191
  ManagedResourceImpl requests_;
192
  ManagedResourceImpl connection_pools_;
193
  uint64_t max_connections_per_host_;
194
  RetryBudgetImpl retries_;
195
};
196

            
197
using ResourceManagerImplPtr = std::unique_ptr<ResourceManagerImpl>;
198

            
199
} // namespace Upstream
200
} // namespace Envoy