Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/common/upstream/upstream_impl.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/common/upstream/upstream_impl.h"
2
3
#include <chrono>
4
#include <cstdint>
5
#include <limits>
6
#include <list>
7
#include <memory>
8
#include <string>
9
#include <vector>
10
11
#include "envoy/config/cluster/v3/circuit_breaker.pb.h"
12
#include "envoy/config/cluster/v3/cluster.pb.h"
13
#include "envoy/config/core/v3/address.pb.h"
14
#include "envoy/config/core/v3/base.pb.h"
15
#include "envoy/config/core/v3/health_check.pb.h"
16
#include "envoy/config/core/v3/protocol.pb.h"
17
#include "envoy/config/endpoint/v3/endpoint_components.pb.h"
18
#include "envoy/config/upstream/local_address_selector/v3/default_local_address_selector.pb.h"
19
#include "envoy/event/dispatcher.h"
20
#include "envoy/event/timer.h"
21
#include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.h"
22
#include "envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.pb.h"
23
#include "envoy/extensions/transport_sockets/raw_buffer/v3/raw_buffer.pb.h"
24
#include "envoy/init/manager.h"
25
#include "envoy/network/dns.h"
26
#include "envoy/network/transport_socket.h"
27
#include "envoy/registry/registry.h"
28
#include "envoy/secret/secret_manager.h"
29
#include "envoy/server/filter_config.h"
30
#include "envoy/server/transport_socket_config.h"
31
#include "envoy/ssl/context_manager.h"
32
#include "envoy/stats/scope.h"
33
#include "envoy/upstream/health_checker.h"
34
#include "envoy/upstream/upstream.h"
35
36
#include "source/common/common/dns_utils.h"
37
#include "source/common/common/enum_to_int.h"
38
#include "source/common/common/fmt.h"
39
#include "source/common/common/utility.h"
40
#include "source/common/config/utility.h"
41
#include "source/common/http/http1/codec_stats.h"
42
#include "source/common/http/http2/codec_stats.h"
43
#include "source/common/http/utility.h"
44
#include "source/common/network/address_impl.h"
45
#include "source/common/network/happy_eyeballs_connection_impl.h"
46
#include "source/common/network/resolver_impl.h"
47
#include "source/common/network/socket_option_factory.h"
48
#include "source/common/network/socket_option_impl.h"
49
#include "source/common/protobuf/protobuf.h"
50
#include "source/common/protobuf/utility.h"
51
#include "source/common/router/config_utility.h"
52
#include "source/common/runtime/runtime_features.h"
53
#include "source/common/runtime/runtime_impl.h"
54
#include "source/common/stats/deferred_creation.h"
55
#include "source/common/upstream/cluster_factory_impl.h"
56
#include "source/common/upstream/health_checker_impl.h"
57
#include "source/extensions/filters/network/http_connection_manager/config.h"
58
#include "source/server/transport_socket_config_impl.h"
59
60
#include "absl/container/node_hash_set.h"
61
#include "absl/strings/str_cat.h"
62
63
namespace Envoy {
64
namespace Upstream {
65
namespace {
66
3.29k
std::string addressToString(Network::Address::InstanceConstSharedPtr address) {
67
3.29k
  if (!address) {
68
0
    return "";
69
0
  }
70
3.29k
  return address->asString();
71
3.29k
}
72
73
Network::TcpKeepaliveConfig
74
0
parseTcpKeepaliveConfig(const envoy::config::cluster::v3::Cluster& config) {
75
0
  const envoy::config::core::v3::TcpKeepalive& options =
76
0
      config.upstream_connection_options().tcp_keepalive();
77
0
  return Network::TcpKeepaliveConfig{
78
0
      PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, keepalive_probes, absl::optional<uint32_t>()),
79
0
      PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, keepalive_time, absl::optional<uint32_t>()),
80
0
      PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, keepalive_interval, absl::optional<uint32_t>())};
81
0
}
82
83
ProtocolOptionsConfigConstSharedPtr
84
createProtocolOptionsConfig(const std::string& name, const ProtobufWkt::Any& typed_config,
85
2.24k
                            Server::Configuration::ProtocolOptionsFactoryContext& factory_context) {
86
2.24k
  Server::Configuration::ProtocolOptionsFactory* factory =
87
2.24k
      Registry::FactoryRegistry<Server::Configuration::NamedNetworkFilterConfigFactory>::getFactory(
88
2.24k
          name);
89
2.24k
  if (factory == nullptr) {
90
2.24k
    factory =
91
2.24k
        Registry::FactoryRegistry<Server::Configuration::NamedHttpFilterConfigFactory>::getFactory(
92
2.24k
            name);
93
2.24k
  }
94
2.24k
  if (factory == nullptr) {
95
2.24k
    factory =
96
2.24k
        Registry::FactoryRegistry<Server::Configuration::ProtocolOptionsFactory>::getFactory(name);
97
2.24k
  }
98
99
2.24k
  if (factory == nullptr) {
100
0
    throwEnvoyExceptionOrPanic(
101
0
        fmt::format("Didn't find a registered network or http filter or protocol "
102
0
                    "options implementation for name: '{}'",
103
0
                    name));
104
0
  }
105
106
2.24k
  ProtobufTypes::MessagePtr proto_config = factory->createEmptyProtocolOptionsProto();
107
108
2.24k
  if (proto_config == nullptr) {
109
0
    throwEnvoyExceptionOrPanic(fmt::format("filter {} does not support protocol options", name));
110
0
  }
111
112
2.24k
  Envoy::Config::Utility::translateOpaqueConfig(
113
2.24k
      typed_config, factory_context.messageValidationVisitor(), *proto_config);
114
2.24k
  return factory->createProtocolOptionsConfig(*proto_config, factory_context);
115
2.24k
}
116
117
absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr> parseExtensionProtocolOptions(
118
    const envoy::config::cluster::v3::Cluster& config,
119
3.26k
    Server::Configuration::ProtocolOptionsFactoryContext& factory_context) {
120
3.26k
  absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr> options;
121
122
3.26k
  for (const auto& it : config.typed_extension_protocol_options()) {
123
2.24k
    auto& name = it.first;
124
2.24k
    auto object = createProtocolOptionsConfig(name, it.second, factory_context);
125
2.24k
    if (object != nullptr) {
126
2.24k
      options[name] = std::move(object);
127
2.24k
    }
128
2.24k
  }
129
130
3.26k
  return options;
131
3.26k
}
132
133
// Updates the EDS health flags for an existing host to match the new host.
134
// @param updated_host the new host to read health flag values from.
135
// @param existing_host the host to update.
136
// @return bool whether the flag update caused the host health to change.
137
0
bool updateEdsHealthFlag(const Host& updated_host, Host& existing_host) {
138
0
  auto updated_eds_health_status = updated_host.edsHealthStatus();
139
140
  // Check if the health flag has changed.
141
0
  if (updated_eds_health_status == existing_host.edsHealthStatus()) {
142
0
    return false;
143
0
  }
144
145
0
  const auto previous_health = existing_host.coarseHealth();
146
0
  existing_host.setEdsHealthStatus(updated_eds_health_status);
147
148
  // Rebuild if changing the flag affected the host health.
149
0
  return previous_health != existing_host.coarseHealth();
150
0
}
151
152
// Converts a set of hosts into a HostVector, excluding certain hosts.
153
// @param hosts hosts to convert
154
// @param excluded_hosts hosts to exclude from the resulting vector.
155
HostVector filterHosts(const absl::node_hash_set<HostSharedPtr>& hosts,
156
28
                       const absl::node_hash_set<HostSharedPtr>& excluded_hosts) {
157
28
  HostVector net_hosts;
158
28
  net_hosts.reserve(hosts.size());
159
160
28
  for (const auto& host : hosts) {
161
14
    if (excluded_hosts.find(host) == excluded_hosts.end()) {
162
14
      net_hosts.emplace_back(host);
163
14
    }
164
14
  }
165
166
28
  return net_hosts;
167
28
}
168
169
Stats::ScopeSharedPtr generateStatsScope(const envoy::config::cluster::v3::Cluster& config,
170
3.26k
                                         Stats::Store& stats) {
171
3.26k
  return stats.createScope(fmt::format(
172
3.26k
      "cluster.{}.", config.alt_stat_name().empty() ? config.name() : config.alt_stat_name()));
173
3.26k
}
174
175
Network::ConnectionSocket::OptionsSharedPtr
176
buildBaseSocketOptions(const envoy::config::cluster::v3::Cluster& cluster_config,
177
3.26k
                       const envoy::config::core::v3::BindConfig& bootstrap_bind_config) {
178
3.26k
  Network::ConnectionSocket::OptionsSharedPtr base_options =
179
3.26k
      std::make_shared<Network::ConnectionSocket::Options>();
180
181
  // The process-wide `signal()` handling may fail to handle SIGPIPE if overridden
182
  // in the process (i.e., on a mobile client). Some OSes support handling it at the socket layer:
183
3.26k
  if (ENVOY_SOCKET_SO_NOSIGPIPE.hasValue()) {
184
0
    Network::Socket::appendOptions(base_options,
185
0
                                   Network::SocketOptionFactory::buildSocketNoSigpipeOptions());
186
0
  }
187
  // Cluster IP_FREEBIND settings, when set, will override the cluster manager wide settings.
188
3.26k
  if ((bootstrap_bind_config.freebind().value() &&
189
3.26k
       !cluster_config.upstream_bind_config().has_freebind()) ||
190
3.26k
      cluster_config.upstream_bind_config().freebind().value()) {
191
0
    Network::Socket::appendOptions(base_options,
192
0
                                   Network::SocketOptionFactory::buildIpFreebindOptions());
193
0
  }
194
3.26k
  if (cluster_config.upstream_connection_options().has_tcp_keepalive()) {
195
0
    Network::Socket::appendOptions(base_options,
196
0
                                   Network::SocketOptionFactory::buildTcpKeepaliveOptions(
197
0
                                       parseTcpKeepaliveConfig(cluster_config)));
198
0
  }
199
200
3.26k
  return base_options;
201
3.26k
}
202
203
Network::ConnectionSocket::OptionsSharedPtr
204
buildClusterSocketOptions(const envoy::config::cluster::v3::Cluster& cluster_config,
205
3.26k
                          const envoy::config::core::v3::BindConfig& bootstrap_bind_config) {
206
3.26k
  Network::ConnectionSocket::OptionsSharedPtr cluster_options =
207
3.26k
      std::make_shared<Network::ConnectionSocket::Options>();
208
  // Cluster socket_options trump cluster manager wide.
209
3.26k
  if (bootstrap_bind_config.socket_options().size() +
210
3.26k
          cluster_config.upstream_bind_config().socket_options().size() >
211
3.26k
      0) {
212
0
    auto socket_options = !cluster_config.upstream_bind_config().socket_options().empty()
213
0
                              ? cluster_config.upstream_bind_config().socket_options()
214
0
                              : bootstrap_bind_config.socket_options();
215
0
    Network::Socket::appendOptions(
216
0
        cluster_options, Network::SocketOptionFactory::buildLiteralOptions(socket_options));
217
0
  }
218
3.26k
  return cluster_options;
219
3.26k
}
220
221
std::vector<::Envoy::Upstream::UpstreamLocalAddress>
222
parseBindConfig(::Envoy::OptRef<const envoy::config::core::v3::BindConfig> bind_config,
223
                const absl::optional<std::string>& cluster_name,
224
                Network::ConnectionSocket::OptionsSharedPtr base_socket_options,
225
3.26k
                Network::ConnectionSocket::OptionsSharedPtr cluster_socket_options) {
226
227
3.26k
  std::vector<::Envoy::Upstream::UpstreamLocalAddress> upstream_local_addresses;
228
3.26k
  if (bind_config.has_value()) {
229
0
    UpstreamLocalAddress upstream_local_address;
230
0
    upstream_local_address.address_ =
231
0
        bind_config->has_source_address()
232
0
            ? ::Envoy::Network::Address::resolveProtoSocketAddress(bind_config->source_address())
233
0
            : nullptr;
234
0
    upstream_local_address.socket_options_ = std::make_shared<Network::ConnectionSocket::Options>();
235
236
0
    ::Envoy::Network::Socket::appendOptions(upstream_local_address.socket_options_,
237
0
                                            base_socket_options);
238
0
    ::Envoy::Network::Socket::appendOptions(upstream_local_address.socket_options_,
239
0
                                            cluster_socket_options);
240
241
0
    upstream_local_addresses.push_back(upstream_local_address);
242
243
0
    for (const auto& extra_source_address : bind_config->extra_source_addresses()) {
244
0
      UpstreamLocalAddress extra_upstream_local_address;
245
0
      extra_upstream_local_address.address_ =
246
0
          ::Envoy::Network::Address::resolveProtoSocketAddress(extra_source_address.address());
247
248
0
      extra_upstream_local_address.socket_options_ =
249
0
          std::make_shared<::Envoy::Network::ConnectionSocket::Options>();
250
0
      ::Envoy::Network::Socket::appendOptions(extra_upstream_local_address.socket_options_,
251
0
                                              base_socket_options);
252
253
0
      if (extra_source_address.has_socket_options()) {
254
0
        ::Envoy::Network::Socket::appendOptions(
255
0
            extra_upstream_local_address.socket_options_,
256
0
            ::Envoy::Network::SocketOptionFactory::buildLiteralOptions(
257
0
                extra_source_address.socket_options().socket_options()));
258
0
      } else {
259
0
        ::Envoy::Network::Socket::appendOptions(extra_upstream_local_address.socket_options_,
260
0
                                                cluster_socket_options);
261
0
      }
262
0
      upstream_local_addresses.push_back(extra_upstream_local_address);
263
0
    }
264
265
0
    for (const auto& additional_source_address : bind_config->additional_source_addresses()) {
266
0
      UpstreamLocalAddress additional_upstream_local_address;
267
0
      additional_upstream_local_address.address_ =
268
0
          ::Envoy::Network::Address::resolveProtoSocketAddress(additional_source_address);
269
0
      additional_upstream_local_address.socket_options_ =
270
0
          std::make_shared<::Envoy::Network::ConnectionSocket::Options>();
271
0
      ::Envoy::Network::Socket::appendOptions(additional_upstream_local_address.socket_options_,
272
0
                                              base_socket_options);
273
0
      ::Envoy::Network::Socket::appendOptions(additional_upstream_local_address.socket_options_,
274
0
                                              cluster_socket_options);
275
0
      upstream_local_addresses.push_back(additional_upstream_local_address);
276
0
    }
277
3.26k
  } else {
278
    // If there is no bind config specified, then return a nullptr for the address.
279
3.26k
    UpstreamLocalAddress local_address;
280
3.26k
    local_address.address_ = nullptr;
281
3.26k
    local_address.socket_options_ = std::make_shared<::Envoy::Network::ConnectionSocket::Options>();
282
3.26k
    ::Envoy::Network::Socket::appendOptions(local_address.socket_options_, base_socket_options);
283
3.26k
    Network::Socket::appendOptions(local_address.socket_options_, cluster_socket_options);
284
3.26k
    upstream_local_addresses.push_back(local_address);
285
3.26k
  }
286
287
  // Verify that we have valid addresses if size is greater than 1.
288
3.26k
  if (upstream_local_addresses.size() > 1) {
289
0
    for (auto const& upstream_local_address : upstream_local_addresses) {
290
0
      if (upstream_local_address.address_ == nullptr) {
291
0
        throwEnvoyExceptionOrPanic(fmt::format(
292
0
            "{}'s upstream binding config has invalid IP addresses.",
293
0
            !(cluster_name.has_value()) ? "Bootstrap"
294
0
                                        : fmt::format("Cluster {}", cluster_name.value())));
295
0
      }
296
0
    }
297
0
  }
298
299
3.26k
  return upstream_local_addresses;
300
3.26k
}
301
302
Envoy::Upstream::UpstreamLocalAddressSelectorConstSharedPtr createUpstreamLocalAddressSelector(
303
    const envoy::config::cluster::v3::Cluster& cluster_config,
304
3.26k
    const absl::optional<envoy::config::core::v3::BindConfig>& bootstrap_bind_config) {
305
306
  // Use the cluster bind config if specified. This completely overrides the
307
  // bootstrap bind config when present.
308
3.26k
  OptRef<const envoy::config::core::v3::BindConfig> bind_config;
309
3.26k
  absl::optional<std::string> cluster_name;
310
3.26k
  if (cluster_config.has_upstream_bind_config()) {
311
0
    bind_config.emplace(cluster_config.upstream_bind_config());
312
0
    cluster_name.emplace(cluster_config.name());
313
3.26k
  } else if (bootstrap_bind_config.has_value()) {
314
0
    bind_config.emplace(*bootstrap_bind_config);
315
0
  }
316
317
  // Verify that bind config is valid.
318
3.26k
  if (bind_config.has_value()) {
319
0
    if (bind_config->additional_source_addresses_size() > 0 &&
320
0
        bind_config->extra_source_addresses_size() > 0) {
321
0
      throwEnvoyExceptionOrPanic(fmt::format(
322
0
          "Can't specify both `extra_source_addresses` and `additional_source_addresses` "
323
0
          "in the {}'s upstream binding config",
324
0
          !(cluster_name.has_value()) ? "Bootstrap"
325
0
                                      : fmt::format("Cluster {}", cluster_name.value())));
326
0
    }
327
328
0
    if (!bind_config->has_source_address() &&
329
0
        (bind_config->extra_source_addresses_size() > 0 ||
330
0
         bind_config->additional_source_addresses_size() > 0)) {
331
0
      throwEnvoyExceptionOrPanic(fmt::format(
332
0
          "{}'s upstream binding config has extra/additional source addresses but no "
333
0
          "source_address. Extra/additional addresses cannot be specified if "
334
0
          "source_address is not set.",
335
0
          !(cluster_name.has_value()) ? "Bootstrap"
336
0
                                      : fmt::format("Cluster {}", cluster_name.value())));
337
0
    }
338
0
  }
339
3.26k
  UpstreamLocalAddressSelectorFactory* local_address_selector_factory;
340
3.26k
  const envoy::config::core::v3::TypedExtensionConfig* const local_address_selector_config =
341
3.26k
      bind_config.has_value() && bind_config->has_local_address_selector()
342
3.26k
          ? &bind_config->local_address_selector()
343
3.26k
          : nullptr;
344
3.26k
  if (local_address_selector_config) {
345
0
    local_address_selector_factory =
346
0
        Config::Utility::getAndCheckFactory<UpstreamLocalAddressSelectorFactory>(
347
0
            *local_address_selector_config, false);
348
3.26k
  } else {
349
    // Create the default local address selector if one was not specified.
350
3.26k
    envoy::config::upstream::local_address_selector::v3::DefaultLocalAddressSelector default_config;
351
3.26k
    envoy::config::core::v3::TypedExtensionConfig typed_extension;
352
3.26k
    typed_extension.mutable_typed_config()->PackFrom(default_config);
353
3.26k
    local_address_selector_factory =
354
3.26k
        Config::Utility::getAndCheckFactory<UpstreamLocalAddressSelectorFactory>(typed_extension,
355
3.26k
                                                                                 false);
356
3.26k
  }
357
3.26k
  return local_address_selector_factory->createLocalAddressSelector(
358
3.26k
      parseBindConfig(
359
3.26k
          bind_config, cluster_name,
360
3.26k
          buildBaseSocketOptions(cluster_config, bootstrap_bind_config.value_or(
361
3.26k
                                                     envoy::config::core::v3::BindConfig{})),
362
3.26k
          buildClusterSocketOptions(cluster_config, bootstrap_bind_config.value_or(
363
3.26k
                                                        envoy::config::core::v3::BindConfig{}))),
364
3.26k
      cluster_name);
365
3.26k
}
366
367
} // namespace
368
369
// Allow disabling ALPN checks for transport sockets. See
370
// https://github.com/envoyproxy/envoy/issues/22876
371
const absl::string_view ClusterImplBase::DoNotValidateAlpnRuntimeKey =
372
    "config.do_not_validate_alpn_support";
373
374
// TODO(pianiststickman): this implementation takes a lock on the hot path and puts a copy of the
375
// stat name into every host that receives a copy of that metric. This can be improved by putting
376
// a single copy of the stat name into a thread-local key->index map so that the lock can be avoided
377
// and using the index as the key to the stat map instead.
378
0
void LoadMetricStatsImpl::add(const absl::string_view key, double value) {
379
0
  absl::MutexLock lock(&mu_);
380
0
  if (map_ == nullptr) {
381
0
    map_ = std::make_unique<StatMap>();
382
0
  }
383
0
  Stat& stat = (*map_)[key];
384
0
  ++stat.num_requests_with_metric;
385
0
  stat.total_metric_value += value;
386
0
}
387
388
0
LoadMetricStats::StatMapPtr LoadMetricStatsImpl::latch() {
389
0
  absl::MutexLock lock(&mu_);
390
0
  StatMapPtr latched = std::move(map_);
391
0
  map_ = nullptr;
392
0
  return latched;
393
0
}
394
395
HostDescriptionImpl::HostDescriptionImpl(
396
    ClusterInfoConstSharedPtr cluster, const std::string& hostname,
397
    Network::Address::InstanceConstSharedPtr dest_address, MetadataConstSharedPtr metadata,
398
    const envoy::config::core::v3::Locality& locality,
399
    const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
400
    uint32_t priority, TimeSource& time_source)
401
    : cluster_(cluster), hostname_(hostname),
402
      health_checks_hostname_(health_check_config.hostname()), address_(dest_address),
403
      canary_(Config::Metadata::metadataValue(metadata.get(),
404
                                              Config::MetadataFilters::get().ENVOY_LB,
405
                                              Config::MetadataEnvoyLbKeys::get().CANARY)
406
                  .bool_value()),
407
      metadata_(metadata), locality_(locality),
408
      locality_zone_stat_name_(locality.zone(), cluster->statsScope().symbolTable()),
409
      priority_(priority),
410
      socket_factory_(resolveTransportSocketFactory(dest_address, metadata_.get())),
411
211k
      creation_time_(time_source.monotonicTime()) {
412
211k
  if (health_check_config.port_value() != 0 && dest_address->type() != Network::Address::Type::Ip) {
413
    // Setting the health check port to non-0 only works for IP-type addresses. Setting the port
414
    // for a pipe address is a misconfiguration. Throw an exception.
415
0
    throwEnvoyExceptionOrPanic(
416
0
        fmt::format("Invalid host configuration: non-zero port for non-IP address"));
417
0
  }
418
211k
  health_check_address_ = resolveHealthCheckAddress(health_check_config, dest_address);
419
211k
}
Envoy::Upstream::HostDescriptionImpl::HostDescriptionImpl(std::__1::shared_ptr<Envoy::Upstream::ClusterInfo const>, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<Envoy::Network::Address::Instance const>, std::__1::shared_ptr<envoy::config::core::v3::Metadata const>, envoy::config::core::v3::Locality const&, envoy::config::endpoint::v3::Endpoint_HealthCheckConfig const&, unsigned int, Envoy::TimeSource&)
Line
Count
Source
411
207k
      creation_time_(time_source.monotonicTime()) {
412
207k
  if (health_check_config.port_value() != 0 && dest_address->type() != Network::Address::Type::Ip) {
413
    // Setting the health check port to non-0 only works for IP-type addresses. Setting the port
414
    // for a pipe address is a misconfiguration. Throw an exception.
415
0
    throwEnvoyExceptionOrPanic(
416
0
        fmt::format("Invalid host configuration: non-zero port for non-IP address"));
417
0
  }
418
207k
  health_check_address_ = resolveHealthCheckAddress(health_check_config, dest_address);
419
207k
}
Envoy::Upstream::HostDescriptionImpl::HostDescriptionImpl(std::__1::shared_ptr<Envoy::Upstream::ClusterInfo const>, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<Envoy::Network::Address::Instance const>, std::__1::shared_ptr<envoy::config::core::v3::Metadata const>, envoy::config::core::v3::Locality const&, envoy::config::endpoint::v3::Endpoint_HealthCheckConfig const&, unsigned int, Envoy::TimeSource&)
Line
Count
Source
411
3.75k
      creation_time_(time_source.monotonicTime()) {
412
3.75k
  if (health_check_config.port_value() != 0 && dest_address->type() != Network::Address::Type::Ip) {
413
    // Setting the health check port to non-0 only works for IP-type addresses. Setting the port
414
    // for a pipe address is a misconfiguration. Throw an exception.
415
0
    throwEnvoyExceptionOrPanic(
416
0
        fmt::format("Invalid host configuration: non-zero port for non-IP address"));
417
0
  }
418
3.75k
  health_check_address_ = resolveHealthCheckAddress(health_check_config, dest_address);
419
3.75k
}
420
421
Network::UpstreamTransportSocketFactory& HostDescriptionImpl::resolveTransportSocketFactory(
422
    const Network::Address::InstanceConstSharedPtr& dest_address,
423
212k
    const envoy::config::core::v3::Metadata* metadata) const {
424
212k
  auto match = cluster_->transportSocketMatcher().resolve(metadata);
425
212k
  match.stats_.total_match_count_.inc();
426
212k
  ENVOY_LOG(debug, "transport socket match, socket {} selected for host with address {}",
427
212k
            match.name_, dest_address ? dest_address->asString() : "empty");
428
429
212k
  return match.factory_;
430
212k
}
431
432
Host::CreateConnectionData HostImpl::createConnection(
433
    Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
434
1.32k
    Network::TransportSocketOptionsConstSharedPtr transport_socket_options) const {
435
1.32k
  return createConnection(dispatcher, cluster(), address(), addressList(), transportSocketFactory(),
436
1.32k
                          options, transport_socket_options, shared_from_this());
437
1.32k
}
438
439
207k
void HostImpl::setEdsHealthFlag(envoy::config::core::v3::HealthStatus health_status) {
440
  // Clear all old EDS health flags first.
441
207k
  HostImpl::healthFlagClear(Host::HealthFlag::FAILED_EDS_HEALTH);
442
207k
  HostImpl::healthFlagClear(Host::HealthFlag::DEGRADED_EDS_HEALTH);
443
444
  // Set the appropriate EDS health flag.
445
207k
  switch (health_status) {
446
0
  case envoy::config::core::v3::UNHEALTHY:
447
0
    FALLTHRU;
448
0
  case envoy::config::core::v3::DRAINING:
449
0
    FALLTHRU;
450
0
  case envoy::config::core::v3::TIMEOUT:
451
0
    HostImpl::healthFlagSet(Host::HealthFlag::FAILED_EDS_HEALTH);
452
0
    break;
453
0
  case envoy::config::core::v3::DEGRADED:
454
0
    HostImpl::healthFlagSet(Host::HealthFlag::DEGRADED_EDS_HEALTH);
455
0
    break;
456
207k
  default:
457
207k
    break;
458
    // No health flags should be set.
459
207k
  }
460
207k
}
461
462
Host::CreateConnectionData HostImpl::createHealthCheckConnection(
463
    Event::Dispatcher& dispatcher,
464
    Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
465
22.3k
    const envoy::config::core::v3::Metadata* metadata) const {
466
467
22.3k
  Network::UpstreamTransportSocketFactory& factory =
468
22.3k
      (metadata != nullptr) ? resolveTransportSocketFactory(healthCheckAddress(), metadata)
469
22.3k
                            : transportSocketFactory();
470
22.3k
  return createConnection(dispatcher, cluster(), healthCheckAddress(), {}, factory, nullptr,
471
22.3k
                          transport_socket_options, shared_from_this());
472
22.3k
}
473
474
Host::CreateConnectionData HostImpl::createConnection(
475
    Event::Dispatcher& dispatcher, const ClusterInfo& cluster,
476
    const Network::Address::InstanceConstSharedPtr& address,
477
    const std::vector<Network::Address::InstanceConstSharedPtr>& address_list,
478
    Network::UpstreamTransportSocketFactory& socket_factory,
479
    const Network::ConnectionSocket::OptionsSharedPtr& options,
480
    Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
481
23.7k
    HostDescriptionConstSharedPtr host) {
482
23.7k
  auto source_address_selector = cluster.getUpstreamLocalAddressSelector();
483
484
23.7k
  Network::ClientConnectionPtr connection;
485
  // If the transport socket options indicate the connection should be
486
  // redirected to a proxy, create the TCP connection to the proxy's address not
487
  // the host's address.
488
23.7k
  if (transport_socket_options && transport_socket_options->http11ProxyInfo().has_value()) {
489
0
    auto upstream_local_address =
490
0
        source_address_selector->getUpstreamLocalAddress(address, options);
491
0
    ENVOY_LOG(debug, "Connecting to configured HTTP/1.1 proxy at {}",
492
0
              transport_socket_options->http11ProxyInfo()->proxy_address->asString());
493
0
    connection = dispatcher.createClientConnection(
494
0
        transport_socket_options->http11ProxyInfo()->proxy_address, upstream_local_address.address_,
495
0
        socket_factory.createTransportSocket(transport_socket_options, host),
496
0
        upstream_local_address.socket_options_, transport_socket_options);
497
23.7k
  } else if (address_list.size() > 1) {
498
0
    connection = std::make_unique<Network::HappyEyeballsConnectionImpl>(
499
0
        dispatcher, address_list, source_address_selector, socket_factory, transport_socket_options,
500
0
        host, options);
501
23.7k
  } else {
502
23.7k
    auto upstream_local_address =
503
23.7k
        source_address_selector->getUpstreamLocalAddress(address, options);
504
23.7k
    connection = dispatcher.createClientConnection(
505
23.7k
        address, upstream_local_address.address_,
506
23.7k
        socket_factory.createTransportSocket(transport_socket_options, host),
507
23.7k
        upstream_local_address.socket_options_, transport_socket_options);
508
23.7k
  }
509
510
23.7k
  connection->connectionInfoSetter().enableSettingInterfaceName(
511
23.7k
      cluster.setLocalInterfaceNameOnUpstreamConnections());
512
23.7k
  connection->setBufferLimits(cluster.perConnectionBufferLimitBytes());
513
23.7k
  cluster.createNetworkFilterChain(*connection);
514
23.7k
  return {std::move(connection), std::move(host)};
515
23.7k
}
516
517
8.68M
void HostImpl::weight(uint32_t new_weight) { weight_ = std::max(1U, new_weight); }
518
519
std::vector<HostsPerLocalityConstSharedPtr> HostsPerLocalityImpl::filter(
520
3.26k
    const std::vector<std::function<bool(const Host&)>>& predicates) const {
521
  // We keep two lists: one for being able to mutate the clone and one for returning to the
522
  // caller. Creating them both at the start avoids iterating over the mutable values at the end
523
  // to convert them to a const pointer.
524
3.26k
  std::vector<std::shared_ptr<HostsPerLocalityImpl>> mutable_clones;
525
3.26k
  std::vector<HostsPerLocalityConstSharedPtr> filtered_clones;
526
527
3.26k
  mutable_clones.reserve(predicates.size());
528
3.26k
  filtered_clones.reserve(predicates.size());
529
13.0k
  for (size_t i = 0; i < predicates.size(); ++i) {
530
9.80k
    mutable_clones.emplace_back(std::make_shared<HostsPerLocalityImpl>());
531
9.80k
    filtered_clones.emplace_back(mutable_clones.back());
532
9.80k
    mutable_clones.back()->local_ = local_;
533
9.80k
  }
534
535
3.26k
  for (const auto& hosts_locality : hosts_per_locality_) {
536
3.26k
    std::vector<HostVector> current_locality_hosts;
537
3.26k
    current_locality_hosts.resize(predicates.size());
538
539
    // Since # of hosts >> # of predicates, we iterate over the hosts in the outer loop.
540
3.26k
    for (const auto& host : hosts_locality) {
541
13.0k
      for (size_t i = 0; i < predicates.size(); ++i) {
542
9.80k
        if (predicates[i](*host)) {
543
3.26k
          current_locality_hosts[i].emplace_back(host);
544
3.26k
        }
545
9.80k
      }
546
3.26k
    }
547
548
13.0k
    for (size_t i = 0; i < predicates.size(); ++i) {
549
9.80k
      mutable_clones[i]->hosts_per_locality_.push_back(std::move(current_locality_hosts[i]));
550
9.80k
    }
551
3.26k
  }
552
553
3.26k
  return filtered_clones;
554
3.26k
}
555
556
void HostSetImpl::updateHosts(PrioritySet::UpdateHostsParams&& update_hosts_params,
557
                              LocalityWeightsConstSharedPtr locality_weights,
558
                              const HostVector& hosts_added, const HostVector& hosts_removed,
559
                              absl::optional<bool> weighted_priority_health,
560
10.9k
                              absl::optional<uint32_t> overprovisioning_factor) {
561
10.9k
  if (weighted_priority_health.has_value()) {
562
9.80k
    weighted_priority_health_ = weighted_priority_health.value();
563
9.80k
  }
564
10.9k
  if (overprovisioning_factor.has_value()) {
565
9.80k
    ASSERT(overprovisioning_factor.value() > 0);
566
9.80k
    overprovisioning_factor_ = overprovisioning_factor.value();
567
9.80k
  }
568
10.9k
  hosts_ = std::move(update_hosts_params.hosts);
569
10.9k
  healthy_hosts_ = std::move(update_hosts_params.healthy_hosts);
570
10.9k
  degraded_hosts_ = std::move(update_hosts_params.degraded_hosts);
571
10.9k
  excluded_hosts_ = std::move(update_hosts_params.excluded_hosts);
572
10.9k
  hosts_per_locality_ = std::move(update_hosts_params.hosts_per_locality);
573
10.9k
  healthy_hosts_per_locality_ = std::move(update_hosts_params.healthy_hosts_per_locality);
574
10.9k
  degraded_hosts_per_locality_ = std::move(update_hosts_params.degraded_hosts_per_locality);
575
10.9k
  excluded_hosts_per_locality_ = std::move(update_hosts_params.excluded_hosts_per_locality);
576
10.9k
  locality_weights_ = std::move(locality_weights);
577
578
  // TODO(ggreenway): implement `weighted_priority_health` support in `rebuildLocalityScheduler`.
579
10.9k
  rebuildLocalityScheduler(healthy_locality_scheduler_, healthy_locality_entries_,
580
10.9k
                           *healthy_hosts_per_locality_, healthy_hosts_->get(), hosts_per_locality_,
581
10.9k
                           excluded_hosts_per_locality_, locality_weights_,
582
10.9k
                           overprovisioning_factor_);
583
10.9k
  rebuildLocalityScheduler(degraded_locality_scheduler_, degraded_locality_entries_,
584
10.9k
                           *degraded_hosts_per_locality_, degraded_hosts_->get(),
585
10.9k
                           hosts_per_locality_, excluded_hosts_per_locality_, locality_weights_,
586
10.9k
                           overprovisioning_factor_);
587
588
10.9k
  runUpdateCallbacks(hosts_added, hosts_removed);
589
10.9k
}
590
591
void HostSetImpl::rebuildLocalityScheduler(
592
    std::unique_ptr<EdfScheduler<LocalityEntry>>& locality_scheduler,
593
    std::vector<std::shared_ptr<LocalityEntry>>& locality_entries,
594
    const HostsPerLocality& eligible_hosts_per_locality, const HostVector& eligible_hosts,
595
    HostsPerLocalityConstSharedPtr all_hosts_per_locality,
596
    HostsPerLocalityConstSharedPtr excluded_hosts_per_locality,
597
21.8k
    LocalityWeightsConstSharedPtr locality_weights, uint32_t overprovisioning_factor) {
598
  // Rebuild the locality scheduler by computing the effective weight of each
599
  // locality in this priority. The scheduler is reset by default, and is rebuilt only if we have
600
  // locality weights (i.e. using EDS) and there is at least one eligible host in this priority.
601
  //
602
  // We omit building a scheduler when there are zero eligible hosts in the priority as
603
  // all the localities will have zero effective weight. At selection time, we'll either select
604
  // from a different scheduler or there will be no available hosts in the priority. At that point
605
  // we'll rely on other mechanisms such as panic mode to select a host, none of which rely on the
606
  // scheduler.
607
  //
608
  // TODO(htuch): if the underlying locality index ->
609
  // envoy::config::core::v3::Locality hasn't changed in hosts_/healthy_hosts_/degraded_hosts_, we
610
  // could just update locality_weight_ without rebuilding. Similar to how host
611
  // level WRR works, we would age out the existing entries via picks and lazily
612
  // apply the new weights.
613
21.8k
  locality_scheduler = nullptr;
614
21.8k
  if (all_hosts_per_locality != nullptr && locality_weights != nullptr &&
615
21.8k
      !locality_weights->empty() && !eligible_hosts.empty()) {
616
9.80k
    locality_scheduler = std::make_unique<EdfScheduler<LocalityEntry>>();
617
9.80k
    locality_entries.clear();
618
19.6k
    for (uint32_t i = 0; i < all_hosts_per_locality->get().size(); ++i) {
619
9.80k
      const double effective_weight = effectiveLocalityWeight(
620
9.80k
          i, eligible_hosts_per_locality, *excluded_hosts_per_locality, *all_hosts_per_locality,
621
9.80k
          *locality_weights, overprovisioning_factor);
622
9.80k
      if (effective_weight > 0) {
623
0
        locality_entries.emplace_back(std::make_shared<LocalityEntry>(i, effective_weight));
624
0
        locality_scheduler->add(effective_weight, locality_entries.back());
625
0
      }
626
9.80k
    }
627
    // If all effective weights were zero, reset the scheduler.
628
9.80k
    if (locality_scheduler->empty()) {
629
9.80k
      locality_scheduler = nullptr;
630
9.80k
    }
631
9.80k
  }
632
21.8k
}
633
634
0
absl::optional<uint32_t> HostSetImpl::chooseHealthyLocality() {
635
0
  return chooseLocality(healthy_locality_scheduler_.get());
636
0
}
637
638
0
absl::optional<uint32_t> HostSetImpl::chooseDegradedLocality() {
639
0
  return chooseLocality(degraded_locality_scheduler_.get());
640
0
}
641
642
absl::optional<uint32_t>
643
0
HostSetImpl::chooseLocality(EdfScheduler<LocalityEntry>* locality_scheduler) {
644
0
  if (locality_scheduler == nullptr) {
645
0
    return {};
646
0
  }
647
0
  const std::shared_ptr<LocalityEntry> locality = locality_scheduler->pickAndAdd(
648
0
      [](const LocalityEntry& locality) { return locality.effective_weight_; });
649
  // We don't build a schedule if there are no weighted localities, so we should always succeed.
650
0
  ASSERT(locality != nullptr);
651
  // If we picked it before, its weight must have been positive.
652
0
  ASSERT(locality->effective_weight_ > 0);
653
0
  return locality->index_;
654
0
}
655
656
PrioritySet::UpdateHostsParams
657
HostSetImpl::updateHostsParams(HostVectorConstSharedPtr hosts,
658
                               HostsPerLocalityConstSharedPtr hosts_per_locality,
659
                               HealthyHostVectorConstSharedPtr healthy_hosts,
660
                               HostsPerLocalityConstSharedPtr healthy_hosts_per_locality,
661
                               DegradedHostVectorConstSharedPtr degraded_hosts,
662
                               HostsPerLocalityConstSharedPtr degraded_hosts_per_locality,
663
                               ExcludedHostVectorConstSharedPtr excluded_hosts,
664
7.64k
                               HostsPerLocalityConstSharedPtr excluded_hosts_per_locality) {
665
7.64k
  return PrioritySet::UpdateHostsParams{std::move(hosts),
666
7.64k
                                        std::move(healthy_hosts),
667
7.64k
                                        std::move(degraded_hosts),
668
7.64k
                                        std::move(excluded_hosts),
669
7.64k
                                        std::move(hosts_per_locality),
670
7.64k
                                        std::move(healthy_hosts_per_locality),
671
7.64k
                                        std::move(degraded_hosts_per_locality),
672
7.64k
                                        std::move(excluded_hosts_per_locality)};
673
7.64k
}
674
675
4.37k
PrioritySet::UpdateHostsParams HostSetImpl::updateHostsParams(const HostSet& host_set) {
676
4.37k
  return updateHostsParams(host_set.hostsPtr(), host_set.hostsPerLocalityPtr(),
677
4.37k
                           host_set.healthyHostsPtr(), host_set.healthyHostsPerLocalityPtr(),
678
4.37k
                           host_set.degradedHostsPtr(), host_set.degradedHostsPerLocalityPtr(),
679
4.37k
                           host_set.excludedHostsPtr(), host_set.excludedHostsPerLocalityPtr());
680
4.37k
}
681
PrioritySet::UpdateHostsParams
682
HostSetImpl::partitionHosts(HostVectorConstSharedPtr hosts,
683
3.26k
                            HostsPerLocalityConstSharedPtr hosts_per_locality) {
684
3.26k
  auto partitioned_hosts = ClusterImplBase::partitionHostList(*hosts);
685
3.26k
  auto healthy_degraded_excluded_hosts_per_locality =
686
3.26k
      ClusterImplBase::partitionHostsPerLocality(*hosts_per_locality);
687
688
3.26k
  return updateHostsParams(std::move(hosts), std::move(hosts_per_locality),
689
3.26k
                           std::move(std::get<0>(partitioned_hosts)),
690
3.26k
                           std::move(std::get<0>(healthy_degraded_excluded_hosts_per_locality)),
691
3.26k
                           std::move(std::get<1>(partitioned_hosts)),
692
3.26k
                           std::move(std::get<1>(healthy_degraded_excluded_hosts_per_locality)),
693
3.26k
                           std::move(std::get<2>(partitioned_hosts)),
694
3.26k
                           std::move(std::get<2>(healthy_degraded_excluded_hosts_per_locality)));
695
3.26k
}
696
697
double HostSetImpl::effectiveLocalityWeight(uint32_t index,
698
                                            const HostsPerLocality& eligible_hosts_per_locality,
699
                                            const HostsPerLocality& excluded_hosts_per_locality,
700
                                            const HostsPerLocality& all_hosts_per_locality,
701
                                            const LocalityWeights& locality_weights,
702
9.80k
                                            uint32_t overprovisioning_factor) {
703
9.80k
  const auto& locality_eligible_hosts = eligible_hosts_per_locality.get()[index];
704
9.80k
  const uint32_t excluded_count = excluded_hosts_per_locality.get().size() > index
705
9.80k
                                      ? excluded_hosts_per_locality.get()[index].size()
706
9.80k
                                      : 0;
707
9.80k
  const auto host_count = all_hosts_per_locality.get()[index].size() - excluded_count;
708
9.80k
  if (host_count == 0) {
709
0
    return 0.0;
710
0
  }
711
9.80k
  const double locality_availability_ratio = 1.0 * locality_eligible_hosts.size() / host_count;
712
9.80k
  const uint32_t weight = locality_weights[index];
713
  // Availability ranges from 0-1.0, and is the ratio of eligible hosts to total hosts, modified
714
  // by the overprovisioning factor.
715
9.80k
  const double effective_locality_availability_ratio =
716
9.80k
      std::min(1.0, (overprovisioning_factor / 100.0) * locality_availability_ratio);
717
9.80k
  return weight * effective_locality_availability_ratio;
718
9.80k
}
719
720
const HostSet&
721
PrioritySetImpl::getOrCreateHostSet(uint32_t priority,
722
                                    absl::optional<bool> weighted_priority_health,
723
20.8k
                                    absl::optional<uint32_t> overprovisioning_factor) {
724
20.8k
  if (host_sets_.size() < priority + 1) {
725
19.7k
    for (size_t i = host_sets_.size(); i <= priority; ++i) {
726
9.89k
      HostSetImplPtr host_set = createHostSet(i, weighted_priority_health, overprovisioning_factor);
727
9.89k
      host_sets_priority_update_cbs_.push_back(
728
9.89k
          host_set->addPriorityUpdateCb([this](uint32_t priority, const HostVector& hosts_added,
729
10.9k
                                               const HostVector& hosts_removed) {
730
10.9k
            runReferenceUpdateCallbacks(priority, hosts_added, hosts_removed);
731
10.9k
          }));
732
9.89k
      host_sets_.push_back(std::move(host_set));
733
9.89k
    }
734
9.89k
  }
735
20.8k
  return *host_sets_[priority];
736
20.8k
}
737
738
void PrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params,
739
                                  LocalityWeightsConstSharedPtr locality_weights,
740
                                  const HostVector& hosts_added, const HostVector& hosts_removed,
741
                                  absl::optional<bool> weighted_priority_health,
742
                                  absl::optional<uint32_t> overprovisioning_factor,
743
10.9k
                                  HostMapConstSharedPtr cross_priority_host_map) {
744
  // Update cross priority host map first. In this way, when the update callbacks of the priority
745
  // set are executed, the latest host map can always be obtained.
746
10.9k
  if (cross_priority_host_map != nullptr) {
747
6.53k
    const_cross_priority_host_map_ = std::move(cross_priority_host_map);
748
6.53k
  }
749
750
  // Ensure that we have a HostSet for the given priority.
751
10.9k
  getOrCreateHostSet(priority, weighted_priority_health, overprovisioning_factor);
752
10.9k
  static_cast<HostSetImpl*>(host_sets_[priority].get())
753
10.9k
      ->updateHosts(std::move(update_hosts_params), std::move(locality_weights), hosts_added,
754
10.9k
                    hosts_removed, weighted_priority_health, overprovisioning_factor);
755
756
10.9k
  if (!batch_update_) {
757
10.8k
    runUpdateCallbacks(hosts_added, hosts_removed);
758
10.8k
  }
759
10.9k
}
760
761
14
void PrioritySetImpl::batchHostUpdate(BatchUpdateCb& callback) {
762
14
  BatchUpdateScope scope(*this);
763
764
  // We wrap the update call with a lambda that tracks all the hosts that have been added/removed.
765
14
  callback.batchUpdate(scope);
766
767
  // Now that all the updates have been complete, we can compute the diff.
768
14
  HostVector net_hosts_added = filterHosts(scope.all_hosts_added_, scope.all_hosts_removed_);
769
14
  HostVector net_hosts_removed = filterHosts(scope.all_hosts_removed_, scope.all_hosts_added_);
770
771
14
  runUpdateCallbacks(net_hosts_added, net_hosts_removed);
772
14
}
773
774
void PrioritySetImpl::BatchUpdateScope::updateHosts(
775
    uint32_t priority, PrioritySet::UpdateHostsParams&& update_hosts_params,
776
    LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
777
    const HostVector& hosts_removed, absl::optional<bool> weighted_priority_health,
778
14
    absl::optional<uint32_t> overprovisioning_factor) {
779
  // We assume that each call updates a different priority.
780
14
  ASSERT(priorities_.find(priority) == priorities_.end());
781
14
  priorities_.insert(priority);
782
783
14
  for (const auto& host : hosts_added) {
784
14
    all_hosts_added_.insert(host);
785
14
  }
786
787
14
  for (const auto& host : hosts_removed) {
788
0
    all_hosts_removed_.insert(host);
789
0
  }
790
791
14
  parent_.updateHosts(priority, std::move(update_hosts_params), locality_weights, hosts_added,
792
14
                      hosts_removed, weighted_priority_health, overprovisioning_factor);
793
14
}
794
795
void MainPrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params,
796
                                      LocalityWeightsConstSharedPtr locality_weights,
797
                                      const HostVector& hosts_added,
798
                                      const HostVector& hosts_removed,
799
                                      absl::optional<bool> weighted_priority_health,
800
                                      absl::optional<uint32_t> overprovisioning_factor,
801
3.26k
                                      HostMapConstSharedPtr cross_priority_host_map) {
802
3.26k
  ASSERT(cross_priority_host_map == nullptr,
803
3.26k
         "External cross-priority host map is meaningless to MainPrioritySetImpl");
804
3.26k
  updateCrossPriorityHostMap(hosts_added, hosts_removed);
805
806
3.26k
  PrioritySetImpl::updateHosts(priority, std::move(update_hosts_params), locality_weights,
807
3.26k
                               hosts_added, hosts_removed, weighted_priority_health,
808
3.26k
                               overprovisioning_factor);
809
3.26k
}
810
811
3.28k
HostMapConstSharedPtr MainPrioritySetImpl::crossPriorityHostMap() const {
812
  // Check if the host set in the main thread PrioritySet has been updated.
813
3.28k
  if (mutable_cross_priority_host_map_ != nullptr) {
814
3.26k
    const_cross_priority_host_map_ = std::move(mutable_cross_priority_host_map_);
815
3.26k
    ASSERT(mutable_cross_priority_host_map_ == nullptr);
816
3.26k
  }
817
3.28k
  return const_cross_priority_host_map_;
818
3.28k
}
819
820
void MainPrioritySetImpl::updateCrossPriorityHostMap(const HostVector& hosts_added,
821
3.26k
                                                     const HostVector& hosts_removed) {
822
3.26k
  if (hosts_added.empty() && hosts_removed.empty()) {
823
    // No new hosts have been added and no old hosts have been removed.
824
0
    return;
825
0
  }
826
827
  // Since read_only_all_host_map_ may be shared by multiple threads, when the host set changes,
828
  // we cannot directly modify read_only_all_host_map_.
829
3.26k
  if (mutable_cross_priority_host_map_ == nullptr) {
830
    // Copy old read only host map to mutable host map.
831
3.26k
    mutable_cross_priority_host_map_ = std::make_shared<HostMap>(*const_cross_priority_host_map_);
832
3.26k
  }
833
834
3.26k
  for (const auto& host : hosts_removed) {
835
0
    mutable_cross_priority_host_map_->erase(addressToString(host->address()));
836
0
  }
837
838
3.26k
  for (const auto& host : hosts_added) {
839
3.26k
    mutable_cross_priority_host_map_->insert({addressToString(host->address()), host});
840
3.26k
  }
841
3.26k
}
842
843
DeferredCreationCompatibleClusterTrafficStats
844
ClusterInfoImpl::generateStats(Stats::ScopeSharedPtr scope,
845
407k
                               const ClusterTrafficStatNames& stat_names, bool defer_creation) {
846
407k
  return Stats::createDeferredCompatibleStats<ClusterTrafficStats>(scope, stat_names,
847
407k
                                                                   defer_creation);
848
407k
}
849
850
ClusterRequestResponseSizeStats ClusterInfoImpl::generateRequestResponseSizeStats(
851
403k
    Stats::Scope& scope, const ClusterRequestResponseSizeStatNames& stat_names) {
852
403k
  return {stat_names, scope};
853
403k
}
854
855
ClusterLoadReportStats
856
ClusterInfoImpl::generateLoadReportStats(Stats::Scope& scope,
857
407k
                                         const ClusterLoadReportStatNames& stat_names) {
858
407k
  return {stat_names, scope};
859
407k
}
860
861
ClusterTimeoutBudgetStats
862
ClusterInfoImpl::generateTimeoutBudgetStats(Stats::Scope& scope,
863
403k
                                            const ClusterTimeoutBudgetStatNames& stat_names) {
864
403k
  return {stat_names, scope};
865
403k
}
866
867
std::shared_ptr<const ClusterInfoImpl::HttpProtocolOptionsConfigImpl>
868
createOptions(const envoy::config::cluster::v3::Cluster& config,
869
              std::shared_ptr<const ClusterInfoImpl::HttpProtocolOptionsConfigImpl>&& options,
870
3.26k
              ProtobufMessage::ValidationVisitor& validation_visitor) {
871
3.26k
  if (options) {
872
2.24k
    return std::move(options);
873
2.24k
  }
874
875
1.02k
  if (config.protocol_selection() == envoy::config::cluster::v3::Cluster::USE_CONFIGURED_PROTOCOL) {
876
    // Make sure multiple protocol configurations are not present
877
1.02k
    if (config.has_http_protocol_options() && config.has_http2_protocol_options()) {
878
0
      throwEnvoyExceptionOrPanic(
879
0
          fmt::format("cluster: Both HTTP1 and HTTP2 options may only be "
880
0
                      "configured with non-default 'protocol_selection' values"));
881
0
    }
882
1.02k
  }
883
884
1.02k
  return std::make_shared<ClusterInfoImpl::HttpProtocolOptionsConfigImpl>(
885
1.02k
      config.http_protocol_options(), config.http2_protocol_options(),
886
1.02k
      config.common_http_protocol_options(),
887
1.02k
      (config.has_upstream_http_protocol_options()
888
1.02k
           ? absl::make_optional<envoy::config::core::v3::UpstreamHttpProtocolOptions>(
889
0
                 config.upstream_http_protocol_options())
890
1.02k
           : absl::nullopt),
891
1.02k
      config.protocol_selection() == envoy::config::cluster::v3::Cluster::USE_DOWNSTREAM_PROTOCOL,
892
1.02k
      config.has_http2_protocol_options(), validation_visitor);
893
1.02k
}
894
895
absl::StatusOr<LegacyLbPolicyConfigHelper::Result>
896
LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProtoWithoutSubset(
897
3.26k
    const ClusterProto& cluster, ProtobufMessage::ValidationVisitor& visitor) {
898
899
3.26k
  LoadBalancerConfigPtr lb_config;
900
3.26k
  TypedLoadBalancerFactory* lb_factory = nullptr;
901
902
3.26k
  switch (cluster.lb_policy()) {
903
0
    PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
904
3.26k
  case ClusterProto::ROUND_ROBIN:
905
3.26k
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
906
3.26k
        "envoy.load_balancing_policies.round_robin");
907
3.26k
    break;
908
0
  case ClusterProto::LEAST_REQUEST:
909
0
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
910
0
        "envoy.load_balancing_policies.least_request");
911
0
    break;
912
0
  case ClusterProto::RANDOM:
913
0
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
914
0
        "envoy.load_balancing_policies.random");
915
0
    break;
916
0
  case ClusterProto::RING_HASH:
917
0
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
918
0
        "envoy.load_balancing_policies.ring_hash");
919
0
    break;
920
0
  case ClusterProto::MAGLEV:
921
0
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
922
0
        "envoy.load_balancing_policies.maglev");
923
0
    break;
924
0
  case ClusterProto::CLUSTER_PROVIDED:
925
0
    lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
926
0
        "envoy.load_balancing_policies.cluster_provided");
927
0
    break;
928
0
  case ClusterProto::LOAD_BALANCING_POLICY_CONFIG:
929
    // 'LOAD_BALANCING_POLICY_CONFIG' should be handled by the 'configureLbPolicies'
930
    // function and should not reach here.
931
0
    PANIC("getTypedLbConfigFromLegacyProtoWithoutSubset: should not reach here");
932
0
    break;
933
3.26k
  }
934
935
3.26k
  if (lb_factory == nullptr) {
936
0
    return absl::InvalidArgumentError(
937
0
        fmt::format("No load balancer factory found for LB type: {}",
938
0
                    ClusterProto::LbPolicy_Name(cluster.lb_policy())));
939
0
  }
940
941
3.26k
  ASSERT(lb_factory != nullptr);
942
3.26k
  return Result{lb_factory, lb_factory->loadConfig(cluster, visitor)};
943
3.26k
}
944
945
absl::StatusOr<LegacyLbPolicyConfigHelper::Result>
946
LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProto(
947
3.26k
    const ClusterProto& cluster, ProtobufMessage::ValidationVisitor& visitor) {
948
949
  // Handle the lb subset config case first.
950
3.26k
  if (cluster.has_lb_subset_config()) {
951
0
    auto* lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
952
0
        "envoy.load_balancing_policies.subset");
953
0
    if (lb_factory != nullptr) {
954
0
      return Result{lb_factory, lb_factory->loadConfig(cluster, visitor)};
955
0
    }
956
0
    return absl::InvalidArgumentError("No subset load balancer factory found");
957
0
  }
958
959
3.26k
  return getTypedLbConfigFromLegacyProtoWithoutSubset(cluster, visitor);
960
3.26k
}
961
962
3.26k
LBPolicyConfig::LBPolicyConfig(const envoy::config::cluster::v3::Cluster& config) {
963
3.26k
  switch (config.lb_config_case()) {
964
0
  case envoy::config::cluster::v3::Cluster::kRoundRobinLbConfig:
965
0
    lb_policy_ = std::make_unique<const envoy::config::cluster::v3::Cluster::RoundRobinLbConfig>(
966
0
        config.round_robin_lb_config());
967
0
    break;
968
0
  case envoy::config::cluster::v3::Cluster::kLeastRequestLbConfig:
969
0
    lb_policy_ = std::make_unique<envoy::config::cluster::v3::Cluster::LeastRequestLbConfig>(
970
0
        config.least_request_lb_config());
971
0
    break;
972
0
  case envoy::config::cluster::v3::Cluster::kRingHashLbConfig:
973
0
    lb_policy_ = std::make_unique<envoy::config::cluster::v3::Cluster::RingHashLbConfig>(
974
0
        config.ring_hash_lb_config());
975
0
    break;
976
0
  case envoy::config::cluster::v3::Cluster::kMaglevLbConfig:
977
0
    lb_policy_ = std::make_unique<envoy::config::cluster::v3::Cluster::MaglevLbConfig>(
978
0
        config.maglev_lb_config());
979
0
    break;
980
0
  case envoy::config::cluster::v3::Cluster::kOriginalDstLbConfig:
981
0
    lb_policy_ = std::make_unique<envoy::config::cluster::v3::Cluster::OriginalDstLbConfig>(
982
0
        config.original_dst_lb_config());
983
0
    break;
984
3.26k
  case envoy::config::cluster::v3::Cluster::LB_CONFIG_NOT_SET:
985
    // The default value of the variant if there is no config would be nullptr
986
    // Which is set when the class is initialized. No action needed if config isn't set
987
3.26k
    break;
988
3.26k
  }
989
3.26k
}
990
991
ClusterInfoImpl::ClusterInfoImpl(
992
    Init::Manager& init_manager, Server::Configuration::ServerFactoryContext& server_context,
993
    const envoy::config::cluster::v3::Cluster& config,
994
    const absl::optional<envoy::config::core::v3::BindConfig>& bind_config,
995
    Runtime::Loader& runtime, TransportSocketMatcherPtr&& socket_matcher,
996
    Stats::ScopeSharedPtr&& stats_scope, bool added_via_api,
997
    Server::Configuration::TransportSocketFactoryContext& factory_context)
998
    : runtime_(runtime), name_(config.name()),
999
      observability_name_(!config.alt_stat_name().empty()
1000
                              ? std::make_unique<std::string>(config.alt_stat_name())
1001
                              : nullptr),
1002
      eds_service_name_(
1003
          config.has_eds_cluster_config()
1004
              ? std::make_unique<std::string>(config.eds_cluster_config().service_name())
1005
              : nullptr),
1006
      extension_protocol_options_(parseExtensionProtocolOptions(config, factory_context)),
1007
      http_protocol_options_(
1008
          createOptions(config,
1009
                        extensionProtocolOptionsTyped<HttpProtocolOptionsConfigImpl>(
1010
                            "envoy.extensions.upstreams.http.v3.HttpProtocolOptions"),
1011
                        factory_context.messageValidationVisitor())),
1012
      tcp_protocol_options_(extensionProtocolOptionsTyped<TcpProtocolOptionsConfigImpl>(
1013
          "envoy.extensions.upstreams.tcp.v3.TcpProtocolOptions")),
1014
      max_requests_per_connection_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
1015
          http_protocol_options_->common_http_protocol_options_, max_requests_per_connection,
1016
          config.max_requests_per_connection().value())),
1017
      connect_timeout_(
1018
          std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, connect_timeout, 5000))),
1019
      per_upstream_preconnect_ratio_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
1020
          config.preconnect_policy(), per_upstream_preconnect_ratio, 1.0)),
1021
      peekahead_ratio_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.preconnect_policy(),
1022
                                                       predictive_preconnect_ratio, 0)),
1023
      socket_matcher_(std::move(socket_matcher)), stats_scope_(std::move(stats_scope)),
1024
      traffic_stats_(generateStats(stats_scope_,
1025
                                   factory_context.clusterManager().clusterStatNames(),
1026
                                   server_context.statsConfig().enableDeferredCreationStats())),
1027
      config_update_stats_(factory_context.clusterManager().clusterConfigUpdateStatNames(),
1028
                           *stats_scope_),
1029
      lb_stats_(factory_context.clusterManager().clusterLbStatNames(), *stats_scope_),
1030
      endpoint_stats_(factory_context.clusterManager().clusterEndpointStatNames(), *stats_scope_),
1031
      load_report_stats_store_(stats_scope_->symbolTable()),
1032
      load_report_stats_(
1033
          generateLoadReportStats(*load_report_stats_store_.rootScope(),
1034
                                  factory_context.clusterManager().clusterLoadReportStatNames())),
1035
      optional_cluster_stats_((config.has_track_cluster_stats() || config.track_timeout_budgets())
1036
                                  ? std::make_unique<OptionalClusterStats>(
1037
                                        config, *stats_scope_, factory_context.clusterManager())
1038
                                  : nullptr),
1039
      features_(ClusterInfoImpl::HttpProtocolOptionsConfigImpl::parseFeatures(
1040
          config, *http_protocol_options_)),
1041
      resource_managers_(config, runtime, name_, *stats_scope_,
1042
                         factory_context.clusterManager().clusterCircuitBreakersStatNames()),
1043
      maintenance_mode_runtime_key_(absl::StrCat("upstream.maintenance_mode.", name_)),
1044
      upstream_local_address_selector_(createUpstreamLocalAddressSelector(config, bind_config)),
1045
      lb_policy_config_(std::make_unique<const LBPolicyConfig>(config)),
1046
      upstream_config_(config.has_upstream_config()
1047
                           ? std::make_unique<envoy::config::core::v3::TypedExtensionConfig>(
1048
                                 config.upstream_config())
1049
                           : nullptr),
1050
      lb_subset_(config.has_lb_subset_config()
1051
                     ? std::make_unique<LoadBalancerSubsetInfoImpl>(config.lb_subset_config())
1052
                     : nullptr),
1053
      metadata_(config.has_metadata()
1054
                    ? std::make_unique<envoy::config::core::v3::Metadata>(config.metadata())
1055
                    : nullptr),
1056
      typed_metadata_(config.has_metadata()
1057
                          ? std::make_unique<ClusterTypedMetadata>(config.metadata())
1058
                          : nullptr),
1059
      common_lb_config_(
1060
          factory_context.clusterManager().getCommonLbConfigPtr(config.common_lb_config())),
1061
      cluster_type_(config.has_cluster_type()
1062
                        ? std::make_unique<envoy::config::cluster::v3::Cluster::CustomClusterType>(
1063
                              config.cluster_type())
1064
                        : nullptr),
1065
      http_filter_config_provider_manager_(
1066
          Http::FilterChainUtility::createSingletonUpstreamFilterConfigProviderManager(
1067
              server_context)),
1068
      network_filter_config_provider_manager_(
1069
          createSingletonUpstreamNetworkFilterConfigProviderManager(server_context)),
1070
      upstream_context_(server_context, init_manager, *stats_scope_),
1071
      per_connection_buffer_limit_bytes_(
1072
          PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, per_connection_buffer_limit_bytes, 1024 * 1024)),
1073
      max_response_headers_count_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
1074
          http_protocol_options_->common_http_protocol_options_, max_headers_count,
1075
          runtime_.snapshot().getInteger(Http::MaxResponseHeadersCountOverrideKey,
1076
                                         Http::DEFAULT_MAX_HEADERS_COUNT))),
1077
      type_(config.type()),
1078
      drain_connections_on_host_removal_(config.ignore_health_on_host_removal()),
1079
      connection_pool_per_downstream_connection_(
1080
          config.connection_pool_per_downstream_connection()),
1081
      warm_hosts_(!config.health_checks().empty() &&
1082
                  common_lb_config_->ignore_new_hosts_until_first_hc()),
1083
      set_local_interface_name_on_upstream_connections_(
1084
          config.upstream_connection_options().set_local_interface_name_on_upstream_connections()),
1085
      added_via_api_(added_via_api), has_configured_http_filters_(false),
1086
      per_endpoint_stats_(config.has_track_cluster_stats() &&
1087
3.26k
                          config.track_cluster_stats().per_endpoint_stats()) {
1088
#ifdef WIN32
1089
  if (set_local_interface_name_on_upstream_connections_) {
1090
    throwEnvoyExceptionOrPanic(
1091
        "set_local_interface_name_on_upstream_connections_ cannot be set to true "
1092
        "on Windows platforms");
1093
  }
1094
#endif
1095
1096
  // Both LoadStatsReporter and per_endpoint_stats need to `latch()` the counters, so if both are
1097
  // configured they will interfere with each other and both get incorrect values.
1098
3.26k
  if (perEndpointStatsEnabled() &&
1099
3.26k
      server_context.bootstrap().cluster_manager().has_load_stats_config()) {
1100
0
    throwEnvoyExceptionOrPanic("Only one of cluster per_endpoint_stats and cluster manager "
1101
0
                               "load_stats_config can be specified");
1102
0
  }
1103
1104
3.26k
  if (config.has_max_requests_per_connection() &&
1105
3.26k
      http_protocol_options_->common_http_protocol_options_.has_max_requests_per_connection()) {
1106
0
    throwEnvoyExceptionOrPanic("Only one of max_requests_per_connection from Cluster or "
1107
0
                               "HttpProtocolOptions can be specified");
1108
0
  }
1109
1110
  // If load_balancing_policy is set we will use it directly, ignoring lb_policy.
1111
3.26k
  if (config.has_load_balancing_policy() ||
1112
3.26k
      config.lb_policy() == envoy::config::cluster::v3::Cluster::LOAD_BALANCING_POLICY_CONFIG) {
1113
0
    configureLbPolicies(config, server_context);
1114
3.26k
  } else if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.convert_legacy_lb_config")) {
1115
3.26k
    auto lb_pair = LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProto(
1116
3.26k
        config, server_context.messageValidationVisitor());
1117
1118
3.26k
    if (!lb_pair.ok()) {
1119
0
      throwEnvoyExceptionOrPanic(std::string(lb_pair.status().message()));
1120
0
    }
1121
1122
3.26k
    load_balancer_config_ = std::move(lb_pair->config);
1123
3.26k
    load_balancer_factory_ = lb_pair->factory;
1124
3.26k
    lb_type_ = LoadBalancerType::LoadBalancingPolicyConfig;
1125
1126
3.26k
    RELEASE_ASSERT(
1127
3.26k
        load_balancer_factory_,
1128
3.26k
        fmt::format(
1129
3.26k
            "No load balancer factory found from legacy LB configuration (type: {}, subset: {}).",
1130
3.26k
            envoy::config::cluster::v3::Cluster::LbPolicy_Name(config.lb_policy()),
1131
3.26k
            config.has_lb_subset_config()));
1132
1133
    // Clear unnecessary legacy config because all legacy config is wrapped in load_balancer_config_
1134
    // except the original_dst_lb_config.
1135
3.26k
    lb_subset_ = nullptr;
1136
3.26k
    if (config.lb_policy() != envoy::config::cluster::v3::Cluster::CLUSTER_PROVIDED) {
1137
3.26k
      lb_policy_config_ = nullptr;
1138
3.26k
    }
1139
3.26k
  } else {
1140
0
    switch (config.lb_policy()) {
1141
0
      PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
1142
0
    case envoy::config::cluster::v3::Cluster::ROUND_ROBIN:
1143
0
      lb_type_ = LoadBalancerType::RoundRobin;
1144
0
      break;
1145
0
    case envoy::config::cluster::v3::Cluster::LEAST_REQUEST:
1146
0
      lb_type_ = LoadBalancerType::LeastRequest;
1147
0
      break;
1148
0
    case envoy::config::cluster::v3::Cluster::RANDOM:
1149
0
      lb_type_ = LoadBalancerType::Random;
1150
0
      break;
1151
0
    case envoy::config::cluster::v3::Cluster::RING_HASH:
1152
0
      lb_type_ = LoadBalancerType::RingHash;
1153
0
      break;
1154
0
    case envoy::config::cluster::v3::Cluster::MAGLEV:
1155
0
      lb_type_ = LoadBalancerType::Maglev;
1156
0
      break;
1157
0
    case envoy::config::cluster::v3::Cluster::CLUSTER_PROVIDED:
1158
0
      if (config.has_lb_subset_config()) {
1159
0
        throwEnvoyExceptionOrPanic(
1160
0
            fmt::format("cluster: LB policy {} cannot be combined with lb_subset_config",
1161
0
                        envoy::config::cluster::v3::Cluster::LbPolicy_Name(config.lb_policy())));
1162
0
      }
1163
1164
0
      lb_type_ = LoadBalancerType::ClusterProvided;
1165
0
      break;
1166
0
    case envoy::config::cluster::v3::Cluster::LOAD_BALANCING_POLICY_CONFIG: {
1167
      // 'LOAD_BALANCING_POLICY_CONFIG' should be handled by the 'configureLbPolicies'
1168
      // function in previous branch and should not reach here.
1169
0
      PANIC("Should not reach here");
1170
0
      break;
1171
0
    }
1172
0
    }
1173
0
  }
1174
1175
3.26k
  if (config.lb_subset_config().locality_weight_aware() &&
1176
3.26k
      !config.common_lb_config().has_locality_weighted_lb_config()) {
1177
0
    throwEnvoyExceptionOrPanic(fmt::format("Locality weight aware subset LB requires that a "
1178
0
                                           "locality_weighted_lb_config be set in {}",
1179
0
                                           name_));
1180
0
  }
1181
1182
  // Use default (1h) or configured `idle_timeout`, unless it's set to 0, indicating that no
1183
  // timeout should be used.
1184
3.26k
  absl::optional<std::chrono::milliseconds> idle_timeout(std::chrono::hours(1));
1185
3.26k
  if (http_protocol_options_->common_http_protocol_options_.has_idle_timeout()) {
1186
0
    idle_timeout = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
1187
0
        http_protocol_options_->common_http_protocol_options_.idle_timeout()));
1188
0
    if (idle_timeout.value().count() == 0) {
1189
0
      idle_timeout = absl::nullopt;
1190
0
    }
1191
0
  }
1192
3.26k
  if (idle_timeout.has_value()) {
1193
3.26k
    optional_timeouts_.set<OptionalTimeoutNames::IdleTimeout>(*idle_timeout);
1194
3.26k
  }
1195
1196
  // Use default (10m) or configured `tcp_pool_idle_timeout`, unless it's set to 0, indicating that
1197
  // no timeout should be used.
1198
3.26k
  absl::optional<std::chrono::milliseconds> tcp_pool_idle_timeout(std::chrono::minutes(10));
1199
3.26k
  if (tcp_protocol_options_ && tcp_protocol_options_->idleTimeout().has_value()) {
1200
0
    tcp_pool_idle_timeout = tcp_protocol_options_->idleTimeout();
1201
0
    if (tcp_pool_idle_timeout.value().count() == 0) {
1202
0
      tcp_pool_idle_timeout = absl::nullopt;
1203
0
    }
1204
0
  }
1205
3.26k
  if (tcp_pool_idle_timeout.has_value()) {
1206
3.26k
    optional_timeouts_.set<OptionalTimeoutNames::TcpPoolIdleTimeout>(*tcp_pool_idle_timeout);
1207
3.26k
  }
1208
1209
  // Use configured `max_connection_duration`, unless it's set to 0, indicating that
1210
  // no timeout should be used. No timeout by default either.
1211
3.26k
  absl::optional<std::chrono::milliseconds> max_connection_duration;
1212
3.26k
  if (http_protocol_options_->common_http_protocol_options_.has_max_connection_duration()) {
1213
0
    max_connection_duration = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
1214
0
        http_protocol_options_->common_http_protocol_options_.max_connection_duration()));
1215
0
    if (max_connection_duration.value().count() == 0) {
1216
0
      max_connection_duration = absl::nullopt;
1217
0
    }
1218
0
  }
1219
3.26k
  if (max_connection_duration.has_value()) {
1220
0
    optional_timeouts_.set<OptionalTimeoutNames::MaxConnectionDuration>(*max_connection_duration);
1221
0
  }
1222
1223
3.26k
  if (config.has_eds_cluster_config()) {
1224
14
    if (config.type() != envoy::config::cluster::v3::Cluster::EDS) {
1225
0
      throwEnvoyExceptionOrPanic("eds_cluster_config set in a non-EDS cluster");
1226
0
    }
1227
14
  }
1228
1229
  // TODO(htuch): Remove this temporary workaround when we have
1230
  // https://github.com/bufbuild/protoc-gen-validate/issues/97 resolved. This just provides
1231
  // early validation of sanity of fields that we should catch at config ingestion.
1232
3.26k
  DurationUtil::durationToMilliseconds(common_lb_config_->update_merge_window());
1233
1234
  // Create upstream network filter factories
1235
3.26k
  const auto& filters = config.filters();
1236
3.26k
  ASSERT(filter_factories_.empty());
1237
3.26k
  filter_factories_.reserve(filters.size());
1238
3.26k
  for (ssize_t i = 0; i < filters.size(); i++) {
1239
0
    const auto& proto_config = filters[i];
1240
0
    const bool is_terminal = i == filters.size() - 1;
1241
0
    ENVOY_LOG(debug, "  upstream network filter #{}:", i);
1242
1243
0
    if (proto_config.has_config_discovery()) {
1244
0
      if (proto_config.has_typed_config()) {
1245
0
        throwEnvoyExceptionOrPanic("Only one of typed_config or config_discovery can be used");
1246
0
      }
1247
1248
0
      ENVOY_LOG(debug, "      dynamic filter name: {}", proto_config.name());
1249
0
      filter_factories_.push_back(
1250
0
          network_filter_config_provider_manager_->createDynamicFilterConfigProvider(
1251
0
              proto_config.config_discovery(), proto_config.name(), server_context,
1252
0
              upstream_context_, factory_context.clusterManager(), is_terminal, "network",
1253
0
              nullptr));
1254
0
      continue;
1255
0
    }
1256
1257
0
    ENVOY_LOG(debug, "    name: {}", proto_config.name());
1258
0
    auto& factory = Config::Utility::getAndCheckFactory<
1259
0
        Server::Configuration::NamedUpstreamNetworkFilterConfigFactory>(proto_config);
1260
0
    auto message = factory.createEmptyConfigProto();
1261
0
    Config::Utility::translateOpaqueConfig(proto_config.typed_config(),
1262
0
                                           factory_context.messageValidationVisitor(), *message);
1263
0
    Network::FilterFactoryCb callback =
1264
0
        factory.createFilterFactoryFromProto(*message, upstream_context_);
1265
0
    filter_factories_.push_back(
1266
0
        network_filter_config_provider_manager_->createStaticFilterConfigProvider(
1267
0
            callback, proto_config.name()));
1268
0
  }
1269
1270
3.26k
  if (http_protocol_options_) {
1271
3.26k
    Http::FilterChainUtility::FiltersList http_filters = http_protocol_options_->http_filters_;
1272
3.26k
    has_configured_http_filters_ = !http_filters.empty();
1273
3.26k
    if (http_filters.empty()) {
1274
3.26k
      auto* codec_filter = http_filters.Add();
1275
3.26k
      codec_filter->set_name("envoy.filters.http.upstream_codec");
1276
3.26k
      codec_filter->mutable_typed_config()->PackFrom(
1277
3.26k
          envoy::extensions::filters::http::upstream_codec::v3::UpstreamCodec::default_instance());
1278
3.26k
    }
1279
3.26k
    if (http_filters[http_filters.size() - 1].name() != "envoy.filters.http.upstream_codec") {
1280
0
      throwEnvoyExceptionOrPanic(
1281
0
          fmt::format("The codec filter is the only valid terminal upstream HTTP filter"));
1282
0
    }
1283
1284
3.26k
    std::string prefix = stats_scope_->symbolTable().toString(stats_scope_->prefix());
1285
3.26k
    Http::FilterChainHelper<Server::Configuration::UpstreamFactoryContext,
1286
3.26k
                            Server::Configuration::UpstreamHttpFilterConfigFactory>
1287
3.26k
        helper(*http_filter_config_provider_manager_, upstream_context_.getServerFactoryContext(),
1288
3.26k
               factory_context.clusterManager(), upstream_context_, prefix);
1289
3.26k
    THROW_IF_NOT_OK(helper.processFilters(http_filters, "upstream http", "upstream http",
1290
3.26k
                                          http_filter_factories_));
1291
3.26k
  }
1292
3.26k
}
1293
1294
// Configures the load balancer based on config.load_balancing_policy
1295
void ClusterInfoImpl::configureLbPolicies(const envoy::config::cluster::v3::Cluster& config,
1296
0
                                          Server::Configuration::ServerFactoryContext& context) {
1297
  // Check if load_balancing_policy is set first.
1298
0
  if (!config.has_load_balancing_policy()) {
1299
0
    throwEnvoyExceptionOrPanic("cluster: field load_balancing_policy need to be set");
1300
0
  }
1301
1302
0
  if (config.has_lb_subset_config()) {
1303
0
    throwEnvoyExceptionOrPanic(
1304
0
        "cluster: load_balancing_policy cannot be combined with lb_subset_config");
1305
0
  }
1306
1307
0
  if (config.has_common_lb_config()) {
1308
0
    const auto& lb_config = config.common_lb_config();
1309
0
    if (lb_config.has_zone_aware_lb_config() || lb_config.has_locality_weighted_lb_config() ||
1310
0
        lb_config.has_consistent_hashing_lb_config()) {
1311
0
      throwEnvoyExceptionOrPanic(
1312
0
          "cluster: load_balancing_policy cannot be combined with partial fields "
1313
0
          "(zone_aware_lb_config, "
1314
0
          "locality_weighted_lb_config, consistent_hashing_lb_config) of common_lb_config");
1315
0
    }
1316
0
  }
1317
1318
0
  absl::InlinedVector<absl::string_view, 4> missing_policies;
1319
0
  for (const auto& policy : config.load_balancing_policy().policies()) {
1320
0
    TypedLoadBalancerFactory* factory =
1321
0
        Config::Utility::getAndCheckFactory<TypedLoadBalancerFactory>(
1322
0
            policy.typed_extension_config(), /*is_optional=*/true);
1323
0
    if (factory != nullptr) {
1324
      // Load and validate the configuration.
1325
0
      auto proto_message = factory->createEmptyConfigProto();
1326
0
      Config::Utility::translateOpaqueConfig(policy.typed_extension_config().typed_config(),
1327
0
                                             context.messageValidationVisitor(), *proto_message);
1328
1329
0
      load_balancer_config_ =
1330
0
          factory->loadConfig(*proto_message, context.messageValidationVisitor());
1331
1332
0
      load_balancer_factory_ = factory;
1333
0
      break;
1334
0
    }
1335
0
    missing_policies.push_back(policy.typed_extension_config().name());
1336
0
  }
1337
1338
0
  if (load_balancer_factory_ == nullptr) {
1339
0
    throwEnvoyExceptionOrPanic(
1340
0
        fmt::format("cluster: didn't find a registered load balancer factory "
1341
0
                    "implementation for cluster: '{}' with names from [{}]",
1342
0
                    name_, absl::StrJoin(missing_policies, ", ")));
1343
0
  }
1344
1345
0
  lb_type_ = LoadBalancerType::LoadBalancingPolicyConfig;
1346
0
}
1347
1348
ProtocolOptionsConfigConstSharedPtr
1349
6.53k
ClusterInfoImpl::extensionProtocolOptions(const std::string& name) const {
1350
6.53k
  auto i = extension_protocol_options_.find(name);
1351
6.53k
  if (i != extension_protocol_options_.end()) {
1352
2.24k
    return i->second;
1353
2.24k
  }
1354
4.29k
  return nullptr;
1355
6.53k
}
1356
1357
Network::UpstreamTransportSocketFactoryPtr createTransportSocketFactory(
1358
    const envoy::config::cluster::v3::Cluster& config,
1359
3.26k
    Server::Configuration::TransportSocketFactoryContext& factory_context) {
1360
  // If the cluster config doesn't have a transport socket configured, override with the default
1361
  // transport socket implementation based on the tls_context. We copy by value first then
1362
  // override if necessary.
1363
3.26k
  auto transport_socket = config.transport_socket();
1364
3.26k
  if (!config.has_transport_socket()) {
1365
3.26k
    envoy::extensions::transport_sockets::raw_buffer::v3::RawBuffer raw_buffer;
1366
3.26k
    transport_socket.mutable_typed_config()->PackFrom(raw_buffer);
1367
3.26k
    transport_socket.set_name("envoy.transport_sockets.raw_buffer");
1368
3.26k
  }
1369
1370
3.26k
  auto& config_factory = Config::Utility::getAndCheckFactory<
1371
3.26k
      Server::Configuration::UpstreamTransportSocketConfigFactory>(transport_socket);
1372
3.26k
  ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig(
1373
3.26k
      transport_socket, factory_context.messageValidationVisitor(), config_factory);
1374
3.26k
  return config_factory.createTransportSocketFactory(*message, factory_context);
1375
3.26k
}
1376
1377
1.32k
void ClusterInfoImpl::createNetworkFilterChain(Network::Connection& connection) const {
1378
1.32k
  for (const auto& filter_config_provider : filter_factories_) {
1379
0
    auto config = filter_config_provider->config();
1380
0
    if (config.has_value()) {
1381
0
      Network::FilterFactoryCb& factory = config.value();
1382
0
      factory(connection);
1383
0
    }
1384
0
  }
1385
1.32k
}
1386
1387
std::vector<Http::Protocol>
1388
2.39k
ClusterInfoImpl::upstreamHttpProtocol(absl::optional<Http::Protocol> downstream_protocol) const {
1389
2.39k
  if (downstream_protocol.has_value() &&
1390
2.39k
      features_ & Upstream::ClusterInfo::Features::USE_DOWNSTREAM_PROTOCOL) {
1391
0
    if (downstream_protocol.value() == Http::Protocol::Http3 &&
1392
0
        !(features_ & Upstream::ClusterInfo::Features::HTTP3)) {
1393
0
      return {Http::Protocol::Http2};
1394
0
    }
1395
    // use HTTP11 since HTTP10 upstream is not supported yet.
1396
0
    if (downstream_protocol.value() == Http::Protocol::Http10) {
1397
0
      return {Http::Protocol::Http11};
1398
0
    }
1399
0
    return {downstream_protocol.value()};
1400
0
  }
1401
1402
2.39k
  if (features_ & Upstream::ClusterInfo::Features::USE_ALPN) {
1403
0
    if (!(features_ & Upstream::ClusterInfo::Features::HTTP3)) {
1404
0
      return {Http::Protocol::Http2, Http::Protocol::Http11};
1405
0
    }
1406
0
    return {Http::Protocol::Http3, Http::Protocol::Http2, Http::Protocol::Http11};
1407
0
  }
1408
1409
2.39k
  if (features_ & Upstream::ClusterInfo::Features::HTTP3) {
1410
0
    return {Http::Protocol::Http3};
1411
0
  }
1412
1413
2.39k
  return {(features_ & Upstream::ClusterInfo::Features::HTTP2) ? Http::Protocol::Http2
1414
2.39k
                                                               : Http::Protocol::Http11};
1415
2.39k
}
1416
1417
bool validateTransportSocketSupportsQuic(
1418
0
    const envoy::config::core::v3::TransportSocket& transport_socket) {
1419
  // The transport socket is valid for QUIC if it is either a QUIC transport socket,
1420
  // or if it is a QUIC transport socket wrapped in an HTTP/1.1 proxy socket.
1421
0
  if (transport_socket.name() == "envoy.transport_sockets.quic") {
1422
0
    return true;
1423
0
  }
1424
0
  if (transport_socket.name() != "envoy.transport_sockets.http_11_proxy") {
1425
0
    return false;
1426
0
  }
1427
0
  envoy::extensions::transport_sockets::http_11_proxy::v3::Http11ProxyUpstreamTransport
1428
0
      http11_socket;
1429
0
  MessageUtil::unpackTo(transport_socket.typed_config(), http11_socket);
1430
0
  return http11_socket.transport_socket().name() == "envoy.transport_sockets.quic";
1431
0
}
1432
1433
ClusterImplBase::ClusterImplBase(const envoy::config::cluster::v3::Cluster& cluster,
1434
                                 ClusterFactoryContext& cluster_context)
1435
    : init_manager_(fmt::format("Cluster {}", cluster.name())),
1436
3.26k
      init_watcher_("ClusterImplBase", [this]() { onInitDone(); }),
1437
      runtime_(cluster_context.serverFactoryContext().runtime()),
1438
      wait_for_warm_on_init_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(cluster, wait_for_warm_on_init, true)),
1439
      time_source_(cluster_context.serverFactoryContext().timeSource()),
1440
      local_cluster_(cluster_context.clusterManager().localClusterName().value_or("") ==
1441
                     cluster.name()),
1442
      const_metadata_shared_pool_(Config::Metadata::getConstMetadataSharedPool(
1443
          cluster_context.serverFactoryContext().singletonManager(),
1444
3.26k
          cluster_context.serverFactoryContext().mainThreadDispatcher())) {
1445
1446
3.26k
  auto& server_context = cluster_context.serverFactoryContext();
1447
1448
3.26k
  auto stats_scope = generateStatsScope(cluster, server_context.serverScope().store());
1449
3.26k
  transport_factory_context_ =
1450
3.26k
      std::make_unique<Server::Configuration::TransportSocketFactoryContextImpl>(
1451
3.26k
          server_context, cluster_context.sslContextManager(), *stats_scope,
1452
3.26k
          cluster_context.clusterManager(), cluster_context.messageValidationVisitor());
1453
3.26k
  transport_factory_context_->setInitManager(init_manager_);
1454
1455
3.26k
  auto socket_factory = createTransportSocketFactory(cluster, *transport_factory_context_);
1456
3.26k
  auto* raw_factory_pointer = socket_factory.get();
1457
1458
3.26k
  auto socket_matcher = std::make_unique<TransportSocketMatcherImpl>(
1459
3.26k
      cluster.transport_socket_matches(), *transport_factory_context_, socket_factory,
1460
3.26k
      *stats_scope);
1461
3.26k
  const bool matcher_supports_alpn = socket_matcher->allMatchesSupportAlpn();
1462
3.26k
  auto& dispatcher = server_context.mainThreadDispatcher();
1463
3.26k
  info_ = std::shared_ptr<const ClusterInfoImpl>(
1464
3.26k
      new ClusterInfoImpl(init_manager_, server_context, cluster,
1465
3.26k
                          cluster_context.clusterManager().bindConfig(), runtime_,
1466
3.26k
                          std::move(socket_matcher), std::move(stats_scope),
1467
3.26k
                          cluster_context.addedViaApi(), *transport_factory_context_),
1468
3.26k
      [&dispatcher](const ClusterInfoImpl* self) {
1469
3.26k
        ENVOY_LOG(trace, "Schedule destroy cluster info {}", self->name());
1470
3.26k
        dispatcher.deleteInDispatcherThread(
1471
3.26k
            std::unique_ptr<const Event::DispatcherThreadDeletable>(self));
1472
3.26k
      });
1473
1474
3.26k
  if ((info_->features() & ClusterInfoImpl::Features::USE_ALPN)) {
1475
0
    if (!raw_factory_pointer->supportsAlpn()) {
1476
0
      throwEnvoyExceptionOrPanic(
1477
0
          fmt::format("ALPN configured for cluster {} which has a non-ALPN transport socket: {}",
1478
0
                      cluster.name(), cluster.DebugString()));
1479
0
    }
1480
0
    if (!matcher_supports_alpn &&
1481
0
        !runtime_.snapshot().featureEnabled(ClusterImplBase::DoNotValidateAlpnRuntimeKey, 0)) {
1482
0
      throwEnvoyExceptionOrPanic(fmt::format(
1483
0
          "ALPN configured for cluster {} which has a non-ALPN transport socket matcher: {}",
1484
0
          cluster.name(), cluster.DebugString()));
1485
0
    }
1486
0
  }
1487
1488
3.26k
  if (info_->features() & ClusterInfoImpl::Features::HTTP3) {
1489
0
#if defined(ENVOY_ENABLE_QUIC)
1490
0
    if (!validateTransportSocketSupportsQuic(cluster.transport_socket())) {
1491
0
      throwEnvoyExceptionOrPanic(
1492
0
          fmt::format("HTTP3 requires a QuicUpstreamTransport transport socket: {} {}",
1493
0
                      cluster.name(), cluster.transport_socket().DebugString()));
1494
0
    }
1495
#else
1496
    throwEnvoyExceptionOrPanic("HTTP3 configured but not enabled in the build.");
1497
#endif
1498
0
  }
1499
1500
  // Create the default (empty) priority set before registering callbacks to
1501
  // avoid getting an update the first time it is accessed.
1502
3.26k
  priority_set_.getOrCreateHostSet(0);
1503
3.26k
  priority_update_cb_ = priority_set_.addPriorityUpdateCb(
1504
3.26k
      [this](uint32_t, const HostVector& hosts_added, const HostVector& hosts_removed) {
1505
3.26k
        if (!hosts_added.empty() || !hosts_removed.empty()) {
1506
3.26k
          info_->endpointStats().membership_change_.inc();
1507
3.26k
        }
1508
1509
3.26k
        uint32_t healthy_hosts = 0;
1510
3.26k
        uint32_t degraded_hosts = 0;
1511
3.26k
        uint32_t excluded_hosts = 0;
1512
3.26k
        uint32_t hosts = 0;
1513
3.26k
        for (const auto& host_set : prioritySet().hostSetsPerPriority()) {
1514
3.26k
          hosts += host_set->hosts().size();
1515
3.26k
          healthy_hosts += host_set->healthyHosts().size();
1516
3.26k
          degraded_hosts += host_set->degradedHosts().size();
1517
3.26k
          excluded_hosts += host_set->excludedHosts().size();
1518
3.26k
        }
1519
3.26k
        info_->endpointStats().membership_total_.set(hosts);
1520
3.26k
        info_->endpointStats().membership_healthy_.set(healthy_hosts);
1521
3.26k
        info_->endpointStats().membership_degraded_.set(degraded_hosts);
1522
3.26k
        info_->endpointStats().membership_excluded_.set(excluded_hosts);
1523
3.26k
      });
1524
3.26k
}
1525
1526
namespace {
1527
1528
6.53k
bool excludeBasedOnHealthFlag(const Host& host) {
1529
6.53k
  return host.healthFlagGet(Host::HealthFlag::PENDING_ACTIVE_HC) ||
1530
6.53k
         host.healthFlagGet(Host::HealthFlag::EXCLUDED_VIA_IMMEDIATE_HC_FAIL);
1531
6.53k
}
1532
1533
} // namespace
1534
1535
std::tuple<HealthyHostVectorConstSharedPtr, DegradedHostVectorConstSharedPtr,
1536
           ExcludedHostVectorConstSharedPtr>
1537
3.26k
ClusterImplBase::partitionHostList(const HostVector& hosts) {
1538
3.26k
  auto healthy_list = std::make_shared<HealthyHostVector>();
1539
3.26k
  auto degraded_list = std::make_shared<DegradedHostVector>();
1540
3.26k
  auto excluded_list = std::make_shared<ExcludedHostVector>();
1541
1542
3.26k
  for (const auto& host : hosts) {
1543
3.26k
    const Host::Health health_status = host->coarseHealth();
1544
3.26k
    if (health_status == Host::Health::Healthy) {
1545
3.26k
      healthy_list->get().emplace_back(host);
1546
3.26k
    } else if (health_status == Host::Health::Degraded) {
1547
0
      degraded_list->get().emplace_back(host);
1548
0
    }
1549
3.26k
    if (excludeBasedOnHealthFlag(*host)) {
1550
0
      excluded_list->get().emplace_back(host);
1551
0
    }
1552
3.26k
  }
1553
1554
3.26k
  return std::make_tuple(healthy_list, degraded_list, excluded_list);
1555
3.26k
}
1556
1557
std::tuple<HostsPerLocalityConstSharedPtr, HostsPerLocalityConstSharedPtr,
1558
           HostsPerLocalityConstSharedPtr>
1559
3.26k
ClusterImplBase::partitionHostsPerLocality(const HostsPerLocality& hosts) {
1560
3.26k
  auto filtered_clones =
1561
3.26k
      hosts.filter({[](const Host& host) { return host.coarseHealth() == Host::Health::Healthy; },
1562
3.26k
                    [](const Host& host) { return host.coarseHealth() == Host::Health::Degraded; },
1563
3.26k
                    [](const Host& host) { return excludeBasedOnHealthFlag(host); }});
1564
1565
3.26k
  return std::make_tuple(std::move(filtered_clones[0]), std::move(filtered_clones[1]),
1566
3.26k
                         std::move(filtered_clones[2]));
1567
3.26k
}
1568
1569
2.39k
bool ClusterInfoImpl::maintenanceMode() const {
1570
2.39k
  return runtime_.snapshot().featureEnabled(maintenance_mode_runtime_key_, 0);
1571
2.39k
}
1572
1573
21.6k
ResourceManager& ClusterInfoImpl::resourceManager(ResourcePriority priority) const {
1574
21.6k
  ASSERT(enumToInt(priority) < resource_managers_.managers_.size());
1575
21.6k
  return *resource_managers_.managers_[enumToInt(priority)];
1576
21.6k
}
1577
1578
3.26k
void ClusterImplBase::initialize(std::function<void()> callback) {
1579
3.26k
  ASSERT(!initialization_started_);
1580
3.26k
  ASSERT(initialization_complete_callback_ == nullptr);
1581
3.26k
  initialization_complete_callback_ = callback;
1582
3.26k
  startPreInit();
1583
3.26k
}
1584
1585
3.26k
void ClusterImplBase::onPreInitComplete() {
1586
  // Protect against multiple calls.
1587
3.26k
  if (initialization_started_) {
1588
0
    return;
1589
0
  }
1590
3.26k
  initialization_started_ = true;
1591
1592
3.26k
  ENVOY_LOG(debug, "initializing {} cluster {} completed",
1593
3.26k
            initializePhase() == InitializePhase::Primary ? "Primary" : "Secondary",
1594
3.26k
            info()->name());
1595
3.26k
  init_manager_.initialize(init_watcher_);
1596
3.26k
}
1597
1598
3.26k
void ClusterImplBase::onInitDone() {
1599
3.26k
  info()->configUpdateStats().warming_state_.set(0);
1600
3.26k
  if (health_checker_ && pending_initialize_health_checks_ == 0) {
1601
0
    for (auto& host_set : prioritySet().hostSetsPerPriority()) {
1602
0
      for (auto& host : host_set->hosts()) {
1603
0
        if (host->disableActiveHealthCheck()) {
1604
0
          continue;
1605
0
        }
1606
0
        ++pending_initialize_health_checks_;
1607
0
      }
1608
0
    }
1609
0
    ENVOY_LOG(debug, "Cluster onInitDone pending initialize health check count {}",
1610
0
              pending_initialize_health_checks_);
1611
1612
    // TODO(mattklein123): Remove this callback when done.
1613
0
    health_checker_->addHostCheckCompleteCb([this](HostSharedPtr, HealthTransition) -> void {
1614
0
      if (pending_initialize_health_checks_ > 0 && --pending_initialize_health_checks_ == 0) {
1615
0
        finishInitialization();
1616
0
      }
1617
0
    });
1618
0
  }
1619
1620
3.26k
  if (pending_initialize_health_checks_ == 0) {
1621
3.26k
    finishInitialization();
1622
3.26k
  }
1623
3.26k
}
1624
1625
3.26k
void ClusterImplBase::finishInitialization() {
1626
3.26k
  ASSERT(initialization_complete_callback_ != nullptr);
1627
3.26k
  ASSERT(initialization_started_);
1628
1629
  // Snap a copy of the completion callback so that we can set it to nullptr to unblock
1630
  // reloadHealthyHosts(). See that function for more info on why we do this.
1631
3.26k
  auto snapped_callback = initialization_complete_callback_;
1632
3.26k
  initialization_complete_callback_ = nullptr;
1633
1634
3.26k
  if (health_checker_ != nullptr) {
1635
0
    reloadHealthyHosts(nullptr);
1636
0
  }
1637
1638
3.26k
  if (snapped_callback != nullptr) {
1639
3.26k
    snapped_callback();
1640
3.26k
  }
1641
3.26k
}
1642
1643
0
void ClusterImplBase::setHealthChecker(const HealthCheckerSharedPtr& health_checker) {
1644
0
  ASSERT(!health_checker_);
1645
0
  health_checker_ = health_checker;
1646
0
  health_checker_->start();
1647
0
  health_checker_->addHostCheckCompleteCb(
1648
0
      [this](const HostSharedPtr& host, HealthTransition changed_state) -> void {
1649
        // If we get a health check completion that resulted in a state change, signal to
1650
        // update the host sets on all threads.
1651
0
        if (changed_state == HealthTransition::Changed) {
1652
0
          reloadHealthyHosts(host);
1653
0
        }
1654
0
      });
1655
0
}
1656
1657
3.26k
void ClusterImplBase::setOutlierDetector(const Outlier::DetectorSharedPtr& outlier_detector) {
1658
3.26k
  if (!outlier_detector) {
1659
3.26k
    return;
1660
3.26k
  }
1661
1662
0
  outlier_detector_ = outlier_detector;
1663
0
  outlier_detector_->addChangedStateCb(
1664
0
      [this](const HostSharedPtr& host) -> void { reloadHealthyHosts(host); });
1665
0
}
1666
1667
0
void ClusterImplBase::reloadHealthyHosts(const HostSharedPtr& host) {
1668
  // Every time a host changes Health Check state we cause a full healthy host recalculation which
1669
  // for expensive LBs (ring, subset, etc.) can be quite time consuming. During startup, this
1670
  // can also block worker threads by doing this repeatedly. There is no reason to do this
1671
  // as we will not start taking traffic until we are initialized. By blocking Health Check
1672
  // updates while initializing we can avoid this.
1673
0
  if (initialization_complete_callback_ != nullptr) {
1674
0
    return;
1675
0
  }
1676
1677
0
  reloadHealthyHostsHelper(host);
1678
0
}
1679
1680
0
void ClusterImplBase::reloadHealthyHostsHelper(const HostSharedPtr&) {
1681
0
  const auto& host_sets = prioritySet().hostSetsPerPriority();
1682
0
  for (size_t priority = 0; priority < host_sets.size(); ++priority) {
1683
0
    const auto& host_set = host_sets[priority];
1684
    // TODO(htuch): Can we skip these copies by exporting out const shared_ptr from HostSet?
1685
0
    HostVectorConstSharedPtr hosts_copy = std::make_shared<HostVector>(host_set->hosts());
1686
1687
0
    HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().clone();
1688
0
    prioritySet().updateHosts(priority,
1689
0
                              HostSetImpl::partitionHosts(hosts_copy, hosts_per_locality_copy),
1690
0
                              host_set->localityWeights(), {}, {}, absl::nullopt, absl::nullopt);
1691
0
  }
1692
0
}
1693
1694
const Network::Address::InstanceConstSharedPtr
1695
3.26k
ClusterImplBase::resolveProtoAddress(const envoy::config::core::v3::Address& address) {
1696
3.26k
  TRY_ASSERT_MAIN_THREAD { return Network::Address::resolveProtoAddress(address); }
1697
3.26k
  END_TRY
1698
3.26k
  CATCH(EnvoyException & e, {
1699
3.26k
    if (info_->type() == envoy::config::cluster::v3::Cluster::STATIC ||
1700
3.26k
        info_->type() == envoy::config::cluster::v3::Cluster::EDS) {
1701
3.26k
      throwEnvoyExceptionOrPanic(
1702
3.26k
          fmt::format("{}. Consider setting resolver_name or setting cluster type "
1703
3.26k
                      "to 'STRICT_DNS' or 'LOGICAL_DNS'",
1704
3.26k
                      e.what()));
1705
3.26k
    }
1706
3.26k
    throw e;
1707
3.26k
  });
1708
0
}
1709
1710
void ClusterImplBase::validateEndpointsForZoneAwareRouting(
1711
3.26k
    const envoy::config::endpoint::v3::LocalityLbEndpoints& endpoints) const {
1712
3.26k
  if (local_cluster_ && endpoints.priority() > 0) {
1713
0
    throwEnvoyExceptionOrPanic(
1714
0
        fmt::format("Unexpected non-zero priority for local cluster '{}'.", info()->name()));
1715
0
  }
1716
3.26k
}
1717
1718
ClusterInfoImpl::OptionalClusterStats::OptionalClusterStats(
1719
    const envoy::config::cluster::v3::Cluster& config, Stats::Scope& stats_scope,
1720
    const ClusterManager& manager)
1721
    : timeout_budget_stats_(
1722
          (config.track_cluster_stats().timeout_budgets() || config.track_timeout_budgets())
1723
              ? std::make_unique<ClusterTimeoutBudgetStats>(generateTimeoutBudgetStats(
1724
                    stats_scope, manager.clusterTimeoutBudgetStatNames()))
1725
              : nullptr),
1726
      request_response_size_stats_(
1727
          (config.track_cluster_stats().request_response_sizes()
1728
               ? std::make_unique<ClusterRequestResponseSizeStats>(generateRequestResponseSizeStats(
1729
                     stats_scope, manager.clusterRequestResponseSizeStatNames()))
1730
0
               : nullptr)) {}
1731
1732
ClusterInfoImpl::ResourceManagers::ResourceManagers(
1733
    const envoy::config::cluster::v3::Cluster& config, Runtime::Loader& runtime,
1734
    const std::string& cluster_name, Stats::Scope& stats_scope,
1735
    const ClusterCircuitBreakersStatNames& circuit_breakers_stat_names)
1736
3.26k
    : circuit_breakers_stat_names_(circuit_breakers_stat_names) {
1737
3.26k
  managers_[enumToInt(ResourcePriority::Default)] =
1738
3.26k
      load(config, runtime, cluster_name, stats_scope, envoy::config::core::v3::DEFAULT);
1739
3.26k
  managers_[enumToInt(ResourcePriority::High)] =
1740
3.26k
      load(config, runtime, cluster_name, stats_scope, envoy::config::core::v3::HIGH);
1741
3.26k
}
1742
1743
ClusterCircuitBreakersStats
1744
ClusterInfoImpl::generateCircuitBreakersStats(Stats::Scope& scope, Stats::StatName prefix,
1745
                                              bool track_remaining,
1746
410k
                                              const ClusterCircuitBreakersStatNames& stat_names) {
1747
4.07M
  auto make_gauge = [&stat_names, &scope, prefix](Stats::StatName stat_name) -> Stats::Gauge& {
1748
4.07M
    return Stats::Utility::gaugeFromElements(scope,
1749
4.07M
                                             {stat_names.circuit_breakers_, prefix, stat_name},
1750
4.07M
                                             Stats::Gauge::ImportMode::Accumulate);
1751
4.07M
  };
1752
1753
410k
#define REMAINING_GAUGE(stat_name)                                                                 \
1754
2.05M
  track_remaining ? make_gauge(stat_name) : scope.store().nullGauge()
1755
1756
410k
  return {
1757
410k
      make_gauge(stat_names.cx_open_),
1758
410k
      make_gauge(stat_names.cx_pool_open_),
1759
410k
      make_gauge(stat_names.rq_open_),
1760
410k
      make_gauge(stat_names.rq_pending_open_),
1761
410k
      make_gauge(stat_names.rq_retry_open_),
1762
410k
      REMAINING_GAUGE(stat_names.remaining_cx_),
1763
410k
      REMAINING_GAUGE(stat_names.remaining_cx_pools_),
1764
410k
      REMAINING_GAUGE(stat_names.remaining_pending_),
1765
410k
      REMAINING_GAUGE(stat_names.remaining_retries_),
1766
410k
      REMAINING_GAUGE(stat_names.remaining_rq_),
1767
410k
  };
1768
1769
410k
#undef REMAINING_GAUGE
1770
410k
}
1771
1772
53
Http::Http1::CodecStats& ClusterInfoImpl::http1CodecStats() const {
1773
53
  return Http::Http1::CodecStats::atomicGet(http1_codec_stats_, *stats_scope_);
1774
53
}
1775
1776
1.27k
Http::Http2::CodecStats& ClusterInfoImpl::http2CodecStats() const {
1777
1.27k
  return Http::Http2::CodecStats::atomicGet(http2_codec_stats_, *stats_scope_);
1778
1.27k
}
1779
1780
0
Http::Http3::CodecStats& ClusterInfoImpl::http3CodecStats() const {
1781
0
  return Http::Http3::CodecStats::atomicGet(http3_codec_stats_, *stats_scope_);
1782
0
}
1783
1784
#ifdef ENVOY_ENABLE_UHV
1785
::Envoy::Http::HeaderValidatorStats&
1786
ClusterInfoImpl::getHeaderValidatorStats(Http::Protocol protocol) const {
1787
  switch (protocol) {
1788
  case Http::Protocol::Http10:
1789
  case Http::Protocol::Http11:
1790
    return http1CodecStats();
1791
  case Http::Protocol::Http2:
1792
    return http2CodecStats();
1793
  case Http::Protocol::Http3:
1794
    return http3CodecStats();
1795
  }
1796
  PANIC_DUE_TO_CORRUPT_ENUM;
1797
}
1798
#endif
1799
1800
Http::ClientHeaderValidatorPtr
1801
2.32k
ClusterInfoImpl::makeHeaderValidator([[maybe_unused]] Http::Protocol protocol) const {
1802
#ifdef ENVOY_ENABLE_UHV
1803
  return http_protocol_options_->header_validator_factory_
1804
             ? http_protocol_options_->header_validator_factory_->createClientHeaderValidator(
1805
                   protocol, getHeaderValidatorStats(protocol))
1806
             : nullptr;
1807
#else
1808
2.32k
  return nullptr;
1809
2.32k
#endif
1810
2.32k
}
1811
1812
std::pair<absl::optional<double>, absl::optional<uint32_t>> ClusterInfoImpl::getRetryBudgetParams(
1813
0
    const envoy::config::cluster::v3::CircuitBreakers::Thresholds& thresholds) {
1814
0
  constexpr double default_budget_percent = 20.0;
1815
0
  constexpr uint32_t default_retry_concurrency = 3;
1816
1817
0
  absl::optional<double> budget_percent;
1818
0
  absl::optional<uint32_t> min_retry_concurrency;
1819
0
  if (thresholds.has_retry_budget()) {
1820
    // The budget_percent and min_retry_concurrency values are only set if there is a retry budget
1821
    // message set in the cluster config.
1822
0
    budget_percent = PROTOBUF_GET_WRAPPED_OR_DEFAULT(thresholds.retry_budget(), budget_percent,
1823
0
                                                     default_budget_percent);
1824
0
    min_retry_concurrency = PROTOBUF_GET_WRAPPED_OR_DEFAULT(
1825
0
        thresholds.retry_budget(), min_retry_concurrency, default_retry_concurrency);
1826
0
  }
1827
0
  return std::make_pair(budget_percent, min_retry_concurrency);
1828
0
}
1829
1830
SINGLETON_MANAGER_REGISTRATION(upstream_network_filter_config_provider_manager);
1831
1832
std::shared_ptr<UpstreamNetworkFilterConfigProviderManager>
1833
ClusterInfoImpl::createSingletonUpstreamNetworkFilterConfigProviderManager(
1834
3.26k
    Server::Configuration::ServerFactoryContext& context) {
1835
3.26k
  return context.singletonManager().getTyped<UpstreamNetworkFilterConfigProviderManager>(
1836
3.26k
      SINGLETON_MANAGER_REGISTERED_NAME(upstream_network_filter_config_provider_manager),
1837
3.26k
      [] { return std::make_shared<Filter::UpstreamNetworkFilterConfigProviderManagerImpl>(); });
1838
3.26k
}
1839
1840
ResourceManagerImplPtr
1841
ClusterInfoImpl::ResourceManagers::load(const envoy::config::cluster::v3::Cluster& config,
1842
                                        Runtime::Loader& runtime, const std::string& cluster_name,
1843
                                        Stats::Scope& stats_scope,
1844
6.53k
                                        const envoy::config::core::v3::RoutingPriority& priority) {
1845
6.53k
  uint64_t max_connections = 1024;
1846
6.53k
  uint64_t max_pending_requests = 1024;
1847
6.53k
  uint64_t max_requests = 1024;
1848
6.53k
  uint64_t max_retries = 3;
1849
6.53k
  uint64_t max_connection_pools = std::numeric_limits<uint64_t>::max();
1850
6.53k
  uint64_t max_connections_per_host = std::numeric_limits<uint64_t>::max();
1851
1852
6.53k
  bool track_remaining = false;
1853
1854
6.53k
  Stats::StatName priority_stat_name;
1855
6.53k
  std::string priority_name;
1856
6.53k
  switch (priority) {
1857
0
    PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
1858
3.26k
  case envoy::config::core::v3::DEFAULT:
1859
3.26k
    priority_stat_name = circuit_breakers_stat_names_.default_;
1860
3.26k
    priority_name = "default";
1861
3.26k
    break;
1862
3.26k
  case envoy::config::core::v3::HIGH:
1863
3.26k
    priority_stat_name = circuit_breakers_stat_names_.high_;
1864
3.26k
    priority_name = "high";
1865
3.26k
    break;
1866
6.53k
  }
1867
1868
6.53k
  const std::string runtime_prefix =
1869
6.53k
      fmt::format("circuit_breakers.{}.{}.", cluster_name, priority_name);
1870
1871
6.53k
  const auto& thresholds = config.circuit_breakers().thresholds();
1872
6.53k
  const auto it = std::find_if(
1873
6.53k
      thresholds.cbegin(), thresholds.cend(),
1874
6.53k
      [priority](const envoy::config::cluster::v3::CircuitBreakers::Thresholds& threshold) {
1875
0
        return threshold.priority() == priority;
1876
0
      });
1877
6.53k
  const auto& per_host_thresholds = config.circuit_breakers().per_host_thresholds();
1878
6.53k
  const auto per_host_it = std::find_if(
1879
6.53k
      per_host_thresholds.cbegin(), per_host_thresholds.cend(),
1880
6.53k
      [priority](const envoy::config::cluster::v3::CircuitBreakers::Thresholds& threshold) {
1881
0
        return threshold.priority() == priority;
1882
0
      });
1883
1884
6.53k
  absl::optional<double> budget_percent;
1885
6.53k
  absl::optional<uint32_t> min_retry_concurrency;
1886
6.53k
  if (it != thresholds.cend()) {
1887
0
    max_connections = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_connections, max_connections);
1888
0
    max_pending_requests =
1889
0
        PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_pending_requests, max_pending_requests);
1890
0
    max_requests = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_requests, max_requests);
1891
0
    max_retries = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_retries, max_retries);
1892
0
    track_remaining = it->track_remaining();
1893
0
    max_connection_pools =
1894
0
        PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_connection_pools, max_connection_pools);
1895
0
    std::tie(budget_percent, min_retry_concurrency) = ClusterInfoImpl::getRetryBudgetParams(*it);
1896
0
  }
1897
6.53k
  if (per_host_it != per_host_thresholds.cend()) {
1898
0
    if (per_host_it->has_max_pending_requests() || per_host_it->has_max_requests() ||
1899
0
        per_host_it->has_max_retries() || per_host_it->has_max_connection_pools() ||
1900
0
        per_host_it->has_retry_budget()) {
1901
0
      throwEnvoyExceptionOrPanic("Unsupported field in per_host_thresholds");
1902
0
    }
1903
0
    if (per_host_it->has_max_connections()) {
1904
0
      max_connections_per_host = per_host_it->max_connections().value();
1905
0
    }
1906
0
  }
1907
6.53k
  return std::make_unique<ResourceManagerImpl>(
1908
6.53k
      runtime, runtime_prefix, max_connections, max_pending_requests, max_requests, max_retries,
1909
6.53k
      max_connection_pools, max_connections_per_host,
1910
6.53k
      ClusterInfoImpl::generateCircuitBreakersStats(stats_scope, priority_stat_name,
1911
6.53k
                                                    track_remaining, circuit_breakers_stat_names_),
1912
6.53k
      budget_percent, min_retry_concurrency);
1913
6.53k
}
1914
1915
PriorityStateManager::PriorityStateManager(ClusterImplBase& cluster,
1916
                                           const LocalInfo::LocalInfo& local_info,
1917
                                           PrioritySet::HostUpdateCb* update_cb)
1918
3.26k
    : parent_(cluster), local_info_node_(local_info.node()), update_cb_(update_cb) {}
1919
1920
void PriorityStateManager::initializePriorityFor(
1921
3.26k
    const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint) {
1922
3.26k
  const uint32_t priority = locality_lb_endpoint.priority();
1923
3.26k
  if (priority_state_.size() <= priority) {
1924
3.26k
    priority_state_.resize(priority + 1);
1925
3.26k
  }
1926
3.26k
  if (priority_state_[priority].first == nullptr) {
1927
3.26k
    priority_state_[priority].first = std::make_unique<HostVector>();
1928
3.26k
  }
1929
3.26k
  if (locality_lb_endpoint.has_locality() && locality_lb_endpoint.has_load_balancing_weight()) {
1930
0
    priority_state_[priority].second[locality_lb_endpoint.locality()] =
1931
0
        locality_lb_endpoint.load_balancing_weight().value();
1932
0
  }
1933
3.26k
}
1934
1935
void PriorityStateManager::registerHostForPriority(
1936
    const std::string& hostname, Network::Address::InstanceConstSharedPtr address,
1937
    const std::vector<Network::Address::InstanceConstSharedPtr>& address_list,
1938
    const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint,
1939
3.26k
    const envoy::config::endpoint::v3::LbEndpoint& lb_endpoint, TimeSource& time_source) {
1940
3.26k
  auto metadata = lb_endpoint.has_metadata()
1941
3.26k
                      ? parent_.constMetadataSharedPool()->getObject(lb_endpoint.metadata())
1942
3.26k
                      : nullptr;
1943
3.26k
  const auto host = std::make_shared<HostImpl>(
1944
3.26k
      parent_.info(), hostname, address, metadata, lb_endpoint.load_balancing_weight().value(),
1945
3.26k
      locality_lb_endpoint.locality(), lb_endpoint.endpoint().health_check_config(),
1946
3.26k
      locality_lb_endpoint.priority(), lb_endpoint.health_status(), time_source);
1947
3.26k
  if (!address_list.empty()) {
1948
0
    host->setAddressList(address_list);
1949
0
  }
1950
3.26k
  registerHostForPriority(host, locality_lb_endpoint);
1951
3.26k
}
1952
1953
void PriorityStateManager::registerHostForPriority(
1954
    const HostSharedPtr& host,
1955
3.26k
    const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint) {
1956
3.26k
  const uint32_t priority = locality_lb_endpoint.priority();
1957
  // Should be called after initializePriorityFor.
1958
3.26k
  ASSERT(priority_state_[priority].first);
1959
3.26k
  priority_state_[priority].first->emplace_back(host);
1960
3.26k
}
1961
1962
void PriorityStateManager::updateClusterPrioritySet(
1963
    const uint32_t priority, HostVectorSharedPtr&& current_hosts,
1964
    const absl::optional<HostVector>& hosts_added, const absl::optional<HostVector>& hosts_removed,
1965
    const absl::optional<Upstream::Host::HealthFlag> health_checker_flag,
1966
    absl::optional<bool> weighted_priority_health,
1967
3.26k
    absl::optional<uint32_t> overprovisioning_factor) {
1968
  // If local locality is not defined then skip populating per locality hosts.
1969
3.26k
  const auto& local_locality = local_info_node_.locality();
1970
3.26k
  ENVOY_LOG(trace, "Local locality: {}", local_locality.DebugString());
1971
1972
  // For non-EDS, most likely the current hosts are from priority_state_[priority].first.
1973
3.26k
  HostVectorSharedPtr hosts(std::move(current_hosts));
1974
3.26k
  LocalityWeightsMap empty_locality_map;
1975
3.26k
  LocalityWeightsMap& locality_weights_map =
1976
3.26k
      priority_state_.size() > priority ? priority_state_[priority].second : empty_locality_map;
1977
3.26k
  ASSERT(priority_state_.size() > priority || locality_weights_map.empty());
1978
3.26k
  LocalityWeightsSharedPtr locality_weights;
1979
3.26k
  std::vector<HostVector> per_locality;
1980
1981
  // If we are configured for locality weighted LB we populate the locality weights. We also
1982
  // populate locality weights if the cluster uses load balancing extensions, since the extension
1983
  // may want to make use of locality weights and we cannot tell by inspecting the config whether
1984
  // this is the case.
1985
  //
1986
  // TODO: have the load balancing extension indicate, programmatically, whether it needs locality
1987
  // weights, as an optimization in cases where it doesn't.
1988
3.26k
  const bool locality_weighted_lb =
1989
3.26k
      parent_.info()->lbConfig().has_locality_weighted_lb_config() ||
1990
3.26k
      parent_.info()->lbType() == LoadBalancerType::LoadBalancingPolicyConfig;
1991
3.26k
  if (locality_weighted_lb) {
1992
3.26k
    locality_weights = std::make_shared<LocalityWeights>();
1993
3.26k
  }
1994
1995
  // We use std::map to guarantee a stable ordering for zone aware routing.
1996
3.26k
  std::map<envoy::config::core::v3::Locality, HostVector, LocalityLess> hosts_per_locality;
1997
1998
3.26k
  for (const HostSharedPtr& host : *hosts) {
1999
    // Take into consideration when a non-EDS cluster has active health checking, i.e. to mark all
2000
    // the hosts unhealthy (host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC)) and then fire
2001
    // update callbacks to start the health checking process. The endpoint with disabled active
2002
    // health check should not be set FAILED_ACTIVE_HC here.
2003
3.26k
    if (health_checker_flag.has_value() && !host->disableActiveHealthCheck()) {
2004
0
      host->healthFlagSet(health_checker_flag.value());
2005
0
    }
2006
3.26k
    hosts_per_locality[host->locality()].push_back(host);
2007
3.26k
  }
2008
2009
  // Do we have hosts for the local locality?
2010
3.26k
  const bool non_empty_local_locality =
2011
3.26k
      local_info_node_.has_locality() &&
2012
3.26k
      hosts_per_locality.find(local_locality) != hosts_per_locality.end();
2013
2014
  // As per HostsPerLocality::get(), the per_locality vector must have the local locality hosts
2015
  // first if non_empty_local_locality.
2016
3.26k
  if (non_empty_local_locality) {
2017
0
    per_locality.emplace_back(hosts_per_locality[local_locality]);
2018
0
    if (locality_weighted_lb) {
2019
0
      locality_weights->emplace_back(locality_weights_map[local_locality]);
2020
0
    }
2021
0
  }
2022
2023
  // After the local locality hosts (if any), we place the remaining locality host groups in
2024
  // lexicographic order. This provides a stable ordering for zone aware routing.
2025
3.26k
  for (auto& entry : hosts_per_locality) {
2026
3.26k
    if (!non_empty_local_locality || !LocalityEqualTo()(local_locality, entry.first)) {
2027
3.26k
      per_locality.emplace_back(entry.second);
2028
3.26k
      if (locality_weighted_lb) {
2029
3.26k
        locality_weights->emplace_back(locality_weights_map[entry.first]);
2030
3.26k
      }
2031
3.26k
    }
2032
3.26k
  }
2033
2034
3.26k
  auto per_locality_shared =
2035
3.26k
      std::make_shared<HostsPerLocalityImpl>(std::move(per_locality), non_empty_local_locality);
2036
2037
  // If a batch update callback was provided, use that. Otherwise directly update
2038
  // the PrioritySet.
2039
3.26k
  if (update_cb_ != nullptr) {
2040
14
    update_cb_->updateHosts(priority, HostSetImpl::partitionHosts(hosts, per_locality_shared),
2041
14
                            std::move(locality_weights), hosts_added.value_or(*hosts),
2042
14
                            hosts_removed.value_or<HostVector>({}), weighted_priority_health,
2043
14
                            overprovisioning_factor);
2044
3.25k
  } else {
2045
3.25k
    parent_.prioritySet().updateHosts(
2046
3.25k
        priority, HostSetImpl::partitionHosts(hosts, per_locality_shared),
2047
3.25k
        std::move(locality_weights), hosts_added.value_or(*hosts),
2048
3.25k
        hosts_removed.value_or<HostVector>({}), weighted_priority_health, overprovisioning_factor);
2049
3.25k
  }
2050
3.26k
}
2051
2052
bool BaseDynamicClusterImpl::updateDynamicHostList(
2053
    const HostVector& new_hosts, HostVector& current_priority_hosts,
2054
    HostVector& hosts_added_to_current_priority, HostVector& hosts_removed_from_current_priority,
2055
14
    const HostMap& all_hosts, const absl::flat_hash_set<std::string>& all_new_hosts) {
2056
14
  uint64_t max_host_weight = 1;
2057
2058
  // Did hosts change?
2059
  //
2060
  // Have host attributes changed the health of any endpoint? If so, we
2061
  // rebuild the hosts vectors. We only do this if the health status of an
2062
  // endpoint has materially changed (e.g. if previously failing active health
2063
  // checks, we just note it's now failing EDS health status but don't rebuild).
2064
  //
2065
  // TODO(htuch): We can be smarter about this potentially, and not force a full
2066
  // host set update on health status change. The way this would work is to
2067
  // implement a HealthChecker subclass that provides thread local health
2068
  // updates to the Cluster object. This will probably make sense to do in
2069
  // conjunction with https://github.com/envoyproxy/envoy/issues/2874.
2070
14
  bool hosts_changed = false;
2071
2072
  // Go through and see if the list we have is different from what we just got. If it is, we make
2073
  // a new host list and raise a change notification. We also check for duplicates here. It's
2074
  // possible for DNS to return the same address multiple times, and a bad EDS implementation
2075
  // could do the same thing.
2076
2077
  // Keep track of hosts we see in new_hosts that we are able to match up with an existing host.
2078
14
  absl::flat_hash_set<std::string> existing_hosts_for_current_priority(
2079
14
      current_priority_hosts.size());
2080
  // Keep track of hosts we're adding (or replacing)
2081
14
  absl::flat_hash_set<std::string> new_hosts_for_current_priority(new_hosts.size());
2082
  // Keep track of hosts for which locality is changed.
2083
14
  absl::flat_hash_set<std::string> hosts_with_updated_locality_for_current_priority(
2084
14
      current_priority_hosts.size());
2085
  // Keep track of hosts for which active health check flag is changed.
2086
14
  absl::flat_hash_set<std::string> hosts_with_active_health_check_flag_changed(
2087
14
      current_priority_hosts.size());
2088
14
  HostVector final_hosts;
2089
14
  for (const HostSharedPtr& host : new_hosts) {
2090
    // To match a new host with an existing host means comparing their addresses.
2091
14
    auto existing_host = all_hosts.find(addressToString(host->address()));
2092
14
    const bool existing_host_found = existing_host != all_hosts.end();
2093
2094
    // Clear any pending deletion flag on an existing host in case it came back while it was
2095
    // being stabilized. We will set it again below if needed.
2096
14
    if (existing_host_found) {
2097
0
      existing_host->second->healthFlagClear(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
2098
0
    }
2099
2100
    // Check if in-place host update should be skipped, i.e. when the following criteria are met
2101
    // (currently there is only one criterion, but we might add more in the future):
2102
    // - The cluster health checker is activated and a new host is matched with the existing one,
2103
    //   but the health check address is different.
2104
14
    const bool health_check_address_changed =
2105
14
        (health_checker_ != nullptr && existing_host_found &&
2106
14
         *existing_host->second->healthCheckAddress() != *host->healthCheckAddress());
2107
14
    bool locality_changed = false;
2108
14
    locality_changed = (existing_host_found &&
2109
14
                        (!LocalityEqualTo()(host->locality(), existing_host->second->locality())));
2110
14
    if (locality_changed) {
2111
0
      hosts_with_updated_locality_for_current_priority.emplace(existing_host->first);
2112
0
    }
2113
2114
14
    const bool active_health_check_flag_changed =
2115
14
        (health_checker_ != nullptr && existing_host_found &&
2116
14
         existing_host->second->disableActiveHealthCheck() != host->disableActiveHealthCheck());
2117
14
    if (active_health_check_flag_changed) {
2118
0
      hosts_with_active_health_check_flag_changed.emplace(existing_host->first);
2119
0
    }
2120
14
    const bool skip_inplace_host_update =
2121
14
        health_check_address_changed || locality_changed || active_health_check_flag_changed;
2122
2123
    // When there is a match and we decided to do in-place update, we potentially update the
2124
    // host's health check flag and metadata. Afterwards, the host is pushed back into the
2125
    // final_hosts, i.e. hosts that should be preserved in the current priority.
2126
14
    if (existing_host_found && !skip_inplace_host_update) {
2127
0
      existing_hosts_for_current_priority.emplace(existing_host->first);
2128
      // If we find a host matched based on address, we keep it. However we do change weight
2129
      // inline so do that here.
2130
0
      if (host->weight() > max_host_weight) {
2131
0
        max_host_weight = host->weight();
2132
0
      }
2133
0
      if (existing_host->second->weight() != host->weight()) {
2134
0
        existing_host->second->weight(host->weight());
2135
        // We do full host set rebuilds so that load balancers can do pre-computation of data
2136
        // structures based on host weight. This may become a performance problem in certain
2137
        // deployments so it is runtime feature guarded and may also need to be configurable
2138
        // and/or dynamic in the future.
2139
0
        hosts_changed = true;
2140
0
      }
2141
2142
0
      hosts_changed |= updateEdsHealthFlag(*host, *existing_host->second);
2143
2144
      // Did metadata change?
2145
0
      bool metadata_changed = true;
2146
0
      if (host->metadata() && existing_host->second->metadata()) {
2147
0
        metadata_changed = !Protobuf::util::MessageDifferencer::Equivalent(
2148
0
            *host->metadata(), *existing_host->second->metadata());
2149
0
      } else if (!host->metadata() && !existing_host->second->metadata()) {
2150
0
        metadata_changed = false;
2151
0
      }
2152
2153
0
      if (metadata_changed) {
2154
        // First, update the entire metadata for the endpoint.
2155
0
        existing_host->second->metadata(host->metadata());
2156
2157
        // Also, given that the canary attribute of an endpoint is derived from its metadata
2158
        // (e.g.: from envoy.lb/canary), we do a blind update here since it's cheaper than testing
2159
        // to see if it actually changed. We must update this besides just updating the metadata,
2160
        // because it'll be used by the router filter to compute upstream stats.
2161
0
        existing_host->second->canary(host->canary());
2162
2163
        // If metadata changed, we need to rebuild. See github issue #3810.
2164
0
        hosts_changed = true;
2165
0
      }
2166
2167
      // Did the priority change?
2168
0
      if (host->priority() != existing_host->second->priority()) {
2169
0
        existing_host->second->priority(host->priority());
2170
0
        hosts_added_to_current_priority.emplace_back(existing_host->second);
2171
0
      }
2172
2173
0
      final_hosts.push_back(existing_host->second);
2174
14
    } else {
2175
14
      new_hosts_for_current_priority.emplace(addressToString(host->address()));
2176
14
      if (host->weight() > max_host_weight) {
2177
0
        max_host_weight = host->weight();
2178
0
      }
2179
2180
      // If we are depending on a health checker, we initialize to unhealthy.
2181
      // If there's an existing host with the same health checker, the
2182
      // active health-status is kept.
2183
14
      if (health_checker_ != nullptr && !host->disableActiveHealthCheck()) {
2184
0
        if (Runtime::runtimeFeatureEnabled(
2185
0
                "envoy.reloadable_features.keep_endpoint_active_hc_status_on_locality_update")) {
2186
0
          if (existing_host_found && !health_check_address_changed &&
2187
0
              !active_health_check_flag_changed) {
2188
            // If there's an existing host, use the same active health-status.
2189
            // The existing host can be marked PENDING_ACTIVE_HC or
2190
            // ACTIVE_HC_TIMEOUT if it is also marked with FAILED_ACTIVE_HC.
2191
0
            ASSERT(!existing_host->second->healthFlagGet(Host::HealthFlag::PENDING_ACTIVE_HC) ||
2192
0
                   existing_host->second->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
2193
0
            ASSERT(!existing_host->second->healthFlagGet(Host::HealthFlag::ACTIVE_HC_TIMEOUT) ||
2194
0
                   existing_host->second->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
2195
2196
0
            constexpr uint32_t active_hc_statuses_mask =
2197
0
                enumToInt(Host::HealthFlag::FAILED_ACTIVE_HC) |
2198
0
                enumToInt(Host::HealthFlag::DEGRADED_ACTIVE_HC) |
2199
0
                enumToInt(Host::HealthFlag::PENDING_ACTIVE_HC) |
2200
0
                enumToInt(Host::HealthFlag::ACTIVE_HC_TIMEOUT);
2201
2202
0
            const uint32_t existing_host_statuses = existing_host->second->healthFlagsGetAll();
2203
0
            host->healthFlagsSetAll(existing_host_statuses & active_hc_statuses_mask);
2204
0
          } else {
2205
            // No previous known host, mark it as failed active HC.
2206
0
            host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC);
2207
2208
            // If we want to exclude hosts until they have been health checked, mark them with
2209
            // a flag to indicate that they have not been health checked yet.
2210
0
            if (info_->warmHosts()) {
2211
0
              host->healthFlagSet(Host::HealthFlag::PENDING_ACTIVE_HC);
2212
0
            }
2213
0
          }
2214
0
        } else {
2215
0
          host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC);
2216
2217
          // If we want to exclude hosts until they have been health checked, mark them with
2218
          // a flag to indicate that they have not been health checked yet.
2219
0
          if (info_->warmHosts()) {
2220
0
            host->healthFlagSet(Host::HealthFlag::PENDING_ACTIVE_HC);
2221
0
          }
2222
0
        }
2223
0
      }
2224
2225
14
      final_hosts.push_back(host);
2226
14
      hosts_added_to_current_priority.push_back(host);
2227
14
    }
2228
14
  }
2229
2230
  // Remove hosts from current_priority_hosts that were matched to an existing host in the
2231
  // previous loop.
2232
14
  auto erase_from =
2233
14
      std::remove_if(current_priority_hosts.begin(), current_priority_hosts.end(),
2234
14
                     [&existing_hosts_for_current_priority](const HostSharedPtr& p) {
2235
0
                       auto existing_itr =
2236
0
                           existing_hosts_for_current_priority.find(p->address()->asString());
2237
2238
0
                       if (existing_itr != existing_hosts_for_current_priority.end()) {
2239
0
                         existing_hosts_for_current_priority.erase(existing_itr);
2240
0
                         return true;
2241
0
                       }
2242
2243
0
                       return false;
2244
0
                     });
2245
14
  current_priority_hosts.erase(erase_from, current_priority_hosts.end());
2246
2247
  // If we saw existing hosts during this iteration from a different priority, then we've moved
2248
  // a host from another priority into this one, so we should mark the priority as having changed.
2249
14
  if (!existing_hosts_for_current_priority.empty()) {
2250
0
    hosts_changed = true;
2251
0
  }
2252
2253
  // The remaining hosts are hosts that are not referenced in the config update. We remove them
2254
  // from the priority if any of the following is true:
2255
  // - Active health checking is not enabled.
2256
  // - The removed hosts are failing active health checking OR have been explicitly marked as
2257
  //   unhealthy by a previous EDS update. We do not count outlier as a reason to remove a host
2258
  //   or any other future health condition that may be added so we do not use the coarseHealth()
2259
  //   API.
2260
  // - We have explicitly configured the cluster to remove hosts regardless of active health
2261
  // status.
2262
14
  const bool dont_remove_healthy_hosts =
2263
14
      health_checker_ != nullptr && !info()->drainConnectionsOnHostRemoval();
2264
14
  if (!current_priority_hosts.empty() && dont_remove_healthy_hosts) {
2265
0
    erase_from = std::remove_if(
2266
0
        current_priority_hosts.begin(), current_priority_hosts.end(),
2267
0
        [&all_new_hosts, &new_hosts_for_current_priority,
2268
0
         &hosts_with_updated_locality_for_current_priority,
2269
0
         &hosts_with_active_health_check_flag_changed, &final_hosts,
2270
0
         &max_host_weight](const HostSharedPtr& p) {
2271
          // This host has already been added as a new host in the
2272
          // new_hosts_for_current_priority. Return false here to make sure that host
2273
          // reference with older locality gets cleaned up from the priority.
2274
0
          if (hosts_with_updated_locality_for_current_priority.contains(p->address()->asString())) {
2275
0
            return false;
2276
0
          }
2277
2278
0
          if (hosts_with_active_health_check_flag_changed.contains(p->address()->asString())) {
2279
0
            return false;
2280
0
          }
2281
2282
0
          if (all_new_hosts.contains(p->address()->asString()) &&
2283
0
              !new_hosts_for_current_priority.contains(p->address()->asString())) {
2284
            // If the address is being completely deleted from this priority, but is
2285
            // referenced from another priority, then we assume that the other
2286
            // priority will perform an in-place update to re-use the existing Host.
2287
            // We should therefore not mark it as PENDING_DYNAMIC_REMOVAL, but
2288
            // instead remove it immediately from this priority.
2289
            // Example: health check address changed and priority also changed
2290
0
            return false;
2291
0
          }
2292
2293
          // PENDING_DYNAMIC_REMOVAL doesn't apply for the host with disabled active
2294
          // health check, the host is removed immediately from this priority.
2295
0
          if ((!(p->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) ||
2296
0
                 p->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH))) &&
2297
0
              !p->disableActiveHealthCheck()) {
2298
0
            if (p->weight() > max_host_weight) {
2299
0
              max_host_weight = p->weight();
2300
0
            }
2301
2302
0
            final_hosts.push_back(p);
2303
0
            p->healthFlagSet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
2304
0
            return true;
2305
0
          }
2306
0
          return false;
2307
0
        });
2308
0
    current_priority_hosts.erase(erase_from, current_priority_hosts.end());
2309
0
  }
2310
2311
  // At this point we've accounted for all the new hosts as well the hosts that previously
2312
  // existed in this priority.
2313
14
  info_->endpointStats().max_host_weight_.set(max_host_weight);
2314
2315
  // Whatever remains in current_priority_hosts should be removed.
2316
14
  if (!hosts_added_to_current_priority.empty() || !current_priority_hosts.empty()) {
2317
14
    hosts_removed_from_current_priority = std::move(current_priority_hosts);
2318
14
    hosts_changed = true;
2319
14
  }
2320
2321
  // During the update we populated final_hosts with all the hosts that should remain
2322
  // in the current priority, so move them back into current_priority_hosts.
2323
14
  current_priority_hosts = std::move(final_hosts);
2324
  // We return false here in the absence of EDS health status or metadata changes, because we
2325
  // have no changes to host vector status (modulo weights). When we have EDS
2326
  // health status or metadata changed, we return true, causing updateHosts() to fire in the
2327
  // caller.
2328
14
  return hosts_changed;
2329
14
}
2330
2331
Network::DnsLookupFamily
2332
0
getDnsLookupFamilyFromCluster(const envoy::config::cluster::v3::Cluster& cluster) {
2333
0
  return DnsUtils::getDnsLookupFamilyFromEnum(cluster.dns_lookup_family());
2334
0
}
2335
2336
void reportUpstreamCxDestroy(const Upstream::HostDescriptionConstSharedPtr& host,
2337
1.32k
                             Network::ConnectionEvent event) {
2338
1.32k
  Upstream::ClusterTrafficStats& stats = *host->cluster().trafficStats();
2339
1.32k
  stats.upstream_cx_destroy_.inc();
2340
1.32k
  if (event == Network::ConnectionEvent::RemoteClose) {
2341
619
    stats.upstream_cx_destroy_remote_.inc();
2342
706
  } else {
2343
706
    stats.upstream_cx_destroy_local_.inc();
2344
706
  }
2345
1.32k
}
2346
2347
void reportUpstreamCxDestroyActiveRequest(const Upstream::HostDescriptionConstSharedPtr& host,
2348
61
                                          Network::ConnectionEvent event) {
2349
61
  Upstream::ClusterTrafficStats& stats = *host->cluster().trafficStats();
2350
61
  stats.upstream_cx_destroy_with_active_rq_.inc();
2351
61
  if (event == Network::ConnectionEvent::RemoteClose) {
2352
27
    stats.upstream_cx_destroy_remote_with_active_rq_.inc();
2353
34
  } else {
2354
34
    stats.upstream_cx_destroy_local_with_active_rq_.inc();
2355
34
  }
2356
61
}
2357
2358
Network::Address::InstanceConstSharedPtr resolveHealthCheckAddress(
2359
    const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
2360
211k
    Network::Address::InstanceConstSharedPtr host_address) {
2361
211k
  Network::Address::InstanceConstSharedPtr health_check_address;
2362
211k
  const auto& port_value = health_check_config.port_value();
2363
211k
  if (health_check_config.has_address()) {
2364
0
    auto address = Network::Address::resolveProtoAddress(health_check_config.address());
2365
0
    health_check_address =
2366
0
        port_value == 0 ? address : Network::Utility::getAddressWithPort(*address, port_value);
2367
211k
  } else {
2368
211k
    health_check_address = port_value == 0
2369
211k
                               ? host_address
2370
211k
                               : Network::Utility::getAddressWithPort(*host_address, port_value);
2371
211k
  }
2372
211k
  return health_check_address;
2373
211k
}
2374
2375
} // namespace Upstream
2376
} // namespace Envoy