1
#pragma once
2

            
3
#include <algorithm>
4
#include <bitset>
5
#include <cstdint>
6
#include <functional>
7
#include <map>
8
#include <memory>
9
#include <string>
10

            
11
#include "envoy/common/optref.h"
12
#include "envoy/config/cluster/v3/cluster.pb.h"
13
#include "envoy/extensions/load_balancing_policies/subset/v3/subset.pb.h"
14
#include "envoy/extensions/load_balancing_policies/subset/v3/subset.pb.validate.h"
15
#include "envoy/runtime/runtime.h"
16
#include "envoy/stats/scope.h"
17
#include "envoy/stream_info/stream_info.h"
18
#include "envoy/upstream/load_balancer.h"
19

            
20
#include "source/common/common/macros.h"
21
#include "source/common/protobuf/protobuf.h"
22
#include "source/common/protobuf/utility.h"
23
#include "source/common/upstream/load_balancer_context_base.h"
24
#include "source/common/upstream/upstream_impl.h"
25
#include "source/extensions/load_balancing_policies/subset/subset_lb_config.h"
26

            
27
#include "absl/container/node_hash_map.h"
28
#include "absl/types/optional.h"
29

            
30
namespace Envoy {
31
namespace Upstream {
32

            
33
using HostHashSet = absl::flat_hash_set<HostSharedPtr>;
34

            
35
class SubsetLoadBalancer : public LoadBalancer, Logger::Loggable<Logger::Id::upstream> {
36
public:
37
  SubsetLoadBalancer(const SubsetLoadBalancerConfig& lb_config,
38
                     const Upstream::ClusterInfo& cluster_info, const PrioritySet& priority_set,
39
                     const PrioritySet* local_priority_set, ClusterLbStats& stats,
40
                     Stats::Scope& scope, Runtime::Loader& runtime, Random::RandomGenerator& random,
41
                     TimeSource& time_source);
42
  ~SubsetLoadBalancer() override;
43

            
44
  // Upstream::LoadBalancer
45
  HostSelectionResponse chooseHost(LoadBalancerContext* context) override;
46
  // TODO(alyssawilk) implement for non-metadata match.
47
1
  HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; }
48
  // Pool selection not implemented.
49
  absl::optional<Upstream::SelectedPoolAndConnection>
50
  selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
51
                           const Upstream::Host& /*host*/,
52
1
                           std::vector<uint8_t>& /*hash_key*/) override {
53
1
    return absl::nullopt;
54
1
  }
55
  // Lifetime tracking not implemented.
56
1
  OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
57
1
    return {};
58
1
  }
59

            
60
10
  std::string childLoadBalancerName() const { return lb_config_.childLoadBalancerName(); }
61
  using SubsetMetadata = std::vector<std::pair<std::string, Protobuf::Value>>;
62
  static std::string describeMetadata(const SubsetMetadata& kvs);
63

            
64
private:
65
  struct SubsetSelectorFallbackParams;
66

            
67
  void initSubsetAnyOnce();
68
  void initSubsetDefaultOnce();
69
  void initSubsetSelectorMap();
70
  void initSelectorFallbackSubset(const envoy::config::cluster::v3::Cluster::LbSubsetConfig::
71
                                      LbSubsetSelector::LbSubsetSelectorFallbackPolicy&);
72
  HostConstSharedPtr
73
  chooseHostForSelectorFallbackPolicy(const SubsetSelectorFallbackParams& fallback_params,
74
                                      LoadBalancerContext* context);
75

            
76
  HostConstSharedPtr chooseHostIteration(LoadBalancerContext* context);
77

            
78
  // Represents a subset of an original HostSet.
79
  class HostSubsetImpl : public HostSetImpl {
80
  public:
81
    HostSubsetImpl(const HostSet& original_host_set, bool locality_weight_aware,
82
                   bool scale_locality_weight)
83
517
        : HostSetImpl(original_host_set.priority(), original_host_set.weightedPriorityHealth(),
84
517
                      original_host_set.overprovisioningFactor()),
85
517
          original_host_set_(original_host_set), locality_weight_aware_(locality_weight_aware),
86
517
          scale_locality_weight_(scale_locality_weight) {}
87

            
88
    void update(const HostHashSet& matching_hosts, const HostVector& hosts_added,
89
                const HostVector& hosts_removed);
90
    LocalityWeightsConstSharedPtr
91
    determineLocalityWeights(const HostsPerLocality& hosts_per_locality) const;
92

            
93
  private:
94
    const HostSet& original_host_set_;
95
    const bool locality_weight_aware_;
96
    const bool scale_locality_weight_;
97
  };
98

            
99
  // Represents a subset of an original PrioritySet.
100
  class PrioritySubsetImpl : public PrioritySetImpl {
101
  public:
102
    PrioritySubsetImpl(const SubsetLoadBalancer& subset_lb, bool locality_weight_aware,
103
                       bool scale_locality_weight);
104

            
105
    void update(uint32_t priority, const HostHashSet& matching_hosts, const HostVector& hosts_added,
106
                const HostVector& hosts_removed);
107

            
108
1981
    bool empty() const { return empty_; }
109

            
110
483
    void triggerCallbacks() {
111
966
      for (size_t i = 0; i < hostSetsPerPriority().size(); ++i) {
112
483
        runReferenceUpdateCallbacks(i, {}, {});
113
483
      }
114
483
    }
115

            
116
    void updateSubset(uint32_t priority, const HostHashSet& matching_hosts,
117
791
                      const HostVector& hosts_added, const HostVector& hosts_removed) {
118
791
      reinterpret_cast<HostSubsetImpl*>(host_sets_[priority].get())
119
791
          ->update(matching_hosts, hosts_added, hosts_removed);
120
791
      runUpdateCallbacks(hosts_added, hosts_removed);
121
791
    }
122

            
123
    // Thread aware LB if applicable.
124
    ThreadAwareLoadBalancerPtr thread_aware_lb_;
125
    // Current active LB.
126
    LoadBalancerPtr lb_;
127

            
128
  protected:
129
    HostSetImplPtr createHostSet(uint32_t priority, absl::optional<bool> weighted_priority_health,
130
                                 absl::optional<uint32_t> overprovisioning_factor) override;
131

            
132
  private:
133
    const PrioritySet& original_priority_set_;
134
    const PrioritySet* original_local_priority_set_{};
135
    const bool locality_weight_aware_;
136
    const bool scale_locality_weight_;
137
    bool empty_ = true;
138
  };
139

            
140
  using HostSubsetImplPtr = std::unique_ptr<HostSubsetImpl>;
141
  using PrioritySubsetImplPtr = std::unique_ptr<PrioritySubsetImpl>;
142

            
143
  class LbSubsetEntry;
144
  struct SubsetSelectorMap;
145

            
146
  using LbSubsetEntryPtr = std::shared_ptr<LbSubsetEntry>;
147
  using SubsetSelectorMapPtr = std::shared_ptr<SubsetSelectorMap>;
148
  using ValueSubsetMap = absl::node_hash_map<HashedValue, LbSubsetEntryPtr>;
149
  using LbSubsetMap = absl::node_hash_map<std::string, ValueSubsetMap>;
150
  using SubsetSelectorFallbackParamsRef = std::reference_wrapper<SubsetSelectorFallbackParams>;
151
  using MetadataFallbacks = Protobuf::RepeatedPtrField<Protobuf::Value>;
152

            
153
public:
154
  class LoadBalancerContextWrapper : public LoadBalancerContext {
155
  public:
156
    LoadBalancerContextWrapper(LoadBalancerContext* wrapped,
157
                               const std::set<std::string>& filtered_metadata_match_criteria_names);
158

            
159
    LoadBalancerContextWrapper(LoadBalancerContext* wrapped,
160
                               Router::MetadataMatchCriteriaConstPtr metadata_match_criteria)
161
10
        : wrapped_(wrapped), metadata_match_(std::move(metadata_match_criteria)) {}
162

            
163
    LoadBalancerContextWrapper(LoadBalancerContext* wrapped,
164
                               const Protobuf::Struct& metadata_match_criteria_override);
165
    // LoadBalancerContext
166
1
    absl::optional<uint64_t> computeHashKey() override { return wrapped_->computeHashKey(); }
167
210
    const Router::MetadataMatchCriteria* metadataMatchCriteria() override {
168
210
      return metadata_match_.get();
169
210
    }
170
1
    const Network::Connection* downstreamConnection() const override {
171
1
      return wrapped_->downstreamConnection();
172
1
    }
173
    StreamInfo::StreamInfo* requestStreamInfo() const override {
174
      return wrapped_->requestStreamInfo();
175
    }
176
1
    const Http::RequestHeaderMap* downstreamHeaders() const override {
177
1
      return wrapped_->downstreamHeaders();
178
1
    }
179
    const HealthyAndDegradedLoad& determinePriorityLoad(
180
        const PrioritySet& priority_set, const HealthyAndDegradedLoad& original_priority_load,
181
100
        const Upstream::RetryPriority::PriorityMappingFunc& priority_mapping_func) override {
182
100
      return wrapped_->determinePriorityLoad(priority_set, original_priority_load,
183
100
                                             priority_mapping_func);
184
100
    }
185
100
    bool shouldSelectAnotherHost(const Host& host) override {
186
100
      return wrapped_->shouldSelectAnotherHost(host);
187
100
    }
188
101
    uint32_t hostSelectionRetryCount() const override {
189
101
      return wrapped_->hostSelectionRetryCount();
190
101
    }
191
1
    Network::Socket::OptionsSharedPtr upstreamSocketOptions() const override {
192
1
      return wrapped_->upstreamSocketOptions();
193
1
    }
194
1
    Network::TransportSocketOptionsConstSharedPtr upstreamTransportSocketOptions() const override {
195
1
      return wrapped_->upstreamTransportSocketOptions();
196
1
    }
197

            
198
1
    absl::optional<OverrideHost> overrideHostToSelect() const override {
199
1
      return wrapped_->overrideHostToSelect();
200
1
    }
201
    void onAsyncHostSelection(Upstream::HostConstSharedPtr&&, std::string&&) override {}
202
1
    void setHeadersModifier(std::function<void(Http::ResponseHeaderMap&)> modifier) override {
203
1
      wrapped_->setHeadersModifier(std::move(modifier));
204
1
    }
205

            
206
  private:
207
    LoadBalancerContext* wrapped_;
208
    Router::MetadataMatchCriteriaConstPtr metadata_match_;
209
  };
210

            
211
private:
212
  struct SubsetSelectorFallbackParams {
213
    envoy::config::cluster::v3::Cluster::LbSubsetConfig::LbSubsetSelector::
214
        LbSubsetSelectorFallbackPolicy fallback_policy_;
215
    const std::set<std::string>* fallback_keys_subset_ = nullptr;
216
  };
217

            
218
  struct SubsetSelectorMap {
219
    absl::node_hash_map<std::string, SubsetSelectorMapPtr> subset_keys_;
220
    SubsetSelectorFallbackParams fallback_params_;
221
  };
222

            
223
  class LbSubset {
224
  public:
225
505
    virtual ~LbSubset() = default;
226
    virtual HostSelectionResponse chooseHost(LoadBalancerContext* context) const PURE;
227
    virtual void pushHost(uint32_t priority, HostSharedPtr host) PURE;
228
    virtual void finalize(uint32_t priority) PURE;
229
    virtual bool active() const PURE;
230
  };
231
  using LbSubsetPtr = std::unique_ptr<LbSubset>;
232

            
233
  class PriorityLbSubset : public LbSubset {
234
  public:
235
    PriorityLbSubset(const SubsetLoadBalancer& subset_lb, bool locality_weight_aware,
236
                     bool scale_locality_weight)
237
483
        : subset_(subset_lb, locality_weight_aware, scale_locality_weight) {}
238

            
239
    // Subset
240
2976
    HostSelectionResponse chooseHost(LoadBalancerContext* context) const override {
241
2976
      return subset_.lb_->chooseHost(context);
242
2976
    }
243
1144
    void pushHost(uint32_t priority, HostSharedPtr host) override {
244
1618
      while (host_sets_.size() <= priority) {
245
474
        host_sets_.push_back({HostHashSet(), HostHashSet()});
246
474
      }
247
1144
      host_sets_[priority].second.emplace(std::move(host));
248
1144
    }
249
    // Called after pushHost. Update subset by the hosts that pushed in the pushHost. If no any host
250
    // is pushed then subset_ will be set to empty.
251
    void finalize(uint32_t priority) override;
252

            
253
1981
    bool active() const override { return !subset_.empty(); }
254

            
255
    std::vector<std::pair<HostHashSet, HostHashSet>> host_sets_;
256
    PrioritySubsetImpl subset_;
257
  };
258

            
259
  class SingleHostLbSubset : public LbSubset {
260
    // Subset
261
32
    HostSelectionResponse chooseHost(LoadBalancerContext*) const override { return subset_; }
262
    // This is called at most once for every update for single host subset.
263
25
    void pushHost(uint32_t priority, HostSharedPtr host) override {
264
25
      new_hosts_[priority] = std::move(host);
265
25
    }
266
    // Called after pushHost. Update subset by the host that pushed in the pushHost. If no any host
267
    // is pushed then subset_ will be set to nullptr.
268
27
    void finalize(uint32_t priority) override {
269
27
      if (auto iter = new_hosts_.find(priority); iter == new_hosts_.end()) {
270
        // No any host for current subset and priority. Try remove record in the hosts_.
271
2
        hosts_.erase(priority);
272
25
      } else {
273
        // Single host is set for current subset and priority.
274
25
        hosts_[priority] = std::move(iter->second);
275
25
        new_hosts_.erase(priority);
276
25
      }
277

            
278
27
      if (hosts_.empty()) {
279
2
        subset_ = nullptr;
280
2
        return;
281
2
      }
282

            
283
25
      subset_ = hosts_.begin()->second;
284
25
    }
285
59
    bool active() const override { return subset_ != nullptr; }
286

            
287
    // We will update subsets for every priority separately and these simple map can help us
288
    // to ensure which priority has valid host quickly.
289
    std::map<uint32_t, HostSharedPtr> hosts_;
290
    std::map<uint32_t, HostSharedPtr> new_hosts_;
291
    HostConstSharedPtr subset_;
292
  };
293

            
294
  // Entry in the subset hierarchy.
295
  class LbSubsetEntry {
296
  public:
297
594
    LbSubsetEntry() = default;
298

            
299
3655
    bool initialized() const { return lb_subset_ != nullptr; }
300
2195
    bool active() const { return initialized() && lb_subset_->active(); }
301
121
    bool hasChildren() const { return !children_.empty(); }
302

            
303
    LbSubsetMap children_;
304

            
305
    // Only initialized if a match exists at this level.
306
    LbSubsetPtr lb_subset_;
307

            
308
    // Used to quick check if entry is single host subset entry or not.
309
    bool single_host_subset_{};
310
  };
311

            
312
  void initLbSubsetEntryOnce(LbSubsetEntryPtr& entry, bool single_host_subset);
313

            
314
  // Create filtered default subset (if necessary) and other subsets based on current hosts.
315
  void refreshSubsets();
316
  void refreshSubsets(uint32_t priority);
317

            
318
  // Called by HostSet::MemberUpdateCb
319
  void update(uint32_t priority, const HostVector& all_hosts);
320

            
321
  void updateFallbackSubset(uint32_t priority, const HostVector& all_hosts);
322
  void processSubsets(uint32_t priority, const HostVector& all_hosts);
323

            
324
  HostConstSharedPtr tryChooseHostFromContext(LoadBalancerContext* context, bool& host_chosen);
325

            
326
  absl::optional<SubsetSelectorFallbackParamsRef>
327
  tryFindSelectorFallbackParams(LoadBalancerContext* context);
328

            
329
  bool hostMatches(const SubsetMetadata& kvs, const Host& host);
330

            
331
  LbSubsetEntryPtr
332
  findSubset(const std::vector<Router::MetadataMatchCriterionConstSharedPtr>& matches);
333

            
334
  LbSubsetEntryPtr findOrCreateLbSubsetEntry(LbSubsetMap& subsets, const SubsetMetadata& kvs,
335
                                             uint32_t idx);
336
  void forEachSubset(LbSubsetMap& subsets, std::function<void(LbSubsetEntryPtr&)> cb);
337
  void purgeEmptySubsets(LbSubsetMap& subsets);
338

            
339
  std::vector<SubsetMetadata> extractSubsetMetadata(const std::set<std::string>& subset_keys,
340
                                                    const Host& host);
341
  HostConstSharedPtr chooseHostWithMetadataFallbacks(LoadBalancerContext* context,
342
                                                     const MetadataFallbacks& metadata_fallbacks);
343
  const Protobuf::Value* getMetadataFallbackList(LoadBalancerContext* context) const;
344
  LoadBalancerContextWrapper removeMetadataFallbackList(LoadBalancerContext* context);
345

            
346
  const SubsetLoadBalancerConfig& lb_config_;
347
  const Upstream::ClusterInfo& cluster_info_;
348
  ClusterLbStats& stats_;
349
  Stats::Scope& scope_;
350
  Runtime::Loader& runtime_;
351
  Random::RandomGenerator& random_;
352
  TimeSource& time_source_;
353

            
354
  const envoy::config::cluster::v3::Cluster::LbSubsetConfig::LbSubsetFallbackPolicy
355
      fallback_policy_;
356
  const envoy::config::cluster::v3::Cluster::LbSubsetConfig::LbSubsetMetadataFallbackPolicy
357
      metadata_fallback_policy_;
358
  const SubsetMetadata default_subset_metadata_;
359
  std::vector<SubsetSelectorPtr> subset_selectors_;
360

            
361
  const PrioritySet& original_priority_set_;
362
  const PrioritySet* original_local_priority_set_;
363
  Common::CallbackHandlePtr original_priority_set_callback_handle_;
364

            
365
  LbSubsetEntryPtr subset_any_;
366
  LbSubsetEntryPtr subset_default_;
367

            
368
  // Reference to sub_set_any_ or subset_default_.
369
  LbSubsetEntryPtr fallback_subset_;
370
  LbSubsetEntryPtr panic_mode_subset_;
371

            
372
  // Forms a trie-like structure. Requires lexically sorted Host and Route metadata.
373
  LbSubsetMap subsets_;
374
  // Forms a trie-like structure of lexically sorted keys+fallback policy from subset
375
  // selectors configuration
376
  SubsetSelectorMapPtr selectors_;
377

            
378
  Stats::Gauge* single_duplicate_stat_{};
379

            
380
  // Keep small members (bools and enums) at the end of class, to reduce alignment overhead.
381
  const bool locality_weight_aware_ : 1;
382
  const bool scale_locality_weight_ : 1;
383
  const bool list_as_any_ : 1;
384
  const bool allow_redundant_keys_{};
385
};
386

            
387
} // namespace Upstream
388
} // namespace Envoy