Line data Source code
1 : #include "source/common/upstream/upstream_impl.h"
2 :
3 : #include <chrono>
4 : #include <cstdint>
5 : #include <limits>
6 : #include <list>
7 : #include <memory>
8 : #include <string>
9 : #include <vector>
10 :
11 : #include "envoy/config/cluster/v3/circuit_breaker.pb.h"
12 : #include "envoy/config/cluster/v3/cluster.pb.h"
13 : #include "envoy/config/core/v3/address.pb.h"
14 : #include "envoy/config/core/v3/base.pb.h"
15 : #include "envoy/config/core/v3/health_check.pb.h"
16 : #include "envoy/config/core/v3/protocol.pb.h"
17 : #include "envoy/config/endpoint/v3/endpoint_components.pb.h"
18 : #include "envoy/config/upstream/local_address_selector/v3/default_local_address_selector.pb.h"
19 : #include "envoy/event/dispatcher.h"
20 : #include "envoy/event/timer.h"
21 : #include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.h"
22 : #include "envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.pb.h"
23 : #include "envoy/extensions/transport_sockets/raw_buffer/v3/raw_buffer.pb.h"
24 : #include "envoy/init/manager.h"
25 : #include "envoy/network/dns.h"
26 : #include "envoy/network/transport_socket.h"
27 : #include "envoy/registry/registry.h"
28 : #include "envoy/secret/secret_manager.h"
29 : #include "envoy/server/filter_config.h"
30 : #include "envoy/server/transport_socket_config.h"
31 : #include "envoy/ssl/context_manager.h"
32 : #include "envoy/stats/scope.h"
33 : #include "envoy/upstream/health_checker.h"
34 : #include "envoy/upstream/upstream.h"
35 :
36 : #include "source/common/common/dns_utils.h"
37 : #include "source/common/common/enum_to_int.h"
38 : #include "source/common/common/fmt.h"
39 : #include "source/common/common/utility.h"
40 : #include "source/common/config/utility.h"
41 : #include "source/common/http/http1/codec_stats.h"
42 : #include "source/common/http/http2/codec_stats.h"
43 : #include "source/common/http/utility.h"
44 : #include "source/common/network/address_impl.h"
45 : #include "source/common/network/happy_eyeballs_connection_impl.h"
46 : #include "source/common/network/resolver_impl.h"
47 : #include "source/common/network/socket_option_factory.h"
48 : #include "source/common/network/socket_option_impl.h"
49 : #include "source/common/protobuf/protobuf.h"
50 : #include "source/common/protobuf/utility.h"
51 : #include "source/common/router/config_utility.h"
52 : #include "source/common/runtime/runtime_features.h"
53 : #include "source/common/runtime/runtime_impl.h"
54 : #include "source/common/stats/deferred_creation.h"
55 : #include "source/common/upstream/cluster_factory_impl.h"
56 : #include "source/common/upstream/health_checker_impl.h"
57 : #include "source/extensions/filters/network/http_connection_manager/config.h"
58 : #include "source/server/transport_socket_config_impl.h"
59 :
60 : #include "absl/container/node_hash_set.h"
61 : #include "absl/strings/str_cat.h"
62 :
63 : namespace Envoy {
64 : namespace Upstream {
65 : namespace {
66 215 : std::string addressToString(Network::Address::InstanceConstSharedPtr address) {
67 215 : if (!address) {
68 0 : return "";
69 0 : }
70 215 : return address->asString();
71 215 : }
72 :
73 : Network::TcpKeepaliveConfig
74 0 : parseTcpKeepaliveConfig(const envoy::config::cluster::v3::Cluster& config) {
75 0 : const envoy::config::core::v3::TcpKeepalive& options =
76 0 : config.upstream_connection_options().tcp_keepalive();
77 0 : return Network::TcpKeepaliveConfig{
78 0 : PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, keepalive_probes, absl::optional<uint32_t>()),
79 0 : PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, keepalive_time, absl::optional<uint32_t>()),
80 0 : PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, keepalive_interval, absl::optional<uint32_t>())};
81 0 : }
82 :
83 : ProtocolOptionsConfigConstSharedPtr
84 : createProtocolOptionsConfig(const std::string& name, const ProtobufWkt::Any& typed_config,
85 119 : Server::Configuration::ProtocolOptionsFactoryContext& factory_context) {
86 119 : Server::Configuration::ProtocolOptionsFactory* factory =
87 119 : Registry::FactoryRegistry<Server::Configuration::NamedNetworkFilterConfigFactory>::getFactory(
88 119 : name);
89 119 : if (factory == nullptr) {
90 119 : factory =
91 119 : Registry::FactoryRegistry<Server::Configuration::NamedHttpFilterConfigFactory>::getFactory(
92 119 : name);
93 119 : }
94 119 : if (factory == nullptr) {
95 119 : factory =
96 119 : Registry::FactoryRegistry<Server::Configuration::ProtocolOptionsFactory>::getFactory(name);
97 119 : }
98 :
99 119 : if (factory == nullptr) {
100 0 : throwEnvoyExceptionOrPanic(
101 0 : fmt::format("Didn't find a registered network or http filter or protocol "
102 0 : "options implementation for name: '{}'",
103 0 : name));
104 0 : }
105 :
106 119 : ProtobufTypes::MessagePtr proto_config = factory->createEmptyProtocolOptionsProto();
107 :
108 119 : if (proto_config == nullptr) {
109 0 : throwEnvoyExceptionOrPanic(fmt::format("filter {} does not support protocol options", name));
110 0 : }
111 :
112 119 : Envoy::Config::Utility::translateOpaqueConfig(
113 119 : typed_config, factory_context.messageValidationVisitor(), *proto_config);
114 119 : return factory->createProtocolOptionsConfig(*proto_config, factory_context);
115 119 : }
116 :
117 : absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr> parseExtensionProtocolOptions(
118 : const envoy::config::cluster::v3::Cluster& config,
119 159 : Server::Configuration::ProtocolOptionsFactoryContext& factory_context) {
120 159 : absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr> options;
121 :
122 159 : for (const auto& it : config.typed_extension_protocol_options()) {
123 119 : auto& name = it.first;
124 119 : auto object = createProtocolOptionsConfig(name, it.second, factory_context);
125 119 : if (object != nullptr) {
126 119 : options[name] = std::move(object);
127 119 : }
128 119 : }
129 :
130 159 : return options;
131 159 : }
132 :
133 : // Updates the EDS health flags for an existing host to match the new host.
134 : // @param updated_host the new host to read health flag values from.
135 : // @param existing_host the host to update.
136 : // @return bool whether the flag update caused the host health to change.
137 0 : bool updateEdsHealthFlag(const Host& updated_host, Host& existing_host) {
138 0 : auto updated_eds_health_status = updated_host.edsHealthStatus();
139 :
140 : // Check if the health flag has changed.
141 0 : if (updated_eds_health_status == existing_host.edsHealthStatus()) {
142 0 : return false;
143 0 : }
144 :
145 0 : const auto previous_health = existing_host.coarseHealth();
146 0 : existing_host.setEdsHealthStatus(updated_eds_health_status);
147 :
148 : // Rebuild if changing the flag affected the host health.
149 0 : return previous_health != existing_host.coarseHealth();
150 0 : }
151 :
152 : // Converts a set of hosts into a HostVector, excluding certain hosts.
153 : // @param hosts hosts to convert
154 : // @param excluded_hosts hosts to exclude from the resulting vector.
155 : HostVector filterHosts(const absl::node_hash_set<HostSharedPtr>& hosts,
156 56 : const absl::node_hash_set<HostSharedPtr>& excluded_hosts) {
157 56 : HostVector net_hosts;
158 56 : net_hosts.reserve(hosts.size());
159 :
160 56 : for (const auto& host : hosts) {
161 28 : if (excluded_hosts.find(host) == excluded_hosts.end()) {
162 28 : net_hosts.emplace_back(host);
163 28 : }
164 28 : }
165 :
166 56 : return net_hosts;
167 56 : }
168 :
169 : Stats::ScopeSharedPtr generateStatsScope(const envoy::config::cluster::v3::Cluster& config,
170 159 : Stats::Store& stats) {
171 159 : return stats.createScope(fmt::format(
172 159 : "cluster.{}.", config.alt_stat_name().empty() ? config.name() : config.alt_stat_name()));
173 159 : }
174 :
175 : Network::ConnectionSocket::OptionsSharedPtr
176 : buildBaseSocketOptions(const envoy::config::cluster::v3::Cluster& cluster_config,
177 159 : const envoy::config::core::v3::BindConfig& bootstrap_bind_config) {
178 159 : Network::ConnectionSocket::OptionsSharedPtr base_options =
179 159 : std::make_shared<Network::ConnectionSocket::Options>();
180 :
181 : // The process-wide `signal()` handling may fail to handle SIGPIPE if overridden
182 : // in the process (i.e., on a mobile client). Some OSes support handling it at the socket layer:
183 159 : if (ENVOY_SOCKET_SO_NOSIGPIPE.hasValue()) {
184 0 : Network::Socket::appendOptions(base_options,
185 0 : Network::SocketOptionFactory::buildSocketNoSigpipeOptions());
186 0 : }
187 : // Cluster IP_FREEBIND settings, when set, will override the cluster manager wide settings.
188 159 : if ((bootstrap_bind_config.freebind().value() &&
189 159 : !cluster_config.upstream_bind_config().has_freebind()) ||
190 159 : cluster_config.upstream_bind_config().freebind().value()) {
191 0 : Network::Socket::appendOptions(base_options,
192 0 : Network::SocketOptionFactory::buildIpFreebindOptions());
193 0 : }
194 159 : if (cluster_config.upstream_connection_options().has_tcp_keepalive()) {
195 0 : Network::Socket::appendOptions(base_options,
196 0 : Network::SocketOptionFactory::buildTcpKeepaliveOptions(
197 0 : parseTcpKeepaliveConfig(cluster_config)));
198 0 : }
199 :
200 159 : return base_options;
201 159 : }
202 :
203 : Network::ConnectionSocket::OptionsSharedPtr
204 : buildClusterSocketOptions(const envoy::config::cluster::v3::Cluster& cluster_config,
205 159 : const envoy::config::core::v3::BindConfig& bootstrap_bind_config) {
206 159 : Network::ConnectionSocket::OptionsSharedPtr cluster_options =
207 159 : std::make_shared<Network::ConnectionSocket::Options>();
208 : // Cluster socket_options trump cluster manager wide.
209 159 : if (bootstrap_bind_config.socket_options().size() +
210 159 : cluster_config.upstream_bind_config().socket_options().size() >
211 159 : 0) {
212 0 : auto socket_options = !cluster_config.upstream_bind_config().socket_options().empty()
213 0 : ? cluster_config.upstream_bind_config().socket_options()
214 0 : : bootstrap_bind_config.socket_options();
215 0 : Network::Socket::appendOptions(
216 0 : cluster_options, Network::SocketOptionFactory::buildLiteralOptions(socket_options));
217 0 : }
218 159 : return cluster_options;
219 159 : }
220 :
221 : std::vector<::Envoy::Upstream::UpstreamLocalAddress>
222 : parseBindConfig(::Envoy::OptRef<const envoy::config::core::v3::BindConfig> bind_config,
223 : const absl::optional<std::string>& cluster_name,
224 : Network::ConnectionSocket::OptionsSharedPtr base_socket_options,
225 159 : Network::ConnectionSocket::OptionsSharedPtr cluster_socket_options) {
226 :
227 159 : std::vector<::Envoy::Upstream::UpstreamLocalAddress> upstream_local_addresses;
228 159 : if (bind_config.has_value()) {
229 0 : UpstreamLocalAddress upstream_local_address;
230 0 : upstream_local_address.address_ =
231 0 : bind_config->has_source_address()
232 0 : ? ::Envoy::Network::Address::resolveProtoSocketAddress(bind_config->source_address())
233 0 : : nullptr;
234 0 : upstream_local_address.socket_options_ = std::make_shared<Network::ConnectionSocket::Options>();
235 :
236 0 : ::Envoy::Network::Socket::appendOptions(upstream_local_address.socket_options_,
237 0 : base_socket_options);
238 0 : ::Envoy::Network::Socket::appendOptions(upstream_local_address.socket_options_,
239 0 : cluster_socket_options);
240 :
241 0 : upstream_local_addresses.push_back(upstream_local_address);
242 :
243 0 : for (const auto& extra_source_address : bind_config->extra_source_addresses()) {
244 0 : UpstreamLocalAddress extra_upstream_local_address;
245 0 : extra_upstream_local_address.address_ =
246 0 : ::Envoy::Network::Address::resolveProtoSocketAddress(extra_source_address.address());
247 :
248 0 : extra_upstream_local_address.socket_options_ =
249 0 : std::make_shared<::Envoy::Network::ConnectionSocket::Options>();
250 0 : ::Envoy::Network::Socket::appendOptions(extra_upstream_local_address.socket_options_,
251 0 : base_socket_options);
252 :
253 0 : if (extra_source_address.has_socket_options()) {
254 0 : ::Envoy::Network::Socket::appendOptions(
255 0 : extra_upstream_local_address.socket_options_,
256 0 : ::Envoy::Network::SocketOptionFactory::buildLiteralOptions(
257 0 : extra_source_address.socket_options().socket_options()));
258 0 : } else {
259 0 : ::Envoy::Network::Socket::appendOptions(extra_upstream_local_address.socket_options_,
260 0 : cluster_socket_options);
261 0 : }
262 0 : upstream_local_addresses.push_back(extra_upstream_local_address);
263 0 : }
264 :
265 0 : for (const auto& additional_source_address : bind_config->additional_source_addresses()) {
266 0 : UpstreamLocalAddress additional_upstream_local_address;
267 0 : additional_upstream_local_address.address_ =
268 0 : ::Envoy::Network::Address::resolveProtoSocketAddress(additional_source_address);
269 0 : additional_upstream_local_address.socket_options_ =
270 0 : std::make_shared<::Envoy::Network::ConnectionSocket::Options>();
271 0 : ::Envoy::Network::Socket::appendOptions(additional_upstream_local_address.socket_options_,
272 0 : base_socket_options);
273 0 : ::Envoy::Network::Socket::appendOptions(additional_upstream_local_address.socket_options_,
274 0 : cluster_socket_options);
275 0 : upstream_local_addresses.push_back(additional_upstream_local_address);
276 0 : }
277 159 : } else {
278 : // If there is no bind config specified, then return a nullptr for the address.
279 159 : UpstreamLocalAddress local_address;
280 159 : local_address.address_ = nullptr;
281 159 : local_address.socket_options_ = std::make_shared<::Envoy::Network::ConnectionSocket::Options>();
282 159 : ::Envoy::Network::Socket::appendOptions(local_address.socket_options_, base_socket_options);
283 159 : Network::Socket::appendOptions(local_address.socket_options_, cluster_socket_options);
284 159 : upstream_local_addresses.push_back(local_address);
285 159 : }
286 :
287 : // Verify that we have valid addresses if size is greater than 1.
288 159 : if (upstream_local_addresses.size() > 1) {
289 0 : for (auto const& upstream_local_address : upstream_local_addresses) {
290 0 : if (upstream_local_address.address_ == nullptr) {
291 0 : throwEnvoyExceptionOrPanic(fmt::format(
292 0 : "{}'s upstream binding config has invalid IP addresses.",
293 0 : !(cluster_name.has_value()) ? "Bootstrap"
294 0 : : fmt::format("Cluster {}", cluster_name.value())));
295 0 : }
296 0 : }
297 0 : }
298 :
299 159 : return upstream_local_addresses;
300 159 : }
301 :
302 : Envoy::Upstream::UpstreamLocalAddressSelectorConstSharedPtr createUpstreamLocalAddressSelector(
303 : const envoy::config::cluster::v3::Cluster& cluster_config,
304 159 : const absl::optional<envoy::config::core::v3::BindConfig>& bootstrap_bind_config) {
305 :
306 : // Use the cluster bind config if specified. This completely overrides the
307 : // bootstrap bind config when present.
308 159 : OptRef<const envoy::config::core::v3::BindConfig> bind_config;
309 159 : absl::optional<std::string> cluster_name;
310 159 : if (cluster_config.has_upstream_bind_config()) {
311 0 : bind_config.emplace(cluster_config.upstream_bind_config());
312 0 : cluster_name.emplace(cluster_config.name());
313 159 : } else if (bootstrap_bind_config.has_value()) {
314 0 : bind_config.emplace(*bootstrap_bind_config);
315 0 : }
316 :
317 : // Verify that bind config is valid.
318 159 : if (bind_config.has_value()) {
319 0 : if (bind_config->additional_source_addresses_size() > 0 &&
320 0 : bind_config->extra_source_addresses_size() > 0) {
321 0 : throwEnvoyExceptionOrPanic(fmt::format(
322 0 : "Can't specify both `extra_source_addresses` and `additional_source_addresses` "
323 0 : "in the {}'s upstream binding config",
324 0 : !(cluster_name.has_value()) ? "Bootstrap"
325 0 : : fmt::format("Cluster {}", cluster_name.value())));
326 0 : }
327 :
328 0 : if (!bind_config->has_source_address() &&
329 0 : (bind_config->extra_source_addresses_size() > 0 ||
330 0 : bind_config->additional_source_addresses_size() > 0)) {
331 0 : throwEnvoyExceptionOrPanic(fmt::format(
332 0 : "{}'s upstream binding config has extra/additional source addresses but no "
333 0 : "source_address. Extra/additional addresses cannot be specified if "
334 0 : "source_address is not set.",
335 0 : !(cluster_name.has_value()) ? "Bootstrap"
336 0 : : fmt::format("Cluster {}", cluster_name.value())));
337 0 : }
338 0 : }
339 159 : UpstreamLocalAddressSelectorFactory* local_address_selector_factory;
340 159 : const envoy::config::core::v3::TypedExtensionConfig* const local_address_selector_config =
341 159 : bind_config.has_value() && bind_config->has_local_address_selector()
342 159 : ? &bind_config->local_address_selector()
343 159 : : nullptr;
344 159 : if (local_address_selector_config) {
345 0 : local_address_selector_factory =
346 0 : Config::Utility::getAndCheckFactory<UpstreamLocalAddressSelectorFactory>(
347 0 : *local_address_selector_config, false);
348 159 : } else {
349 : // Create the default local address selector if one was not specified.
350 159 : envoy::config::upstream::local_address_selector::v3::DefaultLocalAddressSelector default_config;
351 159 : envoy::config::core::v3::TypedExtensionConfig typed_extension;
352 159 : typed_extension.mutable_typed_config()->PackFrom(default_config);
353 159 : local_address_selector_factory =
354 159 : Config::Utility::getAndCheckFactory<UpstreamLocalAddressSelectorFactory>(typed_extension,
355 159 : false);
356 159 : }
357 159 : auto selector_or_error = local_address_selector_factory->createLocalAddressSelector(
358 159 : parseBindConfig(
359 159 : bind_config, cluster_name,
360 159 : buildBaseSocketOptions(cluster_config, bootstrap_bind_config.value_or(
361 159 : envoy::config::core::v3::BindConfig{})),
362 159 : buildClusterSocketOptions(cluster_config, bootstrap_bind_config.value_or(
363 159 : envoy::config::core::v3::BindConfig{}))),
364 159 : cluster_name);
365 159 : THROW_IF_STATUS_NOT_OK(selector_or_error, throw);
366 159 : return selector_or_error.value();
367 159 : }
368 :
369 : } // namespace
370 :
371 : // Allow disabling ALPN checks for transport sockets. See
372 : // https://github.com/envoyproxy/envoy/issues/22876
373 : const absl::string_view ClusterImplBase::DoNotValidateAlpnRuntimeKey =
374 : "config.do_not_validate_alpn_support";
375 :
376 : // TODO(pianiststickman): this implementation takes a lock on the hot path and puts a copy of the
377 : // stat name into every host that receives a copy of that metric. This can be improved by putting
378 : // a single copy of the stat name into a thread-local key->index map so that the lock can be avoided
379 : // and using the index as the key to the stat map instead.
380 0 : void LoadMetricStatsImpl::add(const absl::string_view key, double value) {
381 0 : absl::MutexLock lock(&mu_);
382 0 : if (map_ == nullptr) {
383 0 : map_ = std::make_unique<StatMap>();
384 0 : }
385 0 : Stat& stat = (*map_)[key];
386 0 : ++stat.num_requests_with_metric;
387 0 : stat.total_metric_value += value;
388 0 : }
389 :
390 0 : LoadMetricStats::StatMapPtr LoadMetricStatsImpl::latch() {
391 0 : absl::MutexLock lock(&mu_);
392 0 : StatMapPtr latched = std::move(map_);
393 0 : map_ = nullptr;
394 0 : return latched;
395 0 : }
396 :
397 : HostDescriptionImpl::HostDescriptionImpl(
398 : ClusterInfoConstSharedPtr cluster, const std::string& hostname,
399 : Network::Address::InstanceConstSharedPtr dest_address, MetadataConstSharedPtr metadata,
400 : const envoy::config::core::v3::Locality& locality,
401 : const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
402 : uint32_t priority, TimeSource& time_source)
403 : : cluster_(cluster), hostname_(hostname),
404 : health_checks_hostname_(health_check_config.hostname()), address_(dest_address),
405 : canary_(Config::Metadata::metadataValue(metadata.get(),
406 : Config::MetadataFilters::get().ENVOY_LB,
407 : Config::MetadataEnvoyLbKeys::get().CANARY)
408 : .bool_value()),
409 : metadata_(metadata), locality_(locality),
410 : locality_zone_stat_name_(locality.zone(), cluster->statsScope().symbolTable()),
411 : priority_(priority),
412 : socket_factory_(resolveTransportSocketFactory(dest_address, metadata_.get())),
413 180397 : creation_time_(time_source.monotonicTime()) {
414 180397 : if (health_check_config.port_value() != 0 && dest_address->type() != Network::Address::Type::Ip) {
415 : // Setting the health check port to non-0 only works for IP-type addresses. Setting the port
416 : // for a pipe address is a misconfiguration. Throw an exception.
417 0 : throwEnvoyExceptionOrPanic(
418 0 : fmt::format("Invalid host configuration: non-zero port for non-IP address"));
419 0 : }
420 180397 : health_check_address_ = resolveHealthCheckAddress(health_check_config, dest_address);
421 180397 : }
422 :
423 : Network::UpstreamTransportSocketFactory& HostDescriptionImpl::resolveTransportSocketFactory(
424 : const Network::Address::InstanceConstSharedPtr& dest_address,
425 180400 : const envoy::config::core::v3::Metadata* metadata) const {
426 180400 : auto match = cluster_->transportSocketMatcher().resolve(metadata);
427 180400 : match.stats_.total_match_count_.inc();
428 180400 : ENVOY_LOG(debug, "transport socket match, socket {} selected for host with address {}",
429 180400 : match.name_, dest_address ? dest_address->asString() : "empty");
430 :
431 180400 : return match.factory_;
432 180400 : }
433 :
434 : Host::CreateConnectionData HostImpl::createConnection(
435 : Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
436 173 : Network::TransportSocketOptionsConstSharedPtr transport_socket_options) const {
437 173 : return createConnection(dispatcher, cluster(), address(), addressList(), transportSocketFactory(),
438 173 : options, transport_socket_options, shared_from_this());
439 173 : }
440 :
441 180259 : void HostImpl::setEdsHealthFlag(envoy::config::core::v3::HealthStatus health_status) {
442 : // Clear all old EDS health flags first.
443 180259 : HostImpl::healthFlagClear(Host::HealthFlag::FAILED_EDS_HEALTH);
444 180259 : HostImpl::healthFlagClear(Host::HealthFlag::DEGRADED_EDS_HEALTH);
445 :
446 : // Set the appropriate EDS health flag.
447 180259 : switch (health_status) {
448 0 : case envoy::config::core::v3::UNHEALTHY:
449 0 : FALLTHRU;
450 0 : case envoy::config::core::v3::DRAINING:
451 0 : FALLTHRU;
452 0 : case envoy::config::core::v3::TIMEOUT:
453 0 : HostImpl::healthFlagSet(Host::HealthFlag::FAILED_EDS_HEALTH);
454 0 : break;
455 0 : case envoy::config::core::v3::DEGRADED:
456 0 : HostImpl::healthFlagSet(Host::HealthFlag::DEGRADED_EDS_HEALTH);
457 0 : break;
458 180259 : default:
459 180259 : break;
460 : // No health flags should be set.
461 180259 : }
462 180259 : }
463 :
464 : Host::CreateConnectionData HostImpl::createHealthCheckConnection(
465 : Event::Dispatcher& dispatcher,
466 : Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
467 69 : const envoy::config::core::v3::Metadata* metadata) const {
468 :
469 69 : Network::UpstreamTransportSocketFactory& factory =
470 69 : (metadata != nullptr) ? resolveTransportSocketFactory(healthCheckAddress(), metadata)
471 69 : : transportSocketFactory();
472 69 : return createConnection(dispatcher, cluster(), healthCheckAddress(), {}, factory, nullptr,
473 69 : transport_socket_options, shared_from_this());
474 69 : }
475 :
476 : Host::CreateConnectionData HostImpl::createConnection(
477 : Event::Dispatcher& dispatcher, const ClusterInfo& cluster,
478 : const Network::Address::InstanceConstSharedPtr& address,
479 : const std::vector<Network::Address::InstanceConstSharedPtr>& address_list,
480 : Network::UpstreamTransportSocketFactory& socket_factory,
481 : const Network::ConnectionSocket::OptionsSharedPtr& options,
482 : Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
483 242 : HostDescriptionConstSharedPtr host) {
484 242 : auto source_address_selector = cluster.getUpstreamLocalAddressSelector();
485 :
486 242 : Network::ClientConnectionPtr connection;
487 : // If the transport socket options indicate the connection should be
488 : // redirected to a proxy, create the TCP connection to the proxy's address not
489 : // the host's address.
490 242 : if (transport_socket_options && transport_socket_options->http11ProxyInfo().has_value()) {
491 0 : auto upstream_local_address =
492 0 : source_address_selector->getUpstreamLocalAddress(address, options);
493 0 : ENVOY_LOG(debug, "Connecting to configured HTTP/1.1 proxy at {}",
494 0 : transport_socket_options->http11ProxyInfo()->proxy_address->asString());
495 0 : connection = dispatcher.createClientConnection(
496 0 : transport_socket_options->http11ProxyInfo()->proxy_address, upstream_local_address.address_,
497 0 : socket_factory.createTransportSocket(transport_socket_options, host),
498 0 : upstream_local_address.socket_options_, transport_socket_options);
499 242 : } else if (address_list.size() > 1) {
500 0 : connection = std::make_unique<Network::HappyEyeballsConnectionImpl>(
501 0 : dispatcher, address_list, source_address_selector, socket_factory, transport_socket_options,
502 0 : host, options);
503 242 : } else {
504 242 : auto upstream_local_address =
505 242 : source_address_selector->getUpstreamLocalAddress(address, options);
506 242 : connection = dispatcher.createClientConnection(
507 242 : address, upstream_local_address.address_,
508 242 : socket_factory.createTransportSocket(transport_socket_options, host),
509 242 : upstream_local_address.socket_options_, transport_socket_options);
510 242 : }
511 :
512 242 : connection->connectionInfoSetter().enableSettingInterfaceName(
513 242 : cluster.setLocalInterfaceNameOnUpstreamConnections());
514 242 : connection->setBufferLimits(cluster.perConnectionBufferLimitBytes());
515 242 : cluster.createNetworkFilterChain(*connection);
516 242 : return {std::move(connection), std::move(host)};
517 242 : }
518 :
519 3633883 : void HostImpl::weight(uint32_t new_weight) { weight_ = std::max(1U, new_weight); }
520 :
521 : std::vector<HostsPerLocalityConstSharedPtr> HostsPerLocalityImpl::filter(
522 159 : const std::vector<std::function<bool(const Host&)>>& predicates) const {
523 : // We keep two lists: one for being able to mutate the clone and one for returning to the
524 : // caller. Creating them both at the start avoids iterating over the mutable values at the end
525 : // to convert them to a const pointer.
526 159 : std::vector<std::shared_ptr<HostsPerLocalityImpl>> mutable_clones;
527 159 : std::vector<HostsPerLocalityConstSharedPtr> filtered_clones;
528 :
529 159 : mutable_clones.reserve(predicates.size());
530 159 : filtered_clones.reserve(predicates.size());
531 636 : for (size_t i = 0; i < predicates.size(); ++i) {
532 477 : mutable_clones.emplace_back(std::make_shared<HostsPerLocalityImpl>());
533 477 : filtered_clones.emplace_back(mutable_clones.back());
534 477 : mutable_clones.back()->local_ = local_;
535 477 : }
536 :
537 159 : for (const auto& hosts_locality : hosts_per_locality_) {
538 159 : std::vector<HostVector> current_locality_hosts;
539 159 : current_locality_hosts.resize(predicates.size());
540 :
541 : // Since # of hosts >> # of predicates, we iterate over the hosts in the outer loop.
542 159 : for (const auto& host : hosts_locality) {
543 636 : for (size_t i = 0; i < predicates.size(); ++i) {
544 477 : if (predicates[i](*host)) {
545 159 : current_locality_hosts[i].emplace_back(host);
546 159 : }
547 477 : }
548 159 : }
549 :
550 636 : for (size_t i = 0; i < predicates.size(); ++i) {
551 477 : mutable_clones[i]->hosts_per_locality_.push_back(std::move(current_locality_hosts[i]));
552 477 : }
553 159 : }
554 :
555 159 : return filtered_clones;
556 159 : }
557 :
558 : void HostSetImpl::updateHosts(PrioritySet::UpdateHostsParams&& update_hosts_params,
559 : LocalityWeightsConstSharedPtr locality_weights,
560 : const HostVector& hosts_added, const HostVector& hosts_removed,
561 : absl::optional<bool> weighted_priority_health,
562 663 : absl::optional<uint32_t> overprovisioning_factor) {
563 663 : if (weighted_priority_health.has_value()) {
564 465 : weighted_priority_health_ = weighted_priority_health.value();
565 465 : }
566 663 : if (overprovisioning_factor.has_value()) {
567 465 : ASSERT(overprovisioning_factor.value() > 0);
568 465 : overprovisioning_factor_ = overprovisioning_factor.value();
569 465 : }
570 663 : hosts_ = std::move(update_hosts_params.hosts);
571 663 : healthy_hosts_ = std::move(update_hosts_params.healthy_hosts);
572 663 : degraded_hosts_ = std::move(update_hosts_params.degraded_hosts);
573 663 : excluded_hosts_ = std::move(update_hosts_params.excluded_hosts);
574 663 : hosts_per_locality_ = std::move(update_hosts_params.hosts_per_locality);
575 663 : healthy_hosts_per_locality_ = std::move(update_hosts_params.healthy_hosts_per_locality);
576 663 : degraded_hosts_per_locality_ = std::move(update_hosts_params.degraded_hosts_per_locality);
577 663 : excluded_hosts_per_locality_ = std::move(update_hosts_params.excluded_hosts_per_locality);
578 663 : locality_weights_ = std::move(locality_weights);
579 :
580 : // TODO(ggreenway): implement `weighted_priority_health` support in `rebuildLocalityScheduler`.
581 663 : rebuildLocalityScheduler(healthy_locality_scheduler_, healthy_locality_entries_,
582 663 : *healthy_hosts_per_locality_, healthy_hosts_->get(), hosts_per_locality_,
583 663 : excluded_hosts_per_locality_, locality_weights_,
584 663 : overprovisioning_factor_);
585 663 : rebuildLocalityScheduler(degraded_locality_scheduler_, degraded_locality_entries_,
586 663 : *degraded_hosts_per_locality_, degraded_hosts_->get(),
587 663 : hosts_per_locality_, excluded_hosts_per_locality_, locality_weights_,
588 663 : overprovisioning_factor_);
589 :
590 663 : runUpdateCallbacks(hosts_added, hosts_removed);
591 663 : }
592 :
593 : void HostSetImpl::rebuildLocalityScheduler(
594 : std::unique_ptr<EdfScheduler<LocalityEntry>>& locality_scheduler,
595 : std::vector<std::shared_ptr<LocalityEntry>>& locality_entries,
596 : const HostsPerLocality& eligible_hosts_per_locality, const HostVector& eligible_hosts,
597 : HostsPerLocalityConstSharedPtr all_hosts_per_locality,
598 : HostsPerLocalityConstSharedPtr excluded_hosts_per_locality,
599 1326 : LocalityWeightsConstSharedPtr locality_weights, uint32_t overprovisioning_factor) {
600 : // Rebuild the locality scheduler by computing the effective weight of each
601 : // locality in this priority. The scheduler is reset by default, and is rebuilt only if we have
602 : // locality weights (i.e. using EDS) and there is at least one eligible host in this priority.
603 : //
604 : // We omit building a scheduler when there are zero eligible hosts in the priority as
605 : // all the localities will have zero effective weight. At selection time, we'll either select
606 : // from a different scheduler or there will be no available hosts in the priority. At that point
607 : // we'll rely on other mechanisms such as panic mode to select a host, none of which rely on the
608 : // scheduler.
609 : //
610 : // TODO(htuch): if the underlying locality index ->
611 : // envoy::config::core::v3::Locality hasn't changed in hosts_/healthy_hosts_/degraded_hosts_, we
612 : // could just update locality_weight_ without rebuilding. Similar to how host
613 : // level WRR works, we would age out the existing entries via picks and lazily
614 : // apply the new weights.
615 1326 : locality_scheduler = nullptr;
616 1326 : if (all_hosts_per_locality != nullptr && locality_weights != nullptr &&
617 1326 : !locality_weights->empty() && !eligible_hosts.empty()) {
618 465 : locality_scheduler = std::make_unique<EdfScheduler<LocalityEntry>>();
619 465 : locality_entries.clear();
620 930 : for (uint32_t i = 0; i < all_hosts_per_locality->get().size(); ++i) {
621 465 : const double effective_weight = effectiveLocalityWeight(
622 465 : i, eligible_hosts_per_locality, *excluded_hosts_per_locality, *all_hosts_per_locality,
623 465 : *locality_weights, overprovisioning_factor);
624 465 : if (effective_weight > 0) {
625 0 : locality_entries.emplace_back(std::make_shared<LocalityEntry>(i, effective_weight));
626 0 : locality_scheduler->add(effective_weight, locality_entries.back());
627 0 : }
628 465 : }
629 : // If all effective weights were zero, reset the scheduler.
630 465 : if (locality_scheduler->empty()) {
631 465 : locality_scheduler = nullptr;
632 465 : }
633 465 : }
634 1326 : }
635 :
636 0 : absl::optional<uint32_t> HostSetImpl::chooseHealthyLocality() {
637 0 : return chooseLocality(healthy_locality_scheduler_.get());
638 0 : }
639 :
640 0 : absl::optional<uint32_t> HostSetImpl::chooseDegradedLocality() {
641 0 : return chooseLocality(degraded_locality_scheduler_.get());
642 0 : }
643 :
644 : absl::optional<uint32_t>
645 0 : HostSetImpl::chooseLocality(EdfScheduler<LocalityEntry>* locality_scheduler) {
646 0 : if (locality_scheduler == nullptr) {
647 0 : return {};
648 0 : }
649 0 : const std::shared_ptr<LocalityEntry> locality = locality_scheduler->pickAndAdd(
650 0 : [](const LocalityEntry& locality) { return locality.effective_weight_; });
651 : // We don't build a schedule if there are no weighted localities, so we should always succeed.
652 0 : ASSERT(locality != nullptr);
653 : // If we picked it before, its weight must have been positive.
654 0 : ASSERT(locality->effective_weight_ > 0);
655 0 : return locality->index_;
656 0 : }
657 :
658 : PrioritySet::UpdateHostsParams
659 : HostSetImpl::updateHostsParams(HostVectorConstSharedPtr hosts,
660 : HostsPerLocalityConstSharedPtr hosts_per_locality,
661 : HealthyHostVectorConstSharedPtr healthy_hosts,
662 : HostsPerLocalityConstSharedPtr healthy_hosts_per_locality,
663 : DegradedHostVectorConstSharedPtr degraded_hosts,
664 : HostsPerLocalityConstSharedPtr degraded_hosts_per_locality,
665 : ExcludedHostVectorConstSharedPtr excluded_hosts,
666 516 : HostsPerLocalityConstSharedPtr excluded_hosts_per_locality) {
667 516 : return PrioritySet::UpdateHostsParams{std::move(hosts),
668 516 : std::move(healthy_hosts),
669 516 : std::move(degraded_hosts),
670 516 : std::move(excluded_hosts),
671 516 : std::move(hosts_per_locality),
672 516 : std::move(healthy_hosts_per_locality),
673 516 : std::move(degraded_hosts_per_locality),
674 516 : std::move(excluded_hosts_per_locality)};
675 516 : }
676 :
677 357 : PrioritySet::UpdateHostsParams HostSetImpl::updateHostsParams(const HostSet& host_set) {
678 357 : return updateHostsParams(host_set.hostsPtr(), host_set.hostsPerLocalityPtr(),
679 357 : host_set.healthyHostsPtr(), host_set.healthyHostsPerLocalityPtr(),
680 357 : host_set.degradedHostsPtr(), host_set.degradedHostsPerLocalityPtr(),
681 357 : host_set.excludedHostsPtr(), host_set.excludedHostsPerLocalityPtr());
682 357 : }
683 : PrioritySet::UpdateHostsParams
684 : HostSetImpl::partitionHosts(HostVectorConstSharedPtr hosts,
685 159 : HostsPerLocalityConstSharedPtr hosts_per_locality) {
686 159 : auto partitioned_hosts = ClusterImplBase::partitionHostList(*hosts);
687 159 : auto healthy_degraded_excluded_hosts_per_locality =
688 159 : ClusterImplBase::partitionHostsPerLocality(*hosts_per_locality);
689 :
690 159 : return updateHostsParams(std::move(hosts), std::move(hosts_per_locality),
691 159 : std::move(std::get<0>(partitioned_hosts)),
692 159 : std::move(std::get<0>(healthy_degraded_excluded_hosts_per_locality)),
693 159 : std::move(std::get<1>(partitioned_hosts)),
694 159 : std::move(std::get<1>(healthy_degraded_excluded_hosts_per_locality)),
695 159 : std::move(std::get<2>(partitioned_hosts)),
696 159 : std::move(std::get<2>(healthy_degraded_excluded_hosts_per_locality)));
697 159 : }
698 :
699 : double HostSetImpl::effectiveLocalityWeight(uint32_t index,
700 : const HostsPerLocality& eligible_hosts_per_locality,
701 : const HostsPerLocality& excluded_hosts_per_locality,
702 : const HostsPerLocality& all_hosts_per_locality,
703 : const LocalityWeights& locality_weights,
704 465 : uint32_t overprovisioning_factor) {
705 465 : const auto& locality_eligible_hosts = eligible_hosts_per_locality.get()[index];
706 465 : const uint32_t excluded_count = excluded_hosts_per_locality.get().size() > index
707 465 : ? excluded_hosts_per_locality.get()[index].size()
708 465 : : 0;
709 465 : const auto host_count = all_hosts_per_locality.get()[index].size() - excluded_count;
710 465 : if (host_count == 0) {
711 0 : return 0.0;
712 0 : }
713 465 : const double locality_availability_ratio = 1.0 * locality_eligible_hosts.size() / host_count;
714 465 : const uint32_t weight = locality_weights[index];
715 : // Availability ranges from 0-1.0, and is the ratio of eligible hosts to total hosts, modified
716 : // by the overprovisioning factor.
717 465 : const double effective_locality_availability_ratio =
718 465 : std::min(1.0, (overprovisioning_factor / 100.0) * locality_availability_ratio);
719 465 : return weight * effective_locality_availability_ratio;
720 465 : }
721 :
722 : const HostSet&
723 : PrioritySetImpl::getOrCreateHostSet(uint32_t priority,
724 : absl::optional<bool> weighted_priority_health,
725 1272 : absl::optional<uint32_t> overprovisioning_factor) {
726 1272 : if (host_sets_.size() < priority + 1) {
727 1162 : for (size_t i = host_sets_.size(); i <= priority; ++i) {
728 581 : HostSetImplPtr host_set = createHostSet(i, weighted_priority_health, overprovisioning_factor);
729 581 : host_sets_priority_update_cbs_.push_back(
730 581 : host_set->addPriorityUpdateCb([this](uint32_t priority, const HostVector& hosts_added,
731 663 : const HostVector& hosts_removed) {
732 663 : runReferenceUpdateCallbacks(priority, hosts_added, hosts_removed);
733 663 : }));
734 581 : host_sets_.push_back(std::move(host_set));
735 581 : }
736 581 : }
737 1272 : return *host_sets_[priority];
738 1272 : }
739 :
740 : void PrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params,
741 : LocalityWeightsConstSharedPtr locality_weights,
742 : const HostVector& hosts_added, const HostVector& hosts_removed,
743 : absl::optional<bool> weighted_priority_health,
744 : absl::optional<uint32_t> overprovisioning_factor,
745 663 : HostMapConstSharedPtr cross_priority_host_map) {
746 : // Update cross priority host map first. In this way, when the update callbacks of the priority
747 : // set are executed, the latest host map can always be obtained.
748 663 : if (cross_priority_host_map != nullptr) {
749 306 : const_cross_priority_host_map_ = std::move(cross_priority_host_map);
750 306 : }
751 :
752 : // Ensure that we have a HostSet for the given priority.
753 663 : getOrCreateHostSet(priority, weighted_priority_health, overprovisioning_factor);
754 663 : static_cast<HostSetImpl*>(host_sets_[priority].get())
755 663 : ->updateHosts(std::move(update_hosts_params), std::move(locality_weights), hosts_added,
756 663 : hosts_removed, weighted_priority_health, overprovisioning_factor);
757 :
758 663 : if (!batch_update_) {
759 635 : runUpdateCallbacks(hosts_added, hosts_removed);
760 635 : }
761 663 : }
762 :
763 28 : void PrioritySetImpl::batchHostUpdate(BatchUpdateCb& callback) {
764 28 : BatchUpdateScope scope(*this);
765 :
766 : // We wrap the update call with a lambda that tracks all the hosts that have been added/removed.
767 28 : callback.batchUpdate(scope);
768 :
769 : // Now that all the updates have been complete, we can compute the diff.
770 28 : HostVector net_hosts_added = filterHosts(scope.all_hosts_added_, scope.all_hosts_removed_);
771 28 : HostVector net_hosts_removed = filterHosts(scope.all_hosts_removed_, scope.all_hosts_added_);
772 :
773 28 : runUpdateCallbacks(net_hosts_added, net_hosts_removed);
774 28 : }
775 :
776 : void PrioritySetImpl::BatchUpdateScope::updateHosts(
777 : uint32_t priority, PrioritySet::UpdateHostsParams&& update_hosts_params,
778 : LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
779 : const HostVector& hosts_removed, absl::optional<bool> weighted_priority_health,
780 28 : absl::optional<uint32_t> overprovisioning_factor) {
781 : // We assume that each call updates a different priority.
782 28 : ASSERT(priorities_.find(priority) == priorities_.end());
783 28 : priorities_.insert(priority);
784 :
785 28 : for (const auto& host : hosts_added) {
786 28 : all_hosts_added_.insert(host);
787 28 : }
788 :
789 28 : for (const auto& host : hosts_removed) {
790 0 : all_hosts_removed_.insert(host);
791 0 : }
792 :
793 28 : parent_.updateHosts(priority, std::move(update_hosts_params), locality_weights, hosts_added,
794 28 : hosts_removed, weighted_priority_health, overprovisioning_factor);
795 28 : }
796 :
797 : void MainPrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params,
798 : LocalityWeightsConstSharedPtr locality_weights,
799 : const HostVector& hosts_added,
800 : const HostVector& hosts_removed,
801 : absl::optional<bool> weighted_priority_health,
802 : absl::optional<uint32_t> overprovisioning_factor,
803 159 : HostMapConstSharedPtr cross_priority_host_map) {
804 159 : ASSERT(cross_priority_host_map == nullptr,
805 159 : "External cross-priority host map is meaningless to MainPrioritySetImpl");
806 159 : updateCrossPriorityHostMap(hosts_added, hosts_removed);
807 :
808 159 : PrioritySetImpl::updateHosts(priority, std::move(update_hosts_params), locality_weights,
809 159 : hosts_added, hosts_removed, weighted_priority_health,
810 159 : overprovisioning_factor);
811 159 : }
812 :
813 187 : HostMapConstSharedPtr MainPrioritySetImpl::crossPriorityHostMap() const {
814 : // Check if the host set in the main thread PrioritySet has been updated.
815 187 : if (mutable_cross_priority_host_map_ != nullptr) {
816 159 : const_cross_priority_host_map_ = std::move(mutable_cross_priority_host_map_);
817 159 : ASSERT(mutable_cross_priority_host_map_ == nullptr);
818 159 : }
819 187 : return const_cross_priority_host_map_;
820 187 : }
821 :
822 : void MainPrioritySetImpl::updateCrossPriorityHostMap(const HostVector& hosts_added,
823 159 : const HostVector& hosts_removed) {
824 159 : if (hosts_added.empty() && hosts_removed.empty()) {
825 : // No new hosts have been added and no old hosts have been removed.
826 0 : return;
827 0 : }
828 :
829 : // Since read_only_all_host_map_ may be shared by multiple threads, when the host set changes,
830 : // we cannot directly modify read_only_all_host_map_.
831 159 : if (mutable_cross_priority_host_map_ == nullptr) {
832 : // Copy old read only host map to mutable host map.
833 159 : mutable_cross_priority_host_map_ = std::make_shared<HostMap>(*const_cross_priority_host_map_);
834 159 : }
835 :
836 159 : for (const auto& host : hosts_removed) {
837 0 : mutable_cross_priority_host_map_->erase(addressToString(host->address()));
838 0 : }
839 :
840 159 : for (const auto& host : hosts_added) {
841 159 : mutable_cross_priority_host_map_->insert({addressToString(host->address()), host});
842 159 : }
843 159 : }
844 :
845 : DeferredCreationCompatibleClusterTrafficStats
846 : ClusterInfoImpl::generateStats(Stats::ScopeSharedPtr scope,
847 2376 : const ClusterTrafficStatNames& stat_names, bool defer_creation) {
848 2376 : return Stats::createDeferredCompatibleStats<ClusterTrafficStats>(scope, stat_names,
849 2376 : defer_creation);
850 2376 : }
851 :
852 : ClusterRequestResponseSizeStats ClusterInfoImpl::generateRequestResponseSizeStats(
853 2217 : Stats::Scope& scope, const ClusterRequestResponseSizeStatNames& stat_names) {
854 2217 : return {stat_names, scope};
855 2217 : }
856 :
857 : ClusterLoadReportStats
858 : ClusterInfoImpl::generateLoadReportStats(Stats::Scope& scope,
859 2376 : const ClusterLoadReportStatNames& stat_names) {
860 2376 : return {stat_names, scope};
861 2376 : }
862 :
863 : ClusterTimeoutBudgetStats
864 : ClusterInfoImpl::generateTimeoutBudgetStats(Stats::Scope& scope,
865 2217 : const ClusterTimeoutBudgetStatNames& stat_names) {
866 2217 : return {stat_names, scope};
867 2217 : }
868 :
869 : std::shared_ptr<const ClusterInfoImpl::HttpProtocolOptionsConfigImpl>
870 : createOptions(const envoy::config::cluster::v3::Cluster& config,
871 : std::shared_ptr<const ClusterInfoImpl::HttpProtocolOptionsConfigImpl>&& options,
872 159 : ProtobufMessage::ValidationVisitor& validation_visitor) {
873 159 : if (options) {
874 119 : return std::move(options);
875 119 : }
876 :
877 40 : if (config.protocol_selection() == envoy::config::cluster::v3::Cluster::USE_CONFIGURED_PROTOCOL) {
878 : // Make sure multiple protocol configurations are not present
879 40 : if (config.has_http_protocol_options() && config.has_http2_protocol_options()) {
880 0 : throwEnvoyExceptionOrPanic(
881 0 : fmt::format("cluster: Both HTTP1 and HTTP2 options may only be "
882 0 : "configured with non-default 'protocol_selection' values"));
883 0 : }
884 40 : }
885 :
886 40 : return std::make_shared<ClusterInfoImpl::HttpProtocolOptionsConfigImpl>(
887 40 : config.http_protocol_options(), config.http2_protocol_options(),
888 40 : config.common_http_protocol_options(),
889 40 : (config.has_upstream_http_protocol_options()
890 40 : ? absl::make_optional<envoy::config::core::v3::UpstreamHttpProtocolOptions>(
891 0 : config.upstream_http_protocol_options())
892 40 : : absl::nullopt),
893 40 : config.protocol_selection() == envoy::config::cluster::v3::Cluster::USE_DOWNSTREAM_PROTOCOL,
894 40 : config.has_http2_protocol_options(), validation_visitor);
895 40 : }
896 :
897 : absl::StatusOr<LegacyLbPolicyConfigHelper::Result>
898 : LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProtoWithoutSubset(
899 159 : const ClusterProto& cluster, ProtobufMessage::ValidationVisitor& visitor) {
900 :
901 159 : LoadBalancerConfigPtr lb_config;
902 159 : TypedLoadBalancerFactory* lb_factory = nullptr;
903 :
904 159 : switch (cluster.lb_policy()) {
905 0 : PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
906 159 : case ClusterProto::ROUND_ROBIN:
907 159 : lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
908 159 : "envoy.load_balancing_policies.round_robin");
909 159 : break;
910 0 : case ClusterProto::LEAST_REQUEST:
911 0 : lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
912 0 : "envoy.load_balancing_policies.least_request");
913 0 : break;
914 0 : case ClusterProto::RANDOM:
915 0 : lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
916 0 : "envoy.load_balancing_policies.random");
917 0 : break;
918 0 : case ClusterProto::RING_HASH:
919 0 : lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
920 0 : "envoy.load_balancing_policies.ring_hash");
921 0 : break;
922 0 : case ClusterProto::MAGLEV:
923 0 : lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
924 0 : "envoy.load_balancing_policies.maglev");
925 0 : break;
926 0 : case ClusterProto::CLUSTER_PROVIDED:
927 0 : lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
928 0 : "envoy.load_balancing_policies.cluster_provided");
929 0 : break;
930 0 : case ClusterProto::LOAD_BALANCING_POLICY_CONFIG:
931 : // 'LOAD_BALANCING_POLICY_CONFIG' should be handled by the 'configureLbPolicies'
932 : // function and should not reach here.
933 0 : PANIC("getTypedLbConfigFromLegacyProtoWithoutSubset: should not reach here");
934 0 : break;
935 159 : }
936 :
937 159 : if (lb_factory == nullptr) {
938 0 : return absl::InvalidArgumentError(
939 0 : fmt::format("No load balancer factory found for LB type: {}",
940 0 : ClusterProto::LbPolicy_Name(cluster.lb_policy())));
941 0 : }
942 :
943 159 : ASSERT(lb_factory != nullptr);
944 159 : return Result{lb_factory, lb_factory->loadConfig(cluster, visitor)};
945 159 : }
946 :
947 : absl::StatusOr<LegacyLbPolicyConfigHelper::Result>
948 : LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProto(
949 159 : const ClusterProto& cluster, ProtobufMessage::ValidationVisitor& visitor) {
950 :
951 : // Handle the lb subset config case first.
952 : // Note it is possible to have a lb_subset_config without actually having any subset selectors.
953 : // In this case the subset load balancer should not be used.
954 159 : if (cluster.has_lb_subset_config() && !cluster.lb_subset_config().subset_selectors().empty()) {
955 0 : auto* lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>(
956 0 : "envoy.load_balancing_policies.subset");
957 0 : if (lb_factory != nullptr) {
958 0 : return Result{lb_factory, lb_factory->loadConfig(cluster, visitor)};
959 0 : }
960 0 : return absl::InvalidArgumentError("No subset load balancer factory found");
961 0 : }
962 :
963 159 : return getTypedLbConfigFromLegacyProtoWithoutSubset(cluster, visitor);
964 159 : }
965 :
966 159 : LBPolicyConfig::LBPolicyConfig(const envoy::config::cluster::v3::Cluster& config) {
967 159 : switch (config.lb_config_case()) {
968 0 : case envoy::config::cluster::v3::Cluster::kRoundRobinLbConfig:
969 0 : lb_policy_ = std::make_unique<const envoy::config::cluster::v3::Cluster::RoundRobinLbConfig>(
970 0 : config.round_robin_lb_config());
971 0 : break;
972 0 : case envoy::config::cluster::v3::Cluster::kLeastRequestLbConfig:
973 0 : lb_policy_ = std::make_unique<envoy::config::cluster::v3::Cluster::LeastRequestLbConfig>(
974 0 : config.least_request_lb_config());
975 0 : break;
976 0 : case envoy::config::cluster::v3::Cluster::kRingHashLbConfig:
977 0 : lb_policy_ = std::make_unique<envoy::config::cluster::v3::Cluster::RingHashLbConfig>(
978 0 : config.ring_hash_lb_config());
979 0 : break;
980 0 : case envoy::config::cluster::v3::Cluster::kMaglevLbConfig:
981 0 : lb_policy_ = std::make_unique<envoy::config::cluster::v3::Cluster::MaglevLbConfig>(
982 0 : config.maglev_lb_config());
983 0 : break;
984 0 : case envoy::config::cluster::v3::Cluster::kOriginalDstLbConfig:
985 0 : lb_policy_ = std::make_unique<envoy::config::cluster::v3::Cluster::OriginalDstLbConfig>(
986 0 : config.original_dst_lb_config());
987 0 : break;
988 159 : case envoy::config::cluster::v3::Cluster::LB_CONFIG_NOT_SET:
989 : // The default value of the variant if there is no config would be nullptr
990 : // Which is set when the class is initialized. No action needed if config isn't set
991 159 : break;
992 159 : }
993 159 : }
994 :
995 : ClusterInfoImpl::ClusterInfoImpl(
996 : Init::Manager& init_manager, Server::Configuration::ServerFactoryContext& server_context,
997 : const envoy::config::cluster::v3::Cluster& config,
998 : const absl::optional<envoy::config::core::v3::BindConfig>& bind_config,
999 : Runtime::Loader& runtime, TransportSocketMatcherPtr&& socket_matcher,
1000 : Stats::ScopeSharedPtr&& stats_scope, bool added_via_api,
1001 : Server::Configuration::TransportSocketFactoryContext& factory_context)
1002 : : runtime_(runtime), name_(config.name()),
1003 : observability_name_(!config.alt_stat_name().empty()
1004 : ? std::make_unique<std::string>(config.alt_stat_name())
1005 : : nullptr),
1006 : eds_service_name_(
1007 : config.has_eds_cluster_config()
1008 : ? std::make_unique<std::string>(config.eds_cluster_config().service_name())
1009 : : nullptr),
1010 : extension_protocol_options_(parseExtensionProtocolOptions(config, factory_context)),
1011 : http_protocol_options_(
1012 : createOptions(config,
1013 : extensionProtocolOptionsTyped<HttpProtocolOptionsConfigImpl>(
1014 : "envoy.extensions.upstreams.http.v3.HttpProtocolOptions"),
1015 : factory_context.messageValidationVisitor())),
1016 : tcp_protocol_options_(extensionProtocolOptionsTyped<TcpProtocolOptionsConfigImpl>(
1017 : "envoy.extensions.upstreams.tcp.v3.TcpProtocolOptions")),
1018 : max_requests_per_connection_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
1019 : http_protocol_options_->common_http_protocol_options_, max_requests_per_connection,
1020 : config.max_requests_per_connection().value())),
1021 : connect_timeout_(
1022 : std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, connect_timeout, 5000))),
1023 : per_upstream_preconnect_ratio_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
1024 : config.preconnect_policy(), per_upstream_preconnect_ratio, 1.0)),
1025 : peekahead_ratio_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.preconnect_policy(),
1026 : predictive_preconnect_ratio, 0)),
1027 : socket_matcher_(std::move(socket_matcher)), stats_scope_(std::move(stats_scope)),
1028 : traffic_stats_(generateStats(stats_scope_,
1029 : factory_context.clusterManager().clusterStatNames(),
1030 : server_context.statsConfig().enableDeferredCreationStats())),
1031 : config_update_stats_(factory_context.clusterManager().clusterConfigUpdateStatNames(),
1032 : *stats_scope_),
1033 : lb_stats_(factory_context.clusterManager().clusterLbStatNames(), *stats_scope_),
1034 : endpoint_stats_(factory_context.clusterManager().clusterEndpointStatNames(), *stats_scope_),
1035 : load_report_stats_store_(stats_scope_->symbolTable()),
1036 : load_report_stats_(
1037 : generateLoadReportStats(*load_report_stats_store_.rootScope(),
1038 : factory_context.clusterManager().clusterLoadReportStatNames())),
1039 : optional_cluster_stats_((config.has_track_cluster_stats() || config.track_timeout_budgets())
1040 : ? std::make_unique<OptionalClusterStats>(
1041 : config, *stats_scope_, factory_context.clusterManager())
1042 : : nullptr),
1043 : features_(ClusterInfoImpl::HttpProtocolOptionsConfigImpl::parseFeatures(
1044 : config, *http_protocol_options_)),
1045 : resource_managers_(config, runtime, name_, *stats_scope_,
1046 : factory_context.clusterManager().clusterCircuitBreakersStatNames()),
1047 : maintenance_mode_runtime_key_(absl::StrCat("upstream.maintenance_mode.", name_)),
1048 : upstream_local_address_selector_(createUpstreamLocalAddressSelector(config, bind_config)),
1049 : lb_policy_config_(std::make_unique<const LBPolicyConfig>(config)),
1050 : upstream_config_(config.has_upstream_config()
1051 : ? std::make_unique<envoy::config::core::v3::TypedExtensionConfig>(
1052 : config.upstream_config())
1053 : : nullptr),
1054 : lb_subset_(config.has_lb_subset_config()
1055 : ? std::make_unique<LoadBalancerSubsetInfoImpl>(config.lb_subset_config())
1056 : : nullptr),
1057 : metadata_(config.has_metadata()
1058 : ? std::make_unique<envoy::config::core::v3::Metadata>(config.metadata())
1059 : : nullptr),
1060 : typed_metadata_(config.has_metadata()
1061 : ? std::make_unique<ClusterTypedMetadata>(config.metadata())
1062 : : nullptr),
1063 : common_lb_config_(
1064 : factory_context.clusterManager().getCommonLbConfigPtr(config.common_lb_config())),
1065 : cluster_type_(config.has_cluster_type()
1066 : ? std::make_unique<envoy::config::cluster::v3::Cluster::CustomClusterType>(
1067 : config.cluster_type())
1068 : : nullptr),
1069 : http_filter_config_provider_manager_(
1070 : Http::FilterChainUtility::createSingletonUpstreamFilterConfigProviderManager(
1071 : server_context)),
1072 : network_filter_config_provider_manager_(
1073 : createSingletonUpstreamNetworkFilterConfigProviderManager(server_context)),
1074 : upstream_context_(server_context, init_manager, *stats_scope_),
1075 : per_connection_buffer_limit_bytes_(
1076 : PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, per_connection_buffer_limit_bytes, 1024 * 1024)),
1077 : max_response_headers_count_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
1078 : http_protocol_options_->common_http_protocol_options_, max_headers_count,
1079 : runtime_.snapshot().getInteger(Http::MaxResponseHeadersCountOverrideKey,
1080 : Http::DEFAULT_MAX_HEADERS_COUNT))),
1081 : type_(config.type()),
1082 : drain_connections_on_host_removal_(config.ignore_health_on_host_removal()),
1083 : connection_pool_per_downstream_connection_(
1084 : config.connection_pool_per_downstream_connection()),
1085 : warm_hosts_(!config.health_checks().empty() &&
1086 : common_lb_config_->ignore_new_hosts_until_first_hc()),
1087 : set_local_interface_name_on_upstream_connections_(
1088 : config.upstream_connection_options().set_local_interface_name_on_upstream_connections()),
1089 : added_via_api_(added_via_api), has_configured_http_filters_(false),
1090 : per_endpoint_stats_(config.has_track_cluster_stats() &&
1091 159 : config.track_cluster_stats().per_endpoint_stats()) {
1092 : #ifdef WIN32
1093 : if (set_local_interface_name_on_upstream_connections_) {
1094 : throwEnvoyExceptionOrPanic(
1095 : "set_local_interface_name_on_upstream_connections_ cannot be set to true "
1096 : "on Windows platforms");
1097 : }
1098 : #endif
1099 :
1100 : // Both LoadStatsReporter and per_endpoint_stats need to `latch()` the counters, so if both are
1101 : // configured they will interfere with each other and both get incorrect values.
1102 159 : if (perEndpointStatsEnabled() &&
1103 159 : server_context.bootstrap().cluster_manager().has_load_stats_config()) {
1104 0 : throwEnvoyExceptionOrPanic("Only one of cluster per_endpoint_stats and cluster manager "
1105 0 : "load_stats_config can be specified");
1106 0 : }
1107 :
1108 159 : if (config.has_max_requests_per_connection() &&
1109 159 : http_protocol_options_->common_http_protocol_options_.has_max_requests_per_connection()) {
1110 0 : throwEnvoyExceptionOrPanic("Only one of max_requests_per_connection from Cluster or "
1111 0 : "HttpProtocolOptions can be specified");
1112 0 : }
1113 :
1114 : // If load_balancing_policy is set we will use it directly, ignoring lb_policy.
1115 159 : if (config.has_load_balancing_policy() ||
1116 159 : config.lb_policy() == envoy::config::cluster::v3::Cluster::LOAD_BALANCING_POLICY_CONFIG) {
1117 0 : configureLbPolicies(config, server_context);
1118 159 : } else if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.convert_legacy_lb_config")) {
1119 159 : auto lb_pair = LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProto(
1120 159 : config, server_context.messageValidationVisitor());
1121 :
1122 159 : if (!lb_pair.ok()) {
1123 0 : throwEnvoyExceptionOrPanic(std::string(lb_pair.status().message()));
1124 0 : }
1125 :
1126 159 : load_balancer_config_ = std::move(lb_pair->config);
1127 159 : load_balancer_factory_ = lb_pair->factory;
1128 159 : lb_type_ = LoadBalancerType::LoadBalancingPolicyConfig;
1129 :
1130 159 : RELEASE_ASSERT(
1131 159 : load_balancer_factory_,
1132 159 : fmt::format(
1133 159 : "No load balancer factory found from legacy LB configuration (type: {}, subset: {}).",
1134 159 : envoy::config::cluster::v3::Cluster::LbPolicy_Name(config.lb_policy()),
1135 159 : config.has_lb_subset_config()));
1136 :
1137 : // Clear unnecessary legacy config because all legacy config is wrapped in load_balancer_config_
1138 : // except the original_dst_lb_config.
1139 159 : lb_subset_ = nullptr;
1140 159 : if (config.lb_policy() != envoy::config::cluster::v3::Cluster::CLUSTER_PROVIDED) {
1141 159 : lb_policy_config_ = nullptr;
1142 159 : }
1143 159 : } else {
1144 0 : switch (config.lb_policy()) {
1145 0 : PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
1146 0 : case envoy::config::cluster::v3::Cluster::ROUND_ROBIN:
1147 0 : lb_type_ = LoadBalancerType::RoundRobin;
1148 0 : break;
1149 0 : case envoy::config::cluster::v3::Cluster::LEAST_REQUEST:
1150 0 : lb_type_ = LoadBalancerType::LeastRequest;
1151 0 : break;
1152 0 : case envoy::config::cluster::v3::Cluster::RANDOM:
1153 0 : lb_type_ = LoadBalancerType::Random;
1154 0 : break;
1155 0 : case envoy::config::cluster::v3::Cluster::RING_HASH:
1156 0 : lb_type_ = LoadBalancerType::RingHash;
1157 0 : break;
1158 0 : case envoy::config::cluster::v3::Cluster::MAGLEV:
1159 0 : lb_type_ = LoadBalancerType::Maglev;
1160 0 : break;
1161 0 : case envoy::config::cluster::v3::Cluster::CLUSTER_PROVIDED:
1162 0 : if (config.has_lb_subset_config()) {
1163 0 : throwEnvoyExceptionOrPanic(
1164 0 : fmt::format("cluster: LB policy {} cannot be combined with lb_subset_config",
1165 0 : envoy::config::cluster::v3::Cluster::LbPolicy_Name(config.lb_policy())));
1166 0 : }
1167 :
1168 0 : lb_type_ = LoadBalancerType::ClusterProvided;
1169 0 : break;
1170 0 : case envoy::config::cluster::v3::Cluster::LOAD_BALANCING_POLICY_CONFIG: {
1171 : // 'LOAD_BALANCING_POLICY_CONFIG' should be handled by the 'configureLbPolicies'
1172 : // function in previous branch and should not reach here.
1173 0 : PANIC("Should not reach here");
1174 0 : break;
1175 0 : }
1176 0 : }
1177 0 : }
1178 :
1179 159 : if (config.lb_subset_config().locality_weight_aware() &&
1180 159 : !config.common_lb_config().has_locality_weighted_lb_config()) {
1181 0 : throwEnvoyExceptionOrPanic(fmt::format("Locality weight aware subset LB requires that a "
1182 0 : "locality_weighted_lb_config be set in {}",
1183 0 : name_));
1184 0 : }
1185 :
1186 : // Use default (1h) or configured `idle_timeout`, unless it's set to 0, indicating that no
1187 : // timeout should be used.
1188 159 : absl::optional<std::chrono::milliseconds> idle_timeout(std::chrono::hours(1));
1189 159 : if (http_protocol_options_->common_http_protocol_options_.has_idle_timeout()) {
1190 0 : idle_timeout = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
1191 0 : http_protocol_options_->common_http_protocol_options_.idle_timeout()));
1192 0 : if (idle_timeout.value().count() == 0) {
1193 0 : idle_timeout = absl::nullopt;
1194 0 : }
1195 0 : }
1196 159 : if (idle_timeout.has_value()) {
1197 159 : optional_timeouts_.set<OptionalTimeoutNames::IdleTimeout>(*idle_timeout);
1198 159 : }
1199 :
1200 : // Use default (10m) or configured `tcp_pool_idle_timeout`, unless it's set to 0, indicating that
1201 : // no timeout should be used.
1202 159 : absl::optional<std::chrono::milliseconds> tcp_pool_idle_timeout(std::chrono::minutes(10));
1203 159 : if (tcp_protocol_options_ && tcp_protocol_options_->idleTimeout().has_value()) {
1204 0 : tcp_pool_idle_timeout = tcp_protocol_options_->idleTimeout();
1205 0 : if (tcp_pool_idle_timeout.value().count() == 0) {
1206 0 : tcp_pool_idle_timeout = absl::nullopt;
1207 0 : }
1208 0 : }
1209 159 : if (tcp_pool_idle_timeout.has_value()) {
1210 159 : optional_timeouts_.set<OptionalTimeoutNames::TcpPoolIdleTimeout>(*tcp_pool_idle_timeout);
1211 159 : }
1212 :
1213 : // Use configured `max_connection_duration`, unless it's set to 0, indicating that
1214 : // no timeout should be used. No timeout by default either.
1215 159 : absl::optional<std::chrono::milliseconds> max_connection_duration;
1216 159 : if (http_protocol_options_->common_http_protocol_options_.has_max_connection_duration()) {
1217 0 : max_connection_duration = std::chrono::milliseconds(DurationUtil::durationToMilliseconds(
1218 0 : http_protocol_options_->common_http_protocol_options_.max_connection_duration()));
1219 0 : if (max_connection_duration.value().count() == 0) {
1220 0 : max_connection_duration = absl::nullopt;
1221 0 : }
1222 0 : }
1223 159 : if (max_connection_duration.has_value()) {
1224 0 : optional_timeouts_.set<OptionalTimeoutNames::MaxConnectionDuration>(*max_connection_duration);
1225 0 : }
1226 :
1227 159 : if (config.has_eds_cluster_config()) {
1228 28 : if (config.type() != envoy::config::cluster::v3::Cluster::EDS) {
1229 0 : throwEnvoyExceptionOrPanic("eds_cluster_config set in a non-EDS cluster");
1230 0 : }
1231 28 : }
1232 :
1233 : // TODO(htuch): Remove this temporary workaround when we have
1234 : // https://github.com/bufbuild/protoc-gen-validate/issues/97 resolved. This just provides
1235 : // early validation of sanity of fields that we should catch at config ingestion.
1236 159 : DurationUtil::durationToMilliseconds(common_lb_config_->update_merge_window());
1237 :
1238 : // Create upstream network filter factories
1239 159 : const auto& filters = config.filters();
1240 159 : ASSERT(filter_factories_.empty());
1241 159 : filter_factories_.reserve(filters.size());
1242 159 : for (ssize_t i = 0; i < filters.size(); i++) {
1243 0 : const auto& proto_config = filters[i];
1244 0 : const bool is_terminal = i == filters.size() - 1;
1245 0 : ENVOY_LOG(debug, " upstream network filter #{}:", i);
1246 :
1247 0 : if (proto_config.has_config_discovery()) {
1248 0 : if (proto_config.has_typed_config()) {
1249 0 : throwEnvoyExceptionOrPanic("Only one of typed_config or config_discovery can be used");
1250 0 : }
1251 :
1252 0 : ENVOY_LOG(debug, " dynamic filter name: {}", proto_config.name());
1253 0 : filter_factories_.push_back(
1254 0 : network_filter_config_provider_manager_->createDynamicFilterConfigProvider(
1255 0 : proto_config.config_discovery(), proto_config.name(), server_context,
1256 0 : upstream_context_, factory_context.clusterManager(), is_terminal, "network",
1257 0 : nullptr));
1258 0 : continue;
1259 0 : }
1260 :
1261 0 : ENVOY_LOG(debug, " name: {}", proto_config.name());
1262 0 : auto& factory = Config::Utility::getAndCheckFactory<
1263 0 : Server::Configuration::NamedUpstreamNetworkFilterConfigFactory>(proto_config);
1264 0 : auto message = factory.createEmptyConfigProto();
1265 0 : Config::Utility::translateOpaqueConfig(proto_config.typed_config(),
1266 0 : factory_context.messageValidationVisitor(), *message);
1267 0 : Network::FilterFactoryCb callback =
1268 0 : factory.createFilterFactoryFromProto(*message, upstream_context_);
1269 0 : filter_factories_.push_back(
1270 0 : network_filter_config_provider_manager_->createStaticFilterConfigProvider(
1271 0 : callback, proto_config.name()));
1272 0 : }
1273 :
1274 159 : if (http_protocol_options_) {
1275 159 : Http::FilterChainUtility::FiltersList http_filters = http_protocol_options_->http_filters_;
1276 159 : has_configured_http_filters_ = !http_filters.empty();
1277 159 : if (http_filters.empty()) {
1278 159 : auto* codec_filter = http_filters.Add();
1279 159 : codec_filter->set_name("envoy.filters.http.upstream_codec");
1280 159 : codec_filter->mutable_typed_config()->PackFrom(
1281 159 : envoy::extensions::filters::http::upstream_codec::v3::UpstreamCodec::default_instance());
1282 159 : }
1283 159 : if (http_filters[http_filters.size() - 1].name() != "envoy.filters.http.upstream_codec") {
1284 0 : throwEnvoyExceptionOrPanic(
1285 0 : fmt::format("The codec filter is the only valid terminal upstream HTTP filter"));
1286 0 : }
1287 :
1288 159 : std::string prefix = stats_scope_->symbolTable().toString(stats_scope_->prefix());
1289 159 : Http::FilterChainHelper<Server::Configuration::UpstreamFactoryContext,
1290 159 : Server::Configuration::UpstreamHttpFilterConfigFactory>
1291 159 : helper(*http_filter_config_provider_manager_, upstream_context_.serverFactoryContext(),
1292 159 : factory_context.clusterManager(), upstream_context_, prefix);
1293 159 : THROW_IF_NOT_OK(helper.processFilters(http_filters, "upstream http", "upstream http",
1294 159 : http_filter_factories_));
1295 159 : }
1296 159 : }
1297 :
1298 : // Configures the load balancer based on config.load_balancing_policy
1299 : void ClusterInfoImpl::configureLbPolicies(const envoy::config::cluster::v3::Cluster& config,
1300 0 : Server::Configuration::ServerFactoryContext& context) {
1301 : // Check if load_balancing_policy is set first.
1302 0 : if (!config.has_load_balancing_policy()) {
1303 0 : throwEnvoyExceptionOrPanic("cluster: field load_balancing_policy need to be set");
1304 0 : }
1305 :
1306 0 : if (config.has_lb_subset_config()) {
1307 0 : throwEnvoyExceptionOrPanic(
1308 0 : "cluster: load_balancing_policy cannot be combined with lb_subset_config");
1309 0 : }
1310 :
1311 0 : if (config.has_common_lb_config()) {
1312 0 : const auto& lb_config = config.common_lb_config();
1313 0 : if (lb_config.has_zone_aware_lb_config() || lb_config.has_locality_weighted_lb_config() ||
1314 0 : lb_config.has_consistent_hashing_lb_config()) {
1315 0 : throwEnvoyExceptionOrPanic(
1316 0 : "cluster: load_balancing_policy cannot be combined with partial fields "
1317 0 : "(zone_aware_lb_config, "
1318 0 : "locality_weighted_lb_config, consistent_hashing_lb_config) of common_lb_config");
1319 0 : }
1320 0 : }
1321 :
1322 0 : absl::InlinedVector<absl::string_view, 4> missing_policies;
1323 0 : for (const auto& policy : config.load_balancing_policy().policies()) {
1324 0 : TypedLoadBalancerFactory* factory =
1325 0 : Config::Utility::getAndCheckFactory<TypedLoadBalancerFactory>(
1326 0 : policy.typed_extension_config(), /*is_optional=*/true);
1327 0 : if (factory != nullptr) {
1328 : // Load and validate the configuration.
1329 0 : auto proto_message = factory->createEmptyConfigProto();
1330 0 : Config::Utility::translateOpaqueConfig(policy.typed_extension_config().typed_config(),
1331 0 : context.messageValidationVisitor(), *proto_message);
1332 :
1333 0 : load_balancer_config_ =
1334 0 : factory->loadConfig(*proto_message, context.messageValidationVisitor());
1335 :
1336 0 : load_balancer_factory_ = factory;
1337 0 : break;
1338 0 : }
1339 0 : missing_policies.push_back(policy.typed_extension_config().name());
1340 0 : }
1341 :
1342 0 : if (load_balancer_factory_ == nullptr) {
1343 0 : throwEnvoyExceptionOrPanic(
1344 0 : fmt::format("cluster: didn't find a registered load balancer factory "
1345 0 : "implementation for cluster: '{}' with names from [{}]",
1346 0 : name_, absl::StrJoin(missing_policies, ", ")));
1347 0 : }
1348 :
1349 0 : lb_type_ = LoadBalancerType::LoadBalancingPolicyConfig;
1350 0 : }
1351 :
1352 : ProtocolOptionsConfigConstSharedPtr
1353 318 : ClusterInfoImpl::extensionProtocolOptions(const std::string& name) const {
1354 318 : auto i = extension_protocol_options_.find(name);
1355 318 : if (i != extension_protocol_options_.end()) {
1356 119 : return i->second;
1357 119 : }
1358 199 : return nullptr;
1359 318 : }
1360 :
1361 : Network::UpstreamTransportSocketFactoryPtr createTransportSocketFactory(
1362 : const envoy::config::cluster::v3::Cluster& config,
1363 159 : Server::Configuration::TransportSocketFactoryContext& factory_context) {
1364 : // If the cluster config doesn't have a transport socket configured, override with the default
1365 : // transport socket implementation based on the tls_context. We copy by value first then
1366 : // override if necessary.
1367 159 : auto transport_socket = config.transport_socket();
1368 159 : if (!config.has_transport_socket()) {
1369 159 : envoy::extensions::transport_sockets::raw_buffer::v3::RawBuffer raw_buffer;
1370 159 : transport_socket.mutable_typed_config()->PackFrom(raw_buffer);
1371 159 : transport_socket.set_name("envoy.transport_sockets.raw_buffer");
1372 159 : }
1373 :
1374 159 : auto& config_factory = Config::Utility::getAndCheckFactory<
1375 159 : Server::Configuration::UpstreamTransportSocketConfigFactory>(transport_socket);
1376 159 : ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig(
1377 159 : transport_socket, factory_context.messageValidationVisitor(), config_factory);
1378 159 : return config_factory.createTransportSocketFactory(*message, factory_context);
1379 159 : }
1380 :
1381 173 : void ClusterInfoImpl::createNetworkFilterChain(Network::Connection& connection) const {
1382 173 : for (const auto& filter_config_provider : filter_factories_) {
1383 0 : auto config = filter_config_provider->config();
1384 0 : if (config.has_value()) {
1385 0 : Network::FilterFactoryCb& factory = config.value();
1386 0 : factory(connection);
1387 0 : }
1388 0 : }
1389 173 : }
1390 :
1391 : std::vector<Http::Protocol>
1392 251 : ClusterInfoImpl::upstreamHttpProtocol(absl::optional<Http::Protocol> downstream_protocol) const {
1393 251 : if (downstream_protocol.has_value() &&
1394 251 : features_ & Upstream::ClusterInfo::Features::USE_DOWNSTREAM_PROTOCOL) {
1395 0 : if (downstream_protocol.value() == Http::Protocol::Http3 &&
1396 0 : !(features_ & Upstream::ClusterInfo::Features::HTTP3)) {
1397 0 : return {Http::Protocol::Http2};
1398 0 : }
1399 : // use HTTP11 since HTTP10 upstream is not supported yet.
1400 0 : if (downstream_protocol.value() == Http::Protocol::Http10) {
1401 0 : return {Http::Protocol::Http11};
1402 0 : }
1403 0 : return {downstream_protocol.value()};
1404 0 : }
1405 :
1406 251 : if (features_ & Upstream::ClusterInfo::Features::USE_ALPN) {
1407 0 : if (!(features_ & Upstream::ClusterInfo::Features::HTTP3)) {
1408 0 : return {Http::Protocol::Http2, Http::Protocol::Http11};
1409 0 : }
1410 0 : return {Http::Protocol::Http3, Http::Protocol::Http2, Http::Protocol::Http11};
1411 0 : }
1412 :
1413 251 : if (features_ & Upstream::ClusterInfo::Features::HTTP3) {
1414 0 : return {Http::Protocol::Http3};
1415 0 : }
1416 :
1417 251 : return {(features_ & Upstream::ClusterInfo::Features::HTTP2) ? Http::Protocol::Http2
1418 251 : : Http::Protocol::Http11};
1419 251 : }
1420 :
1421 : bool validateTransportSocketSupportsQuic(
1422 0 : const envoy::config::core::v3::TransportSocket& transport_socket) {
1423 : // The transport socket is valid for QUIC if it is either a QUIC transport socket,
1424 : // or if it is a QUIC transport socket wrapped in an HTTP/1.1 proxy socket.
1425 0 : if (transport_socket.name() == "envoy.transport_sockets.quic") {
1426 0 : return true;
1427 0 : }
1428 0 : if (transport_socket.name() != "envoy.transport_sockets.http_11_proxy") {
1429 0 : return false;
1430 0 : }
1431 0 : envoy::extensions::transport_sockets::http_11_proxy::v3::Http11ProxyUpstreamTransport
1432 0 : http11_socket;
1433 0 : MessageUtil::unpackTo(transport_socket.typed_config(), http11_socket);
1434 0 : return http11_socket.transport_socket().name() == "envoy.transport_sockets.quic";
1435 0 : }
1436 :
1437 : ClusterImplBase::ClusterImplBase(const envoy::config::cluster::v3::Cluster& cluster,
1438 : ClusterFactoryContext& cluster_context)
1439 : : init_manager_(fmt::format("Cluster {}", cluster.name())),
1440 159 : init_watcher_("ClusterImplBase", [this]() { onInitDone(); }),
1441 : runtime_(cluster_context.serverFactoryContext().runtime()),
1442 : wait_for_warm_on_init_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(cluster, wait_for_warm_on_init, true)),
1443 : time_source_(cluster_context.serverFactoryContext().timeSource()),
1444 : local_cluster_(cluster_context.clusterManager().localClusterName().value_or("") ==
1445 : cluster.name()),
1446 : const_metadata_shared_pool_(Config::Metadata::getConstMetadataSharedPool(
1447 : cluster_context.serverFactoryContext().singletonManager(),
1448 159 : cluster_context.serverFactoryContext().mainThreadDispatcher())) {
1449 :
1450 159 : auto& server_context = cluster_context.serverFactoryContext();
1451 :
1452 159 : auto stats_scope = generateStatsScope(cluster, server_context.serverScope().store());
1453 159 : transport_factory_context_ =
1454 159 : std::make_unique<Server::Configuration::TransportSocketFactoryContextImpl>(
1455 159 : server_context, cluster_context.sslContextManager(), *stats_scope,
1456 159 : cluster_context.clusterManager(), cluster_context.messageValidationVisitor());
1457 159 : transport_factory_context_->setInitManager(init_manager_);
1458 :
1459 159 : auto socket_factory = createTransportSocketFactory(cluster, *transport_factory_context_);
1460 159 : auto* raw_factory_pointer = socket_factory.get();
1461 :
1462 159 : auto socket_matcher = std::make_unique<TransportSocketMatcherImpl>(
1463 159 : cluster.transport_socket_matches(), *transport_factory_context_, socket_factory,
1464 159 : *stats_scope);
1465 159 : const bool matcher_supports_alpn = socket_matcher->allMatchesSupportAlpn();
1466 159 : auto& dispatcher = server_context.mainThreadDispatcher();
1467 159 : info_ = std::shared_ptr<const ClusterInfoImpl>(
1468 159 : new ClusterInfoImpl(init_manager_, server_context, cluster,
1469 159 : cluster_context.clusterManager().bindConfig(), runtime_,
1470 159 : std::move(socket_matcher), std::move(stats_scope),
1471 159 : cluster_context.addedViaApi(), *transport_factory_context_),
1472 159 : [&dispatcher](const ClusterInfoImpl* self) {
1473 159 : ENVOY_LOG(trace, "Schedule destroy cluster info {}", self->name());
1474 159 : dispatcher.deleteInDispatcherThread(
1475 159 : std::unique_ptr<const Event::DispatcherThreadDeletable>(self));
1476 159 : });
1477 :
1478 159 : if ((info_->features() & ClusterInfoImpl::Features::USE_ALPN)) {
1479 0 : if (!raw_factory_pointer->supportsAlpn()) {
1480 0 : throwEnvoyExceptionOrPanic(
1481 0 : fmt::format("ALPN configured for cluster {} which has a non-ALPN transport socket: {}",
1482 0 : cluster.name(), cluster.DebugString()));
1483 0 : }
1484 0 : if (!matcher_supports_alpn &&
1485 0 : !runtime_.snapshot().featureEnabled(ClusterImplBase::DoNotValidateAlpnRuntimeKey, 0)) {
1486 0 : throwEnvoyExceptionOrPanic(fmt::format(
1487 0 : "ALPN configured for cluster {} which has a non-ALPN transport socket matcher: {}",
1488 0 : cluster.name(), cluster.DebugString()));
1489 0 : }
1490 0 : }
1491 :
1492 159 : if (info_->features() & ClusterInfoImpl::Features::HTTP3) {
1493 0 : #if defined(ENVOY_ENABLE_QUIC)
1494 0 : if (!validateTransportSocketSupportsQuic(cluster.transport_socket())) {
1495 0 : throwEnvoyExceptionOrPanic(
1496 0 : fmt::format("HTTP3 requires a QuicUpstreamTransport transport socket: {} {}",
1497 0 : cluster.name(), cluster.transport_socket().DebugString()));
1498 0 : }
1499 : #else
1500 : throwEnvoyExceptionOrPanic("HTTP3 configured but not enabled in the build.");
1501 : #endif
1502 0 : }
1503 :
1504 : // Create the default (empty) priority set before registering callbacks to
1505 : // avoid getting an update the first time it is accessed.
1506 159 : priority_set_.getOrCreateHostSet(0);
1507 159 : priority_update_cb_ = priority_set_.addPriorityUpdateCb(
1508 159 : [this](uint32_t, const HostVector& hosts_added, const HostVector& hosts_removed) {
1509 159 : if (!hosts_added.empty() || !hosts_removed.empty()) {
1510 159 : info_->endpointStats().membership_change_.inc();
1511 159 : }
1512 :
1513 159 : uint32_t healthy_hosts = 0;
1514 159 : uint32_t degraded_hosts = 0;
1515 159 : uint32_t excluded_hosts = 0;
1516 159 : uint32_t hosts = 0;
1517 159 : for (const auto& host_set : prioritySet().hostSetsPerPriority()) {
1518 159 : hosts += host_set->hosts().size();
1519 159 : healthy_hosts += host_set->healthyHosts().size();
1520 159 : degraded_hosts += host_set->degradedHosts().size();
1521 159 : excluded_hosts += host_set->excludedHosts().size();
1522 159 : }
1523 159 : info_->endpointStats().membership_total_.set(hosts);
1524 159 : info_->endpointStats().membership_healthy_.set(healthy_hosts);
1525 159 : info_->endpointStats().membership_degraded_.set(degraded_hosts);
1526 159 : info_->endpointStats().membership_excluded_.set(excluded_hosts);
1527 159 : });
1528 : // Drop overload configuration parsing.
1529 159 : absl::Status status = parseDropOverloadConfig(cluster.load_assignment());
1530 159 : if (!status.ok()) {
1531 0 : throwEnvoyExceptionOrPanic(std::string(status.message()));
1532 0 : }
1533 159 : }
1534 :
1535 : namespace {
1536 :
1537 318 : bool excludeBasedOnHealthFlag(const Host& host) {
1538 318 : return host.healthFlagGet(Host::HealthFlag::PENDING_ACTIVE_HC) ||
1539 318 : host.healthFlagGet(Host::HealthFlag::EXCLUDED_VIA_IMMEDIATE_HC_FAIL);
1540 318 : }
1541 :
1542 : } // namespace
1543 :
1544 : std::tuple<HealthyHostVectorConstSharedPtr, DegradedHostVectorConstSharedPtr,
1545 : ExcludedHostVectorConstSharedPtr>
1546 159 : ClusterImplBase::partitionHostList(const HostVector& hosts) {
1547 159 : auto healthy_list = std::make_shared<HealthyHostVector>();
1548 159 : auto degraded_list = std::make_shared<DegradedHostVector>();
1549 159 : auto excluded_list = std::make_shared<ExcludedHostVector>();
1550 :
1551 159 : for (const auto& host : hosts) {
1552 159 : const Host::Health health_status = host->coarseHealth();
1553 159 : if (health_status == Host::Health::Healthy) {
1554 159 : healthy_list->get().emplace_back(host);
1555 159 : } else if (health_status == Host::Health::Degraded) {
1556 0 : degraded_list->get().emplace_back(host);
1557 0 : }
1558 159 : if (excludeBasedOnHealthFlag(*host)) {
1559 0 : excluded_list->get().emplace_back(host);
1560 0 : }
1561 159 : }
1562 :
1563 159 : return std::make_tuple(healthy_list, degraded_list, excluded_list);
1564 159 : }
1565 :
1566 : std::tuple<HostsPerLocalityConstSharedPtr, HostsPerLocalityConstSharedPtr,
1567 : HostsPerLocalityConstSharedPtr>
1568 159 : ClusterImplBase::partitionHostsPerLocality(const HostsPerLocality& hosts) {
1569 159 : auto filtered_clones =
1570 159 : hosts.filter({[](const Host& host) { return host.coarseHealth() == Host::Health::Healthy; },
1571 159 : [](const Host& host) { return host.coarseHealth() == Host::Health::Degraded; },
1572 159 : [](const Host& host) { return excludeBasedOnHealthFlag(host); }});
1573 :
1574 159 : return std::make_tuple(std::move(filtered_clones[0]), std::move(filtered_clones[1]),
1575 159 : std::move(filtered_clones[2]));
1576 159 : }
1577 :
1578 251 : bool ClusterInfoImpl::maintenanceMode() const {
1579 251 : return runtime_.snapshot().featureEnabled(maintenance_mode_runtime_key_, 0);
1580 251 : }
1581 :
1582 2509 : ResourceManager& ClusterInfoImpl::resourceManager(ResourcePriority priority) const {
1583 2509 : ASSERT(enumToInt(priority) < resource_managers_.managers_.size());
1584 2509 : return *resource_managers_.managers_[enumToInt(priority)];
1585 2509 : }
1586 :
1587 159 : void ClusterImplBase::initialize(std::function<void()> callback) {
1588 159 : ASSERT(!initialization_started_);
1589 159 : ASSERT(initialization_complete_callback_ == nullptr);
1590 159 : initialization_complete_callback_ = callback;
1591 159 : startPreInit();
1592 159 : }
1593 :
1594 159 : void ClusterImplBase::onPreInitComplete() {
1595 : // Protect against multiple calls.
1596 159 : if (initialization_started_) {
1597 0 : return;
1598 0 : }
1599 159 : initialization_started_ = true;
1600 :
1601 159 : ENVOY_LOG(debug, "initializing {} cluster {} completed",
1602 159 : initializePhase() == InitializePhase::Primary ? "Primary" : "Secondary",
1603 159 : info()->name());
1604 159 : init_manager_.initialize(init_watcher_);
1605 159 : }
1606 :
1607 159 : void ClusterImplBase::onInitDone() {
1608 159 : info()->configUpdateStats().warming_state_.set(0);
1609 159 : if (health_checker_ && pending_initialize_health_checks_ == 0) {
1610 0 : for (auto& host_set : prioritySet().hostSetsPerPriority()) {
1611 0 : for (auto& host : host_set->hosts()) {
1612 0 : if (host->disableActiveHealthCheck()) {
1613 0 : continue;
1614 0 : }
1615 0 : ++pending_initialize_health_checks_;
1616 0 : }
1617 0 : }
1618 0 : ENVOY_LOG(debug, "Cluster onInitDone pending initialize health check count {}",
1619 0 : pending_initialize_health_checks_);
1620 :
1621 : // TODO(mattklein123): Remove this callback when done.
1622 0 : health_checker_->addHostCheckCompleteCb([this](HostSharedPtr, HealthTransition) -> void {
1623 0 : if (pending_initialize_health_checks_ > 0 && --pending_initialize_health_checks_ == 0) {
1624 0 : finishInitialization();
1625 0 : }
1626 0 : });
1627 0 : }
1628 :
1629 159 : if (pending_initialize_health_checks_ == 0) {
1630 159 : finishInitialization();
1631 159 : }
1632 159 : }
1633 :
1634 159 : void ClusterImplBase::finishInitialization() {
1635 159 : ASSERT(initialization_complete_callback_ != nullptr);
1636 159 : ASSERT(initialization_started_);
1637 :
1638 : // Snap a copy of the completion callback so that we can set it to nullptr to unblock
1639 : // reloadHealthyHosts(). See that function for more info on why we do this.
1640 159 : auto snapped_callback = initialization_complete_callback_;
1641 159 : initialization_complete_callback_ = nullptr;
1642 :
1643 159 : if (health_checker_ != nullptr) {
1644 0 : reloadHealthyHosts(nullptr);
1645 0 : }
1646 :
1647 159 : if (snapped_callback != nullptr) {
1648 159 : snapped_callback();
1649 159 : }
1650 159 : }
1651 :
1652 : absl::Status ClusterImplBase::parseDropOverloadConfig(
1653 187 : const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment) {
1654 : // Default drop_overload_ to zero.
1655 187 : drop_overload_ = UnitFloat(0);
1656 :
1657 187 : if (!cluster_load_assignment.has_policy()) {
1658 187 : return absl::OkStatus();
1659 187 : }
1660 0 : auto policy = cluster_load_assignment.policy();
1661 0 : if (policy.drop_overloads().size() == 0) {
1662 0 : return absl::OkStatus();
1663 0 : }
1664 0 : if (policy.drop_overloads().size() > kDropOverloadSize) {
1665 0 : return absl::InvalidArgumentError(
1666 0 : fmt::format("Cluster drop_overloads config has {} categories. Envoy only support one.",
1667 0 : policy.drop_overloads().size()));
1668 0 : }
1669 :
1670 0 : const auto drop_percentage = policy.drop_overloads(0).drop_percentage();
1671 0 : float denominator = 100;
1672 0 : switch (drop_percentage.denominator()) {
1673 0 : case envoy::type::v3::FractionalPercent::HUNDRED:
1674 0 : denominator = 100;
1675 0 : break;
1676 0 : case envoy::type::v3::FractionalPercent::TEN_THOUSAND:
1677 0 : denominator = 10000;
1678 0 : break;
1679 0 : case envoy::type::v3::FractionalPercent::MILLION:
1680 0 : denominator = 1000000;
1681 0 : break;
1682 0 : default:
1683 0 : return absl::InvalidArgumentError(fmt::format(
1684 0 : "Cluster drop_overloads config denominator setting is invalid : {}. Valid range 0~2.",
1685 0 : drop_percentage.denominator()));
1686 0 : }
1687 0 : drop_overload_ = UnitFloat(float(drop_percentage.numerator()) / (denominator));
1688 0 : return absl::OkStatus();
1689 0 : }
1690 :
1691 0 : void ClusterImplBase::setHealthChecker(const HealthCheckerSharedPtr& health_checker) {
1692 0 : ASSERT(!health_checker_);
1693 0 : health_checker_ = health_checker;
1694 0 : health_checker_->start();
1695 0 : health_checker_->addHostCheckCompleteCb(
1696 0 : [this](const HostSharedPtr& host, HealthTransition changed_state) -> void {
1697 : // If we get a health check completion that resulted in a state change, signal to
1698 : // update the host sets on all threads.
1699 0 : if (changed_state == HealthTransition::Changed) {
1700 0 : reloadHealthyHosts(host);
1701 0 : }
1702 0 : });
1703 0 : }
1704 :
1705 159 : void ClusterImplBase::setOutlierDetector(const Outlier::DetectorSharedPtr& outlier_detector) {
1706 159 : if (!outlier_detector) {
1707 159 : return;
1708 159 : }
1709 :
1710 0 : outlier_detector_ = outlier_detector;
1711 0 : outlier_detector_->addChangedStateCb(
1712 0 : [this](const HostSharedPtr& host) -> void { reloadHealthyHosts(host); });
1713 0 : }
1714 :
1715 0 : void ClusterImplBase::reloadHealthyHosts(const HostSharedPtr& host) {
1716 : // Every time a host changes Health Check state we cause a full healthy host recalculation which
1717 : // for expensive LBs (ring, subset, etc.) can be quite time consuming. During startup, this
1718 : // can also block worker threads by doing this repeatedly. There is no reason to do this
1719 : // as we will not start taking traffic until we are initialized. By blocking Health Check
1720 : // updates while initializing we can avoid this.
1721 0 : if (initialization_complete_callback_ != nullptr) {
1722 0 : return;
1723 0 : }
1724 :
1725 0 : reloadHealthyHostsHelper(host);
1726 0 : }
1727 :
1728 0 : void ClusterImplBase::reloadHealthyHostsHelper(const HostSharedPtr&) {
1729 0 : const auto& host_sets = prioritySet().hostSetsPerPriority();
1730 0 : for (size_t priority = 0; priority < host_sets.size(); ++priority) {
1731 0 : const auto& host_set = host_sets[priority];
1732 : // TODO(htuch): Can we skip these copies by exporting out const shared_ptr from HostSet?
1733 0 : HostVectorConstSharedPtr hosts_copy = std::make_shared<HostVector>(host_set->hosts());
1734 :
1735 0 : HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().clone();
1736 0 : prioritySet().updateHosts(priority,
1737 0 : HostSetImpl::partitionHosts(hosts_copy, hosts_per_locality_copy),
1738 0 : host_set->localityWeights(), {}, {}, absl::nullopt, absl::nullopt);
1739 0 : }
1740 0 : }
1741 :
1742 : const Network::Address::InstanceConstSharedPtr
1743 159 : ClusterImplBase::resolveProtoAddress(const envoy::config::core::v3::Address& address) {
1744 159 : TRY_ASSERT_MAIN_THREAD { return Network::Address::resolveProtoAddress(address); }
1745 159 : END_TRY
1746 159 : CATCH(EnvoyException & e, {
1747 159 : if (info_->type() == envoy::config::cluster::v3::Cluster::STATIC ||
1748 159 : info_->type() == envoy::config::cluster::v3::Cluster::EDS) {
1749 159 : throwEnvoyExceptionOrPanic(
1750 159 : fmt::format("{}. Consider setting resolver_name or setting cluster type "
1751 159 : "to 'STRICT_DNS' or 'LOGICAL_DNS'",
1752 159 : e.what()));
1753 159 : }
1754 159 : throw e;
1755 159 : });
1756 0 : }
1757 :
1758 : void ClusterImplBase::validateEndpointsForZoneAwareRouting(
1759 159 : const envoy::config::endpoint::v3::LocalityLbEndpoints& endpoints) const {
1760 159 : if (local_cluster_ && endpoints.priority() > 0) {
1761 0 : throwEnvoyExceptionOrPanic(
1762 0 : fmt::format("Unexpected non-zero priority for local cluster '{}'.", info()->name()));
1763 0 : }
1764 159 : }
1765 :
1766 : ClusterInfoImpl::OptionalClusterStats::OptionalClusterStats(
1767 : const envoy::config::cluster::v3::Cluster& config, Stats::Scope& stats_scope,
1768 : const ClusterManager& manager)
1769 : : timeout_budget_stats_(
1770 : (config.track_cluster_stats().timeout_budgets() || config.track_timeout_budgets())
1771 : ? std::make_unique<ClusterTimeoutBudgetStats>(generateTimeoutBudgetStats(
1772 : stats_scope, manager.clusterTimeoutBudgetStatNames()))
1773 : : nullptr),
1774 : request_response_size_stats_(
1775 : (config.track_cluster_stats().request_response_sizes()
1776 : ? std::make_unique<ClusterRequestResponseSizeStats>(generateRequestResponseSizeStats(
1777 : stats_scope, manager.clusterRequestResponseSizeStatNames()))
1778 0 : : nullptr)) {}
1779 :
1780 : ClusterInfoImpl::ResourceManagers::ResourceManagers(
1781 : const envoy::config::cluster::v3::Cluster& config, Runtime::Loader& runtime,
1782 : const std::string& cluster_name, Stats::Scope& stats_scope,
1783 : const ClusterCircuitBreakersStatNames& circuit_breakers_stat_names)
1784 159 : : circuit_breakers_stat_names_(circuit_breakers_stat_names) {
1785 159 : managers_[enumToInt(ResourcePriority::Default)] =
1786 159 : load(config, runtime, cluster_name, stats_scope, envoy::config::core::v3::DEFAULT);
1787 159 : managers_[enumToInt(ResourcePriority::High)] =
1788 159 : load(config, runtime, cluster_name, stats_scope, envoy::config::core::v3::HIGH);
1789 159 : }
1790 :
1791 : ClusterCircuitBreakersStats
1792 : ClusterInfoImpl::generateCircuitBreakersStats(Stats::Scope& scope, Stats::StatName prefix,
1793 : bool track_remaining,
1794 2535 : const ClusterCircuitBreakersStatNames& stat_names) {
1795 23760 : auto make_gauge = [&stat_names, &scope, prefix](Stats::StatName stat_name) -> Stats::Gauge& {
1796 23760 : return Stats::Utility::gaugeFromElements(scope,
1797 23760 : {stat_names.circuit_breakers_, prefix, stat_name},
1798 23760 : Stats::Gauge::ImportMode::Accumulate);
1799 23760 : };
1800 :
1801 2535 : #define REMAINING_GAUGE(stat_name) \
1802 12675 : track_remaining ? make_gauge(stat_name) : scope.store().nullGauge()
1803 :
1804 2535 : return {
1805 2535 : make_gauge(stat_names.cx_open_),
1806 2535 : make_gauge(stat_names.cx_pool_open_),
1807 2535 : make_gauge(stat_names.rq_open_),
1808 2535 : make_gauge(stat_names.rq_pending_open_),
1809 2535 : make_gauge(stat_names.rq_retry_open_),
1810 2535 : REMAINING_GAUGE(stat_names.remaining_cx_),
1811 2535 : REMAINING_GAUGE(stat_names.remaining_cx_pools_),
1812 2535 : REMAINING_GAUGE(stat_names.remaining_pending_),
1813 2535 : REMAINING_GAUGE(stat_names.remaining_retries_),
1814 2535 : REMAINING_GAUGE(stat_names.remaining_rq_),
1815 2535 : };
1816 :
1817 2535 : #undef REMAINING_GAUGE
1818 2535 : }
1819 :
1820 68 : Http::Http1::CodecStats& ClusterInfoImpl::http1CodecStats() const {
1821 68 : return Http::Http1::CodecStats::atomicGet(http1_codec_stats_, *stats_scope_);
1822 68 : }
1823 :
1824 105 : Http::Http2::CodecStats& ClusterInfoImpl::http2CodecStats() const {
1825 105 : return Http::Http2::CodecStats::atomicGet(http2_codec_stats_, *stats_scope_);
1826 105 : }
1827 :
1828 0 : Http::Http3::CodecStats& ClusterInfoImpl::http3CodecStats() const {
1829 0 : return Http::Http3::CodecStats::atomicGet(http3_codec_stats_, *stats_scope_);
1830 0 : }
1831 :
1832 : #ifdef ENVOY_ENABLE_UHV
1833 : ::Envoy::Http::HeaderValidatorStats&
1834 : ClusterInfoImpl::getHeaderValidatorStats(Http::Protocol protocol) const {
1835 : switch (protocol) {
1836 : case Http::Protocol::Http10:
1837 : case Http::Protocol::Http11:
1838 : return http1CodecStats();
1839 : case Http::Protocol::Http2:
1840 : return http2CodecStats();
1841 : case Http::Protocol::Http3:
1842 : return http3CodecStats();
1843 : }
1844 : PANIC_DUE_TO_CORRUPT_ENUM;
1845 : }
1846 : #endif
1847 :
1848 : Http::ClientHeaderValidatorPtr
1849 202 : ClusterInfoImpl::makeHeaderValidator([[maybe_unused]] Http::Protocol protocol) const {
1850 : #ifdef ENVOY_ENABLE_UHV
1851 : return http_protocol_options_->header_validator_factory_
1852 : ? http_protocol_options_->header_validator_factory_->createClientHeaderValidator(
1853 : protocol, getHeaderValidatorStats(protocol))
1854 : : nullptr;
1855 : #else
1856 202 : return nullptr;
1857 202 : #endif
1858 202 : }
1859 :
1860 : std::pair<absl::optional<double>, absl::optional<uint32_t>> ClusterInfoImpl::getRetryBudgetParams(
1861 0 : const envoy::config::cluster::v3::CircuitBreakers::Thresholds& thresholds) {
1862 0 : constexpr double default_budget_percent = 20.0;
1863 0 : constexpr uint32_t default_retry_concurrency = 3;
1864 :
1865 0 : absl::optional<double> budget_percent;
1866 0 : absl::optional<uint32_t> min_retry_concurrency;
1867 0 : if (thresholds.has_retry_budget()) {
1868 : // The budget_percent and min_retry_concurrency values are only set if there is a retry budget
1869 : // message set in the cluster config.
1870 0 : budget_percent = PROTOBUF_GET_WRAPPED_OR_DEFAULT(thresholds.retry_budget(), budget_percent,
1871 0 : default_budget_percent);
1872 0 : min_retry_concurrency = PROTOBUF_GET_WRAPPED_OR_DEFAULT(
1873 0 : thresholds.retry_budget(), min_retry_concurrency, default_retry_concurrency);
1874 0 : }
1875 0 : return std::make_pair(budget_percent, min_retry_concurrency);
1876 0 : }
1877 :
1878 : SINGLETON_MANAGER_REGISTRATION(upstream_network_filter_config_provider_manager);
1879 :
1880 : std::shared_ptr<UpstreamNetworkFilterConfigProviderManager>
1881 : ClusterInfoImpl::createSingletonUpstreamNetworkFilterConfigProviderManager(
1882 159 : Server::Configuration::ServerFactoryContext& context) {
1883 159 : return context.singletonManager().getTyped<UpstreamNetworkFilterConfigProviderManager>(
1884 159 : SINGLETON_MANAGER_REGISTERED_NAME(upstream_network_filter_config_provider_manager),
1885 159 : [] { return std::make_shared<Filter::UpstreamNetworkFilterConfigProviderManagerImpl>(); });
1886 159 : }
1887 :
1888 : ResourceManagerImplPtr
1889 : ClusterInfoImpl::ResourceManagers::load(const envoy::config::cluster::v3::Cluster& config,
1890 : Runtime::Loader& runtime, const std::string& cluster_name,
1891 : Stats::Scope& stats_scope,
1892 318 : const envoy::config::core::v3::RoutingPriority& priority) {
1893 318 : uint64_t max_connections = 1024;
1894 318 : uint64_t max_pending_requests = 1024;
1895 318 : uint64_t max_requests = 1024;
1896 318 : uint64_t max_retries = 3;
1897 318 : uint64_t max_connection_pools = std::numeric_limits<uint64_t>::max();
1898 318 : uint64_t max_connections_per_host = std::numeric_limits<uint64_t>::max();
1899 :
1900 318 : bool track_remaining = false;
1901 :
1902 318 : Stats::StatName priority_stat_name;
1903 318 : std::string priority_name;
1904 318 : switch (priority) {
1905 0 : PANIC_ON_PROTO_ENUM_SENTINEL_VALUES;
1906 159 : case envoy::config::core::v3::DEFAULT:
1907 159 : priority_stat_name = circuit_breakers_stat_names_.default_;
1908 159 : priority_name = "default";
1909 159 : break;
1910 159 : case envoy::config::core::v3::HIGH:
1911 159 : priority_stat_name = circuit_breakers_stat_names_.high_;
1912 159 : priority_name = "high";
1913 159 : break;
1914 318 : }
1915 :
1916 318 : const std::string runtime_prefix =
1917 318 : fmt::format("circuit_breakers.{}.{}.", cluster_name, priority_name);
1918 :
1919 318 : const auto& thresholds = config.circuit_breakers().thresholds();
1920 318 : const auto it = std::find_if(
1921 318 : thresholds.cbegin(), thresholds.cend(),
1922 318 : [priority](const envoy::config::cluster::v3::CircuitBreakers::Thresholds& threshold) {
1923 0 : return threshold.priority() == priority;
1924 0 : });
1925 318 : const auto& per_host_thresholds = config.circuit_breakers().per_host_thresholds();
1926 318 : const auto per_host_it = std::find_if(
1927 318 : per_host_thresholds.cbegin(), per_host_thresholds.cend(),
1928 318 : [priority](const envoy::config::cluster::v3::CircuitBreakers::Thresholds& threshold) {
1929 0 : return threshold.priority() == priority;
1930 0 : });
1931 :
1932 318 : absl::optional<double> budget_percent;
1933 318 : absl::optional<uint32_t> min_retry_concurrency;
1934 318 : if (it != thresholds.cend()) {
1935 0 : max_connections = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_connections, max_connections);
1936 0 : max_pending_requests =
1937 0 : PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_pending_requests, max_pending_requests);
1938 0 : max_requests = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_requests, max_requests);
1939 0 : max_retries = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_retries, max_retries);
1940 0 : track_remaining = it->track_remaining();
1941 0 : max_connection_pools =
1942 0 : PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_connection_pools, max_connection_pools);
1943 0 : std::tie(budget_percent, min_retry_concurrency) = ClusterInfoImpl::getRetryBudgetParams(*it);
1944 0 : }
1945 318 : if (per_host_it != per_host_thresholds.cend()) {
1946 0 : if (per_host_it->has_max_pending_requests() || per_host_it->has_max_requests() ||
1947 0 : per_host_it->has_max_retries() || per_host_it->has_max_connection_pools() ||
1948 0 : per_host_it->has_retry_budget()) {
1949 0 : throwEnvoyExceptionOrPanic("Unsupported field in per_host_thresholds");
1950 0 : }
1951 0 : if (per_host_it->has_max_connections()) {
1952 0 : max_connections_per_host = per_host_it->max_connections().value();
1953 0 : }
1954 0 : }
1955 318 : return std::make_unique<ResourceManagerImpl>(
1956 318 : runtime, runtime_prefix, max_connections, max_pending_requests, max_requests, max_retries,
1957 318 : max_connection_pools, max_connections_per_host,
1958 318 : ClusterInfoImpl::generateCircuitBreakersStats(stats_scope, priority_stat_name,
1959 318 : track_remaining, circuit_breakers_stat_names_),
1960 318 : budget_percent, min_retry_concurrency);
1961 318 : }
1962 :
1963 : PriorityStateManager::PriorityStateManager(ClusterImplBase& cluster,
1964 : const LocalInfo::LocalInfo& local_info,
1965 : PrioritySet::HostUpdateCb* update_cb)
1966 159 : : parent_(cluster), local_info_node_(local_info.node()), update_cb_(update_cb) {}
1967 :
1968 : void PriorityStateManager::initializePriorityFor(
1969 159 : const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint) {
1970 159 : const uint32_t priority = locality_lb_endpoint.priority();
1971 159 : if (priority_state_.size() <= priority) {
1972 159 : priority_state_.resize(priority + 1);
1973 159 : }
1974 159 : if (priority_state_[priority].first == nullptr) {
1975 159 : priority_state_[priority].first = std::make_unique<HostVector>();
1976 159 : }
1977 159 : if (locality_lb_endpoint.has_locality() && locality_lb_endpoint.has_load_balancing_weight()) {
1978 0 : priority_state_[priority].second[locality_lb_endpoint.locality()] =
1979 0 : locality_lb_endpoint.load_balancing_weight().value();
1980 0 : }
1981 159 : }
1982 :
1983 : void PriorityStateManager::registerHostForPriority(
1984 : const std::string& hostname, Network::Address::InstanceConstSharedPtr address,
1985 : const std::vector<Network::Address::InstanceConstSharedPtr>& address_list,
1986 : const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint,
1987 159 : const envoy::config::endpoint::v3::LbEndpoint& lb_endpoint, TimeSource& time_source) {
1988 159 : auto metadata = lb_endpoint.has_metadata()
1989 159 : ? parent_.constMetadataSharedPool()->getObject(lb_endpoint.metadata())
1990 159 : : nullptr;
1991 159 : const auto host = std::make_shared<HostImpl>(
1992 159 : parent_.info(), hostname, address, metadata, lb_endpoint.load_balancing_weight().value(),
1993 159 : locality_lb_endpoint.locality(), lb_endpoint.endpoint().health_check_config(),
1994 159 : locality_lb_endpoint.priority(), lb_endpoint.health_status(), time_source);
1995 159 : if (!address_list.empty()) {
1996 0 : host->setAddressList(address_list);
1997 0 : }
1998 159 : registerHostForPriority(host, locality_lb_endpoint);
1999 159 : }
2000 :
2001 : void PriorityStateManager::registerHostForPriority(
2002 : const HostSharedPtr& host,
2003 159 : const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint) {
2004 159 : const uint32_t priority = locality_lb_endpoint.priority();
2005 : // Should be called after initializePriorityFor.
2006 159 : ASSERT(priority_state_[priority].first);
2007 159 : priority_state_[priority].first->emplace_back(host);
2008 159 : }
2009 :
2010 : void PriorityStateManager::updateClusterPrioritySet(
2011 : const uint32_t priority, HostVectorSharedPtr&& current_hosts,
2012 : const absl::optional<HostVector>& hosts_added, const absl::optional<HostVector>& hosts_removed,
2013 : const absl::optional<Upstream::Host::HealthFlag> health_checker_flag,
2014 : absl::optional<bool> weighted_priority_health,
2015 159 : absl::optional<uint32_t> overprovisioning_factor) {
2016 : // If local locality is not defined then skip populating per locality hosts.
2017 159 : const auto& local_locality = local_info_node_.locality();
2018 159 : ENVOY_LOG(trace, "Local locality: {}", local_locality.DebugString());
2019 :
2020 : // For non-EDS, most likely the current hosts are from priority_state_[priority].first.
2021 159 : HostVectorSharedPtr hosts(std::move(current_hosts));
2022 159 : LocalityWeightsMap empty_locality_map;
2023 159 : LocalityWeightsMap& locality_weights_map =
2024 159 : priority_state_.size() > priority ? priority_state_[priority].second : empty_locality_map;
2025 159 : ASSERT(priority_state_.size() > priority || locality_weights_map.empty());
2026 159 : LocalityWeightsSharedPtr locality_weights;
2027 159 : std::vector<HostVector> per_locality;
2028 :
2029 : // If we are configured for locality weighted LB we populate the locality weights. We also
2030 : // populate locality weights if the cluster uses load balancing extensions, since the extension
2031 : // may want to make use of locality weights and we cannot tell by inspecting the config whether
2032 : // this is the case.
2033 : //
2034 : // TODO: have the load balancing extension indicate, programmatically, whether it needs locality
2035 : // weights, as an optimization in cases where it doesn't.
2036 159 : const bool locality_weighted_lb =
2037 159 : parent_.info()->lbConfig().has_locality_weighted_lb_config() ||
2038 159 : parent_.info()->lbType() == LoadBalancerType::LoadBalancingPolicyConfig;
2039 159 : if (locality_weighted_lb) {
2040 159 : locality_weights = std::make_shared<LocalityWeights>();
2041 159 : }
2042 :
2043 : // We use std::map to guarantee a stable ordering for zone aware routing.
2044 159 : std::map<envoy::config::core::v3::Locality, HostVector, LocalityLess> hosts_per_locality;
2045 :
2046 159 : for (const HostSharedPtr& host : *hosts) {
2047 : // Take into consideration when a non-EDS cluster has active health checking, i.e. to mark all
2048 : // the hosts unhealthy (host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC)) and then fire
2049 : // update callbacks to start the health checking process. The endpoint with disabled active
2050 : // health check should not be set FAILED_ACTIVE_HC here.
2051 159 : if (health_checker_flag.has_value() && !host->disableActiveHealthCheck()) {
2052 0 : host->healthFlagSet(health_checker_flag.value());
2053 0 : }
2054 159 : hosts_per_locality[host->locality()].push_back(host);
2055 159 : }
2056 :
2057 : // Do we have hosts for the local locality?
2058 159 : const bool non_empty_local_locality =
2059 159 : local_info_node_.has_locality() &&
2060 159 : hosts_per_locality.find(local_locality) != hosts_per_locality.end();
2061 :
2062 : // As per HostsPerLocality::get(), the per_locality vector must have the local locality hosts
2063 : // first if non_empty_local_locality.
2064 159 : if (non_empty_local_locality) {
2065 0 : per_locality.emplace_back(hosts_per_locality[local_locality]);
2066 0 : if (locality_weighted_lb) {
2067 0 : locality_weights->emplace_back(locality_weights_map[local_locality]);
2068 0 : }
2069 0 : }
2070 :
2071 : // After the local locality hosts (if any), we place the remaining locality host groups in
2072 : // lexicographic order. This provides a stable ordering for zone aware routing.
2073 159 : for (auto& entry : hosts_per_locality) {
2074 159 : if (!non_empty_local_locality || !LocalityEqualTo()(local_locality, entry.first)) {
2075 159 : per_locality.emplace_back(entry.second);
2076 159 : if (locality_weighted_lb) {
2077 159 : locality_weights->emplace_back(locality_weights_map[entry.first]);
2078 159 : }
2079 159 : }
2080 159 : }
2081 :
2082 159 : auto per_locality_shared =
2083 159 : std::make_shared<HostsPerLocalityImpl>(std::move(per_locality), non_empty_local_locality);
2084 :
2085 : // If a batch update callback was provided, use that. Otherwise directly update
2086 : // the PrioritySet.
2087 159 : if (update_cb_ != nullptr) {
2088 28 : update_cb_->updateHosts(priority, HostSetImpl::partitionHosts(hosts, per_locality_shared),
2089 28 : std::move(locality_weights), hosts_added.value_or(*hosts),
2090 28 : hosts_removed.value_or<HostVector>({}), weighted_priority_health,
2091 28 : overprovisioning_factor);
2092 131 : } else {
2093 131 : parent_.prioritySet().updateHosts(
2094 131 : priority, HostSetImpl::partitionHosts(hosts, per_locality_shared),
2095 131 : std::move(locality_weights), hosts_added.value_or(*hosts),
2096 131 : hosts_removed.value_or<HostVector>({}), weighted_priority_health, overprovisioning_factor);
2097 131 : }
2098 159 : }
2099 :
2100 : bool BaseDynamicClusterImpl::updateDynamicHostList(
2101 : const HostVector& new_hosts, HostVector& current_priority_hosts,
2102 : HostVector& hosts_added_to_current_priority, HostVector& hosts_removed_from_current_priority,
2103 28 : const HostMap& all_hosts, const absl::flat_hash_set<std::string>& all_new_hosts) {
2104 28 : uint64_t max_host_weight = 1;
2105 :
2106 : // Did hosts change?
2107 : //
2108 : // Have host attributes changed the health of any endpoint? If so, we
2109 : // rebuild the hosts vectors. We only do this if the health status of an
2110 : // endpoint has materially changed (e.g. if previously failing active health
2111 : // checks, we just note it's now failing EDS health status but don't rebuild).
2112 : //
2113 : // TODO(htuch): We can be smarter about this potentially, and not force a full
2114 : // host set update on health status change. The way this would work is to
2115 : // implement a HealthChecker subclass that provides thread local health
2116 : // updates to the Cluster object. This will probably make sense to do in
2117 : // conjunction with https://github.com/envoyproxy/envoy/issues/2874.
2118 28 : bool hosts_changed = false;
2119 :
2120 : // Go through and see if the list we have is different from what we just got. If it is, we make
2121 : // a new host list and raise a change notification. We also check for duplicates here. It's
2122 : // possible for DNS to return the same address multiple times, and a bad EDS implementation
2123 : // could do the same thing.
2124 :
2125 : // Keep track of hosts we see in new_hosts that we are able to match up with an existing host.
2126 28 : absl::flat_hash_set<std::string> existing_hosts_for_current_priority(
2127 28 : current_priority_hosts.size());
2128 : // Keep track of hosts we're adding (or replacing)
2129 28 : absl::flat_hash_set<std::string> new_hosts_for_current_priority(new_hosts.size());
2130 : // Keep track of hosts for which locality is changed.
2131 28 : absl::flat_hash_set<std::string> hosts_with_updated_locality_for_current_priority(
2132 28 : current_priority_hosts.size());
2133 : // Keep track of hosts for which active health check flag is changed.
2134 28 : absl::flat_hash_set<std::string> hosts_with_active_health_check_flag_changed(
2135 28 : current_priority_hosts.size());
2136 28 : HostVector final_hosts;
2137 28 : for (const HostSharedPtr& host : new_hosts) {
2138 : // To match a new host with an existing host means comparing their addresses.
2139 28 : auto existing_host = all_hosts.find(addressToString(host->address()));
2140 28 : const bool existing_host_found = existing_host != all_hosts.end();
2141 :
2142 : // Clear any pending deletion flag on an existing host in case it came back while it was
2143 : // being stabilized. We will set it again below if needed.
2144 28 : if (existing_host_found) {
2145 0 : existing_host->second->healthFlagClear(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
2146 0 : }
2147 :
2148 : // Check if in-place host update should be skipped, i.e. when the following criteria are met
2149 : // (currently there is only one criterion, but we might add more in the future):
2150 : // - The cluster health checker is activated and a new host is matched with the existing one,
2151 : // but the health check address is different.
2152 28 : const bool health_check_address_changed =
2153 28 : (health_checker_ != nullptr && existing_host_found &&
2154 28 : *existing_host->second->healthCheckAddress() != *host->healthCheckAddress());
2155 28 : bool locality_changed = false;
2156 28 : locality_changed = (existing_host_found &&
2157 28 : (!LocalityEqualTo()(host->locality(), existing_host->second->locality())));
2158 28 : if (locality_changed) {
2159 0 : hosts_with_updated_locality_for_current_priority.emplace(existing_host->first);
2160 0 : }
2161 :
2162 28 : const bool active_health_check_flag_changed =
2163 28 : (health_checker_ != nullptr && existing_host_found &&
2164 28 : existing_host->second->disableActiveHealthCheck() != host->disableActiveHealthCheck());
2165 28 : if (active_health_check_flag_changed) {
2166 0 : hosts_with_active_health_check_flag_changed.emplace(existing_host->first);
2167 0 : }
2168 28 : const bool skip_inplace_host_update =
2169 28 : health_check_address_changed || locality_changed || active_health_check_flag_changed;
2170 :
2171 : // When there is a match and we decided to do in-place update, we potentially update the
2172 : // host's health check flag and metadata. Afterwards, the host is pushed back into the
2173 : // final_hosts, i.e. hosts that should be preserved in the current priority.
2174 28 : if (existing_host_found && !skip_inplace_host_update) {
2175 0 : existing_hosts_for_current_priority.emplace(existing_host->first);
2176 : // If we find a host matched based on address, we keep it. However we do change weight
2177 : // inline so do that here.
2178 0 : if (host->weight() > max_host_weight) {
2179 0 : max_host_weight = host->weight();
2180 0 : }
2181 0 : if (existing_host->second->weight() != host->weight()) {
2182 0 : existing_host->second->weight(host->weight());
2183 : // We do full host set rebuilds so that load balancers can do pre-computation of data
2184 : // structures based on host weight. This may become a performance problem in certain
2185 : // deployments so it is runtime feature guarded and may also need to be configurable
2186 : // and/or dynamic in the future.
2187 0 : hosts_changed = true;
2188 0 : }
2189 :
2190 0 : hosts_changed |= updateEdsHealthFlag(*host, *existing_host->second);
2191 :
2192 : // Did metadata change?
2193 0 : bool metadata_changed = true;
2194 0 : if (host->metadata() && existing_host->second->metadata()) {
2195 0 : metadata_changed = !Protobuf::util::MessageDifferencer::Equivalent(
2196 0 : *host->metadata(), *existing_host->second->metadata());
2197 0 : } else if (!host->metadata() && !existing_host->second->metadata()) {
2198 0 : metadata_changed = false;
2199 0 : }
2200 :
2201 0 : if (metadata_changed) {
2202 : // First, update the entire metadata for the endpoint.
2203 0 : existing_host->second->metadata(host->metadata());
2204 :
2205 : // Also, given that the canary attribute of an endpoint is derived from its metadata
2206 : // (e.g.: from envoy.lb/canary), we do a blind update here since it's cheaper than testing
2207 : // to see if it actually changed. We must update this besides just updating the metadata,
2208 : // because it'll be used by the router filter to compute upstream stats.
2209 0 : existing_host->second->canary(host->canary());
2210 :
2211 : // If metadata changed, we need to rebuild. See github issue #3810.
2212 0 : hosts_changed = true;
2213 0 : }
2214 :
2215 : // Did the priority change?
2216 0 : if (host->priority() != existing_host->second->priority()) {
2217 0 : existing_host->second->priority(host->priority());
2218 0 : hosts_added_to_current_priority.emplace_back(existing_host->second);
2219 0 : }
2220 :
2221 0 : final_hosts.push_back(existing_host->second);
2222 28 : } else {
2223 28 : new_hosts_for_current_priority.emplace(addressToString(host->address()));
2224 28 : if (host->weight() > max_host_weight) {
2225 0 : max_host_weight = host->weight();
2226 0 : }
2227 :
2228 : // If we are depending on a health checker, we initialize to unhealthy.
2229 : // If there's an existing host with the same health checker, the
2230 : // active health-status is kept.
2231 28 : if (health_checker_ != nullptr && !host->disableActiveHealthCheck()) {
2232 0 : if (Runtime::runtimeFeatureEnabled(
2233 0 : "envoy.reloadable_features.keep_endpoint_active_hc_status_on_locality_update")) {
2234 0 : if (existing_host_found && !health_check_address_changed &&
2235 0 : !active_health_check_flag_changed) {
2236 : // If there's an existing host, use the same active health-status.
2237 : // The existing host can be marked PENDING_ACTIVE_HC or
2238 : // ACTIVE_HC_TIMEOUT if it is also marked with FAILED_ACTIVE_HC.
2239 0 : ASSERT(!existing_host->second->healthFlagGet(Host::HealthFlag::PENDING_ACTIVE_HC) ||
2240 0 : existing_host->second->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
2241 0 : ASSERT(!existing_host->second->healthFlagGet(Host::HealthFlag::ACTIVE_HC_TIMEOUT) ||
2242 0 : existing_host->second->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC));
2243 :
2244 0 : constexpr uint32_t active_hc_statuses_mask =
2245 0 : enumToInt(Host::HealthFlag::FAILED_ACTIVE_HC) |
2246 0 : enumToInt(Host::HealthFlag::DEGRADED_ACTIVE_HC) |
2247 0 : enumToInt(Host::HealthFlag::PENDING_ACTIVE_HC) |
2248 0 : enumToInt(Host::HealthFlag::ACTIVE_HC_TIMEOUT);
2249 :
2250 0 : const uint32_t existing_host_statuses = existing_host->second->healthFlagsGetAll();
2251 0 : host->healthFlagsSetAll(existing_host_statuses & active_hc_statuses_mask);
2252 0 : } else {
2253 : // No previous known host, mark it as failed active HC.
2254 0 : host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC);
2255 :
2256 : // If we want to exclude hosts until they have been health checked, mark them with
2257 : // a flag to indicate that they have not been health checked yet.
2258 0 : if (info_->warmHosts()) {
2259 0 : host->healthFlagSet(Host::HealthFlag::PENDING_ACTIVE_HC);
2260 0 : }
2261 0 : }
2262 0 : } else {
2263 0 : host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC);
2264 :
2265 : // If we want to exclude hosts until they have been health checked, mark them with
2266 : // a flag to indicate that they have not been health checked yet.
2267 0 : if (info_->warmHosts()) {
2268 0 : host->healthFlagSet(Host::HealthFlag::PENDING_ACTIVE_HC);
2269 0 : }
2270 0 : }
2271 0 : }
2272 :
2273 28 : final_hosts.push_back(host);
2274 28 : hosts_added_to_current_priority.push_back(host);
2275 28 : }
2276 28 : }
2277 :
2278 : // Remove hosts from current_priority_hosts that were matched to an existing host in the
2279 : // previous loop.
2280 28 : auto erase_from =
2281 28 : std::remove_if(current_priority_hosts.begin(), current_priority_hosts.end(),
2282 28 : [&existing_hosts_for_current_priority](const HostSharedPtr& p) {
2283 0 : auto existing_itr =
2284 0 : existing_hosts_for_current_priority.find(p->address()->asString());
2285 :
2286 0 : if (existing_itr != existing_hosts_for_current_priority.end()) {
2287 0 : existing_hosts_for_current_priority.erase(existing_itr);
2288 0 : return true;
2289 0 : }
2290 :
2291 0 : return false;
2292 0 : });
2293 28 : current_priority_hosts.erase(erase_from, current_priority_hosts.end());
2294 :
2295 : // If we saw existing hosts during this iteration from a different priority, then we've moved
2296 : // a host from another priority into this one, so we should mark the priority as having changed.
2297 28 : if (!existing_hosts_for_current_priority.empty()) {
2298 0 : hosts_changed = true;
2299 0 : }
2300 :
2301 : // The remaining hosts are hosts that are not referenced in the config update. We remove them
2302 : // from the priority if any of the following is true:
2303 : // - Active health checking is not enabled.
2304 : // - The removed hosts are failing active health checking OR have been explicitly marked as
2305 : // unhealthy by a previous EDS update. We do not count outlier as a reason to remove a host
2306 : // or any other future health condition that may be added so we do not use the coarseHealth()
2307 : // API.
2308 : // - We have explicitly configured the cluster to remove hosts regardless of active health
2309 : // status.
2310 28 : const bool dont_remove_healthy_hosts =
2311 28 : health_checker_ != nullptr && !info()->drainConnectionsOnHostRemoval();
2312 28 : if (!current_priority_hosts.empty() && dont_remove_healthy_hosts) {
2313 0 : erase_from = std::remove_if(
2314 0 : current_priority_hosts.begin(), current_priority_hosts.end(),
2315 0 : [&all_new_hosts, &new_hosts_for_current_priority,
2316 0 : &hosts_with_updated_locality_for_current_priority,
2317 0 : &hosts_with_active_health_check_flag_changed, &final_hosts,
2318 0 : &max_host_weight](const HostSharedPtr& p) {
2319 : // This host has already been added as a new host in the
2320 : // new_hosts_for_current_priority. Return false here to make sure that host
2321 : // reference with older locality gets cleaned up from the priority.
2322 0 : if (hosts_with_updated_locality_for_current_priority.contains(p->address()->asString())) {
2323 0 : return false;
2324 0 : }
2325 :
2326 0 : if (hosts_with_active_health_check_flag_changed.contains(p->address()->asString())) {
2327 0 : return false;
2328 0 : }
2329 :
2330 0 : if (all_new_hosts.contains(p->address()->asString()) &&
2331 0 : !new_hosts_for_current_priority.contains(p->address()->asString())) {
2332 : // If the address is being completely deleted from this priority, but is
2333 : // referenced from another priority, then we assume that the other
2334 : // priority will perform an in-place update to re-use the existing Host.
2335 : // We should therefore not mark it as PENDING_DYNAMIC_REMOVAL, but
2336 : // instead remove it immediately from this priority.
2337 : // Example: health check address changed and priority also changed
2338 0 : return false;
2339 0 : }
2340 :
2341 : // PENDING_DYNAMIC_REMOVAL doesn't apply for the host with disabled active
2342 : // health check, the host is removed immediately from this priority.
2343 0 : if ((!(p->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) ||
2344 0 : p->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH))) &&
2345 0 : !p->disableActiveHealthCheck()) {
2346 0 : if (p->weight() > max_host_weight) {
2347 0 : max_host_weight = p->weight();
2348 0 : }
2349 :
2350 0 : final_hosts.push_back(p);
2351 0 : p->healthFlagSet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL);
2352 0 : return true;
2353 0 : }
2354 0 : return false;
2355 0 : });
2356 0 : current_priority_hosts.erase(erase_from, current_priority_hosts.end());
2357 0 : }
2358 :
2359 : // At this point we've accounted for all the new hosts as well the hosts that previously
2360 : // existed in this priority.
2361 28 : info_->endpointStats().max_host_weight_.set(max_host_weight);
2362 :
2363 : // Whatever remains in current_priority_hosts should be removed.
2364 28 : if (!hosts_added_to_current_priority.empty() || !current_priority_hosts.empty()) {
2365 28 : hosts_removed_from_current_priority = std::move(current_priority_hosts);
2366 28 : hosts_changed = true;
2367 28 : }
2368 :
2369 : // During the update we populated final_hosts with all the hosts that should remain
2370 : // in the current priority, so move them back into current_priority_hosts.
2371 28 : current_priority_hosts = std::move(final_hosts);
2372 : // We return false here in the absence of EDS health status or metadata changes, because we
2373 : // have no changes to host vector status (modulo weights). When we have EDS
2374 : // health status or metadata changed, we return true, causing updateHosts() to fire in the
2375 : // caller.
2376 28 : return hosts_changed;
2377 28 : }
2378 :
2379 : Network::DnsLookupFamily
2380 0 : getDnsLookupFamilyFromCluster(const envoy::config::cluster::v3::Cluster& cluster) {
2381 0 : return DnsUtils::getDnsLookupFamilyFromEnum(cluster.dns_lookup_family());
2382 0 : }
2383 :
2384 : void reportUpstreamCxDestroy(const Upstream::HostDescriptionConstSharedPtr& host,
2385 173 : Network::ConnectionEvent event) {
2386 173 : Upstream::ClusterTrafficStats& stats = *host->cluster().trafficStats();
2387 173 : stats.upstream_cx_destroy_.inc();
2388 173 : if (event == Network::ConnectionEvent::RemoteClose) {
2389 101 : stats.upstream_cx_destroy_remote_.inc();
2390 118 : } else {
2391 72 : stats.upstream_cx_destroy_local_.inc();
2392 72 : }
2393 173 : }
2394 :
2395 : void reportUpstreamCxDestroyActiveRequest(const Upstream::HostDescriptionConstSharedPtr& host,
2396 61 : Network::ConnectionEvent event) {
2397 61 : Upstream::ClusterTrafficStats& stats = *host->cluster().trafficStats();
2398 61 : stats.upstream_cx_destroy_with_active_rq_.inc();
2399 61 : if (event == Network::ConnectionEvent::RemoteClose) {
2400 43 : stats.upstream_cx_destroy_remote_with_active_rq_.inc();
2401 58 : } else {
2402 18 : stats.upstream_cx_destroy_local_with_active_rq_.inc();
2403 18 : }
2404 61 : }
2405 :
2406 : Network::Address::InstanceConstSharedPtr resolveHealthCheckAddress(
2407 : const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
2408 180397 : Network::Address::InstanceConstSharedPtr host_address) {
2409 180397 : Network::Address::InstanceConstSharedPtr health_check_address;
2410 180397 : const auto& port_value = health_check_config.port_value();
2411 180397 : if (health_check_config.has_address()) {
2412 0 : auto address = Network::Address::resolveProtoAddress(health_check_config.address());
2413 0 : health_check_address =
2414 0 : port_value == 0 ? address : Network::Utility::getAddressWithPort(*address, port_value);
2415 180397 : } else {
2416 180397 : health_check_address = port_value == 0
2417 180397 : ? host_address
2418 180397 : : Network::Utility::getAddressWithPort(*host_address, port_value);
2419 180397 : }
2420 180397 : return health_check_address;
2421 180397 : }
2422 :
2423 : } // namespace Upstream
2424 : } // namespace Envoy
|