1
#include "source/extensions/load_balancing_policies/subset/subset_lb.h"
2

            
3
#include <memory>
4

            
5
#include "envoy/common/optref.h"
6
#include "envoy/config/cluster/v3/cluster.pb.h"
7
#include "envoy/config/core/v3/base.pb.h"
8
#include "envoy/runtime/runtime.h"
9

            
10
#include "source/common/common/assert.h"
11
#include "source/common/config/metadata.h"
12
#include "source/common/config/well_known_names.h"
13
#include "source/common/protobuf/utility.h"
14

            
15
#include "absl/container/node_hash_set.h"
16

            
17
namespace Envoy {
18
namespace Upstream {
19

            
20
namespace {
21

            
22
/**
23
 * Iterates all the selectors and finds the first one that match_criteria contains all the keys
24
 * of the selector. Returns nullptr if no selector matches, otherwise returns sub match criteria
25
 * that contains only the keys of the selector.
26
 */
27
Router::MetadataMatchCriteriaConstPtr
28
filterCriteriaBySelectors(const std::vector<SubsetSelectorPtr>& subset_selectors,
29
11
                          const Router::MetadataMatchCriteria* match_criteria) {
30
11
  if (match_criteria != nullptr) {
31
23
    for (const auto& selector : subset_selectors) {
32
23
      const auto& selector_keys = selector->selectorKeys();
33
23
      auto sub_match_criteria = match_criteria->filterMatchCriteria(selector_keys);
34
23
      if (sub_match_criteria != nullptr &&
35
23
          sub_match_criteria->metadataMatchCriteria().size() == selector_keys.size()) {
36
10
        return sub_match_criteria;
37
10
      }
38
23
    }
39
10
  }
40
1
  return nullptr;
41
11
}
42

            
43
} // namespace
44

            
45
using HostPredicate = std::function<bool(const Host&)>;
46

            
47
SubsetLoadBalancer::SubsetLoadBalancer(const SubsetLoadBalancerConfig& lb_config,
48
                                       const Upstream::ClusterInfo& cluster_info,
49
                                       const PrioritySet& priority_set,
50
                                       const PrioritySet* local_priority_set, ClusterLbStats& stats,
51
                                       Stats::Scope& scope, Runtime::Loader& runtime,
52
                                       Random::RandomGenerator& random, TimeSource& time_source)
53
210
    : lb_config_(lb_config), cluster_info_(cluster_info), stats_(stats), scope_(scope),
54
210
      runtime_(runtime), random_(random), time_source_(time_source),
55
210
      fallback_policy_(lb_config_.subsetInfo().fallbackPolicy()),
56
210
      metadata_fallback_policy_(lb_config_.subsetInfo().metadataFallbackPolicy()),
57
210
      default_subset_metadata_(lb_config_.subsetInfo().defaultSubset().fields().begin(),
58
210
                               lb_config_.subsetInfo().defaultSubset().fields().end()),
59
210
      subset_selectors_(lb_config_.subsetInfo().subsetSelectors()),
60
210
      original_priority_set_(priority_set), original_local_priority_set_(local_priority_set),
61
210
      locality_weight_aware_(lb_config_.subsetInfo().localityWeightAware()),
62
210
      scale_locality_weight_(lb_config_.subsetInfo().scaleLocalityWeight()),
63
210
      list_as_any_(lb_config_.subsetInfo().listAsAny()),
64
210
      allow_redundant_keys_(lb_config_.subsetInfo().allowRedundantKeys()) {
65
210
  ASSERT(lb_config_.subsetInfo().isEnabled());
66

            
67
210
  if (fallback_policy_ != envoy::config::cluster::v3::Cluster::LbSubsetConfig::NO_FALLBACK) {
68
107
    if (fallback_policy_ == envoy::config::cluster::v3::Cluster::LbSubsetConfig::ANY_ENDPOINT) {
69
89
      ENVOY_LOG(debug, "subset lb: creating any-endpoint fallback load balancer");
70
89
      initSubsetAnyOnce();
71
89
      fallback_subset_ = subset_any_;
72
89
    } else {
73
18
      ENVOY_LOG(debug, "subset lb: creating fallback load balancer for {}",
74
18
                describeMetadata(default_subset_metadata_));
75
18
      initSubsetDefaultOnce();
76
18
      fallback_subset_ = subset_default_;
77
18
    }
78
107
  }
79

            
80
210
  if (lb_config_.subsetInfo().panicModeAny()) {
81
3
    initSubsetAnyOnce();
82
3
    panic_mode_subset_ = subset_any_;
83
3
  }
84

            
85
210
  initSubsetSelectorMap();
86

            
87
  // Create filtered default subset (if necessary) and other subsets based on current hosts.
88
210
  refreshSubsets();
89

            
90
  // Configure future updates.
91
210
  original_priority_set_callback_handle_ = priority_set.addPriorityUpdateCb(
92
240
      [this](uint32_t priority, const HostVector&, const HostVector&) {
93
231
        refreshSubsets(priority);
94
231
        purgeEmptySubsets(subsets_);
95
231
      });
96
210
}
97

            
98
210
SubsetLoadBalancer::~SubsetLoadBalancer() {
99
  // Ensure gauges reflect correct values.
100
457
  forEachSubset(subsets_, [&](LbSubsetEntryPtr entry) {
101
421
    if (entry->active()) {
102
340
      stats_.lb_subsets_removed_.inc();
103
340
      stats_.lb_subsets_active_.dec();
104
340
    }
105
421
  });
106
210
}
107

            
108
210
void SubsetLoadBalancer::refreshSubsets() {
109
210
  for (auto& host_set : original_priority_set_.hostSetsPerPriority()) {
110
210
    update(host_set->priority(), host_set->hosts());
111
210
  }
112
210
}
113

            
114
231
void SubsetLoadBalancer::refreshSubsets(uint32_t priority) {
115
231
  const auto& host_sets = original_priority_set_.hostSetsPerPriority();
116
231
  ASSERT(priority < host_sets.size());
117
231
  update(priority, host_sets[priority]->hosts());
118
231
}
119

            
120
96
void SubsetLoadBalancer::initSubsetAnyOnce() {
121
96
  if (!subset_any_) {
122
96
    subset_any_ = std::make_shared<LbSubsetEntry>();
123
96
    subset_any_->lb_subset_ =
124
96
        std::make_unique<PriorityLbSubset>(*this, locality_weight_aware_, scale_locality_weight_);
125
96
  }
126
96
}
127

            
128
22
void SubsetLoadBalancer::initSubsetDefaultOnce() {
129
22
  if (!subset_default_) {
130
22
    subset_default_ = std::make_shared<LbSubsetEntry>();
131
22
    subset_default_->lb_subset_ =
132
22
        std::make_unique<PriorityLbSubset>(*this, locality_weight_aware_, scale_locality_weight_);
133
22
  }
134
22
}
135

            
136
210
void SubsetLoadBalancer::initSubsetSelectorMap() {
137
210
  selectors_ = std::make_shared<SubsetSelectorMap>();
138
210
  SubsetSelectorMapPtr selectors;
139
226
  for (const auto& subset_selector : subset_selectors_) {
140
226
    const auto& selector_keys = subset_selector->selectorKeys();
141
226
    const auto& selector_fallback_policy = subset_selector->fallbackPolicy();
142
226
    const auto& selector_fallback_keys_subset = subset_selector->fallbackKeysSubset();
143

            
144
226
    uint32_t pos = 0;
145
226
    selectors = selectors_;
146
310
    for (const auto& key : selector_keys) {
147
310
      const auto& selector_it = selectors->subset_keys_.find(key);
148
310
      pos++;
149
310
      if (selector_it == selectors->subset_keys_.end()) {
150
296
        selectors->subset_keys_.emplace(std::make_pair(key, std::make_shared<SubsetSelectorMap>()));
151
296
        const auto& child_selector = selectors->subset_keys_.find(key);
152
        // if this is last key for given selector, check if it has fallback specified
153
296
        if (pos == selector_keys.size()) {
154
222
          child_selector->second->fallback_params_.fallback_policy_ = selector_fallback_policy;
155
222
          child_selector->second->fallback_params_.fallback_keys_subset_ =
156
222
              &selector_fallback_keys_subset;
157
222
          initSelectorFallbackSubset(selector_fallback_policy);
158
222
        }
159
296
        selectors = child_selector->second;
160
296
      } else {
161
14
        selectors = selector_it->second;
162
14
      }
163
310
    }
164
226
    selectors = selectors_;
165
226
  }
166
210
}
167

            
168
void SubsetLoadBalancer::initSelectorFallbackSubset(
169
    const envoy::config::cluster::v3::Cluster::LbSubsetConfig::LbSubsetSelector::
170
222
        LbSubsetSelectorFallbackPolicy& fallback_policy) {
171
222
  if (fallback_policy ==
172
222
          envoy::config::cluster::v3::Cluster::LbSubsetConfig::LbSubsetSelector::ANY_ENDPOINT &&
173
222
      subset_any_ == nullptr) {
174
4
    ENVOY_LOG(debug, "subset lb: creating any-endpoint fallback load balancer for selector");
175
4
    initSubsetAnyOnce();
176
218
  } else if (fallback_policy == envoy::config::cluster::v3::Cluster::LbSubsetConfig::
177
218
                                    LbSubsetSelector::DEFAULT_SUBSET &&
178
218
             subset_default_ == nullptr) {
179
4
    ENVOY_LOG(debug, "subset lb: creating default subset fallback load balancer for selector");
180
4
    initSubsetDefaultOnce();
181
4
  }
182
222
}
183

            
184
3055
HostSelectionResponse SubsetLoadBalancer::chooseHost(LoadBalancerContext* context) {
185
3055
  if (metadata_fallback_policy_ !=
186
3055
      envoy::config::cluster::v3::
187
3055
          Cluster_LbSubsetConfig_LbSubsetMetadataFallbackPolicy_FALLBACK_LIST) {
188
3009
    return chooseHostIteration(context);
189
3009
  }
190
46
  const Protobuf::Value* metadata_fallbacks = getMetadataFallbackList(context);
191
46
  if (metadata_fallbacks == nullptr) {
192
8
    return chooseHostIteration(context);
193
8
  }
194

            
195
38
  LoadBalancerContextWrapper context_no_metadata_fallback = removeMetadataFallbackList(context);
196
38
  return chooseHostWithMetadataFallbacks(&context_no_metadata_fallback,
197
38
                                         metadata_fallbacks->list_value().values());
198
46
}
199

            
200
HostConstSharedPtr
201
SubsetLoadBalancer::chooseHostWithMetadataFallbacks(LoadBalancerContext* context,
202
38
                                                    const MetadataFallbacks& metadata_fallbacks) {
203

            
204
38
  if (metadata_fallbacks.empty()) {
205
4
    return chooseHostIteration(context);
206
4
  }
207

            
208
56
  for (const auto& metadata_override : metadata_fallbacks) {
209
56
    LoadBalancerContextWrapper context_wrapper(context, metadata_override.struct_value());
210
56
    const auto host = chooseHostIteration(&context_wrapper);
211
56
    if (host) {
212
34
      return host;
213
34
    }
214
56
  }
215
  return nullptr;
216
34
}
217

            
218
// assumes context->metadataMatchCriteria() is not null and there is 'fallback_list' criterion
219
SubsetLoadBalancer::LoadBalancerContextWrapper
220
38
SubsetLoadBalancer::removeMetadataFallbackList(LoadBalancerContext* context) {
221
38
  ASSERT(context->metadataMatchCriteria());
222
38
  const auto& match_criteria = context->metadataMatchCriteria()->metadataMatchCriteria();
223

            
224
38
  std::set<std::string> to_preserve;
225
54
  for (const auto& criterion : match_criteria) {
226
54
    if (criterion->name() != Config::MetadataEnvoyLbKeys::get().FALLBACK_LIST) {
227
16
      to_preserve.emplace(criterion->name());
228
16
    }
229
54
  }
230
38
  return {context, to_preserve};
231
38
}
232

            
233
const Protobuf::Value*
234
46
SubsetLoadBalancer::getMetadataFallbackList(LoadBalancerContext* context) const {
235
46
  if (context == nullptr) {
236
2
    return nullptr;
237
2
  }
238
44
  const auto& match_criteria = context->metadataMatchCriteria();
239
44
  if (match_criteria == nullptr) {
240
2
    return nullptr;
241
2
  }
242

            
243
42
  for (const auto& criterion : match_criteria->metadataMatchCriteria()) {
244
42
    if (criterion->name() == Config::MetadataEnvoyLbKeys::get().FALLBACK_LIST) {
245
38
      return &criterion->value().value();
246
38
    } // TODO(MarcinFalkowski): optimization: stop iteration when lexically after 'fallback_list'
247
42
  }
248
4
  return nullptr;
249
42
}
250

            
251
3103
HostConstSharedPtr SubsetLoadBalancer::chooseHostIteration(LoadBalancerContext* context) {
252
3103
  if (context) {
253
3035
    LoadBalancerContext* actual_used_context = context;
254
3035
    std::unique_ptr<LoadBalancerContextWrapper> actual_used_context_wrapper;
255

            
256
3035
    if (allow_redundant_keys_) {
257
      // If redundant keys are allowed then we can filter the metadata match criteria by the
258
      // selectors first to reduce the redundant keys.
259
11
      auto actual_used_criteria =
260
11
          filterCriteriaBySelectors(subset_selectors_, context->metadataMatchCriteria());
261
11
      if (actual_used_criteria != nullptr) {
262
10
        actual_used_context_wrapper =
263
10
            std::make_unique<LoadBalancerContextWrapper>(context, std::move(actual_used_criteria));
264
10
        actual_used_context = actual_used_context_wrapper.get();
265
10
      }
266
11
    }
267

            
268
3035
    bool host_chosen;
269
3035
    HostConstSharedPtr host = tryChooseHostFromContext(actual_used_context, host_chosen);
270
3035
    if (host_chosen) {
271
      // Subset lookup succeeded, return this result even if it's nullptr.
272
1374
      return host;
273
1374
    }
274
    // otherwise check if there is fallback policy configured for given route metadata
275
1661
    absl::optional<SubsetSelectorFallbackParamsRef> selector_fallback_params =
276
1661
        tryFindSelectorFallbackParams(actual_used_context);
277
1661
    if (selector_fallback_params &&
278
1661
        selector_fallback_params->get().fallback_policy_ !=
279
1634
            envoy::config::cluster::v3::Cluster::LbSubsetConfig::LbSubsetSelector::NOT_DEFINED) {
280
      // return result according to configured fallback policy
281
59
      return chooseHostForSelectorFallbackPolicy(*selector_fallback_params, actual_used_context);
282
59
    }
283
1661
  }
284

            
285
1670
  if (fallback_subset_ == nullptr) {
286
55
    return nullptr;
287
55
  }
288

            
289
1615
  HostSelectionResponse host_response = fallback_subset_->lb_subset_->chooseHost(context);
290
1615
  if (host_response.host || host_response.cancelable) {
291
1610
    stats_.lb_subsets_fallback_.inc();
292
1610
    return host_response.host;
293
1610
  }
294

            
295
5
  if (panic_mode_subset_ != nullptr) {
296
5
    HostSelectionResponse host_response = panic_mode_subset_->lb_subset_->chooseHost(context);
297
5
    if (host_response.host || host_response.cancelable) {
298
5
      stats_.lb_subsets_fallback_panic_.inc();
299
5
      return host_response.host;
300
5
    }
301
5
  }
302

            
303
  return nullptr;
304
5
}
305

            
306
absl::optional<SubsetLoadBalancer::SubsetSelectorFallbackParamsRef>
307
1661
SubsetLoadBalancer::tryFindSelectorFallbackParams(LoadBalancerContext* context) {
308
1661
  const Router::MetadataMatchCriteria* match_criteria = context->metadataMatchCriteria();
309
1661
  if (!match_criteria) {
310
3
    return absl::nullopt;
311
3
  }
312
1658
  const auto match_criteria_vec = match_criteria->metadataMatchCriteria();
313
1658
  SubsetSelectorMapPtr selectors = selectors_;
314
1658
  if (selectors == nullptr) {
315
    return absl::nullopt;
316
  }
317
1724
  for (uint32_t i = 0; i < match_criteria_vec.size(); i++) {
318
1720
    const Router::MetadataMatchCriterion& match_criterion = *match_criteria_vec[i];
319
1720
    const auto& subset_it = selectors->subset_keys_.find(match_criterion.name());
320
1720
    if (subset_it == selectors->subset_keys_.end()) {
321
      // No subsets with this key (at this level in the hierarchy).
322
20
      break;
323
20
    }
324

            
325
1700
    if (i + 1 == match_criteria_vec.size()) {
326
      // We've reached the end of the criteria, and they all matched.
327
1634
      return subset_it->second->fallback_params_;
328
1634
    }
329
66
    selectors = subset_it->second;
330
66
  }
331

            
332
24
  return absl::nullopt;
333
1658
}
334

            
335
HostConstSharedPtr SubsetLoadBalancer::chooseHostForSelectorFallbackPolicy(
336
59
    const SubsetSelectorFallbackParams& fallback_params, LoadBalancerContext* context) {
337
59
  const auto& fallback_policy = fallback_params.fallback_policy_;
338
59
  if (fallback_policy ==
339
59
          envoy::config::cluster::v3::Cluster::LbSubsetConfig::LbSubsetSelector::ANY_ENDPOINT &&
340
59
      subset_any_ != nullptr) {
341
8
    return Upstream::LoadBalancer::onlyAllowSynchronousHostSelection(
342
8
        subset_any_->lb_subset_->chooseHost(context));
343
51
  } else if (fallback_policy == envoy::config::cluster::v3::Cluster::LbSubsetConfig::
344
51
                                    LbSubsetSelector::DEFAULT_SUBSET &&
345
51
             subset_default_ != nullptr) {
346
6
    return Upstream::LoadBalancer::onlyAllowSynchronousHostSelection(
347
6
        subset_default_->lb_subset_->chooseHost(context));
348
45
  } else if (fallback_policy ==
349
45
             envoy::config::cluster::v3::Cluster::LbSubsetConfig::LbSubsetSelector::KEYS_SUBSET) {
350
26
    ASSERT(fallback_params.fallback_keys_subset_);
351
26
    auto filtered_context = std::make_unique<LoadBalancerContextWrapper>(
352
26
        context, *fallback_params.fallback_keys_subset_);
353
    // Perform whole subset load balancing again with reduced metadata match criteria
354
26
    return chooseHostIteration(filtered_context.get());
355
26
  } else {
356
19
    return {nullptr};
357
19
  }
358
59
}
359

            
360
// Find a host from the subsets. Sets host_chosen to false and returns nullptr if the context has
361
// no metadata match criteria, if there is no matching subset, or if the matching subset contains
362
// no hosts (ignoring health). Otherwise, host_chosen is true and the returns HostConstSharedPtr
363
// is from the subset's load balancer (technically, it may still be nullptr).
364
HostConstSharedPtr SubsetLoadBalancer::tryChooseHostFromContext(LoadBalancerContext* context,
365
3035
                                                                bool& host_chosen) {
366
3035
  host_chosen = false;
367
3035
  const Router::MetadataMatchCriteria* match_criteria = context->metadataMatchCriteria();
368
3035
  if (!match_criteria) {
369
3
    return {nullptr};
370
3
  }
371

            
372
  // Route has metadata match criteria defined, see if we have a matching subset.
373
3032
  LbSubsetEntryPtr entry = findSubset(match_criteria->metadataMatchCriteria());
374
3032
  if (entry == nullptr || !entry->active()) {
375
    // No matching subset or subset not active: use fallback policy.
376
1658
    return {nullptr};
377
1658
  }
378

            
379
1374
  host_chosen = true;
380
1374
  stats_.lb_subsets_selected_.inc();
381
1374
  return Upstream::LoadBalancer::onlyAllowSynchronousHostSelection(
382
1374
      entry->lb_subset_->chooseHost(context));
383
3032
}
384

            
385
// Iterates over the given metadata match criteria (which must be lexically sorted by key) and
386
// find a matching LbSubsetEntryPtr, if any.
387
SubsetLoadBalancer::LbSubsetEntryPtr SubsetLoadBalancer::findSubset(
388
3032
    const std::vector<Router::MetadataMatchCriterionConstSharedPtr>& match_criteria) {
389
3032
  const LbSubsetMap* subsets = &subsets_;
390

            
391
  // Because the match_criteria and the host metadata used to populate subsets_ are sorted in the
392
  // same order, we can iterate over the criteria and perform a lookup for each key and value,
393
  // starting with the root LbSubsetMap and using the previous iteration's LbSubsetMap thereafter
394
  // (tracked in subsets). If ever a criterion's key or value is not found, there is no subset for
395
  // this criteria. If we reach the last criterion, we've found the LbSubsetEntry for the
396
  // criteria, which may or may not have a subset attached to it.
397
3134
  for (uint32_t i = 0; i < match_criteria.size(); i++) {
398
3130
    const Router::MetadataMatchCriterion& match_criterion = *match_criteria[i];
399
3130
    const auto& subset_it = subsets->find(match_criterion.name());
400
3130
    if (subset_it == subsets->end()) {
401
      // No subsets with this key (at this level in the hierarchy).
402
1527
      break;
403
1527
    }
404

            
405
1603
    const ValueSubsetMap& vs_map = subset_it->second;
406
1603
    const auto& vs_it = vs_map.find(match_criterion.value());
407
1603
    if (vs_it == vs_map.end()) {
408
      // No subsets with this value.
409
127
      break;
410
127
    }
411

            
412
1476
    const LbSubsetEntryPtr& entry = vs_it->second;
413
1476
    if (i + 1 == match_criteria.size()) {
414
      // We've reached the end of the criteria, and they all matched.
415
1374
      return entry;
416
1374
    }
417

            
418
102
    subsets = &entry->children_;
419
102
  }
420

            
421
1658
  return nullptr;
422
3032
}
423

            
424
441
void SubsetLoadBalancer::updateFallbackSubset(uint32_t priority, const HostVector& all_hosts) {
425
441
  auto update_func = [priority, &all_hosts](LbSubsetPtr& subset, const HostPredicate& predicate) {
426
523
    for (const auto& host : all_hosts) {
427
522
      if (predicate(*host)) {
428
437
        subset->pushHost(priority, host);
429
437
      }
430
522
    }
431
268
    subset->finalize(priority);
432
268
  };
433

            
434
441
  if (subset_any_ != nullptr) {
435
365
    update_func(subset_any_->lb_subset_, [](const Host&) { return true; });
436
217
  }
437

            
438
441
  if (subset_default_ != nullptr) {
439
51
    HostPredicate predicate = std::bind(&SubsetLoadBalancer::hostMatches, this,
440
51
                                        default_subset_metadata_, std::placeholders::_1);
441
51
    update_func(subset_default_->lb_subset_, predicate);
442
51
  }
443

            
444
441
  if (fallback_subset_ == nullptr) {
445
190
    ENVOY_LOG(debug, "subset lb: fallback load balancer disabled");
446
190
    return;
447
190
  }
448

            
449
  // Same thing for the panic mode subset.
450
251
  ASSERT(panic_mode_subset_ == nullptr || panic_mode_subset_ == subset_any_);
451
251
}
452

            
453
748
void SubsetLoadBalancer::initLbSubsetEntryOnce(LbSubsetEntryPtr& entry, bool single_host_subset) {
454
748
  ASSERT(entry != nullptr);
455
748
  if (entry->initialized()) {
456
361
    return;
457
361
  }
458

            
459
387
  if (single_host_subset) {
460
22
    entry->lb_subset_ = std::make_unique<SingleHostLbSubset>();
461
22
    entry->single_host_subset_ = true;
462
365
  } else {
463
365
    entry->lb_subset_ =
464
365
        std::make_unique<PriorityLbSubset>(*this, locality_weight_aware_, scale_locality_weight_);
465
365
    entry->single_host_subset_ = false;
466
365
  }
467

            
468
387
  stats_.lb_subsets_active_.inc();
469
387
  stats_.lb_subsets_created_.inc();
470
387
}
471

            
472
// Iterates all the hosts of specified priority, looking up an LbSubsetEntryPtr for each and add
473
// hosts to related entry. Because the metadata of host can be updated inlined, we must evaluate
474
// every hosts for every update.
475
441
void SubsetLoadBalancer::processSubsets(uint32_t priority, const HostVector& all_hosts) {
476
441
  absl::flat_hash_set<const LbSubsetEntry*> single_host_entries;
477
441
  uint64_t collision_count_of_single_host_entries{};
478

            
479
918
  for (const auto& host : all_hosts) {
480
1052
    for (const auto& subset_selector : subset_selectors_) {
481
1052
      const auto& keys = subset_selector->selectorKeys();
482
      // For each host, for each subset key, attempt to extract the metadata corresponding to the
483
      // key from the host.
484
1052
      std::vector<SubsetMetadata> all_kvs = extractSubsetMetadata(keys, *host);
485
1052
      for (const auto& kvs : all_kvs) {
486
        // The host has metadata for each key, find or create its subset.
487
748
        auto entry = findOrCreateLbSubsetEntry(subsets_, kvs, 0);
488
748
        initLbSubsetEntryOnce(entry, subset_selector->singleHostPerSubset());
489

            
490
748
        if (entry->single_host_subset_) {
491
41
          if (single_host_entries.contains(entry.get())) {
492
16
            collision_count_of_single_host_entries++;
493
16
            continue;
494
16
          }
495
25
          single_host_entries.emplace(entry.get());
496
25
        }
497

            
498
732
        entry->lb_subset_->pushHost(priority, host);
499
732
      }
500
1052
    }
501
889
  }
502

            
503
  // This stat isn't added to `ClusterTrafficStats` because it wouldn't be used for nearly all
504
  // clusters, and is only set during configuration updates, not in the data path, so performance
505
  // of looking up the stat isn't critical.
506
441
  if (single_duplicate_stat_ == nullptr) {
507
210
    Stats::StatNameManagedStorage name_storage("lb_subsets_single_host_per_subset_duplicate",
508
210
                                               scope_.symbolTable());
509

            
510
210
    single_duplicate_stat_ = &Stats::Utility::gaugeFromElements(
511
210
        scope_, {name_storage.statName()}, Stats::Gauge::ImportMode::Accumulate);
512
210
  }
513
441
  single_duplicate_stat_->set(collision_count_of_single_host_entries);
514

            
515
  // Finalize updates after all the hosts are evaluated.
516
786
  forEachSubset(subsets_, [priority](LbSubsetEntryPtr entry) {
517
657
    if (entry->initialized()) {
518
550
      entry->lb_subset_->finalize(priority);
519
550
    }
520
657
  });
521
441
}
522

            
523
// Given the latest all hosts, update all subsets for this priority level, creating new subsets as
524
// necessary.
525
441
void SubsetLoadBalancer::update(uint32_t priority, const HostVector& all_hosts) {
526
441
  updateFallbackSubset(priority, all_hosts);
527
441
  processSubsets(priority, all_hosts);
528
441
}
529

            
530
158
bool SubsetLoadBalancer::hostMatches(const SubsetMetadata& kvs, const Host& host) {
531
158
  return Config::Metadata::metadataLabelMatch(
532
158
      kvs, host.metadata().get(), Config::MetadataFilters::get().ENVOY_LB, list_as_any_);
533
158
}
534

            
535
// Iterates over subset_keys looking up values from the given host's metadata. Each key-value pair
536
// is appended to kvs. Returns a non-empty value if the host has a value for each key.
537
std::vector<SubsetLoadBalancer::SubsetMetadata>
538
SubsetLoadBalancer::extractSubsetMetadata(const std::set<std::string>& subset_keys,
539
1052
                                          const Host& host) {
540
1052
  std::vector<SubsetMetadata> all_kvs;
541
1052
  if (!host.metadata()) {
542
171
    return all_kvs;
543
171
  }
544
881
  const envoy::config::core::v3::Metadata& metadata = *host.metadata();
545
881
  const auto& filter_it = metadata.filter_metadata().find(Config::MetadataFilters::get().ENVOY_LB);
546
881
  if (filter_it == metadata.filter_metadata().end()) {
547
12
    return all_kvs;
548
12
  }
549

            
550
869
  const auto& fields = filter_it->second.fields();
551
1043
  for (const auto& key : subset_keys) {
552
1043
    const auto it = fields.find(key);
553
1043
    if (it == fields.end()) {
554
139
      all_kvs.clear();
555
139
      break;
556
139
    }
557

            
558
904
    if (list_as_any_ && it->second.kind_case() == Protobuf::Value::kListValue) {
559
      // If the list of kvs is empty, we initialize one kvs for each value in the list.
560
      // Otherwise, we branch the list of kvs by generating one new kvs per old kvs per
561
      // new value.
562
      //
563
      // For example, two kvs (<a=1>, <a=2>) joined with the kv foo=[bar,baz] results in four kvs:
564
      //   <a=1,foo=bar>
565
      //   <a=1,foo=baz>
566
      //   <a=2,foo=bar>
567
      //   <a=2,foo=baz>
568
14
      if (all_kvs.empty()) {
569
20
        for (const auto& v : it->second.list_value().values()) {
570
20
          all_kvs.emplace_back(SubsetMetadata({make_pair(key, v)}));
571
20
        }
572
10
      } else {
573
4
        std::vector<SubsetMetadata> new_kvs;
574
8
        for (const auto& kvs : all_kvs) {
575
16
          for (const auto& v : it->second.list_value().values()) {
576
16
            auto kv_copy = kvs;
577
16
            kv_copy.emplace_back(make_pair(key, v));
578
16
            new_kvs.emplace_back(kv_copy);
579
16
          }
580
8
        }
581
4
        all_kvs = new_kvs;
582
4
      }
583

            
584
890
    } else {
585
890
      if (all_kvs.empty()) {
586
724
        all_kvs.emplace_back(SubsetMetadata({std::make_pair(key, it->second)}));
587
744
      } else {
588
166
        for (auto& kvs : all_kvs) {
589
166
          kvs.emplace_back(std::make_pair(key, it->second));
590
166
        }
591
166
      }
592
890
    }
593
904
  }
594

            
595
869
  return all_kvs;
596
881
}
597

            
598
5
std::string SubsetLoadBalancer::describeMetadata(const SubsetLoadBalancer::SubsetMetadata& kvs) {
599
5
  if (kvs.empty()) {
600
1
    return "<no metadata>";
601
1
  }
602

            
603
4
  std::ostringstream buf;
604
4
  bool first = true;
605
6
  for (const auto& it : kvs) {
606
6
    if (!first) {
607
2
      buf << ", ";
608
4
    } else {
609
4
      first = false;
610
4
    }
611

            
612
6
    const Protobuf::Value& value = it.second;
613
6
    buf << it.first << "=" << MessageUtil::getJsonStringFromMessageOrError(value);
614
6
  }
615
4
  return buf.str();
616
5
}
617

            
618
// Given a vector of key-values (from extractSubsetMetadata), recursively finds the matching
619
// LbSubsetEntryPtr.
620
SubsetLoadBalancer::LbSubsetEntryPtr
621
SubsetLoadBalancer::findOrCreateLbSubsetEntry(LbSubsetMap& subsets, const SubsetMetadata& kvs,
622
930
                                              uint32_t idx) {
623
930
  ASSERT(idx < kvs.size());
624

            
625
930
  const std::string& name = kvs[idx].first;
626
930
  const Protobuf::Value& pb_value = kvs[idx].second;
627
930
  const HashedValue value(pb_value);
628

            
629
930
  LbSubsetEntryPtr entry;
630

            
631
930
  const auto kv_it = subsets.find(name);
632

            
633
930
  if (kv_it != subsets.end()) {
634
641
    ValueSubsetMap& value_subset_map = kv_it->second;
635
641
    const auto vs_it = value_subset_map.find(value);
636
641
    if (vs_it != value_subset_map.end()) {
637
454
      entry = vs_it->second;
638
454
    }
639
641
  }
640

            
641
930
  if (!entry) {
642
    // Not found. Create an uninitialized entry.
643
476
    entry = std::make_shared<LbSubsetEntry>();
644
476
    if (kv_it != subsets.end()) {
645
187
      ValueSubsetMap& value_subset_map = kv_it->second;
646
187
      value_subset_map.emplace(value, entry);
647
289
    } else {
648
289
      ValueSubsetMap value_subset_map = {{value, entry}};
649
289
      subsets.emplace(name, value_subset_map);
650
289
    }
651
476
  }
652

            
653
930
  idx++;
654
930
  if (idx == kvs.size()) {
655
    // We've matched all the key-values, return the entry.
656
748
    return entry;
657
748
  }
658

            
659
182
  return findOrCreateLbSubsetEntry(entry->children_, kvs, idx);
660
930
}
661

            
662
// Invokes cb for each LbSubsetEntryPtr in subsets.
663
void SubsetLoadBalancer::forEachSubset(LbSubsetMap& subsets,
664
1729
                                       std::function<void(LbSubsetEntryPtr&)> cb) {
665
1729
  for (auto& vsm : subsets) {
666
1078
    for (auto& em : vsm.second) {
667
1078
      LbSubsetEntryPtr entry = em.second;
668
1078
      cb(entry);
669
1078
      forEachSubset(entry->children_, cb);
670
1078
    }
671
683
  }
672
1729
}
673

            
674
631
void SubsetLoadBalancer::purgeEmptySubsets(LbSubsetMap& subsets) {
675
874
  for (auto subset_it = subsets.begin(); subset_it != subsets.end();) {
676
643
    for (auto it = subset_it->second.begin(); it != subset_it->second.end();) {
677
400
      LbSubsetEntryPtr entry = it->second;
678

            
679
400
      purgeEmptySubsets(entry->children_);
680

            
681
400
      if (entry->active() || entry->hasChildren()) {
682
345
        it++;
683
345
        continue;
684
345
      }
685

            
686
      // If it wasn't initialized, it wasn't accounted for.
687
55
      if (entry->initialized()) {
688
47
        stats_.lb_subsets_active_.dec();
689
47
        stats_.lb_subsets_removed_.inc();
690
47
      }
691

            
692
55
      auto next_it = std::next(it);
693
55
      subset_it->second.erase(it);
694
55
      it = next_it;
695
55
    }
696

            
697
243
    if (subset_it->second.empty()) {
698
14
      auto next_subset_it = std::next(subset_it);
699
14
      subsets.erase(subset_it);
700
14
      subset_it = next_subset_it;
701
229
    } else {
702
229
      subset_it++;
703
229
    }
704
243
  }
705
631
}
706

            
707
// Initialize a new HostSubsetImpl and LoadBalancer from the SubsetLoadBalancer, filtering hosts
708
// with the given predicate.
709
SubsetLoadBalancer::PrioritySubsetImpl::PrioritySubsetImpl(const SubsetLoadBalancer& subset_lb,
710
                                                           bool locality_weight_aware,
711
                                                           bool scale_locality_weight)
712
483
    : original_priority_set_(subset_lb.original_priority_set_),
713
483
      original_local_priority_set_(subset_lb.original_local_priority_set_),
714
483
      locality_weight_aware_(locality_weight_aware), scale_locality_weight_(scale_locality_weight) {
715
  // Create at least one host set.
716
483
  getOrCreateHostSet(0);
717

            
718
483
  thread_aware_lb_ =
719
483
      subset_lb.lb_config_.createLoadBalancer(subset_lb.cluster_info_, *this, subset_lb.runtime_,
720
483
                                              subset_lb.random_, subset_lb.time_source_);
721
483
  ASSERT(thread_aware_lb_ != nullptr);
722
483
  THROW_IF_NOT_OK(thread_aware_lb_->initialize());
723
483
  lb_ = thread_aware_lb_->factory()->create({*this, original_local_priority_set_});
724

            
725
483
  triggerCallbacks();
726
483
}
727

            
728
// Given all hosts that that belong in this subset, hosts_added and hosts_removed, update the
729
// underlying HostSet. The hosts_added Hosts and hosts_removed Hosts have been filtered to match
730
// hosts that belong in this subset.
731
void SubsetLoadBalancer::HostSubsetImpl::update(const HostHashSet& matching_hosts,
732
                                                const HostVector& hosts_added,
733
791
                                                const HostVector& hosts_removed) {
734
6761
  auto cached_predicate = [&matching_hosts](const auto& host) {
735
6760
    return matching_hosts.count(&host) == 1;
736
6760
  };
737

            
738
  // TODO(snowp): If we had a unhealthyHosts() function we could avoid potentially traversing
739
  // the list of hosts twice.
740
791
  auto hosts = std::make_shared<HostVector>();
741
791
  hosts->reserve(original_host_set_.hosts().size());
742
2195
  for (const auto& host : original_host_set_.hosts()) {
743
2194
    if (cached_predicate(*host)) {
744
1144
      hosts->emplace_back(host);
745
1144
    }
746
2194
  }
747

            
748
791
  auto healthy_hosts = std::make_shared<HealthyHostVector>();
749
791
  healthy_hosts->get().reserve(original_host_set_.healthyHosts().size());
750
2130
  for (const auto& host : original_host_set_.healthyHosts()) {
751
2129
    if (cached_predicate(*host)) {
752
1081
      healthy_hosts->get().emplace_back(host);
753
1081
    }
754
2129
  }
755

            
756
791
  auto degraded_hosts = std::make_shared<DegradedHostVector>();
757
791
  degraded_hosts->get().reserve(original_host_set_.degradedHosts().size());
758
791
  for (const auto& host : original_host_set_.degradedHosts()) {
759
    if (cached_predicate(*host)) {
760
      degraded_hosts->get().emplace_back(host);
761
    }
762
  }
763

            
764
791
  auto excluded_hosts = std::make_shared<ExcludedHostVector>();
765
791
  excluded_hosts->get().reserve(original_host_set_.excludedHosts().size());
766
791
  for (const auto& host : original_host_set_.excludedHosts()) {
767
    if (cached_predicate(*host)) {
768
      excluded_hosts->get().emplace_back(host);
769
    }
770
  }
771

            
772
  // If we only have one locality we can avoid the first call to filter() by
773
  // just creating a new HostsPerLocality from the list of all hosts.
774
791
  HostsPerLocalityConstSharedPtr hosts_per_locality;
775

            
776
791
  if (original_host_set_.hostsPerLocality().get().size() == 1) {
777
692
    hosts_per_locality = std::make_shared<HostsPerLocalityImpl>(
778
692
        *hosts, original_host_set_.hostsPerLocality().hasLocalLocality());
779
693
  } else {
780
99
    hosts_per_locality = original_host_set_.hostsPerLocality().filter({cached_predicate})[0];
781
99
  }
782

            
783
791
  auto healthy_hosts_per_locality =
784
791
      original_host_set_.healthyHostsPerLocality().filter({cached_predicate})[0];
785
791
  auto degraded_hosts_per_locality =
786
791
      original_host_set_.degradedHostsPerLocality().filter({cached_predicate})[0];
787
791
  auto excluded_hosts_per_locality =
788
791
      original_host_set_.excludedHostsPerLocality().filter({cached_predicate})[0];
789

            
790
791
  HostSetImpl::updateHosts(
791
791
      HostSetImpl::updateHostsParams(
792
791
          hosts, hosts_per_locality, healthy_hosts, healthy_hosts_per_locality, degraded_hosts,
793
791
          degraded_hosts_per_locality, excluded_hosts, excluded_hosts_per_locality),
794
791
      determineLocalityWeights(*hosts_per_locality), hosts_added, hosts_removed,
795
791
      original_host_set_.weightedPriorityHealth(), original_host_set_.overprovisioningFactor());
796
791
}
797

            
798
LocalityWeightsConstSharedPtr SubsetLoadBalancer::HostSubsetImpl::determineLocalityWeights(
799
791
    const HostsPerLocality& hosts_per_locality) const {
800
791
  if (locality_weight_aware_) {
801
10
    if (scale_locality_weight_) {
802
9
      const auto& original_hosts_per_locality = original_host_set_.hostsPerLocality().get();
803
      // E.g. we can be here in static clusters with actual locality weighting before pre-init
804
      // completes.
805
9
      if (!original_host_set_.localityWeights()) {
806
3
        return {};
807
3
      }
808
6
      const auto& original_weights = *original_host_set_.localityWeights();
809

            
810
6
      auto scaled_locality_weights = std::make_shared<LocalityWeights>(original_weights.size());
811
18
      for (uint32_t i = 0; i < original_weights.size(); ++i) {
812
        // If the original locality has zero hosts, skip it. This leaves the weight at zero.
813
12
        if (original_hosts_per_locality[i].empty()) {
814
          continue;
815
        }
816

            
817
        // Otherwise, scale it proportionally to the number of hosts removed by the subset
818
        // predicate.
819
12
        (*scaled_locality_weights)[i] =
820
12
            std::round(float((original_weights[i] * hosts_per_locality.get()[i].size())) /
821
12
                       original_hosts_per_locality[i].size());
822
12
      }
823

            
824
6
      return scaled_locality_weights;
825
9
    } else {
826
1
      return original_host_set_.localityWeights();
827
1
    }
828
10
  }
829
781
  return {};
830
791
}
831

            
832
HostSetImplPtr SubsetLoadBalancer::PrioritySubsetImpl::createHostSet(
833
    uint32_t priority, absl::optional<bool> weighted_priority_health,
834
517
    absl::optional<uint32_t> overprovisioning_factor) {
835
  // Use original hostset's overprovisioning_factor.
836
517
  RELEASE_ASSERT(priority < original_priority_set_.hostSetsPerPriority().size(), "");
837

            
838
517
  const HostSetPtr& host_set = original_priority_set_.hostSetsPerPriority()[priority];
839

            
840
517
  ASSERT(!overprovisioning_factor.has_value() ||
841
517
         overprovisioning_factor.value() == host_set->overprovisioningFactor());
842
517
  ASSERT(!weighted_priority_health.has_value() ||
843
517
         weighted_priority_health.value() == host_set->weightedPriorityHealth());
844
517
  return HostSetImplPtr{
845
517
      new HostSubsetImpl(*host_set, locality_weight_aware_, scale_locality_weight_)};
846
517
}
847

            
848
void SubsetLoadBalancer::PrioritySubsetImpl::update(uint32_t priority,
849
                                                    const HostHashSet& matching_hosts,
850
                                                    const HostVector& hosts_added,
851
791
                                                    const HostVector& hosts_removed) {
852
791
  const auto& host_subset = getOrCreateHostSet(priority);
853
791
  updateSubset(priority, matching_hosts, hosts_added, hosts_removed);
854

            
855
791
  if (host_subset.hosts().empty() != empty_) {
856
540
    empty_ = true;
857
544
    for (auto& host_set : hostSetsPerPriority()) {
858
544
      empty_ &= host_set->hosts().empty();
859
544
    }
860
540
  }
861

            
862
  // Create a new worker local LB if needed.
863
  // TODO(mattklein123): See the PrioritySubsetImpl constructor for additional comments on how
864
  // we can do better here.
865
791
  if (thread_aware_lb_ != nullptr && thread_aware_lb_->factory()->recreateOnHostChange()) {
866
22
    lb_ = thread_aware_lb_->factory()->create({*this, original_local_priority_set_});
867
22
  }
868
791
}
869

            
870
791
void SubsetLoadBalancer::PriorityLbSubset::finalize(uint32_t priority) {
871
834
  while (host_sets_.size() <= priority) {
872
43
    host_sets_.push_back({HostHashSet(), HostHashSet()});
873
43
  }
874
791
  auto& [old_hosts, new_hosts] = host_sets_[priority];
875

            
876
791
  HostVector added;
877
791
  HostVector removed;
878

            
879
791
  for (const auto& host : old_hosts) {
880
351
    if (new_hosts.count(host) == 0) {
881
108
      removed.emplace_back(host);
882
108
    }
883
351
  }
884

            
885
1145
  for (const auto& host : new_hosts) {
886
1144
    if (old_hosts.count(host) == 0) {
887
901
      added.emplace_back(host);
888
901
    }
889
1144
  }
890

            
891
791
  subset_.update(priority, new_hosts, added, removed);
892

            
893
791
  old_hosts.swap(new_hosts);
894
791
  new_hosts.clear();
895
791
}
896

            
897
SubsetLoadBalancer::LoadBalancerContextWrapper::LoadBalancerContextWrapper(
898
    LoadBalancerContext* wrapped,
899
    const std::set<std::string>& filtered_metadata_match_criteria_names)
900
65
    : wrapped_(wrapped) {
901
65
  ASSERT(wrapped->metadataMatchCriteria());
902

            
903
65
  metadata_match_ =
904
65
      wrapped->metadataMatchCriteria()->filterMatchCriteria(filtered_metadata_match_criteria_names);
905
65
}
906

            
907
SubsetLoadBalancer::LoadBalancerContextWrapper::LoadBalancerContextWrapper(
908
    LoadBalancerContext* wrapped, const Protobuf::Struct& metadata_match_criteria_override)
909
56
    : wrapped_(wrapped) {
910
56
  ASSERT(wrapped->metadataMatchCriteria());
911
56
  metadata_match_ =
912
56
      wrapped->metadataMatchCriteria()->mergeMatchCriteria(metadata_match_criteria_override);
913
56
}
914
} // namespace Upstream
915
} // namespace Envoy