Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/common/upstream/upstream_impl.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/common/upstream/upstream_impl.h"
2
3
#include <chrono>
4
#include <cstdint>
5
#include <limits>
6
#include <list>
7
#include <memory>
8
#include <string>
9
#include <vector>
10
11
#include "envoy/config/cluster/v3/circuit_breaker.pb.h"
12
#include "envoy/config/cluster/v3/cluster.pb.h"
13
#include "envoy/config/core/v3/address.pb.h"
14
#include "envoy/config/core/v3/base.pb.h"
15
#include "envoy/config/core/v3/health_check.pb.h"
16
#include "envoy/config/core/v3/protocol.pb.h"
17
#include "envoy/config/endpoint/v3/endpoint_components.pb.h"
18
#include "envoy/config/upstream/local_address_selector/v3/default_local_address_selector.pb.h"
19
#include "envoy/event/dispatcher.h"
20
#include "envoy/event/timer.h"
21
#include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.h"
22
#include "envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.pb.h"
23
#include "envoy/extensions/transport_sockets/raw_buffer/v3/raw_buffer.pb.h"
24
#include "envoy/init/manager.h"
25
#include "envoy/network/dns.h"
26
#include "envoy/network/transport_socket.h"
27
#include "envoy/registry/registry.h"
28
#include "envoy/secret/secret_manager.h"
29
#include "envoy/server/filter_config.h"
30
#include "envoy/server/transport_socket_config.h"
31
#include "envoy/ssl/context_manager.h"
32
#include "envoy/stats/scope.h"
33
#include "envoy/upstream/health_checker.h"
34
#include "envoy/upstream/upstream.h"
35
36
#include "source/common/common/dns_utils.h"
37
#include "source/common/common/enum_to_int.h"
38
#include "source/common/common/fmt.h"
39
#include "source/common/common/utility.h"
40
#include "source/common/config/utility.h"
41
#include "source/common/http/http1/codec_stats.h"
42
#include "source/common/http/http2/codec_stats.h"
43
#include "source/common/http/utility.h"
44
#include "source/common/network/address_impl.h"
45
#include "source/common/network/filter_state_proxy_info.h"
46
#include "source/common/network/happy_eyeballs_connection_impl.h"
47
#include "source/common/network/resolver_impl.h"
48
#include "source/common/network/socket_option_factory.h"
49
#include "source/common/network/socket_option_impl.h"
50
#include "source/common/protobuf/protobuf.h"
51
#include "source/common/protobuf/utility.h"
52
#include "source/common/router/config_utility.h"
53
#include "source/common/runtime/runtime_features.h"
54
#include "source/common/runtime/runtime_impl.h"
55
#include "source/common/stats/deferred_creation.h"
56
#include "source/common/upstream/cluster_factory_impl.h"
57
#include "source/common/upstream/health_checker_impl.h"
58
#include "source/extensions/filters/network/http_connection_manager/config.h"
59
#include "source/server/transport_socket_config_impl.h"
60
61
#include "absl/container/node_hash_set.h"
62
#include "absl/strings/str_cat.h"
63
64
namespace Envoy {
65
namespace Upstream {
66
namespace {
67
2.43k
std::string addressToString(Network::Address::InstanceConstSharedPtr address) {
68
2.43k
  if (!address) {
69
0
    return "";
70
0
  }
71
2.43k
  return address->asString();
72
2.43k
}
73
74
Network::TcpKeepaliveConfig
75
0
parseTcpKeepaliveConfig(const envoy::config::cluster::v3::Cluster& config) {
76
0
  const envoy::config::core::v3::TcpKeepalive& options =
77
0
      config.upstream_connection_options().tcp_keepalive();
78
0
  return Network::TcpKeepaliveConfig{
79
0
      PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, keepalive_probes, absl::optional<uint32_t>()),
80
0
      PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, keepalive_time, absl::optional<uint32_t>()),
81
0
      PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, keepalive_interval, absl::optional<uint32_t>())};
82
0
}
83
84
absl::StatusOr<ProtocolOptionsConfigConstSharedPtr>
85
createProtocolOptionsConfig(const std::string& name, const ProtobufWkt::Any& typed_config,
86
1.54k
                            Server::Configuration::ProtocolOptionsFactoryContext& factory_context) {
87
1.54k
  Server::Configuration::ProtocolOptionsFactory* factory =
88
1.54k
      Registry::FactoryRegistry<Server::Configuration::NamedNetworkFilterConfigFactory>::getFactory(
89
1.54k
          name);
90
1.54k
  if (factory == nullptr) {
91
1.54k
    factory =
92
1.54k
        Registry::FactoryRegistry<Server::Configuration::NamedHttpFilterConfigFactory>::getFactory(
93
1.54k
            name);
94
1.54k
  }
95
1.54k
  if (factory == nullptr) {
96
1.54k
    factory =
97
1.54k
        Registry::FactoryRegistry<Server::Configuration::ProtocolOptionsFactory>::getFactory(name);
98
1.54k
  }
99
100
1.54k
  if (factory == nullptr) {
101
0
    return absl::InvalidArgumentError(
102
0
        fmt::format("Didn't find a registered network or http filter or protocol "
103
0
                    "options implementation for name: '{}'",
104
0
                    name));
105
0
  }
106
107
1.54k
  ProtobufTypes::MessagePtr proto_config = factory->createEmptyProtocolOptionsProto();
108
109
1.54k
  if (proto_config == nullptr) {
110
0
    return absl::InvalidArgumentError(
111
0
        fmt::format("filter {} does not support protocol options", name));
112
0
  }
113
114
1.54k
  Envoy::Config::Utility::translateOpaqueConfig(
115
1.54k
      typed_config, factory_context.messageValidationVisitor(), *proto_config);
116
1.54k
  return factory->createProtocolOptionsConfig(*proto_config, factory_context);
117
1.54k
}
118
119
absl::StatusOr<absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr>>
120
parseExtensionProtocolOptions(
121
    const envoy::config::cluster::v3::Cluster& config,
122
2.40k
    Server::Configuration::ProtocolOptionsFactoryContext& factory_context) {
123
2.40k
  absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr> options;
124
125
2.40k
  for (const auto& it : config.typed_extension_protocol_options()) {
126
1.54k
    auto& name = it.first;
127
1.54k
    auto object_or_error = createProtocolOptionsConfig(name, it.second, factory_context);
128
1.54k
    RETURN_IF_NOT_OK_REF(object_or_error.status());
129
1.54k
    if (object_or_error.value() != nullptr) {
130
1.54k
      options[name] = std::move(object_or_error.value());
131
1.54k
    }
132
1.54k
  }
133
134
2.40k
  return options;
135
2.40k
}
136
137
// Updates the EDS health flags for an existing host to match the new host.
138
// @param updated_host the new host to read health flag values from.
139
// @param existing_host the host to update.
140
// @return bool whether the flag update caused the host health to change.
141
0
bool updateEdsHealthFlag(const Host& updated_host, Host& existing_host) {
142
0
  auto updated_eds_health_status = updated_host.edsHealthStatus();
143
144
  // Check if the health flag has changed.
145
0
  if (updated_eds_health_status == existing_host.edsHealthStatus()) {
146
0
    return false;
147
0
  }
148
149
0
  const auto previous_health = existing_host.coarseHealth();
150
0
  existing_host.setEdsHealthStatus(updated_eds_health_status);
151
152
  // Rebuild if changing the flag affected the host health.
153
0
  return previous_health != existing_host.coarseHealth();
154
0
}
155
156
// Converts a set of hosts into a HostVector, excluding certain hosts.
157
// @param hosts hosts to convert
158
// @param excluded_hosts hosts to exclude from the resulting vector.
159
HostVector filterHosts(const absl::node_hash_set<HostSharedPtr>& hosts,
160
28
                       const absl::node_hash_set<HostSharedPtr>& excluded_hosts) {
161
28
  HostVector net_hosts;
162
28
  net_hosts.reserve(hosts.size());
163
164
28
  for (const auto& host : hosts) {
165
14
    if (excluded_hosts.find(host) == excluded_hosts.end()) {
166
14
      net_hosts.emplace_back(host);
167
14
    }
168
14
  }
169
170
28
  return net_hosts;
171
28
}
172
173
Stats::ScopeSharedPtr generateStatsScope(const envoy::config::cluster::v3::Cluster& config,
174
2.40k
                                         Stats::Store& stats) {
175
2.40k
  return stats.createScope(fmt::format(
176
2.40k
      "cluster.{}.", config.alt_stat_name().empty() ? config.name() : config.alt_stat_name()));
177
2.40k
}
178
179
Network::ConnectionSocket::OptionsSharedPtr
180
buildBaseSocketOptions(const envoy::config::cluster::v3::Cluster& cluster_config,
181
2.40k
                       const envoy::config::core::v3::BindConfig& bootstrap_bind_config) {
182
2.40k
  Network::ConnectionSocket::OptionsSharedPtr base_options =
183
2.40k
      std::make_shared<Network::ConnectionSocket::Options>();
184
185
  // The process-wide `signal()` handling may fail to handle SIGPIPE if overridden
186
  // in the process (i.e., on a mobile client). Some OSes support handling it at the socket layer:
187
2.40k
  if (ENVOY_SOCKET_SO_NOSIGPIPE.hasValue()) {
188
0
    Network::Socket::appendOptions(base_options,
189
0
                                   Network::SocketOptionFactory::buildSocketNoSigpipeOptions());
190
0
  }
191
  // Cluster IP_FREEBIND settings, when set, will override the cluster manager wide settings.
192
2.40k
  if ((bootstrap_bind_config.freebind().value() &&
193
2.40k
       !cluster_config.upstream_bind_config().has_freebind()) ||
194
2.40k
      cluster_config.upstream_bind_config().freebind().value()) {
195
0
    Network::Socket::appendOptions(base_options,
196
0
                                   Network::SocketOptionFactory::buildIpFreebindOptions());
197
0
  }
198
2.40k
  if (cluster_config.upstream_connection_options().has_tcp_keepalive()) {
199
0
    Network::Socket::appendOptions(base_options,
200
0
                                   Network::SocketOptionFactory::buildTcpKeepaliveOptions(
201
0
                                       parseTcpKeepaliveConfig(cluster_config)));
202
0
  }
203
204
2.40k
  return base_options;
205
2.40k
}
206
207
Network::ConnectionSocket::OptionsSharedPtr
208
buildClusterSocketOptions(const envoy::config::cluster::v3::Cluster& cluster_config,
209
2.40k
                          const envoy::config::core::v3::BindConfig& bootstrap_bind_config) {
210
2.40k
  Network::ConnectionSocket::OptionsSharedPtr cluster_options =
211
2.40k
      std::make_shared<Network::ConnectionSocket::Options>();
212
  // Cluster socket_options trump cluster manager wide.
213
2.40k
  if (bootstrap_bind_config.socket_options().size() +
214
2.40k
          cluster_config.upstream_bind_config().socket_options().size() >
215
2.40k
      0) {
216
0
    auto socket_options = !cluster_config.upstream_bind_config().socket_options().empty()
217
0
                              ? cluster_config.upstream_bind_config().socket_options()
218
0
                              : bootstrap_bind_config.socket_options();
219
0
    Network::Socket::appendOptions(
220
0
        cluster_options, Network::SocketOptionFactory::buildLiteralOptions(socket_options));
221
0
  }
222
2.40k
  return cluster_options;
223
2.40k
}
224
225
absl::StatusOr<std::vector<::Envoy::Upstream::UpstreamLocalAddress>>
226
parseBindConfig(::Envoy::OptRef<const envoy::config::core::v3::BindConfig> bind_config,
227
                const absl::optional<std::string>& cluster_name,
228
                Network::ConnectionSocket::OptionsSharedPtr base_socket_options,
229
2.40k
                Network::ConnectionSocket::OptionsSharedPtr cluster_socket_options) {
230
231
2.40k
  std::vector<::Envoy::Upstream::UpstreamLocalAddress> upstream_local_addresses;
232
2.40k
  if (bind_config.has_value()) {
233
0
    UpstreamLocalAddress upstream_local_address;
234
0
    upstream_local_address.address_ = nullptr;
235
0
    if (bind_config->has_source_address()) {
236
237
0
      auto address_or_error =
238
0
          ::Envoy::Network::Address::resolveProtoSocketAddress(bind_config->source_address());
239
0
      RETURN_IF_NOT_OK_REF(address_or_error.status());
240
0
      upstream_local_address.address_ = address_or_error.value();
241
0
    }
242
0
    upstream_local_address.socket_options_ = std::make_shared<Network::ConnectionSocket::Options>();
243
244
0
    ::Envoy::Network::Socket::appendOptions(upstream_local_address.socket_options_,
245
0
                                            base_socket_options);
246
0
    ::Envoy::Network::Socket::appendOptions(upstream_local_address.socket_options_,
247
0
                                            cluster_socket_options);
248
249
0
    upstream_local_addresses.push_back(upstream_local_address);
250
251
0
    for (const auto& extra_source_address : bind_config->extra_source_addresses()) {
252
0
      UpstreamLocalAddress extra_upstream_local_address;
253
0
      auto address_or_error =
254
0
          ::Envoy::Network::Address::resolveProtoSocketAddress(extra_source_address.address());
255
0
      RETURN_IF_NOT_OK_REF(address_or_error.status());
256
0
      extra_upstream_local_address.address_ = address_or_error.value();
257
258
0
      extra_upstream_local_address.socket_options_ =
259
0
          std::make_shared<::Envoy::Network::ConnectionSocket::Options>();
260
0
      ::Envoy::Network::Socket::appendOptions(extra_upstream_local_address.socket_options_,
261
0
                                              base_socket_options);
262
263
0
      if (extra_source_address.has_socket_options()) {
264
0
        ::Envoy::Network::Socket::appendOptions(
265
0
            extra_upstream_local_address.socket_options_,
266
0
            ::Envoy::Network::SocketOptionFactory::buildLiteralOptions(
267
0
                extra_source_address.socket_options().socket_options()));
268
0
      } else {
269
0
        ::Envoy::Network::Socket::appendOptions(extra_upstream_local_address.socket_options_,
270
0
                                                cluster_socket_options);
271
0
      }
272
0
      upstream_local_addresses.push_back(extra_upstream_local_address);
273
0
    }
274
275
0
    for (const auto& additional_source_address : bind_config->additional_source_addresses()) {
276
0
      UpstreamLocalAddress additional_upstream_local_address;
277
0
      auto address_or_error =
278
0
          ::Envoy::Network::Address::resolveProtoSocketAddress(additional_source_address);
279
0
      RETURN_IF_NOT_OK_REF(address_or_error.status());
280
0
      additional_upstream_local_address.address_ = address_or_error.value();
281
0
      additional_upstream_local_address.socket_options_ =
282
0
          std::make_shared<::Envoy::Network::ConnectionSocket::Options>();
283
0
      ::Envoy::Network::Socket::appendOptions(additional_upstream_local_address.socket_options_,
284
0
                                              base_socket_options);
285
0
      ::Envoy::Network::Socket::appendOptions(additional_upstream_local_address.socket_options_,
286
0
                                              cluster_socket_options);
287
0
      upstream_local_addresses.push_back(additional_upstream_local_address);
288
0
    }
289
2.40k
  } else {
290
    // If there is no bind config specified, then return a nullptr for the address.
291
2.40k
    UpstreamLocalAddress local_address;
292
2.40k
    local_address.address_ = nullptr;
293
2.40k
    local_address.socket_options_ = std::make_shared<::Envoy::Network::ConnectionSocket::Options>();
294
2.40k
    ::Envoy::Network::Socket::appendOptions(local_address.socket_options_, base_socket_options);
295
2.40k
    Network::Socket::appendOptions(local_address.socket_options_, cluster_socket_options);
296
2.40k
    upstream_local_addresses.push_back(local_address);
297
2.40k
  }
298
299
  // Verify that we have valid addresses if size is greater than 1.
300
2.40k
  if (upstream_local_addresses.size() > 1) {
301
0
    for (auto const& upstream_local_address : upstream_local_addresses) {
302
0
      if (upstream_local_address.address_ == nullptr) {
303
0
        return absl::InvalidArgumentError(fmt::format(
304
0
            "{}'s upstream binding config has invalid IP addresses.",
305
0
            !(cluster_name.has_value()) ? "Bootstrap"
306
0
                                        : fmt::format("Cluster {}", cluster_name.value())));
307
0
      }
308
0
    }
309
0
  }
310
311
2.40k
  return upstream_local_addresses;
312
2.40k
}
313
314
absl::StatusOr<Envoy::Upstream::UpstreamLocalAddressSelectorConstSharedPtr>
315
createUpstreamLocalAddressSelector(
316
    const envoy::config::cluster::v3::Cluster& cluster_config,
317
2.40k
    const absl::optional<envoy::config::core::v3::BindConfig>& bootstrap_bind_config) {
318
319
  // Use the cluster bind config if specified. This completely overrides the
320
  // bootstrap bind config when present.
321
2.40k
  OptRef<const envoy::config::core::v3::BindConfig> bind_config;
322
2.40k
  absl::optional<std::string> cluster_name;
323
2.40k
  if (cluster_config.has_upstream_bind_config()) {
324
0
    bind_config.emplace(cluster_config.upstream_bind_config());
325
0
    cluster_name.emplace(cluster_config.name());
326
2.40k
  } else if (bootstrap_bind_config.has_value()) {
327
0
    bind_config.emplace(*bootstrap_bind_config);
328
0
  }
329
330
  // Verify that bind config is valid.
331
2.40k
  if (bind_config.has_value()) {
332
0
    if (bind_config->additional_source_addresses_size() > 0 &&
333
0
        bind_config->extra_source_addresses_size() > 0) {
334
0
      return absl::InvalidArgumentError(fmt::format(
335
0
          "Can't specify both `extra_source_addresses` and `additional_source_addresses` "
336
0
          "in the {}'s upstream binding config",
337
0
          !(cluster_name.has_value()) ? "Bootstrap"
338
0
                                      : fmt::format("Cluster {}", cluster_name.value())));
339
0
    }
340
341
0
    if (!bind_config->has_source_address() &&
342
0
        (bind_config->extra_source_addresses_size() > 0 ||
343
0
         bind_config->additional_source_addresses_size() > 0)) {
344
0
      return absl::InvalidArgumentError(fmt::format(
345
0
          "{}'s upstream binding config has extra/additional source addresses but no "
346
0
          "source_address. Extra/additional addresses cannot be specified if "
347
0
          "source_address is not set.",
348
0
          !(cluster_name.has_value()) ? "Bootstrap"
349
0
                                      : fmt::format("Cluster {}", cluster_name.value())));
350
0
    }
351
0
  }
352
2.40k
  UpstreamLocalAddressSelectorFactory* local_address_selector_factory;
353
2.40k
  const envoy::config::core::v3::TypedExtensionConfig* const local_address_selector_config =
354
2.40k
      bind_config.has_value() && bind_config->has_local_address_selector()
355
2.40k
          ? &bind_config->local_address_selector()
356
2.40k
          : nullptr;
357
2.40k
  if (local_address_selector_config) {
358
0
    local_address_selector_factory =
359
0
        Config::Utility::getAndCheckFactory<UpstreamLocalAddressSelectorFactory>(
360
0
            *local_address_selector_config, false);
361
2.40k
  } else {
362
    // Create the default local address selector if one was not specified.
363
2.40k
    envoy::config::upstream::local_address_selector::v3::DefaultLocalAddressSelector default_config;
364
2.40k
    envoy::config::core::v3::TypedExtensionConfig typed_extension;
365
2.40k
    typed_extension.mutable_typed_config()->PackFrom(default_config);
366
2.40k
    local_address_selector_factory =
367
2.40k
        Config::Utility::getAndCheckFactory<UpstreamLocalAddressSelectorFactory>(typed_extension,
368
2.40k
                                                                                 false);
369
2.40k
  }
370
2.40k
  absl::StatusOr<std::vector<::Envoy::Upstream::UpstreamLocalAddress>> config_or_error =
371
2.40k
      parseBindConfig(
372
2.40k
          bind_config, cluster_name,
373
2.40k
          buildBaseSocketOptions(cluster_config, bootstrap_bind_config.value_or(
374
2.40k
                                                     envoy::config::core::v3::BindConfig{})),
375
2.40k
          buildClusterSocketOptions(cluster_config, bootstrap_bind_config.value_or(
376
2.40k
                                                        envoy::config::core::v3::BindConfig{})));
377
2.40k
  RETURN_IF_NOT_OK_REF(config_or_error.status());
378
2.40k
  auto selector_or_error = local_address_selector_factory->createLocalAddressSelector(
379
2.40k
      config_or_error.value(), cluster_name);
380
2.40k
  RETURN_IF_NOT_OK_REF(selector_or_error.status());
381
2.40k
  return selector_or_error.value();
382
2.40k
}
383
384
class LoadBalancerFactoryContextImpl : public Upstream::LoadBalancerFactoryContext {
385
public:
386
  explicit LoadBalancerFactoryContextImpl(
387
      Server::Configuration::ServerFactoryContext& server_context)
388
2.40k
      : server_context_(server_context) {}
389
390
0
  Event::Dispatcher& mainThreadDispatcher() override {
391
0
    return server_context_.mainThreadDispatcher();
392
0
  }
393
394
private:
395
  Server::Configuration::ServerFactoryContext& server_context_;
396
};
397
398
} // namespace
399
400
// Allow disabling ALPN checks for transport sockets. See
401
// https://github.com/envoyproxy/envoy/issues/22876
402
const absl::string_view ClusterImplBase::DoNotValidateAlpnRuntimeKey =
403
    "config.do_not_validate_alpn_support";
404
405
// Overriding drop_overload ratio settings from EDS.
406
const absl::string_view ClusterImplBase::DropOverloadRuntimeKey =
407
    "load_balancing_policy.drop_overload_limit";
408
409
// TODO(pianiststickman): this implementation takes a lock on the hot path and puts a copy of the
410
// stat name into every host that receives a copy of that metric. This can be improved by putting
411
// a single copy of the stat name into a thread-local key->index map so that the lock can be avoided
412
// and using the index as the key to the stat map instead.
413
0
void LoadMetricStatsImpl::add(const absl::string_view key, double value) {
414
0
  absl::MutexLock lock(&mu_);
415
0
  if (map_ == nullptr) {
416
0
    map_ = std::make_unique<StatMap>();
417
0
  }
418
0
  Stat& stat = (*map_)[key];
419
0
  ++stat.num_requests_with_metric;
420
0
  stat.total_metric_value += value;
421
0
}
422
423
0
LoadMetricStats::StatMapPtr LoadMetricStatsImpl::latch() {
424
0
  absl::MutexLock lock(&mu_);
425
0
  StatMapPtr latched = std::move(map_);
426
0
  map_ = nullptr;
427
0
  return latched;
428
0
}
429
430
HostDescriptionImpl::HostDescriptionImpl(
431
    ClusterInfoConstSharedPtr cluster, const std::string& hostname,
432
    Network::Address::InstanceConstSharedPtr dest_address, MetadataConstSharedPtr endpoint_metadata,
433
    MetadataConstSharedPtr locality_metadata, const envoy::config::core::v3::Locality& locality,
434
    const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
435
    uint32_t priority, TimeSource& time_source, const AddressVector& address_list)
436
    : HostDescriptionImplBase(cluster, hostname, dest_address, endpoint_metadata, locality_metadata,
437
                              locality, health_check_config, priority, time_source),
438
      address_(dest_address),
439
      address_list_or_null_(makeAddressListOrNull(dest_address, address_list)),
440
206k
      health_check_address_(resolveHealthCheckAddress(health_check_config, dest_address)) {}
Envoy::Upstream::HostDescriptionImpl::HostDescriptionImpl(std::__1::shared_ptr<Envoy::Upstream::ClusterInfo const>, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<Envoy::Network::Address::Instance const>, std::__1::shared_ptr<envoy::config::core::v3::Metadata const>, std::__1::shared_ptr<envoy::config::core::v3::Metadata const>, envoy::config::core::v3::Locality const&, envoy::config::endpoint::v3::Endpoint_HealthCheckConfig const&, unsigned int, Envoy::TimeSource&, std::__1::vector<std::__1::shared_ptr<Envoy::Network::Address::Instance const>, std::__1::allocator<std::__1::shared_ptr<Envoy::Network::Address::Instance const> > > const&)
Line
Count
Source
440
205k
      health_check_address_(resolveHealthCheckAddress(health_check_config, dest_address)) {}
Envoy::Upstream::HostDescriptionImpl::HostDescriptionImpl(std::__1::shared_ptr<Envoy::Upstream::ClusterInfo const>, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<Envoy::Network::Address::Instance const>, std::__1::shared_ptr<envoy::config::core::v3::Metadata const>, std::__1::shared_ptr<envoy::config::core::v3::Metadata const>, envoy::config::core::v3::Locality const&, envoy::config::endpoint::v3::Endpoint_HealthCheckConfig const&, unsigned int, Envoy::TimeSource&, std::__1::vector<std::__1::shared_ptr<Envoy::Network::Address::Instance const>, std::__1::allocator<std::__1::shared_ptr<Envoy::Network::Address::Instance const> > > const&)
Line
Count
Source
440
815
      health_check_address_(resolveHealthCheckAddress(health_check_config, dest_address)) {}
441
442
HostDescriptionImplBase::HostDescriptionImplBase(
443
    ClusterInfoConstSharedPtr cluster, const std::string& hostname,
444
    Network::Address::InstanceConstSharedPtr dest_address, MetadataConstSharedPtr endpoint_metadata,
445
    MetadataConstSharedPtr locality_metadata, const envoy::config::core::v3::Locality& locality,
446
    const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
447
    uint32_t priority, TimeSource& time_source)
448
    : cluster_(cluster), hostname_(hostname),
449
      health_checks_hostname_(health_check_config.hostname()),
450
      canary_(Config::Metadata::metadataValue(endpoint_metadata.get(),
451
                                              Config::MetadataFilters::get().ENVOY_LB,
452
                                              Config::MetadataEnvoyLbKeys::get().CANARY)
453
                  .bool_value()),
454
      endpoint_metadata_(endpoint_metadata), locality_metadata_(locality_metadata),
455
      locality_(locality),
456
      locality_zone_stat_name_(locality.zone(), cluster->statsScope().symbolTable()),
457
      priority_(priority),
458
      socket_factory_(resolveTransportSocketFactory(dest_address, endpoint_metadata_.get())),
459
206k
      creation_time_(time_source.monotonicTime()) {
460
206k
  if (health_check_config.port_value() != 0 && dest_address->type() != Network::Address::Type::Ip) {
461
    // Setting the health check port to non-0 only works for IP-type addresses. Setting the port
462
    // for a pipe address is a misconfiguration. Throw an exception.
463
0
    throwEnvoyExceptionOrPanic(
464
0
        fmt::format("Invalid host configuration: non-zero port for non-IP address"));
465
0
  }
466
206k
}
467
468
HostDescription::SharedConstAddressVector HostDescriptionImplBase::makeAddressListOrNull(
469
206k
    const Network::Address::InstanceConstSharedPtr& address, const AddressVector& address_list) {
470
206k
  if (address_list.empty()) {
471
206k
    return {};
472
206k
  }
473
0
  ASSERT(*address_list.front() == *address);
474
0
  return std::make_shared<AddressVector>(address_list);
475
0
}
476
477
Network::UpstreamTransportSocketFactory& HostDescriptionImplBase::resolveTransportSocketFactory(
478
    const Network::Address::InstanceConstSharedPtr& dest_address,
479
207k
    const envoy::config::core::v3::Metadata* endpoint_metadata) const {
480
207k
  auto match =
481
207k
      cluster_->transportSocketMatcher().resolve(endpoint_metadata, locality_metadata_.get());
482
207k
  match.stats_.total_match_count_.inc();
483
207k
  ENVOY_LOG(debug, "transport socket match, socket {} selected for host with address {}",
484
207k
            match.name_, dest_address ? dest_address->asString() : "empty");
485
486
207k
  return match.factory_;
487
207k
}
488
489
Host::CreateConnectionData HostImplBase::createConnection(
490
    Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
491
911
    Network::TransportSocketOptionsConstSharedPtr transport_socket_options) const {
492
911
  return createConnection(dispatcher, cluster(), address(), addressListOrNull(),
493
911
                          transportSocketFactory(), options, transport_socket_options,
494
911
                          shared_from_this());
495
911
}
496
497
205k
void HostImplBase::setEdsHealthFlag(envoy::config::core::v3::HealthStatus health_status) {
498
  // Clear all old EDS health flags first.
499
205k
  HostImplBase::healthFlagClear(Host::HealthFlag::FAILED_EDS_HEALTH);
500
205k
  HostImplBase::healthFlagClear(Host::HealthFlag::DEGRADED_EDS_HEALTH);
501
205k
  if (Runtime::runtimeFeatureEnabled(
502
205k
          "envoy.reloadable_features.exclude_host_in_eds_status_draining")) {
503
205k
    HostImplBase::healthFlagClear(Host::HealthFlag::EDS_STATUS_DRAINING);
504
205k
  }
505
506
  // Set the appropriate EDS health flag.
507
205k
  switch (health_status) {
508
0
  case envoy::config::core::v3::UNHEALTHY:
509
0
    FALLTHRU;
510
0
  case envoy::config::core::v3::TIMEOUT:
511
0
    HostImplBase::healthFlagSet(Host::HealthFlag::FAILED_EDS_HEALTH);
512
0
    break;
513
0
  case envoy::config::core::v3::DRAINING:
514
0
    if (Runtime::runtimeFeatureEnabled(
515
0
            "envoy.reloadable_features.exclude_host_in_eds_status_draining")) {
516
0
      HostImplBase::healthFlagSet(Host::HealthFlag::EDS_STATUS_DRAINING);
517
0
    }
518
0
    break;
519
0
  case envoy::config::core::v3::DEGRADED:
520
0
    HostImplBase::healthFlagSet(Host::HealthFlag::DEGRADED_EDS_HEALTH);
521
0
    break;
522
205k
  default:
523
205k
    break;
524
    // No health flags should be set.
525
205k
  }
526
205k
}
527
528
Host::CreateConnectionData HostImplBase::createHealthCheckConnection(
529
    Event::Dispatcher& dispatcher,
530
    Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
531
22.4k
    const envoy::config::core::v3::Metadata* metadata) const {
532
22.4k
  Network::UpstreamTransportSocketFactory& factory =
533
22.4k
      (metadata != nullptr) ? resolveTransportSocketFactory(healthCheckAddress(), metadata)
534
22.4k
                            : transportSocketFactory();
535
22.4k
  return createConnection(dispatcher, cluster(), healthCheckAddress(), {}, factory, nullptr,
536
22.4k
                          transport_socket_options, shared_from_this());
537
22.4k
}
538
539
absl::optional<Network::Address::InstanceConstSharedPtr> HostImplBase::maybeGetProxyRedirectAddress(
540
    const Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
541
23.3k
    HostDescriptionConstSharedPtr host) {
542
23.3k
  if (transport_socket_options && transport_socket_options->http11ProxyInfo().has_value()) {
543
0
    return transport_socket_options->http11ProxyInfo()->proxy_address;
544
0
  }
545
546
  // See if host metadata contains a proxy address and only check locality metadata if host
547
  // metadata did not have the relevant key.
548
46.6k
  for (const auto& metadata : {host->metadata(), host->localityMetadata()}) {
549
46.6k
    if (metadata == nullptr) {
550
46.6k
      continue;
551
46.6k
    }
552
553
0
    auto addr_it = metadata->typed_filter_metadata().find(
554
0
        Config::MetadataFilters::get().ENVOY_HTTP11_PROXY_TRANSPORT_SOCKET_ADDR);
555
0
    if (addr_it == metadata->typed_filter_metadata().end()) {
556
0
      continue;
557
0
    }
558
559
    // Parse an address from the metadata.
560
0
    envoy::config::core::v3::Address proxy_addr;
561
0
    auto status = MessageUtil::unpackTo(addr_it->second, proxy_addr);
562
0
    if (!status.ok()) {
563
0
      ENVOY_LOG_EVERY_POW_2(
564
0
          error, "failed to parse proto from endpoint/locality metadata field {}, host={}",
565
0
          Config::MetadataFilters::get().ENVOY_HTTP11_PROXY_TRANSPORT_SOCKET_ADDR,
566
0
          host->hostname());
567
0
      return absl::nullopt;
568
0
    }
569
570
    // Resolve the parsed address proto.
571
0
    auto resolve_status = Network::Address::resolveProtoAddress(proxy_addr);
572
0
    if (!resolve_status.ok()) {
573
0
      ENVOY_LOG_EVERY_POW_2(
574
0
          error, "failed to resolve address from endpoint/locality metadata field {}, host={}",
575
0
          Config::MetadataFilters::get().ENVOY_HTTP11_PROXY_TRANSPORT_SOCKET_ADDR,
576
0
          host->hostname());
577
0
      return absl::nullopt;
578
0
    }
579
580
    // We successfully resolved, so return the instance ptr.
581
0
    return resolve_status.value();
582
0
  }
583
584
23.3k
  return absl::nullopt;
585
23.3k
}
586
587
Host::CreateConnectionData HostImplBase::createConnection(
588
    Event::Dispatcher& dispatcher, const ClusterInfo& cluster,
589
    const Network::Address::InstanceConstSharedPtr& address,
590
    const SharedConstAddressVector& address_list_or_null,
591
    Network::UpstreamTransportSocketFactory& socket_factory,
592
    const Network::ConnectionSocket::OptionsSharedPtr& options,
593
    Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
594
23.3k
    HostDescriptionConstSharedPtr host) {
595
23.3k
  auto source_address_selector = cluster.getUpstreamLocalAddressSelector();
596
597
23.3k
  absl::optional<Network::Address::InstanceConstSharedPtr> proxy_address =
598
23.3k
      maybeGetProxyRedirectAddress(transport_socket_options, host);
599
600
23.3k
  Network::ClientConnectionPtr connection;
601
  // If the transport socket options or endpoint/locality metadata indicate the connection should
602
  // be redirected to a proxy, create the TCP connection to the proxy's address not the host's
603
  // address.
604
23.3k
  if (proxy_address.has_value()) {
605
0
    auto upstream_local_address =
606
0
        source_address_selector->getUpstreamLocalAddress(address, options);
607
0
    ENVOY_LOG(debug, "Connecting to configured HTTP/1.1 proxy at {}",
608
0
              proxy_address.value()->asString());
609
0
    connection = dispatcher.createClientConnection(
610
0
        proxy_address.value(), upstream_local_address.address_,
611
0
        socket_factory.createTransportSocket(transport_socket_options, host),
612
0
        upstream_local_address.socket_options_, transport_socket_options);
613
23.3k
  } else if (address_list_or_null != nullptr && address_list_or_null->size() > 1) {
614
0
    absl::optional<envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig>
615
0
        happy_eyeballs_config;
616
0
    if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.use_config_in_happy_eyeballs")) {
617
0
      ENVOY_LOG(debug, "Upstream using happy eyeballs config.");
618
0
      if (cluster.happyEyeballsConfig().has_value()) {
619
0
        happy_eyeballs_config = *cluster.happyEyeballsConfig();
620
0
      } else {
621
0
        envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig default_config;
622
0
        default_config.set_first_address_family_version(
623
0
            envoy::config::cluster::v3::UpstreamConnectionOptions::DEFAULT);
624
0
        default_config.mutable_first_address_family_count()->set_value(1);
625
0
        happy_eyeballs_config = absl::make_optional(default_config);
626
0
      }
627
0
    }
628
0
    connection = std::make_unique<Network::HappyEyeballsConnectionImpl>(
629
0
        dispatcher, *address_list_or_null, source_address_selector, socket_factory,
630
0
        transport_socket_options, host, options, happy_eyeballs_config);
631
23.3k
  } else {
632
23.3k
    auto upstream_local_address =
633
23.3k
        source_address_selector->getUpstreamLocalAddress(address, options);
634
23.3k
    connection = dispatcher.createClientConnection(
635
23.3k
        address, upstream_local_address.address_,
636
23.3k
        socket_factory.createTransportSocket(transport_socket_options, host),
637
23.3k
        upstream_local_address.socket_options_, transport_socket_options);
638
23.3k
  }
639
640
23.3k
  connection->connectionInfoSetter().enableSettingInterfaceName(
641
23.3k
      cluster.setLocalInterfaceNameOnUpstreamConnections());
642
23.3k
  connection->setBufferLimits(cluster.perConnectionBufferLimitBytes());
643
23.3k
  if (auto upstream_info = connection->streamInfo().upstreamInfo(); upstream_info) {
644
23.3k
    upstream_info->setUpstreamHost(host);
645
23.3k
  }
646
23.3k
  cluster.createNetworkFilterChain(*connection);
647
23.3k
  return {std::move(connection), std::move(host)};
648
23.3k
}
649
650
10.3M
void HostImplBase::weight(uint32_t new_weight) { weight_ = std::max(1U, new_weight); }
651
652
std::vector<HostsPerLocalityConstSharedPtr> HostsPerLocalityImpl::filter(
653
2.40k
    const std::vector<std::function<bool(const Host&)>>& predicates) const {
654
  // We keep two lists: one for being able to mutate the clone and one for returning to the
655
  // caller. Creating them both at the start avoids iterating over the mutable values at the end
656
  // to convert them to a const pointer.
657
2.40k
  std::vector<std::shared_ptr<HostsPerLocalityImpl>> mutable_clones;
658
2.40k
  std::vector<HostsPerLocalityConstSharedPtr> filtered_clones;
659
660
2.40k
  mutable_clones.reserve(predicates.size());
661
2.40k
  filtered_clones.reserve(predicates.size());
662
9.62k
  for (size_t i = 0; i < predicates.size(); ++i) {
663
7.22k
    mutable_clones.emplace_back(std::make_shared<HostsPerLocalityImpl>());
664
7.22k
    filtered_clones.emplace_back(mutable_clones.back());
665
7.22k
    mutable_clones.back()->local_ = local_;
666
7.22k
  }
667
668
2.40k
  for (const auto& hosts_locality : hosts_per_locality_) {
669
2.40k
    std::vector<HostVector> current_locality_hosts;
670
2.40k
    current_locality_hosts.resize(predicates.size());
671
672
    // Since # of hosts >> # of predicates, we iterate over the hosts in the outer loop.
673
2.40k
    for (const auto& host : hosts_locality) {
674
9.62k
      for (size_t i = 0; i < predicates.size(); ++i) {
675
7.22k
        if (predicates[i](*host)) {
676
2.40k
          current_locality_hosts[i].emplace_back(host);
677
2.40k
        }
678
7.22k
      }
679
2.40k
    }
680
681
9.62k
    for (size_t i = 0; i < predicates.size(); ++i) {
682
7.22k
      mutable_clones[i]->hosts_per_locality_.push_back(std::move(current_locality_hosts[i]));
683
7.22k
    }
684
2.40k
  }
685
686
2.40k
  return filtered_clones;
687
2.40k
}
688
689
void HostSetImpl::updateHosts(PrioritySet::UpdateHostsParams&& update_hosts_params,
690
                              LocalityWeightsConstSharedPtr locality_weights,
691
                              const HostVector& hosts_added, const HostVector& hosts_removed,
692
                              uint64_t seed, absl::optional<bool> weighted_priority_health,
693
8.76k
                              absl::optional<uint32_t> overprovisioning_factor) {
694
8.76k
  if (weighted_priority_health.has_value()) {
695
7.22k
    weighted_priority_health_ = weighted_priority_health.value();
696
7.22k
  }
697
8.76k
  if (overprovisioning_factor.has_value()) {
698
7.22k
    ASSERT(overprovisioning_factor.value() > 0);
699
7.22k
    overprovisioning_factor_ = overprovisioning_factor.value();
700
7.22k
  }
701
8.76k
  hosts_ = std::move(update_hosts_params.hosts);
702
8.76k
  healthy_hosts_ = std::move(update_hosts_params.healthy_hosts);
703
8.76k
  degraded_hosts_ = std::move(update_hosts_params.degraded_hosts);
704
8.76k
  excluded_hosts_ = std::move(update_hosts_params.excluded_hosts);
705
8.76k
  hosts_per_locality_ = std::move(update_hosts_params.hosts_per_locality);
706
8.76k
  healthy_hosts_per_locality_ = std::move(update_hosts_params.healthy_hosts_per_locality);
707
8.76k
  degraded_hosts_per_locality_ = std::move(update_hosts_params.degraded_hosts_per_locality);
708
8.76k
  excluded_hosts_per_locality_ = std::move(update_hosts_params.excluded_hosts_per_locality);
709
8.76k
  locality_weights_ = std::move(locality_weights);
710
711
  // TODO(ggreenway): implement `weighted_priority_health` support in `rebuildLocalityScheduler`.
712
8.76k
  rebuildLocalityScheduler(healthy_locality_scheduler_, healthy_locality_entries_,
713
8.76k
                           *healthy_hosts_per_locality_, healthy_hosts_->get(), hosts_per_locality_,
714
8.76k
                           excluded_hosts_per_locality_, locality_weights_,
715
8.76k
                           overprovisioning_factor_, seed);
716
8.76k
  rebuildLocalityScheduler(degraded_locality_scheduler_, degraded_locality_entries_,
717
8.76k
                           *degraded_hosts_per_locality_, degraded_hosts_->get(),
718
8.76k
                           hosts_per_locality_, excluded_hosts_per_locality_, locality_weights_,
719
8.76k
                           overprovisioning_factor_, seed);
720
721
8.76k
  runUpdateCallbacks(hosts_added, hosts_removed);
722
8.76k
}
723
724
void HostSetImpl::rebuildLocalityScheduler(
725
    std::unique_ptr<EdfScheduler<LocalityEntry>>& locality_scheduler,
726
    std::vector<std::shared_ptr<LocalityEntry>>& locality_entries,
727
    const HostsPerLocality& eligible_hosts_per_locality, const HostVector& eligible_hosts,
728
    HostsPerLocalityConstSharedPtr all_hosts_per_locality,
729
    HostsPerLocalityConstSharedPtr excluded_hosts_per_locality,
730
    LocalityWeightsConstSharedPtr locality_weights, uint32_t overprovisioning_factor,
731
17.5k
    uint64_t seed) {
732
  // Rebuild the locality scheduler by computing the effective weight of each
733
  // locality in this priority. The scheduler is reset by default, and is rebuilt only if we have
734
  // locality weights (i.e. using EDS) and there is at least one eligible host in this priority.
735
  //
736
  // We omit building a scheduler when there are zero eligible hosts in the priority as
737
  // all the localities will have zero effective weight. At selection time, we'll either select
738
  // from a different scheduler or there will be no available hosts in the priority. At that point
739
  // we'll rely on other mechanisms such as panic mode to select a host, none of which rely on the
740
  // scheduler.
741
  //
742
  // TODO(htuch): if the underlying locality index ->
743
  // envoy::config::core::v3::Locality hasn't changed in hosts_/healthy_hosts_/degraded_hosts_, we
744
  // could just update locality_weight_ without rebuilding. Similar to how host
745
  // level WRR works, we would age out the existing entries via picks and lazily
746
  // apply the new weights.
747
17.5k
  locality_scheduler = nullptr;
748
17.5k
  if (all_hosts_per_locality != nullptr && locality_weights != nullptr &&
749
17.5k
      !locality_weights->empty() && !eligible_hosts.empty()) {
750
7.22k
    if (Runtime::runtimeFeatureEnabled(
751
7.22k
            "envoy.reloadable_features.edf_lb_locality_scheduler_init_fix")) {
752
7.22k
      locality_entries.clear();
753
14.4k
      for (uint32_t i = 0; i < all_hosts_per_locality->get().size(); ++i) {
754
7.22k
        const double effective_weight = effectiveLocalityWeight(
755
7.22k
            i, eligible_hosts_per_locality, *excluded_hosts_per_locality, *all_hosts_per_locality,
756
7.22k
            *locality_weights, overprovisioning_factor);
757
7.22k
        if (effective_weight > 0) {
758
0
          locality_entries.emplace_back(std::make_shared<LocalityEntry>(i, effective_weight));
759
0
        }
760
7.22k
      }
761
      // If not all effective weights were zero, create the scheduler.
762
7.22k
      if (!locality_entries.empty()) {
763
0
        locality_scheduler = std::make_unique<EdfScheduler<LocalityEntry>>(
764
0
            EdfScheduler<LocalityEntry>::createWithPicks(
765
0
                locality_entries,
766
0
                [](const LocalityEntry& entry) { return entry.effective_weight_; }, seed));
767
0
      }
768
7.22k
    } else {
769
0
      locality_scheduler = std::make_unique<EdfScheduler<LocalityEntry>>();
770
0
      locality_entries.clear();
771
0
      for (uint32_t i = 0; i < all_hosts_per_locality->get().size(); ++i) {
772
0
        const double effective_weight = effectiveLocalityWeight(
773
0
            i, eligible_hosts_per_locality, *excluded_hosts_per_locality, *all_hosts_per_locality,
774
0
            *locality_weights, overprovisioning_factor);
775
0
        if (effective_weight > 0) {
776
0
          locality_entries.emplace_back(std::make_shared<LocalityEntry>(i, effective_weight));
777
0
          locality_scheduler->add(effective_weight, locality_entries.back());
778
0
        }
779
0
      }
780
      // If all effective weights were zero, reset the scheduler.
781
0
      if (locality_scheduler->empty()) {
782
0
        locality_scheduler = nullptr;
783
0
      }
784
0
    }
785
7.22k
  }
786
17.5k
}
787
788
0
absl::optional<uint32_t> HostSetImpl::chooseHealthyLocality() {
789
0
  return chooseLocality(healthy_locality_scheduler_.get());
790
0
}
791
792
0
absl::optional<uint32_t> HostSetImpl::chooseDegradedLocality() {
793
0
  return chooseLocality(degraded_locality_scheduler_.get());
794
0
}
795
796
absl::optional<uint32_t>
797
0
HostSetImpl::chooseLocality(EdfScheduler<LocalityEntry>* locality_scheduler) {
798
0
  if (locality_scheduler == nullptr) {
799
0
    return {};
800
0
  }
801
0
  const std::shared_ptr<LocalityEntry> locality = locality_scheduler->pickAndAdd(
802
0
      [](const LocalityEntry& locality) { return locality.effective_weight_; });
803
  // We don't build a schedule if there are no weighted localities, so we should always succeed.
804
0
  ASSERT(locality != nullptr);
805
  // If we picked it before, its weight must have been positive.
806
0
  ASSERT(locality->effective_weight_ > 0);
807
0
  return locality->index_;
808
0
}
809
810
PrioritySet::UpdateHostsParams
811
HostSetImpl::updateHostsParams(HostVectorConstSharedPtr hosts,
812
                               HostsPerLocalityConstSharedPtr hosts_per_locality,
813
                               HealthyHostVectorConstSharedPtr healthy_hosts,
814
                               HostsPerLocalityConstSharedPtr healthy_hosts_per_locality,
815
                               DegradedHostVectorConstSharedPtr degraded_hosts,
816
                               HostsPerLocalityConstSharedPtr degraded_hosts_per_locality,
817
                               ExcludedHostVectorConstSharedPtr excluded_hosts,
818
6.35k
                               HostsPerLocalityConstSharedPtr excluded_hosts_per_locality) {
819
6.35k
  return PrioritySet::UpdateHostsParams{std::move(hosts),
820
6.35k
                                        std::move(healthy_hosts),
821
6.35k
                                        std::move(degraded_hosts),
822
6.35k
                                        std::move(excluded_hosts),
823
6.35k
                                        std::move(hosts_per_locality),
824
6.35k
                                        std::move(healthy_hosts_per_locality),
825
6.35k
                                        std::move(degraded_hosts_per_locality),
826
6.35k
                                        std::move(excluded_hosts_per_locality)};
827
6.35k
}
828
829
3.94k
PrioritySet::UpdateHostsParams HostSetImpl::updateHostsParams(const HostSet& host_set) {
830
3.94k
  return updateHostsParams(host_set.hostsPtr(), host_set.hostsPerLocalityPtr(),
831
3.94k
                           host_set.healthyHostsPtr(), host_set.healthyHostsPerLocalityPtr(),
832
3.94k
                           host_set.degradedHostsPtr(), host_set.degradedHostsPerLocalityPtr(),
833
3.94k
                           host_set.excludedHostsPtr(), host_set.excludedHostsPerLocalityPtr());
834
3.94k
}
835
PrioritySet::UpdateHostsParams
836
HostSetImpl::partitionHosts(HostVectorConstSharedPtr hosts,
837
2.40k
                            HostsPerLocalityConstSharedPtr hosts_per_locality) {
838
2.40k
  auto partitioned_hosts = ClusterImplBase::partitionHostList(*hosts);
839
2.40k
  auto healthy_degraded_excluded_hosts_per_locality =
840
2.40k
      ClusterImplBase::partitionHostsPerLocality(*hosts_per_locality);
841
842
2.40k
  return updateHostsParams(std::move(hosts), std::move(hosts_per_locality),
843
2.40k
                           std::move(std::get<0>(partitioned_hosts)),
844
2.40k
                           std::move(std::get<0>(healthy_degraded_excluded_hosts_per_locality)),
845
2.40k
                           std::move(std::get<1>(partitioned_hosts)),
846
2.40k
                           std::move(std::get<1>(healthy_degraded_excluded_hosts_per_locality)),
847
2.40k
                           std::move(std::get<2>(partitioned_hosts)),
848
2.40k
                           std::move(std::get<2>(healthy_degraded_excluded_hosts_per_locality)));
849
2.40k
}
850
851
double HostSetImpl::effectiveLocalityWeight(uint32_t index,
852
                                            const HostsPerLocality& eligible_hosts_per_locality,
853
                                            const HostsPerLocality& excluded_hosts_per_locality,
854
                                            const HostsPerLocality& all_hosts_per_locality,
855
                                            const LocalityWeights& locality_weights,
856
7.22k
                                            uint32_t overprovisioning_factor) {
857
7.22k
  const auto& locality_eligible_hosts = eligible_hosts_per_locality.get()[index];
858
7.22k
  const uint32_t excluded_count = excluded_hosts_per_locality.get().size() > index
859
7.22k
                                      ? excluded_hosts_per_locality.get()[index].size()
860
7.22k
                                      : 0;
861
7.22k
  const auto host_count = all_hosts_per_locality.get()[index].size() - excluded_count;
862
7.22k
  if (host_count == 0) {
863
0
    return 0.0;
864
0
  }
865
7.22k
  const double locality_availability_ratio = 1.0 * locality_eligible_hosts.size() / host_count;
866
7.22k
  const uint32_t weight = locality_weights[index];
867
  // Availability ranges from 0-1.0, and is the ratio of eligible hosts to total hosts, modified
868
  // by the overprovisioning factor.
869
7.22k
  const double effective_locality_availability_ratio =
870
7.22k
      std::min(1.0, (overprovisioning_factor / 100.0) * locality_availability_ratio);
871
7.22k
  return weight * effective_locality_availability_ratio;
872
7.22k
}
873
874
const HostSet&
875
PrioritySetImpl::getOrCreateHostSet(uint32_t priority,
876
                                    absl::optional<bool> weighted_priority_health,
877
16.0k
                                    absl::optional<uint32_t> overprovisioning_factor) {
878
16.0k
  if (host_sets_.size() < priority + 1) {
879
14.6k
    for (size_t i = host_sets_.size(); i <= priority; ++i) {
880
7.31k
      HostSetImplPtr host_set = createHostSet(i, weighted_priority_health, overprovisioning_factor);
881
7.31k
      host_sets_priority_update_cbs_.push_back(
882
7.31k
          host_set->addPriorityUpdateCb([this](uint32_t priority, const HostVector& hosts_added,
883
8.76k
                                               const HostVector& hosts_removed) {
884
8.76k
            runReferenceUpdateCallbacks(priority, hosts_added, hosts_removed);
885
8.76k
            return absl::OkStatus();
886
8.76k
          }));
887
7.31k
      host_sets_.push_back(std::move(host_set));
888
7.31k
    }
889
7.31k
  }
890
16.0k
  return *host_sets_[priority];
891
16.0k
}
892
893
void PrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params,
894
                                  LocalityWeightsConstSharedPtr locality_weights,
895
                                  const HostVector& hosts_added, const HostVector& hosts_removed,
896
                                  uint64_t seed, absl::optional<bool> weighted_priority_health,
897
                                  absl::optional<uint32_t> overprovisioning_factor,
898
8.76k
                                  HostMapConstSharedPtr cross_priority_host_map) {
899
  // Update cross priority host map first. In this way, when the update callbacks of the priority
900
  // set are executed, the latest host map can always be obtained.
901
8.76k
  if (cross_priority_host_map != nullptr) {
902
4.81k
    const_cross_priority_host_map_ = std::move(cross_priority_host_map);
903
4.81k
  }
904
905
  // Ensure that we have a HostSet for the given priority.
906
8.76k
  getOrCreateHostSet(priority, weighted_priority_health, overprovisioning_factor);
907
8.76k
  static_cast<HostSetImpl*>(host_sets_[priority].get())
908
8.76k
      ->updateHosts(std::move(update_hosts_params), std::move(locality_weights), hosts_added,
909
8.76k
                    hosts_removed, seed, weighted_priority_health, overprovisioning_factor);
910
911
8.76k
  if (!batch_update_) {
912
8.74k
    runUpdateCallbacks(hosts_added, hosts_removed);
913
8.74k
  }
914
8.76k
}
915
916
14
void PrioritySetImpl::batchHostUpdate(BatchUpdateCb& callback) {
917
14
  BatchUpdateScope scope(*this);
918
919
  // We wrap the update call with a lambda that tracks all the hosts that have been added/removed.
920
14
  callback.batchUpdate(scope);
921
922
  // Now that all the updates have been complete, we can compute the diff.
923
14
  HostVector net_hosts_added = filterHosts(scope.all_hosts_added_, scope.all_hosts_removed_);
924
14
  HostVector net_hosts_removed = filterHosts(scope.all_hosts_removed_, scope.all_hosts_added_);
925
926
14
  runUpdateCallbacks(net_hosts_added, net_hosts_removed);
927
14
}
928
929
void PrioritySetImpl::BatchUpdateScope::updateHosts(
930
    uint32_t priority, PrioritySet::UpdateHostsParams&& update_hosts_params,
931
    LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
932
    const HostVector& hosts_removed, uint64_t seed, absl::optional<bool> weighted_priority_health,
933
14
    absl::optional<uint32_t> overprovisioning_factor) {
934
  // We assume that each call updates a different priority.
935
14
  ASSERT(priorities_.find(priority) == priorities_.end());
936
14
  priorities_.insert(priority);
937
938
14
  for (const auto& host : hosts_added) {
939
14
    all_hosts_added_.insert(host);
940
14
  }
941
942
14
  for (const auto& host : hosts_removed) {
943
0
    all_hosts_removed_.insert(host);
944
0
  }
945
946
14
  parent_.updateHosts(priority, std::move(update_hosts_params), locality_weights, hosts_added,
947
14
                      hosts_removed, seed, weighted_priority_health, overprovisioning_factor);
948
14
}
949
950
void MainPrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params,
951
                                      LocalityWeightsConstSharedPtr locality_weights,
952
                                      const HostVector& hosts_added,
953
                                      const HostVector& hosts_removed, uint64_t seed,
954
                                      absl::optional<bool> weighted_priority_health,
955
                                      absl::optional<uint32_t> overprovisioning_factor,
956
2.40k
                                      HostMapConstSharedPtr cross_priority_host_map) {
957
2.40k
  ASSERT(cross_priority_host_map == nullptr,
958
2.40k
         "External cross-priority host map is meaningless to MainPrioritySetImpl");
959
2.40k
  updateCrossPriorityHostMap(priority, hosts_added, hosts_removed);
960
961
2.40k
  PrioritySetImpl::updateHosts(priority, std::move(update_hosts_params), locality_weights,
962
2.40k
                               hosts_added, hosts_removed, seed, weighted_priority_health,
963
2.40k
                               overprovisioning_factor);
964
2.40k
}
965
966
2.42k
HostMapConstSharedPtr MainPrioritySetImpl::crossPriorityHostMap() const {
967
  // Check if the host set in the main thread PrioritySet has been updated.
968
2.42k
  if (mutable_cross_priority_host_map_ != nullptr) {
969
2.40k
    const_cross_priority_host_map_ = std::move(mutable_cross_priority_host_map_);
970
2.40k
    ASSERT(mutable_cross_priority_host_map_ == nullptr);
971
2.40k
  }
972
2.42k
  return const_cross_priority_host_map_;
973
2.42k
}
974
975
void MainPrioritySetImpl::updateCrossPriorityHostMap(uint32_t priority,
976
                                                     const HostVector& hosts_added,
977
2.40k
                                                     const HostVector& hosts_removed) {
978
2.40k
  if (hosts_added.empty() && hosts_removed.empty()) {
979
    // No new hosts have been added and no old hosts have been removed.
980
0
    return;
981
0
  }
982
983
  // Since read_only_all_host_map_ may be shared by multiple threads, when the host set changes,
984
  // we cannot directly modify read_only_all_host_map_.
985
2.40k
  if (mutable_cross_priority_host_map_ == nullptr) {
986
    // Copy old read only host map to mutable host map.
987
2.40k
    mutable_cross_priority_host_map_ = std::make_shared<HostMap>(*const_cross_priority_host_map_);
988
2.40k
  }
989
990
2.40k
  for (const auto& host : hosts_removed) {
991
0
    const auto host_address = addressToString(host->address());
992
0
    const auto existing_host = mutable_cross_priority_host_map_->find(host_address);
993
0
    if (existing_host != mutable_cross_priority_host_map_->end()) {
994
      // Only delete from the current priority to protect from situations where
995
      // the add operation was already executed and has already moved the metadata of the host
996
      // from a higher priority value to a lower priority value.
997
0
      if (existing_host->second->priority() == priority) {
998
0
        mutable_cross_priority_host_map_->erase(host_address);
999
0
      }
1000
0
    }
1001
0
  }
1002
1003
2.40k
  for (const auto& host : hosts_added) {
1004
2.40k
    mutable_cross_priority_host_map_->insert({addressToString(host->address()), host});
1005
2.40k
  }
1006
2.40k
}
1007
1008
DeferredCreationCompatibleClusterTrafficStats
1009
ClusterInfoImpl::generateStats(Stats::ScopeSharedPtr scope,
1010
403k
                               const ClusterTrafficStatNames& stat_names, bool defer_creation) {
1011
403k
  return Stats::createDeferredCompatibleStats<ClusterTrafficStats>(scope, stat_names,
1012
403k
                                                                   defer_creation);
1013
403k
}
1014
1015
ClusterRequestResponseSizeStats ClusterInfoImpl::generateRequestResponseSizeStats(
1016
401k
    Stats::Scope& scope, const ClusterRequestResponseSizeStatNames& stat_names) {
1017
401k
  return {stat_names, scope};
1018
401k
}
1019
1020
ClusterLoadReportStats
1021
ClusterInfoImpl::generateLoadReportStats(Stats::Scope& scope,
1022
403k
                                         const ClusterLoadReportStatNames& stat_names) {
1023
403k
  return {stat_names, scope};
1024
403k
}
1025
1026
ClusterTimeoutBudgetStats
1027
ClusterInfoImpl::generateTimeoutBudgetStats(Stats::Scope& scope,
1028
401k
                                            const ClusterTimeoutBudgetStatNames& stat_names) {
1029
401k
  return {stat_names, scope};
1030
401k
}
1031
1032
absl::StatusOr<std::shared_ptr<const ClusterInfoImpl::HttpProtocolOptionsConfigImpl>>
1033
createOptions(const envoy::config::cluster::v3::Cluster& config,
1034
              std::shared_ptr<const ClusterInfoImpl::HttpProtocolOptionsConfigImpl>&& options,
1035
2.40k
              ProtobufMessage::ValidationVisitor& validation_visitor) {
1036
2.40k
  if (options) {
1037
1.54k
    return std::move(options);
1038
1.54k
  }
1039
1040
862
  if (config.protocol_selection() == envoy::config::cluster::v3::Cluster::USE_CONFIGURED_PROTOCOL) {
1041
    // Make sure multiple protocol configurations are not present
1042
862
    if (config.has_http_protocol_options() && config.has_http2_protocol_options()) {
1043
0
      return absl::InvalidArgumentError(
1044
0
          fmt::format("cluster: Both HTTP1 and HTTP2 options may only be "
1045
0
                      "configured with non-default 'protocol_selection' values"));
1046
0
    }
1047
862
  }
1048
1049
862
  auto options_or_error =
1050
862
      ClusterInfoImpl::HttpProtocolOptionsConfigImpl::createProtocolOptionsConfig(
1051
862
          config.http_protocol_options(), config.http2_protocol_options(),
1052
862
          config.common_http_protocol_options(),
1053
862
          (config.has_upstream_http_protocol_options()
1054
862
               ? absl::make_optional<envoy::config::core::v3::UpstreamHttpProtocolOptions>(
1055
0
                     config.upstream_http_protocol_options())
1056
862
               : absl::nullopt),
1057
862
          config.protocol_selection() ==
1058
862
              envoy::config::cluster::v3::Cluster::USE_DOWNSTREAM_PROTOCOL,
1059
862
          config.has_http2_protocol_options(), validation_visitor);
1060
862
  RETURN_IF_NOT_OK_REF(options_or_error.status());
1061
862
  return options_or_error.value();
1062
862
}
1063
1064
absl::StatusOr<LegacyLbPolicyConfigHelper::Result>
1065
LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProtoWithoutSubset(
1066
    LoadBalancerFactoryContext& lb_factory_context, const ClusterProto& cluster,
1067
2.40k
    ProtobufMessage::ValidationVisitor& visitor) {
1068
2.40k
  LoadBalancerConfigPtr lb_config;
1069
2.40k
  TypedLoadBalancerFactory* lb_factory = nullptr;
1070
1071
2.40k
  switch (cluster.lb_policy()) {
1072
0
    PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
1073
2.40k
  case ClusterProto::ROUND_ROBIN:
1074
2.40k
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
1075
2.40k
        "envoy.load_balancing_policies.round_robin");
1076
2.40k
    break;
1077
0
  case ClusterProto::LEAST_REQUEST:
1078
0
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
1079
0
        "envoy.load_balancing_policies.least_request");
1080
0
    break;
1081
0
  case ClusterProto::RANDOM:
1082
0
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
1083
0
        "envoy.load_balancing_policies.random");
1084
0
    break;
1085
0
  case ClusterProto::RING_HASH:
1086
0
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
1087
0
        "envoy.load_balancing_policies.ring_hash");
1088
0
    break;
1089
0
  case ClusterProto::MAGLEV:
1090
0
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
1091
0
        "envoy.load_balancing_policies.maglev");
1092
0
    break;
1093
0
  case ClusterProto::CLUSTER_PROVIDED:
1094
0
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
1095
0
        "envoy.load_balancing_policies.cluster_provided");
1096
0
    break;
1097
0
  case ClusterProto::LOAD_BALANCING_POLICY_CONFIG:
1098
    // 'LOAD_BALANCING_POLICY_CONFIG' should be handled by the 'configureLbPolicies'
1099
    // function and should not reach here.
1100
0
    PANIC("getTypedLbConfigFromLegacyProtoWithoutSubset: should not reach here");
1101
0
    break;
1102
2.40k
  }
1103
1104
2.40k
  if (lb_factory == nullptr) {
1105
0
    return absl::InvalidArgumentError(
1106
0
        fmt::format("No load balancer factory found for LB type: {}",
1107
0
                    ClusterProto::LbPolicy_Name(cluster.lb_policy())));
1108
0
  }
1109
1110
2.40k
  return Result{lb_factory, lb_factory->loadConfig(lb_factory_context, cluster, visitor)};
1111
2.40k
}
1112
1113
absl::StatusOr<LegacyLbPolicyConfigHelper::Result>
1114
LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProto(
1115
    LoadBalancerFactoryContext& lb_factory_context, const ClusterProto& cluster,
1116
2.40k
    ProtobufMessage::ValidationVisitor& visitor) {
1117
  // Handle the lb subset config case first.
1118
  // Note it is possible to have a lb_subset_config without actually having any subset selectors.
1119
  // In this case the subset load balancer should not be used.
1120
2.40k
  if (cluster.has_lb_subset_config() && !cluster.lb_subset_config().subset_selectors().empty()) {
1121
0
    auto* lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
1122
0
        "envoy.load_balancing_policies.subset");
1123
0
    if (lb_factory != nullptr) {
1124
0
      return Result{lb_factory, lb_factory->loadConfig(lb_factory_context, cluster, visitor)};
1125
0
    }
1126
0
    return absl::InvalidArgumentError("No subset load balancer factory found");
1127
0
  }
1128
1129
2.40k
  return getTypedLbConfigFromLegacyProtoWithoutSubset(lb_factory_context, cluster, visitor);
1130
2.40k
}
1131
1132
using ProtocolOptionsHashMap =
1133
    absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr>;
1134
1135
ClusterInfoImpl::ClusterInfoImpl(
1136
    Init::Manager& init_manager, Server::Configuration::ServerFactoryContext& server_context,
1137
    const envoy::config::cluster::v3::Cluster& config,
1138
    const absl::optional<envoy::config::core::v3::BindConfig>& bind_config,
1139
    Runtime::Loader& runtime, TransportSocketMatcherPtr&& socket_matcher,
1140
    Stats::ScopeSharedPtr&& stats_scope, bool added_via_api,
1141
    Server::Configuration::TransportSocketFactoryContext& factory_context)
1142
    : runtime_(runtime), name_(config.name()),
1143
      observability_name_(!config.alt_stat_name().empty()
1144
                              ? std::make_unique<std::string>(config.alt_stat_name())
1145
                              : nullptr),
1146
      eds_service_name_(
1147
          config.has_eds_cluster_config()
1148
              ? std::make_unique<std::string>(config.eds_cluster_config().service_name())
1149
              : nullptr),
1150
      extension_protocol_options_(THROW_OR_RETURN_VALUE(
1151
          parseExtensionProtocolOptions(config, factory_context), ProtocolOptionsHashMap)),
1152
      http_protocol_options_(THROW_OR_RETURN_VALUE(
1153
          createOptions(config,
1154
                        extensionProtocolOptionsTyped<HttpProtocolOptionsConfigImpl>(
1155
                            "envoy.extensions.upstreams.http.v3.HttpProtocolOptions"),
1156
                        factory_context.messageValidationVisitor()),
1157
          std::shared_ptr<const ClusterInfoImpl::HttpProtocolOptionsConfigImpl>)),
1158
      tcp_protocol_options_(extensionProtocolOptionsTyped<TcpProtocolOptionsConfigImpl>(
1159
          "envoy.extensions.upstreams.tcp.v3.TcpProtocolOptions")),
1160
      max_requests_per_connection_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
1161
          http_protocol_options_->common_http_protocol_options_, max_requests_per_connection,
1162
          config.max_requests_per_connection().value())),
1163
      connect_timeout_(
1164
          std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, connect_timeout, 5000))),
1165
      per_upstream_preconnect_ratio_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
1166
          config.preconnect_policy(), per_upstream_preconnect_ratio, 1.0)),
1167
      peekahead_ratio_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.preconnect_policy(),
1168
                                                       predictive_preconnect_ratio, 0)),
1169
      socket_matcher_(std::move(socket_matcher)), stats_scope_(std::move(stats_scope)),
1170
      traffic_stats_(generateStats(stats_scope_,
1171
                                   factory_context.clusterManager().clusterStatNames(),
1172
                                   server_context.statsConfig().enableDeferredCreationStats())),
1173
      config_update_stats_(factory_context.clusterManager().clusterConfigUpdateStatNames(),
1174
                           *stats_scope_),
1175
      lb_stats_(factory_context.clusterManager().clusterLbStatNames(), *stats_scope_),
1176
      endpoint_stats_(factory_context.clusterManager().clusterEndpointStatNames(), *stats_scope_),
1177
      load_report_stats_store_(stats_scope_->symbolTable()),
1178
      load_report_stats_(
1179
          generateLoadReportStats(*load_report_stats_store_.rootScope(),
1180
                                  factory_context.clusterManager().clusterLoadReportStatNames())),
1181
      optional_cluster_stats_((config.has_track_cluster_stats() || config.track_timeout_budgets())
1182
                                  ? std::make_unique<OptionalClusterStats>(
1183
                                        config, *stats_scope_, factory_context.clusterManager())
1184
                                  : nullptr),
1185
      features_(ClusterInfoImpl::HttpProtocolOptionsConfigImpl::parseFeatures(
1186
          config, *http_protocol_options_)),
1187
      resource_managers_(config, runtime, name_, *stats_scope_,
1188
                         factory_context.clusterManager().clusterCircuitBreakersStatNames()),
1189
      maintenance_mode_runtime_key_(absl::StrCat("upstream.maintenance_mode.", name_)),
1190
      upstream_local_address_selector_(
1191
          THROW_OR_RETURN_VALUE(createUpstreamLocalAddressSelector(config, bind_config),
1192
                                Envoy::Upstream::UpstreamLocalAddressSelectorConstSharedPtr)),
1193
      upstream_config_(config.has_upstream_config()
1194
                           ? std::make_unique<envoy::config::core::v3::TypedExtensionConfig>(
1195
                                 config.upstream_config())
1196
                           : nullptr),
1197
      metadata_(config.has_metadata()
1198
                    ? std::make_unique<envoy::config::core::v3::Metadata>(config.metadata())
1199
                    : nullptr),
1200
      typed_metadata_(config.has_metadata()
1201
                          ? std::make_unique<ClusterTypedMetadata>(config.metadata())
1202
                          : nullptr),
1203
      common_lb_config_(
1204
          factory_context.clusterManager().getCommonLbConfigPtr(config.common_lb_config())),
1205
      cluster_type_(config.has_cluster_type()
1206
                        ? std::make_unique<envoy::config::cluster::v3::Cluster::CustomClusterType>(
1207
                              config.cluster_type())
1208
                        : nullptr),
1209
      http_filter_config_provider_manager_(
1210
          Http::FilterChainUtility::createSingletonUpstreamFilterConfigProviderManager(
1211
              server_context)),
1212
      network_filter_config_provider_manager_(
1213
          createSingletonUpstreamNetworkFilterConfigProviderManager(server_context)),
1214
      upstream_context_(server_context, init_manager, *stats_scope_),
1215
      happy_eyeballs_config_(
1216
          config.upstream_connection_options().has_happy_eyeballs_config()
1217
              ? std::make_unique<
1218
                    envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig>(
1219
                    config.upstream_connection_options().happy_eyeballs_config())
1220
              : nullptr),
1221
      lrs_report_metric_names_(!config.lrs_report_endpoint_metrics().empty()
1222
                                   ? std::make_unique<Envoy::Orca::LrsReportMetricNames>(
1223
                                         config.lrs_report_endpoint_metrics().begin(),
1224
                                         config.lrs_report_endpoint_metrics().end())
1225
                                   : nullptr),
1226
      per_connection_buffer_limit_bytes_(
1227
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, per_connection_buffer_limit_bytes, 1024 * 1024)),
1228
      max_response_headers_count_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
1229
          http_protocol_options_->common_http_protocol_options_, max_headers_count,
1230
          runtime_.snapshot().getInteger(Http::MaxResponseHeadersCountOverrideKey,
1231
                                         Http::DEFAULT_MAX_HEADERS_COUNT))),
1232
      type_(config.type()),
1233
      drain_connections_on_host_removal_(config.ignore_health_on_host_removal()),
1234
      connection_pool_per_downstream_connection_(
1235
          config.connection_pool_per_downstream_connection()),
1236
      warm_hosts_(!config.health_checks().empty() &&
1237
                  common_lb_config_->ignore_new_hosts_until_first_hc()),
1238
      set_local_interface_name_on_upstream_connections_(
1239
          config.upstream_connection_options().set_local_interface_name_on_upstream_connections()),
1240
      added_via_api_(added_via_api), has_configured_http_filters_(false),
1241
      per_endpoint_stats_(config.has_track_cluster_stats() &&
1242
2.40k
                          config.track_cluster_stats().per_endpoint_stats()) {
1243
#ifdef WIN32
1244
  if (set_local_interface_name_on_upstream_connections_) {
1245
    throwEnvoyExceptionOrPanic(
1246
        "set_local_interface_name_on_upstream_connections_ cannot be set to true "
1247
        "on Windows platforms");
1248
  }
1249
#endif
1250
1251
  // Both LoadStatsReporter and per_endpoint_stats need to `latch()` the counters, so if both are
1252
  // configured they will interfere with each other and both get incorrect values.
1253
  // TODO(ggreenway): Verify that bypassing virtual dispatch here was intentional
1254
2.40k
  if (ClusterInfoImpl::perEndpointStatsEnabled() &&
1255
2.40k
      server_context.bootstrap().cluster_manager().has_load_stats_config()) {
1256
0
    throwEnvoyExceptionOrPanic("Only one of cluster per_endpoint_stats and cluster manager "
1257
0
                               "load_stats_config can be specified");
1258
0
  }
1259
1260
2.40k
  if (config.has_max_requests_per_connection() &&
1261
2.40k
      http_protocol_options_->common_http_protocol_options_.has_max_requests_per_connection()) {
1262
0
    throwEnvoyExceptionOrPanic("Only one of max_requests_per_connection from Cluster or "
1263
0
                               "HttpProtocolOptions can be specified");
1264
0
  }
1265
1266
2.40k
  if (config.has_load_balancing_policy() ||
1267
2.40k
      config.lb_policy() == envoy::config::cluster::v3::Cluster::LOAD_BALANCING_POLICY_CONFIG) {
1268
    // If load_balancing_policy is set we will use it directly, ignoring lb_policy.
1269
1270
0
    THROW_IF_NOT_OK(configureLbPolicies(config, server_context));
1271
2.40k
  } else {
1272
    // If load_balancing_policy is not set, we will try to convert legacy lb_policy
1273
    // to load_balancing_policy and use it.
1274
2.40k
    LoadBalancerFactoryContextImpl lb_factory_context(server_context);
1275
1276
2.40k
    auto lb_pair = LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProto(
1277
2.40k
        lb_factory_context, config, server_context.messageValidationVisitor());
1278
1279
2.40k
    if (!lb_pair.ok()) {
1280
0
      throwEnvoyExceptionOrPanic(std::string(lb_pair.status().message()));
1281
0
    }
1282
2.40k
    load_balancer_factory_ = lb_pair->factory;
1283
2.40k
    ASSERT(load_balancer_factory_ != nullptr, "null load balancer factory");
1284
2.40k
    load_balancer_config_ = std::move(lb_pair->config);
1285
2.40k
  }
1286
1287
2.40k
  if (config.lb_subset_config().locality_weight_aware() &&
1288
2.40k
      !config.common_lb_config().has_locality_weighted_lb_config()) {
1289
0
    throwEnvoyExceptionOrPanic(fmt::format("Locality weight aware subset LB requires that a "
1290
0
                                           "locality_weighted_lb_config be set in {}",
1291
0
                                           name_));
1292
0
  }
1293
1294
  // Use default (1h) or configured `idle_timeout`, unless it's set to 0, indicating that no
1295
  // timeout should be used.
1296
2.40k
  absl::optional<std::chrono::milliseconds> idle_timeout(std::chrono::hours(1));
1297
2.40k
  if (http_protocol_options_->common_http_protocol_options_.has_idle_timeout()) {
1298
0
    idle_timeout = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
1299
0
        http_protocol_options_->common_http_protocol_options_.idle_timeout()));
1300
0
    if (idle_timeout.value().count() == 0) {
1301
0
      idle_timeout = absl::nullopt;
1302
0
    }
1303
0
  }
1304
2.40k
  if (idle_timeout.has_value()) {
1305
2.40k
    optional_timeouts_.set<OptionalTimeoutNames::IdleTimeout>(*idle_timeout);
1306
2.40k
  }
1307
1308
  // Use default (10m) or configured `tcp_pool_idle_timeout`, unless it's set to 0, indicating
1309
  // that no timeout should be used.
1310
2.40k
  absl::optional<std::chrono::milliseconds> tcp_pool_idle_timeout(std::chrono::minutes(10));
1311
2.40k
  if (tcp_protocol_options_ && tcp_protocol_options_->idleTimeout().has_value()) {
1312
0
    tcp_pool_idle_timeout = tcp_protocol_options_->idleTimeout();
1313
0
    if (tcp_pool_idle_timeout.value().count() == 0) {
1314
0
      tcp_pool_idle_timeout = absl::nullopt;
1315
0
    }
1316
0
  }
1317
2.40k
  if (tcp_pool_idle_timeout.has_value()) {
1318
2.40k
    optional_timeouts_.set<OptionalTimeoutNames::TcpPoolIdleTimeout>(*tcp_pool_idle_timeout);
1319
2.40k
  }
1320
1321
  // Use configured `max_connection_duration`, unless it's set to 0, indicating that
1322
  // no timeout should be used. No timeout by default either.
1323
2.40k
  absl::optional<std::chrono::milliseconds> max_connection_duration;
1324
2.40k
  if (http_protocol_options_->common_http_protocol_options_.has_max_connection_duration()) {
1325
0
    max_connection_duration = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
1326
0
        http_protocol_options_->common_http_protocol_options_.max_connection_duration()));
1327
0
    if (max_connection_duration.value().count() == 0) {
1328
0
      max_connection_duration = absl::nullopt;
1329
0
    }
1330
0
  }
1331
2.40k
  if (max_connection_duration.has_value()) {
1332
0
    optional_timeouts_.set<OptionalTimeoutNames::MaxConnectionDuration>(*max_connection_duration);
1333
0
  }
1334
1335
2.40k
  if (config.has_eds_cluster_config()) {
1336
14
    if (config.type() != envoy::config::cluster::v3::Cluster::EDS) {
1337
0
      throwEnvoyExceptionOrPanic("eds_cluster_config set in a non-EDS cluster");
1338
0
    }
1339
14
  }
1340
1341
  // TODO(htuch): Remove this temporary workaround when we have
1342
  // https://github.com/bufbuild/protoc-gen-validate/issues/97 resolved. This just provides
1343
  // early validation of sanity of fields that we should catch at config ingestion.
1344
2.40k
  DurationUtil::durationToMilliseconds(common_lb_config_->update_merge_window());
1345
1346
  // Create upstream network filter factories
1347
2.40k
  const auto& filters = config.filters();
1348
2.40k
  ASSERT(filter_factories_.empty());
1349
2.40k
  filter_factories_.reserve(filters.size());
1350
2.40k
  for (ssize_t i = 0; i < filters.size(); i++) {
1351
0
    const auto& proto_config = filters[i];
1352
0
    const bool is_terminal = i == filters.size() - 1;
1353
0
    ENVOY_LOG(debug, "  upstream network filter #{}:", i);
1354
1355
0
    if (proto_config.has_config_discovery()) {
1356
0
      if (proto_config.has_typed_config()) {
1357
0
        throwEnvoyExceptionOrPanic("Only one of typed_config or config_discovery can be used");
1358
0
      }
1359
1360
0
      ENVOY_LOG(debug, "      dynamic filter name: {}", proto_config.name());
1361
0
      filter_factories_.push_back(
1362
0
          network_filter_config_provider_manager_->createDynamicFilterConfigProvider(
1363
0
              proto_config.config_discovery(), proto_config.name(), server_context,
1364
0
              upstream_context_, factory_context.clusterManager(), is_terminal, "network",
1365
0
              nullptr));
1366
0
      continue;
1367
0
    }
1368
1369
0
    ENVOY_LOG(debug, "    name: {}", proto_config.name());
1370
0
    auto& factory = Config::Utility::getAndCheckFactory<
1371
0
        Server::Configuration::NamedUpstreamNetworkFilterConfigFactory>(proto_config);
1372
0
    auto message = factory.createEmptyConfigProto();
1373
0
    Config::Utility::translateOpaqueConfig(proto_config.typed_config(),
1374
0
                                           factory_context.messageValidationVisitor(), *message);
1375
0
    Network::FilterFactoryCb callback =
1376
0
        factory.createFilterFactoryFromProto(*message, upstream_context_);
1377
0
    filter_factories_.push_back(
1378
0
        network_filter_config_provider_manager_->createStaticFilterConfigProvider(
1379
0
            callback, proto_config.name()));
1380
0
  }
1381
1382
2.40k
  if (http_protocol_options_) {
1383
2.40k
    Http::FilterChainUtility::FiltersList http_filters = http_protocol_options_->http_filters_;
1384
2.40k
    has_configured_http_filters_ = !http_filters.empty();
1385
2.40k
    static const std::string upstream_codec_type_url =
1386
2.40k
        envoy::extensions::filters::http::upstream_codec::v3::UpstreamCodec::default_instance()
1387
2.40k
            .GetTypeName();
1388
2.40k
    if (http_filters.empty()) {
1389
2.40k
      auto* codec_filter = http_filters.Add();
1390
2.40k
      codec_filter->set_name("envoy.filters.http.upstream_codec");
1391
2.40k
      codec_filter->mutable_typed_config()->set_type_url(
1392
2.40k
          absl::StrCat("type.googleapis.com/", upstream_codec_type_url));
1393
2.40k
    } else {
1394
0
      const auto last_type_url =
1395
0
          Config::Utility::getFactoryType(http_filters[http_filters.size() - 1].typed_config());
1396
0
      if (last_type_url != upstream_codec_type_url) {
1397
0
        throwEnvoyExceptionOrPanic(fmt::format(
1398
0
            "The codec filter is the only valid terminal upstream HTTP filter, use '{}'",
1399
0
            upstream_codec_type_url));
1400
0
      }
1401
0
    }
1402
1403
2.40k
    std::string prefix = stats_scope_->symbolTable().toString(stats_scope_->prefix());
1404
2.40k
    Http::FilterChainHelper<Server::Configuration::UpstreamFactoryContext,
1405
2.40k
                            Server::Configuration::UpstreamHttpFilterConfigFactory>
1406
2.40k
        helper(*http_filter_config_provider_manager_, upstream_context_.serverFactoryContext(),
1407
2.40k
               factory_context.clusterManager(), upstream_context_, prefix);
1408
2.40k
    THROW_IF_NOT_OK(helper.processFilters(http_filters, "upstream http", "upstream http",
1409
2.40k
                                          http_filter_factories_));
1410
2.40k
  }
1411
2.40k
}
1412
1413
// Configures the load balancer based on config.load_balancing_policy
1414
absl::Status
1415
ClusterInfoImpl::configureLbPolicies(const envoy::config::cluster::v3::Cluster& config,
1416
0
                                     Server::Configuration::ServerFactoryContext& context) {
1417
  // Check if load_balancing_policy is set first.
1418
0
  if (!config.has_load_balancing_policy()) {
1419
0
    return absl::InvalidArgumentError("cluster: field load_balancing_policy need to be set");
1420
0
  }
1421
1422
0
  if (config.has_lb_subset_config()) {
1423
0
    return absl::InvalidArgumentError(
1424
0
        "cluster: load_balancing_policy cannot be combined with lb_subset_config");
1425
0
  }
1426
1427
0
  if (config.has_common_lb_config()) {
1428
0
    const auto& lb_config = config.common_lb_config();
1429
0
    if (lb_config.has_zone_aware_lb_config() || lb_config.has_locality_weighted_lb_config() ||
1430
0
        lb_config.has_consistent_hashing_lb_config()) {
1431
0
      return absl::InvalidArgumentError(
1432
0
          "cluster: load_balancing_policy cannot be combined with partial fields "
1433
0
          "(zone_aware_lb_config, "
1434
0
          "locality_weighted_lb_config, consistent_hashing_lb_config) of common_lb_config");
1435
0
    }
1436
0
  }
1437
1438
0
  absl::InlinedVector<absl::string_view, 4> missing_policies;
1439
0
  for (const auto& policy : config.load_balancing_policy().policies()) {
1440
0
    TypedLoadBalancerFactory* factory =
1441
0
        Config::Utility::getAndCheckFactory<TypedLoadBalancerFactory>(
1442
0
            policy.typed_extension_config(), /*is_optional=*/true);
1443
0
    if (factory != nullptr) {
1444
      // Load and validate the configuration.
1445
0
      LoadBalancerFactoryContextImpl lb_factory_context(context);
1446
0
      auto proto_message = factory->createEmptyConfigProto();
1447
0
      Config::Utility::translateOpaqueConfig(policy.typed_extension_config().typed_config(),
1448
0
                                             context.messageValidationVisitor(), *proto_message);
1449
1450
0
      load_balancer_factory_ = factory;
1451
0
      load_balancer_config_ = factory->loadConfig(lb_factory_context, *proto_message,
1452
0
                                                  context.messageValidationVisitor());
1453
1454
0
      break;
1455
0
    }
1456
0
    missing_policies.push_back(policy.typed_extension_config().name());
1457
0
  }
1458
1459
0
  if (load_balancer_factory_ == nullptr) {
1460
0
    return absl::InvalidArgumentError(
1461
0
        fmt::format("cluster: didn't find a registered load balancer factory "
1462
0
                    "implementation for cluster: '{}' with names from [{}]",
1463
0
                    name_, absl::StrJoin(missing_policies, ", ")));
1464
0
  }
1465
0
  return absl::OkStatus();
1466
0
}
1467
1468
ProtocolOptionsConfigConstSharedPtr
1469
4.81k
ClusterInfoImpl::extensionProtocolOptions(const std::string& name) const {
1470
4.81k
  auto i = extension_protocol_options_.find(name);
1471
4.81k
  if (i != extension_protocol_options_.end()) {
1472
1.54k
    return i->second;
1473
1.54k
  }
1474
3.26k
  return nullptr;
1475
4.81k
}
1476
1477
absl::StatusOr<Network::UpstreamTransportSocketFactoryPtr> createTransportSocketFactory(
1478
    const envoy::config::cluster::v3::Cluster& config,
1479
2.40k
    Server::Configuration::TransportSocketFactoryContext& factory_context) {
1480
  // If the cluster config doesn't have a transport socket configured, override with the default
1481
  // transport socket implementation based on the tls_context. We copy by value first then
1482
  // override if necessary.
1483
2.40k
  auto transport_socket = config.transport_socket();
1484
2.40k
  if (!config.has_transport_socket()) {
1485
2.40k
    envoy::extensions::transport_sockets::raw_buffer::v3::RawBuffer raw_buffer;
1486
2.40k
    transport_socket.mutable_typed_config()->PackFrom(raw_buffer);
1487
2.40k
    transport_socket.set_name("envoy.transport_sockets.raw_buffer");
1488
2.40k
  }
1489
1490
2.40k
  auto& config_factory = Config::Utility::getAndCheckFactory<
1491
2.40k
      Server::Configuration::UpstreamTransportSocketConfigFactory>(transport_socket);
1492
2.40k
  ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig(
1493
2.40k
      transport_socket, factory_context.messageValidationVisitor(), config_factory);
1494
2.40k
  return config_factory.createTransportSocketFactory(*message, factory_context);
1495
2.40k
}
1496
1497
911
void ClusterInfoImpl::createNetworkFilterChain(Network::Connection& connection) const {
1498
911
  for (const auto& filter_config_provider : filter_factories_) {
1499
0
    auto config = filter_config_provider->config();
1500
0
    if (config.has_value()) {
1501
0
      Network::FilterFactoryCb& factory = config.value();
1502
0
      factory(connection);
1503
0
    }
1504
0
  }
1505
911
}
1506
1507
std::vector<Http::Protocol>
1508
1.81k
ClusterInfoImpl::upstreamHttpProtocol(absl::optional<Http::Protocol> downstream_protocol) const {
1509
1.81k
  if (downstream_protocol.has_value() &&
1510
1.81k
      features_ & Upstream::ClusterInfo::Features::USE_DOWNSTREAM_PROTOCOL) {
1511
0
    if (downstream_protocol.value() == Http::Protocol::Http3 &&
1512
0
        !(features_ & Upstream::ClusterInfo::Features::HTTP3)) {
1513
0
      return {Http::Protocol::Http2};
1514
0
    }
1515
    // use HTTP11 since HTTP10 upstream is not supported yet.
1516
0
    if (downstream_protocol.value() == Http::Protocol::Http10) {
1517
0
      return {Http::Protocol::Http11};
1518
0
    }
1519
0
    return {downstream_protocol.value()};
1520
0
  }
1521
1522
1.81k
  if (features_ & Upstream::ClusterInfo::Features::USE_ALPN) {
1523
0
    if (!(features_ & Upstream::ClusterInfo::Features::HTTP3)) {
1524
0
      return {Http::Protocol::Http2, Http::Protocol::Http11};
1525
0
    }
1526
0
    return {Http::Protocol::Http3, Http::Protocol::Http2, Http::Protocol::Http11};
1527
0
  }
1528
1529
1.81k
  if (features_ & Upstream::ClusterInfo::Features::HTTP3) {
1530
0
    return {Http::Protocol::Http3};
1531
0
  }
1532
1533
1.81k
  return {(features_ & Upstream::ClusterInfo::Features::HTTP2) ? Http::Protocol::Http2
1534
1.81k
                                                               : Http::Protocol::Http11};
1535
1.81k
}
1536
1537
absl::StatusOr<bool> validateTransportSocketSupportsQuic(
1538
0
    const envoy::config::core::v3::TransportSocket& transport_socket) {
1539
  // The transport socket is valid for QUIC if it is either a QUIC transport socket,
1540
  // or if it is a QUIC transport socket wrapped in an HTTP/1.1 proxy socket.
1541
0
  if (transport_socket.name() == "envoy.transport_sockets.quic") {
1542
0
    return true;
1543
0
  }
1544
0
  if (transport_socket.name() != "envoy.transport_sockets.http_11_proxy") {
1545
0
    return false;
1546
0
  }
1547
0
  envoy::extensions::transport_sockets::http_11_proxy::v3::Http11ProxyUpstreamTransport
1548
0
      http11_socket;
1549
0
  RETURN_IF_NOT_OK(MessageUtil::unpackTo(transport_socket.typed_config(), http11_socket));
1550
0
  return http11_socket.transport_socket().name() == "envoy.transport_sockets.quic";
1551
0
}
1552
1553
ClusterImplBase::ClusterImplBase(const envoy::config::cluster::v3::Cluster& cluster,
1554
                                 ClusterFactoryContext& cluster_context,
1555
                                 absl::Status& creation_status)
1556
    : init_manager_(fmt::format("Cluster {}", cluster.name())),
1557
2.40k
      init_watcher_("ClusterImplBase", [this]() { onInitDone(); }),
1558
      runtime_(cluster_context.serverFactoryContext().runtime()),
1559
      wait_for_warm_on_init_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(cluster, wait_for_warm_on_init, true)),
1560
      random_(cluster_context.serverFactoryContext().api().randomGenerator()),
1561
      time_source_(cluster_context.serverFactoryContext().timeSource()),
1562
      local_cluster_(cluster_context.clusterManager().localClusterName().value_or("") ==
1563
                     cluster.name()),
1564
      const_metadata_shared_pool_(Config::Metadata::getConstMetadataSharedPool(
1565
          cluster_context.serverFactoryContext().singletonManager(),
1566
2.40k
          cluster_context.serverFactoryContext().mainThreadDispatcher())) {
1567
2.40k
  auto& server_context = cluster_context.serverFactoryContext();
1568
1569
2.40k
  auto stats_scope = generateStatsScope(cluster, server_context.serverScope().store());
1570
2.40k
  transport_factory_context_ =
1571
2.40k
      std::make_unique<Server::Configuration::TransportSocketFactoryContextImpl>(
1572
2.40k
          server_context, cluster_context.sslContextManager(), *stats_scope,
1573
2.40k
          cluster_context.clusterManager(), cluster_context.messageValidationVisitor());
1574
2.40k
  transport_factory_context_->setInitManager(init_manager_);
1575
1576
2.40k
  auto socket_factory_or_error = createTransportSocketFactory(cluster, *transport_factory_context_);
1577
2.40k
  SET_AND_RETURN_IF_NOT_OK(socket_factory_or_error.status(), creation_status);
1578
2.40k
  auto* raw_factory_pointer = socket_factory_or_error.value().get();
1579
1580
2.40k
  auto socket_matcher_or_error = TransportSocketMatcherImpl::create(
1581
2.40k
      cluster.transport_socket_matches(), *transport_factory_context_,
1582
2.40k
      socket_factory_or_error.value(), *stats_scope);
1583
2.40k
  SET_AND_RETURN_IF_NOT_OK(socket_matcher_or_error.status(), creation_status);
1584
2.40k
  auto socket_matcher = std::move(*socket_matcher_or_error);
1585
2.40k
  const bool matcher_supports_alpn = socket_matcher->allMatchesSupportAlpn();
1586
2.40k
  auto& dispatcher = server_context.mainThreadDispatcher();
1587
2.40k
  info_ = std::shared_ptr<const ClusterInfoImpl>(
1588
2.40k
      new ClusterInfoImpl(init_manager_, server_context, cluster,
1589
2.40k
                          cluster_context.clusterManager().bindConfig(), runtime_,
1590
2.40k
                          std::move(socket_matcher), std::move(stats_scope),
1591
2.40k
                          cluster_context.addedViaApi(), *transport_factory_context_),
1592
2.40k
      [&dispatcher](const ClusterInfoImpl* self) {
1593
2.40k
        ENVOY_LOG(trace, "Schedule destroy cluster info {}", self->name());
1594
2.40k
        dispatcher.deleteInDispatcherThread(
1595
2.40k
            std::unique_ptr<const Event::DispatcherThreadDeletable>(self));
1596
2.40k
      });
1597
1598
2.40k
  if ((info_->features() & ClusterInfoImpl::Features::USE_ALPN)) {
1599
0
    if (!raw_factory_pointer->supportsAlpn()) {
1600
0
      creation_status = absl::InvalidArgumentError(
1601
0
          fmt::format("ALPN configured for cluster {} which has a non-ALPN transport socket: {}",
1602
0
                      cluster.name(), cluster.DebugString()));
1603
0
      return;
1604
0
    }
1605
0
    if (!matcher_supports_alpn &&
1606
0
        !runtime_.snapshot().featureEnabled(ClusterImplBase::DoNotValidateAlpnRuntimeKey, 0)) {
1607
0
      creation_status = absl::InvalidArgumentError(fmt::format(
1608
0
          "ALPN configured for cluster {} which has a non-ALPN transport socket matcher: {}",
1609
0
          cluster.name(), cluster.DebugString()));
1610
0
      return;
1611
0
    }
1612
0
  }
1613
1614
2.40k
  if (info_->features() & ClusterInfoImpl::Features::HTTP3) {
1615
0
#if defined(ENVOY_ENABLE_QUIC)
1616
0
    absl::StatusOr<bool> supports_quic =
1617
0
        validateTransportSocketSupportsQuic(cluster.transport_socket());
1618
0
    SET_AND_RETURN_IF_NOT_OK(supports_quic.status(), creation_status);
1619
0
    if (!*supports_quic) {
1620
0
      creation_status = absl::InvalidArgumentError(
1621
0
          fmt::format("HTTP3 requires a QuicUpstreamTransport transport socket: {} {}",
1622
0
                      cluster.name(), cluster.transport_socket().DebugString()));
1623
0
      return;
1624
0
    }
1625
#else
1626
    creation_status = absl::InvalidArgumentError("HTTP3 configured but not enabled in the build.");
1627
    return;
1628
#endif
1629
0
  }
1630
1631
  // Create the default (empty) priority set before registering callbacks to
1632
  // avoid getting an update the first time it is accessed.
1633
2.40k
  priority_set_.getOrCreateHostSet(0);
1634
2.40k
  priority_update_cb_ = priority_set_.addPriorityUpdateCb(
1635
2.40k
      [this](uint32_t, const HostVector& hosts_added, const HostVector& hosts_removed) {
1636
2.40k
        if (!hosts_added.empty() || !hosts_removed.empty()) {
1637
2.40k
          info_->endpointStats().membership_change_.inc();
1638
2.40k
        }
1639
1640
2.40k
        uint32_t healthy_hosts = 0;
1641
2.40k
        uint32_t degraded_hosts = 0;
1642
2.40k
        uint32_t excluded_hosts = 0;
1643
2.40k
        uint32_t hosts = 0;
1644
2.40k
        for (const auto& host_set : prioritySet().hostSetsPerPriority()) {
1645
2.40k
          hosts += host_set->hosts().size();
1646
2.40k
          healthy_hosts += host_set->healthyHosts().size();
1647
2.40k
          degraded_hosts += host_set->degradedHosts().size();
1648
2.40k
          excluded_hosts += host_set->excludedHosts().size();
1649
2.40k
        }
1650
2.40k
        info_->endpointStats().membership_total_.set(hosts);
1651
2.40k
        info_->endpointStats().membership_healthy_.set(healthy_hosts);
1652
2.40k
        info_->endpointStats().membership_degraded_.set(degraded_hosts);
1653
2.40k
        info_->endpointStats().membership_excluded_.set(excluded_hosts);
1654
2.40k
        return absl::OkStatus();
1655
2.40k
      });
1656
  // Drop overload configuration parsing.
1657
2.40k
  SET_AND_RETURN_IF_NOT_OK(parseDropOverloadConfig(cluster.load_assignment()), creation_status);
1658
2.40k
}
1659
1660
namespace {
1661
4.81k
bool excludeBasedOnHealthFlag(const Host& host) {
1662
4.81k
  return host.healthFlagGet(Host::HealthFlag::PENDING_ACTIVE_HC) ||
1663
4.81k
         host.healthFlagGet(Host::HealthFlag::EXCLUDED_VIA_IMMEDIATE_HC_FAIL) ||
1664
4.81k
         host.healthFlagGet(Host::HealthFlag::EDS_STATUS_DRAINING);
1665
4.81k
}
1666
1667
} // namespace
1668
1669
std::tuple<HealthyHostVectorConstSharedPtr, DegradedHostVectorConstSharedPtr,
1670
           ExcludedHostVectorConstSharedPtr>
1671
2.40k
ClusterImplBase::partitionHostList(const HostVector& hosts) {
1672
2.40k
  auto healthy_list = std::make_shared<HealthyHostVector>();
1673
2.40k
  auto degraded_list = std::make_shared<DegradedHostVector>();
1674
2.40k
  auto excluded_list = std::make_shared<ExcludedHostVector>();
1675
1676
2.40k
  for (const auto& host : hosts) {
1677
2.40k
    const Host::Health health_status = host->coarseHealth();
1678
2.40k
    if (health_status == Host::Health::Healthy) {
1679
2.40k
      healthy_list->get().emplace_back(host);
1680
2.40k
    } else if (health_status == Host::Health::Degraded) {
1681
0
      degraded_list->get().emplace_back(host);
1682
0
    }
1683
2.40k
    if (excludeBasedOnHealthFlag(*host)) {
1684
0
      excluded_list->get().emplace_back(host);
1685
0
    }
1686
2.40k
  }
1687
1688
2.40k
  return std::make_tuple(healthy_list, degraded_list, excluded_list);
1689
2.40k
}
1690
1691
std::tuple<HostsPerLocalityConstSharedPtr, HostsPerLocalityConstSharedPtr,
1692
           HostsPerLocalityConstSharedPtr>
1693
2.40k
ClusterImplBase::partitionHostsPerLocality(const HostsPerLocality& hosts) {
1694
2.40k
  auto filtered_clones =
1695
2.40k
      hosts.filter({[](const Host& host) { return host.coarseHealth() == Host::Health::Healthy; },
1696
2.40k
                    [](const Host& host) { return host.coarseHealth() == Host::Health::Degraded; },
1697
2.40k
                    [](const Host& host) { return excludeBasedOnHealthFlag(host); }});
1698
1699
2.40k
  return std::make_tuple(std::move(filtered_clones[0]), std::move(filtered_clones[1]),
1700
2.40k
                         std::move(filtered_clones[2]));
1701
2.40k
}
1702
1703
1.81k
bool ClusterInfoImpl::maintenanceMode() const {
1704
1.81k
  return runtime_.snapshot().featureEnabled(maintenance_mode_runtime_key_, 0);
1705
1.81k
}
1706
1707
15.4k
ResourceManager& ClusterInfoImpl::resourceManager(ResourcePriority priority) const {
1708
15.4k
  ASSERT(enumToInt(priority) < resource_managers_.managers_.size());
1709
15.4k
  return *resource_managers_.managers_[enumToInt(priority)];
1710
15.4k
}
1711
1712
2.40k
void ClusterImplBase::initialize(std::function<void()> callback) {
1713
2.40k
  ASSERT(!initialization_started_);
1714
2.40k
  ASSERT(initialization_complete_callback_ == nullptr);
1715
2.40k
  initialization_complete_callback_ = callback;
1716
2.40k
  startPreInit();
1717
2.40k
}
1718
1719
2.40k
void ClusterImplBase::onPreInitComplete() {
1720
  // Protect against multiple calls.
1721
2.40k
  if (initialization_started_) {
1722
0
    return;
1723
0
  }
1724
2.40k
  initialization_started_ = true;
1725
1726
2.40k
  ENVOY_LOG(debug, "initializing {} cluster {} completed",
1727
2.40k
            initializePhase() == InitializePhase::Primary ? "Primary" : "Secondary",
1728
2.40k
            info()->name());
1729
2.40k
  init_manager_.initialize(init_watcher_);
1730
2.40k
}
1731
1732
2.40k
void ClusterImplBase::onInitDone() {
1733
2.40k
  info()->configUpdateStats().warming_state_.set(0);
1734
2.40k
  if (health_checker_ && pending_initialize_health_checks_ == 0) {
1735
0
    for (auto& host_set : prioritySet().hostSetsPerPriority()) {
1736
0
      for (auto& host : host_set->hosts()) {
1737
0
        if (host->disableActiveHealthCheck()) {
1738
0
          continue;
1739
0
        }
1740
0
        ++pending_initialize_health_checks_;
1741
0
      }
1742
0
    }
1743
0
    ENVOY_LOG(debug, "Cluster onInitDone pending initialize health check count {}",
1744
0
              pending_initialize_health_checks_);
1745
1746
    // TODO(mattklein123): Remove this callback when done.
1747
0
    health_checker_->addHostCheckCompleteCb(
1748
0
        [this](HostSharedPtr, HealthTransition, HealthState) -> void {
1749
0
          if (pending_initialize_health_checks_ > 0 && --pending_initialize_health_checks_ == 0) {
1750
0
            finishInitialization();
1751
0
          }
1752
0
        });
1753
0
  }
1754
1755
2.40k
  if (pending_initialize_health_checks_ == 0) {
1756
2.40k
    finishInitialization();
1757
2.40k
  }
1758
2.40k
}
1759
1760
2.40k
void ClusterImplBase::finishInitialization() {
1761
2.40k
  ASSERT(initialization_complete_callback_ != nullptr);
1762
2.40k
  ASSERT(initialization_started_);
1763
1764
  // Snap a copy of the completion callback so that we can set it to nullptr to unblock
1765
  // reloadHealthyHosts(). See that function for more info on why we do this.
1766
2.40k
  auto snapped_callback = initialization_complete_callback_;
1767
2.40k
  initialization_complete_callback_ = nullptr;
1768
1769
2.40k
  if (health_checker_ != nullptr) {
1770
0
    reloadHealthyHosts(nullptr);
1771
0
  }
1772
1773
2.40k
  if (snapped_callback != nullptr) {
1774
2.40k
    snapped_callback();
1775
2.40k
  }
1776
2.40k
}
1777
1778
absl::Status ClusterImplBase::parseDropOverloadConfig(
1779
2.42k
    const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment) {
1780
  // Default drop_overload_ to zero.
1781
2.42k
  drop_overload_ = UnitFloat(0);
1782
1783
2.42k
  if (!cluster_load_assignment.has_policy()) {
1784
2.42k
    return absl::OkStatus();
1785
2.42k
  }
1786
0
  const auto& policy = cluster_load_assignment.policy();
1787
0
  if (policy.drop_overloads().empty()) {
1788
0
    return absl::OkStatus();
1789
0
  }
1790
0
  if (policy.drop_overloads().size() > kDropOverloadSize) {
1791
0
    return absl::InvalidArgumentError(
1792
0
        fmt::format("Cluster drop_overloads config has {} categories. Envoy only support one.",
1793
0
                    policy.drop_overloads().size()));
1794
0
  }
1795
1796
0
  const auto& drop_percentage = policy.drop_overloads(0).drop_percentage();
1797
0
  float denominator = 100;
1798
0
  switch (drop_percentage.denominator()) {
1799
0
  case envoy::type::v3::FractionalPercent::HUNDRED:
1800
0
    denominator = 100;
1801
0
    break;
1802
0
  case envoy::type::v3::FractionalPercent::TEN_THOUSAND:
1803
0
    denominator = 10000;
1804
0
    break;
1805
0
  case envoy::type::v3::FractionalPercent::MILLION:
1806
0
    denominator = 1000000;
1807
0
    break;
1808
0
  default:
1809
0
    return absl::InvalidArgumentError(fmt::format(
1810
0
        "Cluster drop_overloads config denominator setting is invalid : {}. Valid range 0~2.",
1811
0
        static_cast<int>(drop_percentage.denominator())));
1812
0
  }
1813
1814
  // If DropOverloadRuntimeKey is not enabled, honor the EDS drop_overload config.
1815
  // If it is enabled, choose the smaller one between it and the EDS config.
1816
0
  float drop_ratio = float(drop_percentage.numerator()) / (denominator);
1817
0
  if (drop_ratio > 1) {
1818
0
    return absl::InvalidArgumentError(
1819
0
        fmt::format("Cluster drop_overloads config is invalid. drop_ratio={}(Numerator {} / "
1820
0
                    "Denominator {}). The valid range is 0~1.",
1821
0
                    drop_ratio, drop_percentage.numerator(), denominator));
1822
0
  }
1823
0
  const uint64_t MAX_DROP_OVERLOAD_RUNTIME = 100;
1824
0
  uint64_t drop_ratio_runtime = runtime_.snapshot().getInteger(
1825
0
      ClusterImplBase::DropOverloadRuntimeKey, MAX_DROP_OVERLOAD_RUNTIME);
1826
0
  if (drop_ratio_runtime > MAX_DROP_OVERLOAD_RUNTIME) {
1827
0
    return absl::InvalidArgumentError(
1828
0
        fmt::format("load_balancing_policy.drop_overload_limit runtime key config {} is invalid. "
1829
0
                    "The valid range is 0~100",
1830
0
                    drop_ratio_runtime));
1831
0
  }
1832
1833
0
  drop_ratio = std::min(drop_ratio, float(drop_ratio_runtime) / float(MAX_DROP_OVERLOAD_RUNTIME));
1834
0
  drop_overload_ = UnitFloat(drop_ratio);
1835
0
  drop_category_ = policy.drop_overloads(0).category();
1836
0
  return absl::OkStatus();
1837
0
}
1838
1839
0
void ClusterImplBase::setHealthChecker(const HealthCheckerSharedPtr& health_checker) {
1840
0
  ASSERT(!health_checker_);
1841
0
  health_checker_ = health_checker;
1842
0
  health_checker_->start();
1843
0
  health_checker_->addHostCheckCompleteCb(
1844
0
      [this](const HostSharedPtr& host, HealthTransition changed_state, HealthState) -> void {
1845
        // If we get a health check completion that resulted in a state change, signal to
1846
        // update the host sets on all threads.
1847
0
        if (changed_state == HealthTransition::Changed) {
1848
0
          reloadHealthyHosts(host);
1849
0
        }
1850
0
      });
1851
0
}
1852
1853
2.40k
void ClusterImplBase::setOutlierDetector(const Outlier::DetectorSharedPtr& outlier_detector) {
1854
2.40k
  if (!outlier_detector) {
1855
2.40k
    return;
1856
2.40k
  }
1857
1858
0
  outlier_detector_ = outlier_detector;
1859
0
  outlier_detector_->addChangedStateCb(
1860
0
      [this](const HostSharedPtr& host) -> void { reloadHealthyHosts(host); });
1861
0
}
1862
1863
0
void ClusterImplBase::reloadHealthyHosts(const HostSharedPtr& host) {
1864
  // Every time a host changes Health Check state we cause a full healthy host recalculation which
1865
  // for expensive LBs (ring, subset, etc.) can be quite time consuming. During startup, this
1866
  // can also block worker threads by doing this repeatedly. There is no reason to do this
1867
  // as we will not start taking traffic until we are initialized. By blocking Health Check
1868
  // updates while initializing we can avoid this.
1869
0
  if (initialization_complete_callback_ != nullptr) {
1870
0
    return;
1871
0
  }
1872
1873
0
  reloadHealthyHostsHelper(host);
1874
0
}
1875
1876
0
void ClusterImplBase::reloadHealthyHostsHelper(const HostSharedPtr&) {
1877
0
  const auto& host_sets = prioritySet().hostSetsPerPriority();
1878
0
  for (size_t priority = 0; priority < host_sets.size(); ++priority) {
1879
0
    const auto& host_set = host_sets[priority];
1880
    // TODO(htuch): Can we skip these copies by exporting out const shared_ptr from HostSet?
1881
0
    HostVectorConstSharedPtr hosts_copy = std::make_shared<HostVector>(host_set->hosts());
1882
1883
0
    HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().clone();
1884
0
    prioritySet().updateHosts(
1885
0
        priority, HostSetImpl::partitionHosts(hosts_copy, hosts_per_locality_copy),
1886
0
        host_set->localityWeights(), {}, {}, random_.random(), absl::nullopt, absl::nullopt);
1887
0
  }
1888
0
}
1889
1890
absl::StatusOr<const Network::Address::InstanceConstSharedPtr>
1891
2.40k
ClusterImplBase::resolveProtoAddress(const envoy::config::core::v3::Address& address) {
1892
2.40k
  absl::Status resolve_status;
1893
2.40k
  TRY_ASSERT_MAIN_THREAD {
1894
2.40k
    auto address_or_error = Network::Address::resolveProtoAddress(address);
1895
2.40k
    if (address_or_error.status().ok()) {
1896
2.40k
      return address_or_error.value();
1897
2.40k
    }
1898
0
    resolve_status = address_or_error.status();
1899
0
  }
1900
0
  END_TRY
1901
2.40k
  CATCH(EnvoyException & e, { resolve_status = absl::InvalidArgumentError(e.what()); });
1902
0
  if (info_->type() == envoy::config::cluster::v3::Cluster::STATIC ||
1903
0
      info_->type() == envoy::config::cluster::v3::Cluster::EDS) {
1904
0
    return absl::InvalidArgumentError(
1905
0
        fmt::format("{}. Consider setting resolver_name or setting cluster type "
1906
0
                    "to 'STRICT_DNS' or 'LOGICAL_DNS'",
1907
0
                    resolve_status.message()));
1908
0
  }
1909
0
  return resolve_status;
1910
0
}
1911
1912
absl::Status ClusterImplBase::validateEndpointsForZoneAwareRouting(
1913
2.40k
    const envoy::config::endpoint::v3::LocalityLbEndpoints& endpoints) const {
1914
2.40k
  if (local_cluster_ && endpoints.priority() > 0) {
1915
0
    return absl::InvalidArgumentError(
1916
0
        fmt::format("Unexpected non-zero priority for local cluster '{}'.", info()->name()));
1917
0
  }
1918
2.40k
  return absl::OkStatus();
1919
2.40k
}
1920
1921
ClusterInfoImpl::OptionalClusterStats::OptionalClusterStats(
1922
    const envoy::config::cluster::v3::Cluster& config, Stats::Scope& stats_scope,
1923
    const ClusterManager& manager)
1924
    : timeout_budget_stats_(
1925
          (config.track_cluster_stats().timeout_budgets() || config.track_timeout_budgets())
1926
              ? std::make_unique<ClusterTimeoutBudgetStats>(generateTimeoutBudgetStats(
1927
                    stats_scope, manager.clusterTimeoutBudgetStatNames()))
1928
              : nullptr),
1929
      request_response_size_stats_(
1930
          (config.track_cluster_stats().request_response_sizes()
1931
               ? std::make_unique<ClusterRequestResponseSizeStats>(generateRequestResponseSizeStats(
1932
                     stats_scope, manager.clusterRequestResponseSizeStatNames()))
1933
0
               : nullptr)) {}
1934
1935
ClusterInfoImpl::ResourceManagers::ResourceManagers(
1936
    const envoy::config::cluster::v3::Cluster& config, Runtime::Loader& runtime,
1937
    const std::string& cluster_name, Stats::Scope& stats_scope,
1938
    const ClusterCircuitBreakersStatNames& circuit_breakers_stat_names)
1939
2.40k
    : circuit_breakers_stat_names_(circuit_breakers_stat_names) {
1940
2.40k
  managers_[enumToInt(ResourcePriority::Default)] = THROW_OR_RETURN_VALUE(
1941
2.40k
      load(config, runtime, cluster_name, stats_scope, envoy::config::core::v3::DEFAULT),
1942
2.40k
      ResourceManagerImplPtr);
1943
2.40k
  managers_[enumToInt(ResourcePriority::High)] = THROW_OR_RETURN_VALUE(
1944
2.40k
      load(config, runtime, cluster_name, stats_scope, envoy::config::core::v3::HIGH),
1945
2.40k
      ResourceManagerImplPtr);
1946
2.40k
}
1947
1948
ClusterCircuitBreakersStats
1949
ClusterInfoImpl::generateCircuitBreakersStats(Stats::Scope& scope, Stats::StatName prefix,
1950
                                              bool track_remaining,
1951
406k
                                              const ClusterCircuitBreakersStatNames& stat_names) {
1952
4.03M
  auto make_gauge = [&stat_names, &scope, prefix](Stats::StatName stat_name) -> Stats::Gauge& {
1953
4.03M
    return Stats::Utility::gaugeFromElements(scope,
1954
4.03M
                                             {stat_names.circuit_breakers_, prefix, stat_name},
1955
4.03M
                                             Stats::Gauge::ImportMode::Accumulate);
1956
4.03M
  };
1957
1958
406k
#define REMAINING_GAUGE(stat_name)                                                                 \
1959
2.03M
  track_remaining ? make_gauge(stat_name) : scope.store().nullGauge()
1960
1961
406k
  return {
1962
406k
      make_gauge(stat_names.cx_open_),
1963
406k
      make_gauge(stat_names.cx_pool_open_),
1964
406k
      make_gauge(stat_names.rq_open_),
1965
406k
      make_gauge(stat_names.rq_pending_open_),
1966
406k
      make_gauge(stat_names.rq_retry_open_),
1967
406k
      REMAINING_GAUGE(stat_names.remaining_cx_),
1968
406k
      REMAINING_GAUGE(stat_names.remaining_cx_pools_),
1969
406k
      REMAINING_GAUGE(stat_names.remaining_pending_),
1970
406k
      REMAINING_GAUGE(stat_names.remaining_retries_),
1971
406k
      REMAINING_GAUGE(stat_names.remaining_rq_),
1972
406k
  };
1973
1974
406k
#undef REMAINING_GAUGE
1975
406k
}
1976
1977
30
Http::Http1::CodecStats& ClusterInfoImpl::http1CodecStats() const {
1978
30
  return Http::Http1::CodecStats::atomicGet(http1_codec_stats_, *stats_scope_);
1979
30
}
1980
1981
881
Http::Http2::CodecStats& ClusterInfoImpl::http2CodecStats() const {
1982
881
  return Http::Http2::CodecStats::atomicGet(http2_codec_stats_, *stats_scope_);
1983
881
}
1984
1985
0
Http::Http3::CodecStats& ClusterInfoImpl::http3CodecStats() const {
1986
0
  return Http::Http3::CodecStats::atomicGet(http3_codec_stats_, *stats_scope_);
1987
0
}
1988
1989
#ifdef ENVOY_ENABLE_UHV
1990
::Envoy::Http::HeaderValidatorStats&
1991
ClusterInfoImpl::getHeaderValidatorStats(Http::Protocol protocol) const {
1992
  switch (protocol) {
1993
  case Http::Protocol::Http10:
1994
  case Http::Protocol::Http11:
1995
    return http1CodecStats();
1996
  case Http::Protocol::Http2:
1997
    return http2CodecStats();
1998
  case Http::Protocol::Http3:
1999
    return http3CodecStats();
2000
  }
2001
  PANIC_DUE_TO_CORRUPT_ENUM;
2002
}
2003
#endif
2004
2005
Http::ClientHeaderValidatorPtr
2006
1.76k
ClusterInfoImpl::makeHeaderValidator([[maybe_unused]] Http::Protocol protocol) const {
2007
#ifdef ENVOY_ENABLE_UHV
2008
  return http_protocol_options_->header_validator_factory_
2009
             ? http_protocol_options_->header_validator_factory_->createClientHeaderValidator(
2010
                   protocol, getHeaderValidatorStats(protocol))
2011
             : nullptr;
2012
#else
2013
1.76k
  return nullptr;
2014
1.76k
#endif
2015
1.76k
}
2016
2017
std::pair<absl::optional<double>, absl::optional<uint32_t>> ClusterInfoImpl::getRetryBudgetParams(
2018
0
    const envoy::config::cluster::v3::CircuitBreakers::Thresholds& thresholds) {
2019
0
  constexpr double default_budget_percent = 20.0;
2020
0
  constexpr uint32_t default_retry_concurrency = 3;
2021
2022
0
  absl::optional<double> budget_percent;
2023
0
  absl::optional<uint32_t> min_retry_concurrency;
2024
0
  if (thresholds.has_retry_budget()) {
2025
    // The budget_percent and min_retry_concurrency values are only set if there is a retry budget
2026
    // message set in the cluster config.
2027
0
    budget_percent = PROTOBUF_GET_WRAPPED_OR_DEFAULT(thresholds.retry_budget(), budget_percent,
2028
0
                                                     default_budget_percent);
2029
0
    min_retry_concurrency = PROTOBUF_GET_WRAPPED_OR_DEFAULT(
2030
0
        thresholds.retry_budget(), min_retry_concurrency, default_retry_concurrency);
2031
0
  }
2032
0
  return std::make_pair(budget_percent, min_retry_concurrency);
2033
0
}
2034
2035
SINGLETON_MANAGER_REGISTRATION(upstream_network_filter_config_provider_manager);
2036
2037
std::shared_ptr<UpstreamNetworkFilterConfigProviderManager>
2038
ClusterInfoImpl::createSingletonUpstreamNetworkFilterConfigProviderManager(
2039
2.40k
    Server::Configuration::ServerFactoryContext& context) {
2040
2.40k
  return context.singletonManager().getTyped<UpstreamNetworkFilterConfigProviderManager>(
2041
2.40k
      SINGLETON_MANAGER_REGISTERED_NAME(upstream_network_filter_config_provider_manager),
2042
2.40k
      [] { return std::make_shared<Filter::UpstreamNetworkFilterConfigProviderManagerImpl>(); });
2043
2.40k
}
2044
2045
absl::StatusOr<ResourceManagerImplPtr>
2046
ClusterInfoImpl::ResourceManagers::load(const envoy::config::cluster::v3::Cluster& config,
2047
                                        Runtime::Loader& runtime, const std::string& cluster_name,
2048
                                        Stats::Scope& stats_scope,
2049
4.81k
                                        const envoy::config::core::v3::RoutingPriority& priority) {
2050
4.81k
  uint64_t max_connections = 1024;
2051
4.81k
  uint64_t max_pending_requests = 1024;
2052
4.81k
  uint64_t max_requests = 1024;
2053
4.81k
  uint64_t max_retries = 3;
2054
4.81k
  uint64_t max_connection_pools = std::numeric_limits<uint64_t>::max();
2055
4.81k
  uint64_t max_connections_per_host = std::numeric_limits<uint64_t>::max();
2056
2057
4.81k
  bool track_remaining = false;
2058
2059
4.81k
  Stats::StatName priority_stat_name;
2060
4.81k
  std::string priority_name;
2061
4.81k
  switch (priority) {
2062
0
    PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
2063
2.40k
  case envoy::config::core::v3::DEFAULT:
2064
2.40k
    priority_stat_name = circuit_breakers_stat_names_.default_;
2065
2.40k
    priority_name = "default";
2066
2.40k
    break;
2067
2.40k
  case envoy::config::core::v3::HIGH:
2068
2.40k
    priority_stat_name = circuit_breakers_stat_names_.high_;
2069
2.40k
    priority_name = "high";
2070
2.40k
    break;
2071
4.81k
  }
2072
2073
4.81k
  const std::string runtime_prefix =
2074
4.81k
      fmt::format("circuit_breakers.{}.{}.", cluster_name, priority_name);
2075
2076
4.81k
  const auto& thresholds = config.circuit_breakers().thresholds();
2077
4.81k
  const auto it = std::find_if(
2078
4.81k
      thresholds.cbegin(), thresholds.cend(),
2079
4.81k
      [priority](const envoy::config::cluster::v3::CircuitBreakers::Thresholds& threshold) {
2080
0
        return threshold.priority() == priority;
2081
0
      });
2082
4.81k
  const auto& per_host_thresholds = config.circuit_breakers().per_host_thresholds();
2083
4.81k
  const auto per_host_it = std::find_if(
2084
4.81k
      per_host_thresholds.cbegin(), per_host_thresholds.cend(),
2085
4.81k
      [priority](const envoy::config::cluster::v3::CircuitBreakers::Thresholds& threshold) {
2086
0
        return threshold.priority() == priority;
2087
0
      });
2088
2089
4.81k
  absl::optional<double> budget_percent;
2090
4.81k
  absl::optional<uint32_t> min_retry_concurrency;
2091
4.81k
  if (it != thresholds.cend()) {
2092
0
    max_connections = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_connections, max_connections);
2093
0
    max_pending_requests =
2094
0
        PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_pending_requests, max_pending_requests);
2095
0
    max_requests = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_requests, max_requests);
2096
0
    max_retries = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_retries, max_retries);
2097
0
    track_remaining = it->track_remaining();
2098
0
    max_connection_pools =
2099
0
        PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_connection_pools, max_connection_pools);
2100
0
    std::tie(budget_percent, min_retry_concurrency) = ClusterInfoImpl::getRetryBudgetParams(*it);
2101
0
  }
2102
4.81k
  if (per_host_it != per_host_thresholds.cend()) {
2103
0
    if (per_host_it->has_max_pending_requests() || per_host_it->has_max_requests() ||
2104
0
        per_host_it->has_max_retries() || per_host_it->has_max_connection_pools() ||
2105
0
        per_host_it->has_retry_budget()) {
2106
0
      return absl::InvalidArgumentError("Unsupported field in per_host_thresholds");
2107
0
    }
2108
0
    if (per_host_it->has_max_connections()) {
2109
0
      max_connections_per_host = per_host_it->max_connections().value();
2110
0
    }
2111
0
  }
2112
4.81k
  return std::make_unique<ResourceManagerImpl>(
2113
4.81k
      runtime, runtime_prefix, max_connections, max_pending_requests, max_requests, max_retries,
2114
4.81k
      max_connection_pools, max_connections_per_host,
2115
4.81k
      ClusterInfoImpl::generateCircuitBreakersStats(stats_scope, priority_stat_name,
2116
4.81k
                                                    track_remaining, circuit_breakers_stat_names_),
2117
4.81k
      budget_percent, min_retry_concurrency);
2118
4.81k
}
2119
2120
PriorityStateManager::PriorityStateManager(ClusterImplBase& cluster,
2121
                                           const LocalInfo::LocalInfo& local_info,
2122
                                           PrioritySet::HostUpdateCb* update_cb,
2123
                                           Random::RandomGenerator& random)
2124
    : parent_(cluster), local_info_node_(local_info.node()), update_cb_(update_cb),
2125
2.40k
      random_(random) {}
2126
2127
void PriorityStateManager::initializePriorityFor(
2128
2.40k
    const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint) {
2129
2.40k
  const uint32_t priority = locality_lb_endpoint.priority();
2130
2.40k
  if (priority_state_.size() <= priority) {
2131
2.40k
    priority_state_.resize(priority + 1);
2132
2.40k
  }
2133
2.40k
  if (priority_state_[priority].first == nullptr) {
2134
2.40k
    priority_state_[priority].first = std::make_unique<HostVector>();
2135
2.40k
  }
2136
2.40k
  if (locality_lb_endpoint.has_locality() && locality_lb_endpoint.has_load_balancing_weight()) {
2137
0
    priority_state_[priority].second[locality_lb_endpoint.locality()] =
2138
0
        locality_lb_endpoint.load_balancing_weight().value();
2139
0
  }
2140
2.40k
}
2141
2142
void PriorityStateManager::registerHostForPriority(
2143
    const std::string& hostname, Network::Address::InstanceConstSharedPtr address,
2144
    const std::vector<Network::Address::InstanceConstSharedPtr>& address_list,
2145
    const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint,
2146
2.40k
    const envoy::config::endpoint::v3::LbEndpoint& lb_endpoint, TimeSource& time_source) {
2147
2.40k
  auto endpoint_metadata =
2148
2.40k
      lb_endpoint.has_metadata()
2149
2.40k
          ? parent_.constMetadataSharedPool()->getObject(lb_endpoint.metadata())
2150
2.40k
          : nullptr;
2151
2.40k
  auto locality_metadata =
2152
2.40k
      locality_lb_endpoint.has_metadata()
2153
2.40k
          ? parent_.constMetadataSharedPool()->getObject(locality_lb_endpoint.metadata())
2154
2.40k
          : nullptr;
2155
2.40k
  const auto host = std::make_shared<HostImpl>(
2156
2.40k
      parent_.info(), hostname, address, endpoint_metadata, locality_metadata,
2157
2.40k
      lb_endpoint.load_balancing_weight().value(), locality_lb_endpoint.locality(),
2158
2.40k
      lb_endpoint.endpoint().health_check_config(), locality_lb_endpoint.priority(),
2159
2.40k
      lb_endpoint.health_status(), time_source, address_list);
2160
2.40k
  registerHostForPriority(host, locality_lb_endpoint);
2161
2.40k
}
2162
2163
void PriorityStateManager::registerHostForPriority(
2164
    const HostSharedPtr& host,
2165
2.40k
    const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint) {
2166
2.40k
  const uint32_t priority = locality_lb_endpoint.priority();
2167
  // Should be called after initializePriorityFor.
2168
2.40k
  ASSERT(priority_state_[priority].first);
2169
2.40k
  priority_state_[priority].first->emplace_back(host);
2170
2.40k
}
2171
2172
void PriorityStateManager::updateClusterPrioritySet(
2173
    const uint32_t priority, HostVectorSharedPtr&& current_hosts,
2174
    const absl::optional<HostVector>& hosts_added, const absl::optional<HostVector>& hosts_removed,
2175
    const absl::optional<Upstream::Host::HealthFlag> health_checker_flag,
2176
    absl::optional<bool> weighted_priority_health,
2177
2.40k
    absl::optional<uint32_t> overprovisioning_factor) {
2178
  // If local locality is not defined then skip populating per locality hosts.
2179
2.40k
  const auto& local_locality = local_info_node_.locality();
2180
2.40k
  ENVOY_LOG(trace, "Local locality: {}", local_locality.DebugString());
2181
2182
  // For non-EDS, most likely the current hosts are from priority_state_[priority].first.
2183
2.40k
  HostVectorSharedPtr hosts(std::move(current_hosts));
2184
2.40k
  LocalityWeightsMap empty_locality_map;
2185
2.40k
  LocalityWeightsMap& locality_weights_map =
2186
2.40k
      priority_state_.size() > priority ? priority_state_[priority].second : empty_locality_map;
2187
2.40k
  ASSERT(priority_state_.size() > priority || locality_weights_map.empty());
2188
2.40k
  LocalityWeightsSharedPtr locality_weights;
2189
2.40k
  std::vector<HostVector> per_locality;
2190
2191
  // TODO: have the load balancing extension indicate, programmatically, whether it needs locality
2192
  // weights, as an optimization in cases where it doesn't.
2193
2.40k
  locality_weights = std::make_shared<LocalityWeights>();
2194
2195
  // We use std::map to guarantee a stable ordering for zone aware routing.
2196
2.40k
  std::map<envoy::config::core::v3::Locality, HostVector, LocalityLess> hosts_per_locality;
2197
2198
2.40k
  for (const HostSharedPtr& host : *hosts) {
2199
    // Take into consideration when a non-EDS cluster has active health checking, i.e. to mark all
2200
    // the hosts unhealthy (host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC)) and then fire
2201
    // update callbacks to start the health checking process. The endpoint with disabled active
2202
    // health check should not be set FAILED_ACTIVE_HC here.
2203
2.40k
    if (health_checker_flag.has_value() && !host->disableActiveHealthCheck()) {
2204
0
      host->healthFlagSet(health_checker_flag.value());
2205
0
    }
2206
2.40k
    hosts_per_locality[host->locality()].push_back(host);
2207
2.40k
  }
2208
2209
  // Do we have hosts for the local locality?
2210
2.40k
  const bool non_empty_local_locality =
2211
2.40k
      local_info_node_.has_locality() &&
2212
2.40k
      hosts_per_locality.find(local_locality) != hosts_per_locality.end();
2213
2214
  // As per HostsPerLocality::get(), the per_locality vector must have the local locality hosts
2215
  // first if non_empty_local_locality.
2216
2.40k
  if (non_empty_local_locality) {
2217
0
    per_locality.emplace_back(hosts_per_locality[local_locality]);
2218
0
    locality_weights->emplace_back(locality_weights_map[local_locality]);
2219
0
  }
2220
2221
  // After the local locality hosts (if any), we place the remaining locality host groups in
2222
  // lexicographic order. This provides a stable ordering for zone aware routing.
2223
2.40k
  for (auto& entry : hosts_per_locality) {
2224
2.40k
    if (!non_empty_local_locality || !LocalityEqualTo()(local_locality, entry.first)) {
2225
2.40k
      per_locality.emplace_back(entry.second);
2226
2.40k
      locality_weights->emplace_back(locality_weights_map[entry.first]);
2227
2.40k
    }
2228
2.40k
  }
2229
2230
2.40k
  auto per_locality_shared =
2231
2.40k
      std::make_shared<HostsPerLocalityImpl>(std::move(per_locality), non_empty_local_locality);
2232
2233
  // If a batch update callback was provided, use that. Otherwise directly update
2234
  // the PrioritySet.
2235
2.40k
  if (update_cb_ != nullptr) {
2236
14
    update_cb_->updateHosts(priority, HostSetImpl::partitionHosts(hosts, per_locality_shared),
2237
14
                            std::move(locality_weights), hosts_added.value_or(*hosts),
2238
14
                            hosts_removed.value_or<HostVector>({}), random_.random(),
2239
14
                            weighted_priority_health, overprovisioning_factor);
2240
2.39k
  } else {
2241
2.39k
    parent_.prioritySet().updateHosts(priority,
2242
2.39k
                                      HostSetImpl::partitionHosts(hosts, per_locality_shared),
2243
2.39k
                                      std::move(locality_weights), hosts_added.value_or(*hosts),
2244
2.39k
                                      hosts_removed.value_or<HostVector>({}), random_.random(),
2245
2.39k
                                      weighted_priority_health, overprovisioning_factor);
2246
2.39k
  }
2247
2.40k
}
2248
2249
bool BaseDynamicClusterImpl::updateDynamicHostList(
2250
    const HostVector& new_hosts, HostVector& current_priority_hosts,
2251
    HostVector& hosts_added_to_current_priority, HostVector& hosts_removed_from_current_priority,
2252
14
    const HostMap& all_hosts, const absl::flat_hash_set<std::string>& all_new_hosts) {
2253
14
  uint64_t max_host_weight = 1;
2254
2255
  // Did hosts change?
2256
  //
2257
  // Have host attributes changed the health of any endpoint? If so, we
2258
  // rebuild the hosts vectors. We only do this if the health status of an
2259
  // endpoint has materially changed (e.g. if previously failing active health
2260
  // checks, we just note it's now failing EDS health status but don't rebuild).
2261
  //
2262
  // TODO(htuch): We can be smarter about this potentially, and not force a full
2263
  // host set update on health status change. The way this would work is to
2264
  // implement a HealthChecker subclass that provides thread local health
2265
  // updates to the Cluster object. This will probably make sense to do in
2266
  // conjunction with https://github.com/envoyproxy/envoy/issues/2874.
2267
14
  bool hosts_changed = false;
2268
2269
  // Go through and see if the list we have is different from what we just got. If it is, we make
2270
  // a new host list and raise a change notification. We also check for duplicates here. It's
2271
  // possible for DNS to return the same address multiple times, and a bad EDS implementation
2272
  // could do the same thing.
2273
2274
  // Keep track of hosts we see in new_hosts that we are able to match up with an existing host.
2275
14
  absl::flat_hash_set<std::string> existing_hosts_for_current_priority(
2276
14
      current_priority_hosts.size());
2277
  // Keep track of hosts we're adding (or replacing)
2278
14
  absl::flat_hash_set<std::string> new_hosts_for_current_priority(new_hosts.size());
2279
  // Keep track of hosts for which locality is changed.
2280
14
  absl::flat_hash_set<std::string> hosts_with_updated_locality_for_current_priority(
2281
14
      current_priority_hosts.size());
2282
  // Keep track of hosts for which active health check flag is changed.
2283
14
  absl::flat_hash_set<std::string> hosts_with_active_health_check_flag_changed(
2284
14
      current_priority_hosts.size());
2285
14
  HostVector final_hosts;
2286
14
  for (const HostSharedPtr& host : new_hosts) {
2287
    // To match a new host with an existing host means comparing their addresses.
2288
14
    auto existing_host = all_hosts.find(addressToString(host->address()));
2289
14
    const bool existing_host_found = existing_host != all_hosts.end();
2290
2291
    // Clear any pending deletion flag on an existing host in case it came back while it was
2292
    // being stabilized. We will set it again below if needed.
2293
14
    if (existing_host_found) {
2294
0
      existing_host->second->healthFlagClear(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
2295
0
    }
2296
2297
    // Check if in-place host update should be skipped, i.e. when the following criteria are met
2298
    // (currently there is only one criterion, but we might add more in the future):
2299
    // - The cluster health checker is activated and a new host is matched with the existing one,
2300
    //   but the health check address is different.
2301
14
    const bool health_check_address_changed =
2302
14
        (health_checker_ != nullptr && existing_host_found &&
2303
14
         *existing_host->second->healthCheckAddress() != *host->healthCheckAddress());
2304
14
    bool locality_changed = false;
2305
14
    locality_changed = (existing_host_found &&
2306
14
                        (!LocalityEqualTo()(host->locality(), existing_host->second->locality())));
2307
14
    if (locality_changed) {
2308
0
      hosts_with_updated_locality_for_current_priority.emplace(existing_host->first);
2309
0
    }
2310
2311
14
    const bool active_health_check_flag_changed =
2312
14
        (health_checker_ != nullptr && existing_host_found &&
2313
14
         existing_host->second->disableActiveHealthCheck() != host->disableActiveHealthCheck());
2314
14
    if (active_health_check_flag_changed) {
2315
0
      hosts_with_active_health_check_flag_changed.emplace(existing_host->first);
2316
0
    }
2317
14
    const bool skip_inplace_host_update =
2318
14
        health_check_address_changed || locality_changed || active_health_check_flag_changed;
2319
2320
    // When there is a match and we decided to do in-place update, we potentially update the
2321
    // host's health check flag and metadata. Afterwards, the host is pushed back into the
2322
    // final_hosts, i.e. hosts that should be preserved in the current priority.
2323
14
    if (existing_host_found && !skip_inplace_host_update) {
2324
0
      existing_hosts_for_current_priority.emplace(existing_host->first);
2325
      // If we find a host matched based on address, we keep it. However we do change weight
2326
      // inline so do that here.
2327
0
      if (host->weight() > max_host_weight) {
2328
0
        max_host_weight = host->weight();
2329
0
      }
2330
0
      if (existing_host->second->weight() != host->weight()) {
2331
0
        existing_host->second->weight(host->weight());
2332
        // We do full host set rebuilds so that load balancers can do pre-computation of data
2333
        // structures based on host weight. This may become a performance problem in certain
2334
        // deployments so it is runtime feature guarded and may also need to be configurable
2335
        // and/or dynamic in the future.
2336
0
        hosts_changed = true;
2337
0
      }
2338
2339
0
      hosts_changed |= updateEdsHealthFlag(*host, *existing_host->second);
2340
2341
      // Did metadata change?
2342
0
      bool metadata_changed = true;
2343
0
      if (host->metadata() && existing_host->second->metadata()) {
2344
0
        metadata_changed = !Protobuf::util::MessageDifferencer::Equivalent(
2345
0
            *host->metadata(), *existing_host->second->metadata());
2346
0
      } else if (!host->metadata() && !existing_host->second->metadata()) {
2347
0
        metadata_changed = false;
2348
0
      }
2349
2350
0
      if (metadata_changed) {
2351
        // First, update the entire metadata for the endpoint.
2352
0
        existing_host->second->metadata(host->metadata());
2353
2354
        // Also, given that the canary attribute of an endpoint is derived from its metadata
2355
        // (e.g.: from envoy.lb/canary), we do a blind update here since it's cheaper than testing
2356
        // to see if it actually changed. We must update this besides just updating the metadata,
2357
        // because it'll be used by the router filter to compute upstream stats.
2358
0
        existing_host->second->canary(host->canary());
2359
2360
        // If metadata changed, we need to rebuild. See github issue #3810.
2361
0
        hosts_changed = true;
2362
0
      }
2363
2364
      // Did the priority change?
2365
0
      if (host->priority() != existing_host->second->priority()) {
2366
0
        existing_host->second->priority(host->priority());
2367
0
        hosts_added_to_current_priority.emplace_back(existing_host->second);
2368
0
      }
2369
2370
0
      final_hosts.push_back(existing_host->second);
2371
14
    } else {
2372
14
      new_hosts_for_current_priority.emplace(addressToString(host->address()));
2373
14
      if (host->weight() > max_host_weight) {
2374
0
        max_host_weight = host->weight();
2375
0
      }
2376
2377
      // If we are depending on a health checker, we initialize to unhealthy.
2378
      // If there's an existing host with the same health checker, the
2379
      // active health-status is kept.
2380
14
      if (health_checker_ != nullptr && !host->disableActiveHealthCheck()) {
2381
0
        if (existing_host_found && !health_check_address_changed &&
2382
0
            !active_health_check_flag_changed) {
2383
          // If there's an existing host, use the same active health-status.
2384
          // The existing host can be marked PENDING_ACTIVE_HC or
2385
          // ACTIVE_HC_TIMEOUT if it is also marked with FAILED_ACTIVE_HC.
2386
0
          ASSERT(!existing_host->second->healthFlagGet(Host::HealthFlag::PENDING_ACTIVE_HC) ||
2387
0
                 existing_host->second->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
2388
0
          ASSERT(!existing_host->second->healthFlagGet(Host::HealthFlag::ACTIVE_HC_TIMEOUT) ||
2389
0
                 existing_host->second->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
2390
2391
0
          constexpr uint32_t active_hc_statuses_mask =
2392
0
              enumToInt(Host::HealthFlag::FAILED_ACTIVE_HC) |
2393
0
              enumToInt(Host::HealthFlag::DEGRADED_ACTIVE_HC) |
2394
0
              enumToInt(Host::HealthFlag::PENDING_ACTIVE_HC) |
2395
0
              enumToInt(Host::HealthFlag::ACTIVE_HC_TIMEOUT);
2396
2397
0
          const uint32_t existing_host_statuses = existing_host->second->healthFlagsGetAll();
2398
0
          host->healthFlagsSetAll(existing_host_statuses & active_hc_statuses_mask);
2399
0
        } else {
2400
          // No previous known host, mark it as failed active HC.
2401
0
          host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC);
2402
2403
          // If we want to exclude hosts until they have been health checked, mark them with
2404
          // a flag to indicate that they have not been health checked yet.
2405
0
          if (info_->warmHosts()) {
2406
0
            host->healthFlagSet(Host::HealthFlag::PENDING_ACTIVE_HC);
2407
0
          }
2408
0
        }
2409
0
      }
2410
2411
14
      final_hosts.push_back(host);
2412
14
      hosts_added_to_current_priority.push_back(host);
2413
14
    }
2414
14
  }
2415
2416
  // Remove hosts from current_priority_hosts that were matched to an existing host in the
2417
  // previous loop.
2418
14
  auto erase_from =
2419
14
      std::remove_if(current_priority_hosts.begin(), current_priority_hosts.end(),
2420
14
                     [&existing_hosts_for_current_priority](const HostSharedPtr& p) {
2421
0
                       auto existing_itr =
2422
0
                           existing_hosts_for_current_priority.find(p->address()->asString());
2423
2424
0
                       if (existing_itr != existing_hosts_for_current_priority.end()) {
2425
0
                         existing_hosts_for_current_priority.erase(existing_itr);
2426
0
                         return true;
2427
0
                       }
2428
2429
0
                       return false;
2430
0
                     });
2431
14
  current_priority_hosts.erase(erase_from, current_priority_hosts.end());
2432
2433
  // If we saw existing hosts during this iteration from a different priority, then we've moved
2434
  // a host from another priority into this one, so we should mark the priority as having changed.
2435
14
  if (!existing_hosts_for_current_priority.empty()) {
2436
0
    hosts_changed = true;
2437
0
  }
2438
2439
  // The remaining hosts are hosts that are not referenced in the config update. We remove them
2440
  // from the priority if any of the following is true:
2441
  // - Active health checking is not enabled.
2442
  // - The removed hosts are failing active health checking OR have been explicitly marked as
2443
  //   unhealthy by a previous EDS update. We do not count outlier as a reason to remove a host
2444
  //   or any other future health condition that may be added so we do not use the coarseHealth()
2445
  //   API.
2446
  // - We have explicitly configured the cluster to remove hosts regardless of active health
2447
  // status.
2448
14
  const bool dont_remove_healthy_hosts =
2449
14
      health_checker_ != nullptr && !info()->drainConnectionsOnHostRemoval();
2450
14
  if (!current_priority_hosts.empty() && dont_remove_healthy_hosts) {
2451
0
    erase_from = std::remove_if(
2452
0
        current_priority_hosts.begin(), current_priority_hosts.end(),
2453
0
        [&all_new_hosts, &new_hosts_for_current_priority,
2454
0
         &hosts_with_updated_locality_for_current_priority,
2455
0
         &hosts_with_active_health_check_flag_changed, &final_hosts,
2456
0
         &max_host_weight](const HostSharedPtr& p) {
2457
          // This host has already been added as a new host in the
2458
          // new_hosts_for_current_priority. Return false here to make sure that host
2459
          // reference with older locality gets cleaned up from the priority.
2460
0
          if (hosts_with_updated_locality_for_current_priority.contains(p->address()->asString())) {
2461
0
            return false;
2462
0
          }
2463
2464
0
          if (hosts_with_active_health_check_flag_changed.contains(p->address()->asString())) {
2465
0
            return false;
2466
0
          }
2467
2468
0
          if (all_new_hosts.contains(p->address()->asString()) &&
2469
0
              !new_hosts_for_current_priority.contains(p->address()->asString())) {
2470
            // If the address is being completely deleted from this priority, but is
2471
            // referenced from another priority, then we assume that the other
2472
            // priority will perform an in-place update to re-use the existing Host.
2473
            // We should therefore not mark it as PENDING_DYNAMIC_REMOVAL, but
2474
            // instead remove it immediately from this priority.
2475
            // Example: health check address changed and priority also changed
2476
0
            return false;
2477
0
          }
2478
2479
          // PENDING_DYNAMIC_REMOVAL doesn't apply for the host with disabled active
2480
          // health check, the host is removed immediately from this priority.
2481
0
          if ((!(p->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) ||
2482
0
                 p->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH))) &&
2483
0
              !p->disableActiveHealthCheck()) {
2484
0
            if (p->weight() > max_host_weight) {
2485
0
              max_host_weight = p->weight();
2486
0
            }
2487
2488
0
            final_hosts.push_back(p);
2489
0
            p->healthFlagSet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
2490
0
            return true;
2491
0
          }
2492
0
          return false;
2493
0
        });
2494
0
    current_priority_hosts.erase(erase_from, current_priority_hosts.end());
2495
0
  }
2496
2497
  // At this point we've accounted for all the new hosts as well the hosts that previously
2498
  // existed in this priority.
2499
14
  info_->endpointStats().max_host_weight_.set(max_host_weight);
2500
2501
  // Whatever remains in current_priority_hosts should be removed.
2502
14
  if (!hosts_added_to_current_priority.empty() || !current_priority_hosts.empty()) {
2503
14
    hosts_removed_from_current_priority = std::move(current_priority_hosts);
2504
14
    hosts_changed = true;
2505
14
  }
2506
2507
  // During the update we populated final_hosts with all the hosts that should remain
2508
  // in the current priority, so move them back into current_priority_hosts.
2509
14
  current_priority_hosts = std::move(final_hosts);
2510
  // We return false here in the absence of EDS health status or metadata changes, because we
2511
  // have no changes to host vector status (modulo weights). When we have EDS
2512
  // health status or metadata changed, we return true, causing updateHosts() to fire in the
2513
  // caller.
2514
14
  return hosts_changed;
2515
14
}
2516
2517
Network::DnsLookupFamily
2518
0
getDnsLookupFamilyFromCluster(const envoy::config::cluster::v3::Cluster& cluster) {
2519
0
  return DnsUtils::getDnsLookupFamilyFromEnum(cluster.dns_lookup_family());
2520
0
}
2521
2522
void reportUpstreamCxDestroy(const Upstream::HostDescriptionConstSharedPtr& host,
2523
911
                             Network::ConnectionEvent event) {
2524
911
  Upstream::ClusterTrafficStats& stats = *host->cluster().trafficStats();
2525
911
  stats.upstream_cx_destroy_.inc();
2526
911
  if (event == Network::ConnectionEvent::RemoteClose) {
2527
433
    stats.upstream_cx_destroy_remote_.inc();
2528
478
  } else {
2529
478
    stats.upstream_cx_destroy_local_.inc();
2530
478
  }
2531
911
}
2532
2533
void reportUpstreamCxDestroyActiveRequest(const Upstream::HostDescriptionConstSharedPtr& host,
2534
41
                                          Network::ConnectionEvent event) {
2535
41
  Upstream::ClusterTrafficStats& stats = *host->cluster().trafficStats();
2536
41
  stats.upstream_cx_destroy_with_active_rq_.inc();
2537
41
  if (event == Network::ConnectionEvent::RemoteClose) {
2538
20
    stats.upstream_cx_destroy_remote_with_active_rq_.inc();
2539
21
  } else {
2540
21
    stats.upstream_cx_destroy_local_with_active_rq_.inc();
2541
21
  }
2542
41
}
2543
2544
Network::Address::InstanceConstSharedPtr resolveHealthCheckAddress(
2545
    const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
2546
206k
    Network::Address::InstanceConstSharedPtr host_address) {
2547
206k
  Network::Address::InstanceConstSharedPtr health_check_address;
2548
206k
  const auto& port_value = health_check_config.port_value();
2549
206k
  if (health_check_config.has_address()) {
2550
0
    auto address_or_error = Network::Address::resolveProtoAddress(health_check_config.address());
2551
0
    THROW_IF_NOT_OK_REF(address_or_error.status());
2552
0
    auto address = address_or_error.value();
2553
0
    health_check_address =
2554
0
        port_value == 0 ? address : Network::Utility::getAddressWithPort(*address, port_value);
2555
206k
  } else {
2556
206k
    health_check_address = port_value == 0
2557
206k
                               ? host_address
2558
206k
                               : Network::Utility::getAddressWithPort(*host_address, port_value);
2559
206k
  }
2560
206k
  return health_check_address;
2561
206k
}
2562
2563
} // namespace Upstream
2564
} // namespace Envoy