/proc/self/cwd/source/common/upstream/upstream_impl.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/common/upstream/upstream_impl.h" |
2 | | |
3 | | #include <chrono> |
4 | | #include <cstdint> |
5 | | #include <limits> |
6 | | #include <list> |
7 | | #include <memory> |
8 | | #include <string> |
9 | | #include <vector> |
10 | | |
11 | | #include "envoy/config/cluster/v3/circuit_breaker.pb.h" |
12 | | #include "envoy/config/cluster/v3/cluster.pb.h" |
13 | | #include "envoy/config/core/v3/address.pb.h" |
14 | | #include "envoy/config/core/v3/base.pb.h" |
15 | | #include "envoy/config/core/v3/health_check.pb.h" |
16 | | #include "envoy/config/core/v3/protocol.pb.h" |
17 | | #include "envoy/config/endpoint/v3/endpoint_components.pb.h" |
18 | | #include "envoy/config/upstream/local_address_selector/v3/default_local_address_selector.pb.h" |
19 | | #include "envoy/event/dispatcher.h" |
20 | | #include "envoy/event/timer.h" |
21 | | #include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.h" |
22 | | #include "envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.pb.h" |
23 | | #include "envoy/extensions/transport_sockets/raw_buffer/v3/raw_buffer.pb.h" |
24 | | #include "envoy/init/manager.h" |
25 | | #include "envoy/network/dns.h" |
26 | | #include "envoy/network/transport_socket.h" |
27 | | #include "envoy/registry/registry.h" |
28 | | #include "envoy/secret/secret_manager.h" |
29 | | #include "envoy/server/filter_config.h" |
30 | | #include "envoy/server/transport_socket_config.h" |
31 | | #include "envoy/ssl/context_manager.h" |
32 | | #include "envoy/stats/scope.h" |
33 | | #include "envoy/upstream/health_checker.h" |
34 | | #include "envoy/upstream/upstream.h" |
35 | | |
36 | | #include "source/common/common/dns_utils.h" |
37 | | #include "source/common/common/enum_to_int.h" |
38 | | #include "source/common/common/fmt.h" |
39 | | #include "source/common/common/utility.h" |
40 | | #include "source/common/config/utility.h" |
41 | | #include "source/common/http/http1/codec_stats.h" |
42 | | #include "source/common/http/http2/codec_stats.h" |
43 | | #include "source/common/http/utility.h" |
44 | | #include "source/common/network/address_impl.h" |
45 | | #include "source/common/network/happy_eyeballs_connection_impl.h" |
46 | | #include "source/common/network/resolver_impl.h" |
47 | | #include "source/common/network/socket_option_factory.h" |
48 | | #include "source/common/network/socket_option_impl.h" |
49 | | #include "source/common/protobuf/protobuf.h" |
50 | | #include "source/common/protobuf/utility.h" |
51 | | #include "source/common/router/config_utility.h" |
52 | | #include "source/common/runtime/runtime_features.h" |
53 | | #include "source/common/runtime/runtime_impl.h" |
54 | | #include "source/common/stats/deferred_creation.h" |
55 | | #include "source/common/upstream/cluster_factory_impl.h" |
56 | | #include "source/common/upstream/health_checker_impl.h" |
57 | | #include "source/extensions/filters/network/http_connection_manager/config.h" |
58 | | #include "source/server/transport_socket_config_impl.h" |
59 | | |
60 | | #include "absl/container/node_hash_set.h" |
61 | | #include "absl/strings/str_cat.h" |
62 | | |
63 | | namespace Envoy { |
64 | | namespace Upstream { |
65 | | namespace { |
66 | 3.29k | std::string addressToString(Network::Address::InstanceConstSharedPtr address) { |
67 | 3.29k | if (!address) { |
68 | 0 | return ""; |
69 | 0 | } |
70 | 3.29k | return address->asString(); |
71 | 3.29k | } |
72 | | |
73 | | Network::TcpKeepaliveConfig |
74 | 0 | parseTcpKeepaliveConfig(const envoy::config::cluster::v3::Cluster& config) { |
75 | 0 | const envoy::config::core::v3::TcpKeepalive& options = |
76 | 0 | config.upstream_connection_options().tcp_keepalive(); |
77 | 0 | return Network::TcpKeepaliveConfig{ |
78 | 0 | PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, keepalive_probes, absl::optional<uint32_t>()), |
79 | 0 | PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, keepalive_time, absl::optional<uint32_t>()), |
80 | 0 | PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, keepalive_interval, absl::optional<uint32_t>())}; |
81 | 0 | } |
82 | | |
83 | | ProtocolOptionsConfigConstSharedPtr |
84 | | createProtocolOptionsConfig(const std::string& name, const ProtobufWkt::Any& typed_config, |
85 | 2.24k | Server::Configuration::ProtocolOptionsFactoryContext& factory_context) { |
86 | 2.24k | Server::Configuration::ProtocolOptionsFactory* factory = |
87 | 2.24k | Registry::FactoryRegistry<Server::Configuration::NamedNetworkFilterConfigFactory>::getFactory( |
88 | 2.24k | name); |
89 | 2.24k | if (factory == nullptr) { |
90 | 2.24k | factory = |
91 | 2.24k | Registry::FactoryRegistry<Server::Configuration::NamedHttpFilterConfigFactory>::getFactory( |
92 | 2.24k | name); |
93 | 2.24k | } |
94 | 2.24k | if (factory == nullptr) { |
95 | 2.24k | factory = |
96 | 2.24k | Registry::FactoryRegistry<Server::Configuration::ProtocolOptionsFactory>::getFactory(name); |
97 | 2.24k | } |
98 | | |
99 | 2.24k | if (factory == nullptr) { |
100 | 0 | throwEnvoyExceptionOrPanic( |
101 | 0 | fmt::format("Didn't find a registered network or http filter or protocol " |
102 | 0 | "options implementation for name: '{}'", |
103 | 0 | name)); |
104 | 0 | } |
105 | | |
106 | 2.24k | ProtobufTypes::MessagePtr proto_config = factory->createEmptyProtocolOptionsProto(); |
107 | | |
108 | 2.24k | if (proto_config == nullptr) { |
109 | 0 | throwEnvoyExceptionOrPanic(fmt::format("filter {} does not support protocol options", name)); |
110 | 0 | } |
111 | | |
112 | 2.24k | Envoy::Config::Utility::translateOpaqueConfig( |
113 | 2.24k | typed_config, factory_context.messageValidationVisitor(), *proto_config); |
114 | 2.24k | return factory->createProtocolOptionsConfig(*proto_config, factory_context); |
115 | 2.24k | } |
116 | | |
117 | | absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr> parseExtensionProtocolOptions( |
118 | | const envoy::config::cluster::v3::Cluster& config, |
119 | 3.26k | Server::Configuration::ProtocolOptionsFactoryContext& factory_context) { |
120 | 3.26k | absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr> options; |
121 | | |
122 | 3.26k | for (const auto& it : config.typed_extension_protocol_options()) { |
123 | 2.24k | auto& name = it.first; |
124 | 2.24k | auto object = createProtocolOptionsConfig(name, it.second, factory_context); |
125 | 2.24k | if (object != nullptr) { |
126 | 2.24k | options[name] = std::move(object); |
127 | 2.24k | } |
128 | 2.24k | } |
129 | | |
130 | 3.26k | return options; |
131 | 3.26k | } |
132 | | |
133 | | // Updates the EDS health flags for an existing host to match the new host. |
134 | | // @param updated_host the new host to read health flag values from. |
135 | | // @param existing_host the host to update. |
136 | | // @return bool whether the flag update caused the host health to change. |
137 | 0 | bool updateEdsHealthFlag(const Host& updated_host, Host& existing_host) { |
138 | 0 | auto updated_eds_health_status = updated_host.edsHealthStatus(); |
139 | | |
140 | | // Check if the health flag has changed. |
141 | 0 | if (updated_eds_health_status == existing_host.edsHealthStatus()) { |
142 | 0 | return false; |
143 | 0 | } |
144 | | |
145 | 0 | const auto previous_health = existing_host.coarseHealth(); |
146 | 0 | existing_host.setEdsHealthStatus(updated_eds_health_status); |
147 | | |
148 | | // Rebuild if changing the flag affected the host health. |
149 | 0 | return previous_health != existing_host.coarseHealth(); |
150 | 0 | } |
151 | | |
152 | | // Converts a set of hosts into a HostVector, excluding certain hosts. |
153 | | // @param hosts hosts to convert |
154 | | // @param excluded_hosts hosts to exclude from the resulting vector. |
155 | | HostVector filterHosts(const absl::node_hash_set<HostSharedPtr>& hosts, |
156 | 28 | const absl::node_hash_set<HostSharedPtr>& excluded_hosts) { |
157 | 28 | HostVector net_hosts; |
158 | 28 | net_hosts.reserve(hosts.size()); |
159 | | |
160 | 28 | for (const auto& host : hosts) { |
161 | 14 | if (excluded_hosts.find(host) == excluded_hosts.end()) { |
162 | 14 | net_hosts.emplace_back(host); |
163 | 14 | } |
164 | 14 | } |
165 | | |
166 | 28 | return net_hosts; |
167 | 28 | } |
168 | | |
169 | | Stats::ScopeSharedPtr generateStatsScope(const envoy::config::cluster::v3::Cluster& config, |
170 | 3.26k | Stats::Store& stats) { |
171 | 3.26k | return stats.createScope(fmt::format( |
172 | 3.26k | "cluster.{}.", config.alt_stat_name().empty() ? config.name() : config.alt_stat_name())); |
173 | 3.26k | } |
174 | | |
175 | | Network::ConnectionSocket::OptionsSharedPtr |
176 | | buildBaseSocketOptions(const envoy::config::cluster::v3::Cluster& cluster_config, |
177 | 3.26k | const envoy::config::core::v3::BindConfig& bootstrap_bind_config) { |
178 | 3.26k | Network::ConnectionSocket::OptionsSharedPtr base_options = |
179 | 3.26k | std::make_shared<Network::ConnectionSocket::Options>(); |
180 | | |
181 | | // The process-wide `signal()` handling may fail to handle SIGPIPE if overridden |
182 | | // in the process (i.e., on a mobile client). Some OSes support handling it at the socket layer: |
183 | 3.26k | if (ENVOY_SOCKET_SO_NOSIGPIPE.hasValue()) { |
184 | 0 | Network::Socket::appendOptions(base_options, |
185 | 0 | Network::SocketOptionFactory::buildSocketNoSigpipeOptions()); |
186 | 0 | } |
187 | | // Cluster IP_FREEBIND settings, when set, will override the cluster manager wide settings. |
188 | 3.26k | if ((bootstrap_bind_config.freebind().value() && |
189 | 3.26k | !cluster_config.upstream_bind_config().has_freebind()) || |
190 | 3.26k | cluster_config.upstream_bind_config().freebind().value()) { |
191 | 0 | Network::Socket::appendOptions(base_options, |
192 | 0 | Network::SocketOptionFactory::buildIpFreebindOptions()); |
193 | 0 | } |
194 | 3.26k | if (cluster_config.upstream_connection_options().has_tcp_keepalive()) { |
195 | 0 | Network::Socket::appendOptions(base_options, |
196 | 0 | Network::SocketOptionFactory::buildTcpKeepaliveOptions( |
197 | 0 | parseTcpKeepaliveConfig(cluster_config))); |
198 | 0 | } |
199 | | |
200 | 3.26k | return base_options; |
201 | 3.26k | } |
202 | | |
203 | | Network::ConnectionSocket::OptionsSharedPtr |
204 | | buildClusterSocketOptions(const envoy::config::cluster::v3::Cluster& cluster_config, |
205 | 3.26k | const envoy::config::core::v3::BindConfig& bootstrap_bind_config) { |
206 | 3.26k | Network::ConnectionSocket::OptionsSharedPtr cluster_options = |
207 | 3.26k | std::make_shared<Network::ConnectionSocket::Options>(); |
208 | | // Cluster socket_options trump cluster manager wide. |
209 | 3.26k | if (bootstrap_bind_config.socket_options().size() + |
210 | 3.26k | cluster_config.upstream_bind_config().socket_options().size() > |
211 | 3.26k | 0) { |
212 | 0 | auto socket_options = !cluster_config.upstream_bind_config().socket_options().empty() |
213 | 0 | ? cluster_config.upstream_bind_config().socket_options() |
214 | 0 | : bootstrap_bind_config.socket_options(); |
215 | 0 | Network::Socket::appendOptions( |
216 | 0 | cluster_options, Network::SocketOptionFactory::buildLiteralOptions(socket_options)); |
217 | 0 | } |
218 | 3.26k | return cluster_options; |
219 | 3.26k | } |
220 | | |
221 | | std::vector<::Envoy::Upstream::UpstreamLocalAddress> |
222 | | parseBindConfig(::Envoy::OptRef<const envoy::config::core::v3::BindConfig> bind_config, |
223 | | const absl::optional<std::string>& cluster_name, |
224 | | Network::ConnectionSocket::OptionsSharedPtr base_socket_options, |
225 | 3.26k | Network::ConnectionSocket::OptionsSharedPtr cluster_socket_options) { |
226 | | |
227 | 3.26k | std::vector<::Envoy::Upstream::UpstreamLocalAddress> upstream_local_addresses; |
228 | 3.26k | if (bind_config.has_value()) { |
229 | 0 | UpstreamLocalAddress upstream_local_address; |
230 | 0 | upstream_local_address.address_ = |
231 | 0 | bind_config->has_source_address() |
232 | 0 | ? ::Envoy::Network::Address::resolveProtoSocketAddress(bind_config->source_address()) |
233 | 0 | : nullptr; |
234 | 0 | upstream_local_address.socket_options_ = std::make_shared<Network::ConnectionSocket::Options>(); |
235 | |
|
236 | 0 | ::Envoy::Network::Socket::appendOptions(upstream_local_address.socket_options_, |
237 | 0 | base_socket_options); |
238 | 0 | ::Envoy::Network::Socket::appendOptions(upstream_local_address.socket_options_, |
239 | 0 | cluster_socket_options); |
240 | |
|
241 | 0 | upstream_local_addresses.push_back(upstream_local_address); |
242 | |
|
243 | 0 | for (const auto& extra_source_address : bind_config->extra_source_addresses()) { |
244 | 0 | UpstreamLocalAddress extra_upstream_local_address; |
245 | 0 | extra_upstream_local_address.address_ = |
246 | 0 | ::Envoy::Network::Address::resolveProtoSocketAddress(extra_source_address.address()); |
247 | |
|
248 | 0 | extra_upstream_local_address.socket_options_ = |
249 | 0 | std::make_shared<::Envoy::Network::ConnectionSocket::Options>(); |
250 | 0 | ::Envoy::Network::Socket::appendOptions(extra_upstream_local_address.socket_options_, |
251 | 0 | base_socket_options); |
252 | |
|
253 | 0 | if (extra_source_address.has_socket_options()) { |
254 | 0 | ::Envoy::Network::Socket::appendOptions( |
255 | 0 | extra_upstream_local_address.socket_options_, |
256 | 0 | ::Envoy::Network::SocketOptionFactory::buildLiteralOptions( |
257 | 0 | extra_source_address.socket_options().socket_options())); |
258 | 0 | } else { |
259 | 0 | ::Envoy::Network::Socket::appendOptions(extra_upstream_local_address.socket_options_, |
260 | 0 | cluster_socket_options); |
261 | 0 | } |
262 | 0 | upstream_local_addresses.push_back(extra_upstream_local_address); |
263 | 0 | } |
264 | |
|
265 | 0 | for (const auto& additional_source_address : bind_config->additional_source_addresses()) { |
266 | 0 | UpstreamLocalAddress additional_upstream_local_address; |
267 | 0 | additional_upstream_local_address.address_ = |
268 | 0 | ::Envoy::Network::Address::resolveProtoSocketAddress(additional_source_address); |
269 | 0 | additional_upstream_local_address.socket_options_ = |
270 | 0 | std::make_shared<::Envoy::Network::ConnectionSocket::Options>(); |
271 | 0 | ::Envoy::Network::Socket::appendOptions(additional_upstream_local_address.socket_options_, |
272 | 0 | base_socket_options); |
273 | 0 | ::Envoy::Network::Socket::appendOptions(additional_upstream_local_address.socket_options_, |
274 | 0 | cluster_socket_options); |
275 | 0 | upstream_local_addresses.push_back(additional_upstream_local_address); |
276 | 0 | } |
277 | 3.26k | } else { |
278 | | // If there is no bind config specified, then return a nullptr for the address. |
279 | 3.26k | UpstreamLocalAddress local_address; |
280 | 3.26k | local_address.address_ = nullptr; |
281 | 3.26k | local_address.socket_options_ = std::make_shared<::Envoy::Network::ConnectionSocket::Options>(); |
282 | 3.26k | ::Envoy::Network::Socket::appendOptions(local_address.socket_options_, base_socket_options); |
283 | 3.26k | Network::Socket::appendOptions(local_address.socket_options_, cluster_socket_options); |
284 | 3.26k | upstream_local_addresses.push_back(local_address); |
285 | 3.26k | } |
286 | | |
287 | | // Verify that we have valid addresses if size is greater than 1. |
288 | 3.26k | if (upstream_local_addresses.size() > 1) { |
289 | 0 | for (auto const& upstream_local_address : upstream_local_addresses) { |
290 | 0 | if (upstream_local_address.address_ == nullptr) { |
291 | 0 | throwEnvoyExceptionOrPanic(fmt::format( |
292 | 0 | "{}'s upstream binding config has invalid IP addresses.", |
293 | 0 | !(cluster_name.has_value()) ? "Bootstrap" |
294 | 0 | : fmt::format("Cluster {}", cluster_name.value()))); |
295 | 0 | } |
296 | 0 | } |
297 | 0 | } |
298 | | |
299 | 3.26k | return upstream_local_addresses; |
300 | 3.26k | } |
301 | | |
302 | | Envoy::Upstream::UpstreamLocalAddressSelectorConstSharedPtr createUpstreamLocalAddressSelector( |
303 | | const envoy::config::cluster::v3::Cluster& cluster_config, |
304 | 3.26k | const absl::optional<envoy::config::core::v3::BindConfig>& bootstrap_bind_config) { |
305 | | |
306 | | // Use the cluster bind config if specified. This completely overrides the |
307 | | // bootstrap bind config when present. |
308 | 3.26k | OptRef<const envoy::config::core::v3::BindConfig> bind_config; |
309 | 3.26k | absl::optional<std::string> cluster_name; |
310 | 3.26k | if (cluster_config.has_upstream_bind_config()) { |
311 | 0 | bind_config.emplace(cluster_config.upstream_bind_config()); |
312 | 0 | cluster_name.emplace(cluster_config.name()); |
313 | 3.26k | } else if (bootstrap_bind_config.has_value()) { |
314 | 0 | bind_config.emplace(*bootstrap_bind_config); |
315 | 0 | } |
316 | | |
317 | | // Verify that bind config is valid. |
318 | 3.26k | if (bind_config.has_value()) { |
319 | 0 | if (bind_config->additional_source_addresses_size() > 0 && |
320 | 0 | bind_config->extra_source_addresses_size() > 0) { |
321 | 0 | throwEnvoyExceptionOrPanic(fmt::format( |
322 | 0 | "Can't specify both `extra_source_addresses` and `additional_source_addresses` " |
323 | 0 | "in the {}'s upstream binding config", |
324 | 0 | !(cluster_name.has_value()) ? "Bootstrap" |
325 | 0 | : fmt::format("Cluster {}", cluster_name.value()))); |
326 | 0 | } |
327 | | |
328 | 0 | if (!bind_config->has_source_address() && |
329 | 0 | (bind_config->extra_source_addresses_size() > 0 || |
330 | 0 | bind_config->additional_source_addresses_size() > 0)) { |
331 | 0 | throwEnvoyExceptionOrPanic(fmt::format( |
332 | 0 | "{}'s upstream binding config has extra/additional source addresses but no " |
333 | 0 | "source_address. Extra/additional addresses cannot be specified if " |
334 | 0 | "source_address is not set.", |
335 | 0 | !(cluster_name.has_value()) ? "Bootstrap" |
336 | 0 | : fmt::format("Cluster {}", cluster_name.value()))); |
337 | 0 | } |
338 | 0 | } |
339 | 3.26k | UpstreamLocalAddressSelectorFactory* local_address_selector_factory; |
340 | 3.26k | const envoy::config::core::v3::TypedExtensionConfig* const local_address_selector_config = |
341 | 3.26k | bind_config.has_value() && bind_config->has_local_address_selector() |
342 | 3.26k | ? &bind_config->local_address_selector() |
343 | 3.26k | : nullptr; |
344 | 3.26k | if (local_address_selector_config) { |
345 | 0 | local_address_selector_factory = |
346 | 0 | Config::Utility::getAndCheckFactory<UpstreamLocalAddressSelectorFactory>( |
347 | 0 | *local_address_selector_config, false); |
348 | 3.26k | } else { |
349 | | // Create the default local address selector if one was not specified. |
350 | 3.26k | envoy::config::upstream::local_address_selector::v3::DefaultLocalAddressSelector default_config; |
351 | 3.26k | envoy::config::core::v3::TypedExtensionConfig typed_extension; |
352 | 3.26k | typed_extension.mutable_typed_config()->PackFrom(default_config); |
353 | 3.26k | local_address_selector_factory = |
354 | 3.26k | Config::Utility::getAndCheckFactory<UpstreamLocalAddressSelectorFactory>(typed_extension, |
355 | 3.26k | false); |
356 | 3.26k | } |
357 | 3.26k | return local_address_selector_factory->createLocalAddressSelector( |
358 | 3.26k | parseBindConfig( |
359 | 3.26k | bind_config, cluster_name, |
360 | 3.26k | buildBaseSocketOptions(cluster_config, bootstrap_bind_config.value_or( |
361 | 3.26k | envoy::config::core::v3::BindConfig{})), |
362 | 3.26k | buildClusterSocketOptions(cluster_config, bootstrap_bind_config.value_or( |
363 | 3.26k | envoy::config::core::v3::BindConfig{}))), |
364 | 3.26k | cluster_name); |
365 | 3.26k | } |
366 | | |
367 | | } // namespace |
368 | | |
369 | | // Allow disabling ALPN checks for transport sockets. See |
370 | | // https://github.com/envoyproxy/envoy/issues/22876 |
371 | | const absl::string_view ClusterImplBase::DoNotValidateAlpnRuntimeKey = |
372 | | "config.do_not_validate_alpn_support"; |
373 | | |
374 | | // TODO(pianiststickman): this implementation takes a lock on the hot path and puts a copy of the |
375 | | // stat name into every host that receives a copy of that metric. This can be improved by putting |
376 | | // a single copy of the stat name into a thread-local key->index map so that the lock can be avoided |
377 | | // and using the index as the key to the stat map instead. |
378 | 0 | void LoadMetricStatsImpl::add(const absl::string_view key, double value) { |
379 | 0 | absl::MutexLock lock(&mu_); |
380 | 0 | if (map_ == nullptr) { |
381 | 0 | map_ = std::make_unique<StatMap>(); |
382 | 0 | } |
383 | 0 | Stat& stat = (*map_)[key]; |
384 | 0 | ++stat.num_requests_with_metric; |
385 | 0 | stat.total_metric_value += value; |
386 | 0 | } |
387 | | |
388 | 0 | LoadMetricStats::StatMapPtr LoadMetricStatsImpl::latch() { |
389 | 0 | absl::MutexLock lock(&mu_); |
390 | 0 | StatMapPtr latched = std::move(map_); |
391 | 0 | map_ = nullptr; |
392 | 0 | return latched; |
393 | 0 | } |
394 | | |
395 | | HostDescriptionImpl::HostDescriptionImpl( |
396 | | ClusterInfoConstSharedPtr cluster, const std::string& hostname, |
397 | | Network::Address::InstanceConstSharedPtr dest_address, MetadataConstSharedPtr metadata, |
398 | | const envoy::config::core::v3::Locality& locality, |
399 | | const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config, |
400 | | uint32_t priority, TimeSource& time_source) |
401 | | : cluster_(cluster), hostname_(hostname), |
402 | | health_checks_hostname_(health_check_config.hostname()), address_(dest_address), |
403 | | canary_(Config::Metadata::metadataValue(metadata.get(), |
404 | | Config::MetadataFilters::get().ENVOY_LB, |
405 | | Config::MetadataEnvoyLbKeys::get().CANARY) |
406 | | .bool_value()), |
407 | | metadata_(metadata), locality_(locality), |
408 | | locality_zone_stat_name_(locality.zone(), cluster->statsScope().symbolTable()), |
409 | | priority_(priority), |
410 | | socket_factory_(resolveTransportSocketFactory(dest_address, metadata_.get())), |
411 | 211k | creation_time_(time_source.monotonicTime()) { |
412 | 211k | if (health_check_config.port_value() != 0 && dest_address->type() != Network::Address::Type::Ip) { |
413 | | // Setting the health check port to non-0 only works for IP-type addresses. Setting the port |
414 | | // for a pipe address is a misconfiguration. Throw an exception. |
415 | 0 | throwEnvoyExceptionOrPanic( |
416 | 0 | fmt::format("Invalid host configuration: non-zero port for non-IP address")); |
417 | 0 | } |
418 | 211k | health_check_address_ = resolveHealthCheckAddress(health_check_config, dest_address); |
419 | 211k | } Envoy::Upstream::HostDescriptionImpl::HostDescriptionImpl(std::__1::shared_ptr<Envoy::Upstream::ClusterInfo const>, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<Envoy::Network::Address::Instance const>, std::__1::shared_ptr<envoy::config::core::v3::Metadata const>, envoy::config::core::v3::Locality const&, envoy::config::endpoint::v3::Endpoint_HealthCheckConfig const&, unsigned int, Envoy::TimeSource&) Line | Count | Source | 411 | 207k | creation_time_(time_source.monotonicTime()) { | 412 | 207k | if (health_check_config.port_value() != 0 && dest_address->type() != Network::Address::Type::Ip) { | 413 | | // Setting the health check port to non-0 only works for IP-type addresses. Setting the port | 414 | | // for a pipe address is a misconfiguration. Throw an exception. | 415 | 0 | throwEnvoyExceptionOrPanic( | 416 | 0 | fmt::format("Invalid host configuration: non-zero port for non-IP address")); | 417 | 0 | } | 418 | 207k | health_check_address_ = resolveHealthCheckAddress(health_check_config, dest_address); | 419 | 207k | } |
Envoy::Upstream::HostDescriptionImpl::HostDescriptionImpl(std::__1::shared_ptr<Envoy::Upstream::ClusterInfo const>, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<Envoy::Network::Address::Instance const>, std::__1::shared_ptr<envoy::config::core::v3::Metadata const>, envoy::config::core::v3::Locality const&, envoy::config::endpoint::v3::Endpoint_HealthCheckConfig const&, unsigned int, Envoy::TimeSource&) Line | Count | Source | 411 | 3.75k | creation_time_(time_source.monotonicTime()) { | 412 | 3.75k | if (health_check_config.port_value() != 0 && dest_address->type() != Network::Address::Type::Ip) { | 413 | | // Setting the health check port to non-0 only works for IP-type addresses. Setting the port | 414 | | // for a pipe address is a misconfiguration. Throw an exception. | 415 | 0 | throwEnvoyExceptionOrPanic( | 416 | 0 | fmt::format("Invalid host configuration: non-zero port for non-IP address")); | 417 | 0 | } | 418 | 3.75k | health_check_address_ = resolveHealthCheckAddress(health_check_config, dest_address); | 419 | 3.75k | } |
|
420 | | |
421 | | Network::UpstreamTransportSocketFactory& HostDescriptionImpl::resolveTransportSocketFactory( |
422 | | const Network::Address::InstanceConstSharedPtr& dest_address, |
423 | 212k | const envoy::config::core::v3::Metadata* metadata) const { |
424 | 212k | auto match = cluster_->transportSocketMatcher().resolve(metadata); |
425 | 212k | match.stats_.total_match_count_.inc(); |
426 | 212k | ENVOY_LOG(debug, "transport socket match, socket {} selected for host with address {}", |
427 | 212k | match.name_, dest_address ? dest_address->asString() : "empty"); |
428 | | |
429 | 212k | return match.factory_; |
430 | 212k | } |
431 | | |
432 | | Host::CreateConnectionData HostImpl::createConnection( |
433 | | Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options, |
434 | 1.32k | Network::TransportSocketOptionsConstSharedPtr transport_socket_options) const { |
435 | 1.32k | return createConnection(dispatcher, cluster(), address(), addressList(), transportSocketFactory(), |
436 | 1.32k | options, transport_socket_options, shared_from_this()); |
437 | 1.32k | } |
438 | | |
439 | 207k | void HostImpl::setEdsHealthFlag(envoy::config::core::v3::HealthStatus health_status) { |
440 | | // Clear all old EDS health flags first. |
441 | 207k | HostImpl::healthFlagClear(Host::HealthFlag::FAILED_EDS_HEALTH); |
442 | 207k | HostImpl::healthFlagClear(Host::HealthFlag::DEGRADED_EDS_HEALTH); |
443 | | |
444 | | // Set the appropriate EDS health flag. |
445 | 207k | switch (health_status) { |
446 | 0 | case envoy::config::core::v3::UNHEALTHY: |
447 | 0 | FALLTHRU; |
448 | 0 | case envoy::config::core::v3::DRAINING: |
449 | 0 | FALLTHRU; |
450 | 0 | case envoy::config::core::v3::TIMEOUT: |
451 | 0 | HostImpl::healthFlagSet(Host::HealthFlag::FAILED_EDS_HEALTH); |
452 | 0 | break; |
453 | 0 | case envoy::config::core::v3::DEGRADED: |
454 | 0 | HostImpl::healthFlagSet(Host::HealthFlag::DEGRADED_EDS_HEALTH); |
455 | 0 | break; |
456 | 207k | default: |
457 | 207k | break; |
458 | | // No health flags should be set. |
459 | 207k | } |
460 | 207k | } |
461 | | |
462 | | Host::CreateConnectionData HostImpl::createHealthCheckConnection( |
463 | | Event::Dispatcher& dispatcher, |
464 | | Network::TransportSocketOptionsConstSharedPtr transport_socket_options, |
465 | 22.3k | const envoy::config::core::v3::Metadata* metadata) const { |
466 | | |
467 | 22.3k | Network::UpstreamTransportSocketFactory& factory = |
468 | 22.3k | (metadata != nullptr) ? resolveTransportSocketFactory(healthCheckAddress(), metadata) |
469 | 22.3k | : transportSocketFactory(); |
470 | 22.3k | return createConnection(dispatcher, cluster(), healthCheckAddress(), {}, factory, nullptr, |
471 | 22.3k | transport_socket_options, shared_from_this()); |
472 | 22.3k | } |
473 | | |
474 | | Host::CreateConnectionData HostImpl::createConnection( |
475 | | Event::Dispatcher& dispatcher, const ClusterInfo& cluster, |
476 | | const Network::Address::InstanceConstSharedPtr& address, |
477 | | const std::vector<Network::Address::InstanceConstSharedPtr>& address_list, |
478 | | Network::UpstreamTransportSocketFactory& socket_factory, |
479 | | const Network::ConnectionSocket::OptionsSharedPtr& options, |
480 | | Network::TransportSocketOptionsConstSharedPtr transport_socket_options, |
481 | 23.7k | HostDescriptionConstSharedPtr host) { |
482 | 23.7k | auto source_address_selector = cluster.getUpstreamLocalAddressSelector(); |
483 | | |
484 | 23.7k | Network::ClientConnectionPtr connection; |
485 | | // If the transport socket options indicate the connection should be |
486 | | // redirected to a proxy, create the TCP connection to the proxy's address not |
487 | | // the host's address. |
488 | 23.7k | if (transport_socket_options && transport_socket_options->http11ProxyInfo().has_value()) { |
489 | 0 | auto upstream_local_address = |
490 | 0 | source_address_selector->getUpstreamLocalAddress(address, options); |
491 | 0 | ENVOY_LOG(debug, "Connecting to configured HTTP/1.1 proxy at {}", |
492 | 0 | transport_socket_options->http11ProxyInfo()->proxy_address->asString()); |
493 | 0 | connection = dispatcher.createClientConnection( |
494 | 0 | transport_socket_options->http11ProxyInfo()->proxy_address, upstream_local_address.address_, |
495 | 0 | socket_factory.createTransportSocket(transport_socket_options, host), |
496 | 0 | upstream_local_address.socket_options_, transport_socket_options); |
497 | 23.7k | } else if (address_list.size() > 1) { |
498 | 0 | connection = std::make_unique<Network::HappyEyeballsConnectionImpl>( |
499 | 0 | dispatcher, address_list, source_address_selector, socket_factory, transport_socket_options, |
500 | 0 | host, options); |
501 | 23.7k | } else { |
502 | 23.7k | auto upstream_local_address = |
503 | 23.7k | source_address_selector->getUpstreamLocalAddress(address, options); |
504 | 23.7k | connection = dispatcher.createClientConnection( |
505 | 23.7k | address, upstream_local_address.address_, |
506 | 23.7k | socket_factory.createTransportSocket(transport_socket_options, host), |
507 | 23.7k | upstream_local_address.socket_options_, transport_socket_options); |
508 | 23.7k | } |
509 | | |
510 | 23.7k | connection->connectionInfoSetter().enableSettingInterfaceName( |
511 | 23.7k | cluster.setLocalInterfaceNameOnUpstreamConnections()); |
512 | 23.7k | connection->setBufferLimits(cluster.perConnectionBufferLimitBytes()); |
513 | 23.7k | cluster.createNetworkFilterChain(*connection); |
514 | 23.7k | return {std::move(connection), std::move(host)}; |
515 | 23.7k | } |
516 | | |
517 | 8.68M | void HostImpl::weight(uint32_t new_weight) { weight_ = std::max(1U, new_weight); } |
518 | | |
519 | | std::vector<HostsPerLocalityConstSharedPtr> HostsPerLocalityImpl::filter( |
520 | 3.26k | const std::vector<std::function<bool(const Host&)>>& predicates) const { |
521 | | // We keep two lists: one for being able to mutate the clone and one for returning to the |
522 | | // caller. Creating them both at the start avoids iterating over the mutable values at the end |
523 | | // to convert them to a const pointer. |
524 | 3.26k | std::vector<std::shared_ptr<HostsPerLocalityImpl>> mutable_clones; |
525 | 3.26k | std::vector<HostsPerLocalityConstSharedPtr> filtered_clones; |
526 | | |
527 | 3.26k | mutable_clones.reserve(predicates.size()); |
528 | 3.26k | filtered_clones.reserve(predicates.size()); |
529 | 13.0k | for (size_t i = 0; i < predicates.size(); ++i) { |
530 | 9.80k | mutable_clones.emplace_back(std::make_shared<HostsPerLocalityImpl>()); |
531 | 9.80k | filtered_clones.emplace_back(mutable_clones.back()); |
532 | 9.80k | mutable_clones.back()->local_ = local_; |
533 | 9.80k | } |
534 | | |
535 | 3.26k | for (const auto& hosts_locality : hosts_per_locality_) { |
536 | 3.26k | std::vector<HostVector> current_locality_hosts; |
537 | 3.26k | current_locality_hosts.resize(predicates.size()); |
538 | | |
539 | | // Since # of hosts >> # of predicates, we iterate over the hosts in the outer loop. |
540 | 3.26k | for (const auto& host : hosts_locality) { |
541 | 13.0k | for (size_t i = 0; i < predicates.size(); ++i) { |
542 | 9.80k | if (predicates[i](*host)) { |
543 | 3.26k | current_locality_hosts[i].emplace_back(host); |
544 | 3.26k | } |
545 | 9.80k | } |
546 | 3.26k | } |
547 | | |
548 | 13.0k | for (size_t i = 0; i < predicates.size(); ++i) { |
549 | 9.80k | mutable_clones[i]->hosts_per_locality_.push_back(std::move(current_locality_hosts[i])); |
550 | 9.80k | } |
551 | 3.26k | } |
552 | | |
553 | 3.26k | return filtered_clones; |
554 | 3.26k | } |
555 | | |
556 | | void HostSetImpl::updateHosts(PrioritySet::UpdateHostsParams&& update_hosts_params, |
557 | | LocalityWeightsConstSharedPtr locality_weights, |
558 | | const HostVector& hosts_added, const HostVector& hosts_removed, |
559 | | absl::optional<bool> weighted_priority_health, |
560 | 10.9k | absl::optional<uint32_t> overprovisioning_factor) { |
561 | 10.9k | if (weighted_priority_health.has_value()) { |
562 | 9.80k | weighted_priority_health_ = weighted_priority_health.value(); |
563 | 9.80k | } |
564 | 10.9k | if (overprovisioning_factor.has_value()) { |
565 | 9.80k | ASSERT(overprovisioning_factor.value() > 0); |
566 | 9.80k | overprovisioning_factor_ = overprovisioning_factor.value(); |
567 | 9.80k | } |
568 | 10.9k | hosts_ = std::move(update_hosts_params.hosts); |
569 | 10.9k | healthy_hosts_ = std::move(update_hosts_params.healthy_hosts); |
570 | 10.9k | degraded_hosts_ = std::move(update_hosts_params.degraded_hosts); |
571 | 10.9k | excluded_hosts_ = std::move(update_hosts_params.excluded_hosts); |
572 | 10.9k | hosts_per_locality_ = std::move(update_hosts_params.hosts_per_locality); |
573 | 10.9k | healthy_hosts_per_locality_ = std::move(update_hosts_params.healthy_hosts_per_locality); |
574 | 10.9k | degraded_hosts_per_locality_ = std::move(update_hosts_params.degraded_hosts_per_locality); |
575 | 10.9k | excluded_hosts_per_locality_ = std::move(update_hosts_params.excluded_hosts_per_locality); |
576 | 10.9k | locality_weights_ = std::move(locality_weights); |
577 | | |
578 | | // TODO(ggreenway): implement `weighted_priority_health` support in `rebuildLocalityScheduler`. |
579 | 10.9k | rebuildLocalityScheduler(healthy_locality_scheduler_, healthy_locality_entries_, |
580 | 10.9k | *healthy_hosts_per_locality_, healthy_hosts_->get(), hosts_per_locality_, |
581 | 10.9k | excluded_hosts_per_locality_, locality_weights_, |
582 | 10.9k | overprovisioning_factor_); |
583 | 10.9k | rebuildLocalityScheduler(degraded_locality_scheduler_, degraded_locality_entries_, |
584 | 10.9k | *degraded_hosts_per_locality_, degraded_hosts_->get(), |
585 | 10.9k | hosts_per_locality_, excluded_hosts_per_locality_, locality_weights_, |
586 | 10.9k | overprovisioning_factor_); |
587 | | |
588 | 10.9k | runUpdateCallbacks(hosts_added, hosts_removed); |
589 | 10.9k | } |
590 | | |
591 | | void HostSetImpl::rebuildLocalityScheduler( |
592 | | std::unique_ptr<EdfScheduler<LocalityEntry>>& locality_scheduler, |
593 | | std::vector<std::shared_ptr<LocalityEntry>>& locality_entries, |
594 | | const HostsPerLocality& eligible_hosts_per_locality, const HostVector& eligible_hosts, |
595 | | HostsPerLocalityConstSharedPtr all_hosts_per_locality, |
596 | | HostsPerLocalityConstSharedPtr excluded_hosts_per_locality, |
597 | 21.8k | LocalityWeightsConstSharedPtr locality_weights, uint32_t overprovisioning_factor) { |
598 | | // Rebuild the locality scheduler by computing the effective weight of each |
599 | | // locality in this priority. The scheduler is reset by default, and is rebuilt only if we have |
600 | | // locality weights (i.e. using EDS) and there is at least one eligible host in this priority. |
601 | | // |
602 | | // We omit building a scheduler when there are zero eligible hosts in the priority as |
603 | | // all the localities will have zero effective weight. At selection time, we'll either select |
604 | | // from a different scheduler or there will be no available hosts in the priority. At that point |
605 | | // we'll rely on other mechanisms such as panic mode to select a host, none of which rely on the |
606 | | // scheduler. |
607 | | // |
608 | | // TODO(htuch): if the underlying locality index -> |
609 | | // envoy::config::core::v3::Locality hasn't changed in hosts_/healthy_hosts_/degraded_hosts_, we |
610 | | // could just update locality_weight_ without rebuilding. Similar to how host |
611 | | // level WRR works, we would age out the existing entries via picks and lazily |
612 | | // apply the new weights. |
613 | 21.8k | locality_scheduler = nullptr; |
614 | 21.8k | if (all_hosts_per_locality != nullptr && locality_weights != nullptr && |
615 | 21.8k | !locality_weights->empty() && !eligible_hosts.empty()) { |
616 | 9.80k | locality_scheduler = std::make_unique<EdfScheduler<LocalityEntry>>(); |
617 | 9.80k | locality_entries.clear(); |
618 | 19.6k | for (uint32_t i = 0; i < all_hosts_per_locality->get().size(); ++i) { |
619 | 9.80k | const double effective_weight = effectiveLocalityWeight( |
620 | 9.80k | i, eligible_hosts_per_locality, *excluded_hosts_per_locality, *all_hosts_per_locality, |
621 | 9.80k | *locality_weights, overprovisioning_factor); |
622 | 9.80k | if (effective_weight > 0) { |
623 | 0 | locality_entries.emplace_back(std::make_shared<LocalityEntry>(i, effective_weight)); |
624 | 0 | locality_scheduler->add(effective_weight, locality_entries.back()); |
625 | 0 | } |
626 | 9.80k | } |
627 | | // If all effective weights were zero, reset the scheduler. |
628 | 9.80k | if (locality_scheduler->empty()) { |
629 | 9.80k | locality_scheduler = nullptr; |
630 | 9.80k | } |
631 | 9.80k | } |
632 | 21.8k | } |
633 | | |
634 | 0 | absl::optional<uint32_t> HostSetImpl::chooseHealthyLocality() { |
635 | 0 | return chooseLocality(healthy_locality_scheduler_.get()); |
636 | 0 | } |
637 | | |
638 | 0 | absl::optional<uint32_t> HostSetImpl::chooseDegradedLocality() { |
639 | 0 | return chooseLocality(degraded_locality_scheduler_.get()); |
640 | 0 | } |
641 | | |
642 | | absl::optional<uint32_t> |
643 | 0 | HostSetImpl::chooseLocality(EdfScheduler<LocalityEntry>* locality_scheduler) { |
644 | 0 | if (locality_scheduler == nullptr) { |
645 | 0 | return {}; |
646 | 0 | } |
647 | 0 | const std::shared_ptr<LocalityEntry> locality = locality_scheduler->pickAndAdd( |
648 | 0 | [](const LocalityEntry& locality) { return locality.effective_weight_; }); |
649 | | // We don't build a schedule if there are no weighted localities, so we should always succeed. |
650 | 0 | ASSERT(locality != nullptr); |
651 | | // If we picked it before, its weight must have been positive. |
652 | 0 | ASSERT(locality->effective_weight_ > 0); |
653 | 0 | return locality->index_; |
654 | 0 | } |
655 | | |
656 | | PrioritySet::UpdateHostsParams |
657 | | HostSetImpl::updateHostsParams(HostVectorConstSharedPtr hosts, |
658 | | HostsPerLocalityConstSharedPtr hosts_per_locality, |
659 | | HealthyHostVectorConstSharedPtr healthy_hosts, |
660 | | HostsPerLocalityConstSharedPtr healthy_hosts_per_locality, |
661 | | DegradedHostVectorConstSharedPtr degraded_hosts, |
662 | | HostsPerLocalityConstSharedPtr degraded_hosts_per_locality, |
663 | | ExcludedHostVectorConstSharedPtr excluded_hosts, |
664 | 7.64k | HostsPerLocalityConstSharedPtr excluded_hosts_per_locality) { |
665 | 7.64k | return PrioritySet::UpdateHostsParams{std::move(hosts), |
666 | 7.64k | std::move(healthy_hosts), |
667 | 7.64k | std::move(degraded_hosts), |
668 | 7.64k | std::move(excluded_hosts), |
669 | 7.64k | std::move(hosts_per_locality), |
670 | 7.64k | std::move(healthy_hosts_per_locality), |
671 | 7.64k | std::move(degraded_hosts_per_locality), |
672 | 7.64k | std::move(excluded_hosts_per_locality)}; |
673 | 7.64k | } |
674 | | |
675 | 4.37k | PrioritySet::UpdateHostsParams HostSetImpl::updateHostsParams(const HostSet& host_set) { |
676 | 4.37k | return updateHostsParams(host_set.hostsPtr(), host_set.hostsPerLocalityPtr(), |
677 | 4.37k | host_set.healthyHostsPtr(), host_set.healthyHostsPerLocalityPtr(), |
678 | 4.37k | host_set.degradedHostsPtr(), host_set.degradedHostsPerLocalityPtr(), |
679 | 4.37k | host_set.excludedHostsPtr(), host_set.excludedHostsPerLocalityPtr()); |
680 | 4.37k | } |
681 | | PrioritySet::UpdateHostsParams |
682 | | HostSetImpl::partitionHosts(HostVectorConstSharedPtr hosts, |
683 | 3.26k | HostsPerLocalityConstSharedPtr hosts_per_locality) { |
684 | 3.26k | auto partitioned_hosts = ClusterImplBase::partitionHostList(*hosts); |
685 | 3.26k | auto healthy_degraded_excluded_hosts_per_locality = |
686 | 3.26k | ClusterImplBase::partitionHostsPerLocality(*hosts_per_locality); |
687 | | |
688 | 3.26k | return updateHostsParams(std::move(hosts), std::move(hosts_per_locality), |
689 | 3.26k | std::move(std::get<0>(partitioned_hosts)), |
690 | 3.26k | std::move(std::get<0>(healthy_degraded_excluded_hosts_per_locality)), |
691 | 3.26k | std::move(std::get<1>(partitioned_hosts)), |
692 | 3.26k | std::move(std::get<1>(healthy_degraded_excluded_hosts_per_locality)), |
693 | 3.26k | std::move(std::get<2>(partitioned_hosts)), |
694 | 3.26k | std::move(std::get<2>(healthy_degraded_excluded_hosts_per_locality))); |
695 | 3.26k | } |
696 | | |
697 | | double HostSetImpl::effectiveLocalityWeight(uint32_t index, |
698 | | const HostsPerLocality& eligible_hosts_per_locality, |
699 | | const HostsPerLocality& excluded_hosts_per_locality, |
700 | | const HostsPerLocality& all_hosts_per_locality, |
701 | | const LocalityWeights& locality_weights, |
702 | 9.80k | uint32_t overprovisioning_factor) { |
703 | 9.80k | const auto& locality_eligible_hosts = eligible_hosts_per_locality.get()[index]; |
704 | 9.80k | const uint32_t excluded_count = excluded_hosts_per_locality.get().size() > index |
705 | 9.80k | ? excluded_hosts_per_locality.get()[index].size() |
706 | 9.80k | : 0; |
707 | 9.80k | const auto host_count = all_hosts_per_locality.get()[index].size() - excluded_count; |
708 | 9.80k | if (host_count == 0) { |
709 | 0 | return 0.0; |
710 | 0 | } |
711 | 9.80k | const double locality_availability_ratio = 1.0 * locality_eligible_hosts.size() / host_count; |
712 | 9.80k | const uint32_t weight = locality_weights[index]; |
713 | | // Availability ranges from 0-1.0, and is the ratio of eligible hosts to total hosts, modified |
714 | | // by the overprovisioning factor. |
715 | 9.80k | const double effective_locality_availability_ratio = |
716 | 9.80k | std::min(1.0, (overprovisioning_factor / 100.0) * locality_availability_ratio); |
717 | 9.80k | return weight * effective_locality_availability_ratio; |
718 | 9.80k | } |
719 | | |
720 | | const HostSet& |
721 | | PrioritySetImpl::getOrCreateHostSet(uint32_t priority, |
722 | | absl::optional<bool> weighted_priority_health, |
723 | 20.8k | absl::optional<uint32_t> overprovisioning_factor) { |
724 | 20.8k | if (host_sets_.size() < priority + 1) { |
725 | 19.7k | for (size_t i = host_sets_.size(); i <= priority; ++i) { |
726 | 9.89k | HostSetImplPtr host_set = createHostSet(i, weighted_priority_health, overprovisioning_factor); |
727 | 9.89k | host_sets_priority_update_cbs_.push_back( |
728 | 9.89k | host_set->addPriorityUpdateCb([this](uint32_t priority, const HostVector& hosts_added, |
729 | 10.9k | const HostVector& hosts_removed) { |
730 | 10.9k | runReferenceUpdateCallbacks(priority, hosts_added, hosts_removed); |
731 | 10.9k | })); |
732 | 9.89k | host_sets_.push_back(std::move(host_set)); |
733 | 9.89k | } |
734 | 9.89k | } |
735 | 20.8k | return *host_sets_[priority]; |
736 | 20.8k | } |
737 | | |
738 | | void PrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params, |
739 | | LocalityWeightsConstSharedPtr locality_weights, |
740 | | const HostVector& hosts_added, const HostVector& hosts_removed, |
741 | | absl::optional<bool> weighted_priority_health, |
742 | | absl::optional<uint32_t> overprovisioning_factor, |
743 | 10.9k | HostMapConstSharedPtr cross_priority_host_map) { |
744 | | // Update cross priority host map first. In this way, when the update callbacks of the priority |
745 | | // set are executed, the latest host map can always be obtained. |
746 | 10.9k | if (cross_priority_host_map != nullptr) { |
747 | 6.53k | const_cross_priority_host_map_ = std::move(cross_priority_host_map); |
748 | 6.53k | } |
749 | | |
750 | | // Ensure that we have a HostSet for the given priority. |
751 | 10.9k | getOrCreateHostSet(priority, weighted_priority_health, overprovisioning_factor); |
752 | 10.9k | static_cast<HostSetImpl*>(host_sets_[priority].get()) |
753 | 10.9k | ->updateHosts(std::move(update_hosts_params), std::move(locality_weights), hosts_added, |
754 | 10.9k | hosts_removed, weighted_priority_health, overprovisioning_factor); |
755 | | |
756 | 10.9k | if (!batch_update_) { |
757 | 10.8k | runUpdateCallbacks(hosts_added, hosts_removed); |
758 | 10.8k | } |
759 | 10.9k | } |
760 | | |
761 | 14 | void PrioritySetImpl::batchHostUpdate(BatchUpdateCb& callback) { |
762 | 14 | BatchUpdateScope scope(*this); |
763 | | |
764 | | // We wrap the update call with a lambda that tracks all the hosts that have been added/removed. |
765 | 14 | callback.batchUpdate(scope); |
766 | | |
767 | | // Now that all the updates have been complete, we can compute the diff. |
768 | 14 | HostVector net_hosts_added = filterHosts(scope.all_hosts_added_, scope.all_hosts_removed_); |
769 | 14 | HostVector net_hosts_removed = filterHosts(scope.all_hosts_removed_, scope.all_hosts_added_); |
770 | | |
771 | 14 | runUpdateCallbacks(net_hosts_added, net_hosts_removed); |
772 | 14 | } |
773 | | |
774 | | void PrioritySetImpl::BatchUpdateScope::updateHosts( |
775 | | uint32_t priority, PrioritySet::UpdateHostsParams&& update_hosts_params, |
776 | | LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, |
777 | | const HostVector& hosts_removed, absl::optional<bool> weighted_priority_health, |
778 | 14 | absl::optional<uint32_t> overprovisioning_factor) { |
779 | | // We assume that each call updates a different priority. |
780 | 14 | ASSERT(priorities_.find(priority) == priorities_.end()); |
781 | 14 | priorities_.insert(priority); |
782 | | |
783 | 14 | for (const auto& host : hosts_added) { |
784 | 14 | all_hosts_added_.insert(host); |
785 | 14 | } |
786 | | |
787 | 14 | for (const auto& host : hosts_removed) { |
788 | 0 | all_hosts_removed_.insert(host); |
789 | 0 | } |
790 | | |
791 | 14 | parent_.updateHosts(priority, std::move(update_hosts_params), locality_weights, hosts_added, |
792 | 14 | hosts_removed, weighted_priority_health, overprovisioning_factor); |
793 | 14 | } |
794 | | |
795 | | void MainPrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params, |
796 | | LocalityWeightsConstSharedPtr locality_weights, |
797 | | const HostVector& hosts_added, |
798 | | const HostVector& hosts_removed, |
799 | | absl::optional<bool> weighted_priority_health, |
800 | | absl::optional<uint32_t> overprovisioning_factor, |
801 | 3.26k | HostMapConstSharedPtr cross_priority_host_map) { |
802 | 3.26k | ASSERT(cross_priority_host_map == nullptr, |
803 | 3.26k | "External cross-priority host map is meaningless to MainPrioritySetImpl"); |
804 | 3.26k | updateCrossPriorityHostMap(hosts_added, hosts_removed); |
805 | | |
806 | 3.26k | PrioritySetImpl::updateHosts(priority, std::move(update_hosts_params), locality_weights, |
807 | 3.26k | hosts_added, hosts_removed, weighted_priority_health, |
808 | 3.26k | overprovisioning_factor); |
809 | 3.26k | } |
810 | | |
811 | 3.28k | HostMapConstSharedPtr MainPrioritySetImpl::crossPriorityHostMap() const { |
812 | | // Check if the host set in the main thread PrioritySet has been updated. |
813 | 3.28k | if (mutable_cross_priority_host_map_ != nullptr) { |
814 | 3.26k | const_cross_priority_host_map_ = std::move(mutable_cross_priority_host_map_); |
815 | 3.26k | ASSERT(mutable_cross_priority_host_map_ == nullptr); |
816 | 3.26k | } |
817 | 3.28k | return const_cross_priority_host_map_; |
818 | 3.28k | } |
819 | | |
820 | | void MainPrioritySetImpl::updateCrossPriorityHostMap(const HostVector& hosts_added, |
821 | 3.26k | const HostVector& hosts_removed) { |
822 | 3.26k | if (hosts_added.empty() && hosts_removed.empty()) { |
823 | | // No new hosts have been added and no old hosts have been removed. |
824 | 0 | return; |
825 | 0 | } |
826 | | |
827 | | // Since read_only_all_host_map_ may be shared by multiple threads, when the host set changes, |
828 | | // we cannot directly modify read_only_all_host_map_. |
829 | 3.26k | if (mutable_cross_priority_host_map_ == nullptr) { |
830 | | // Copy old read only host map to mutable host map. |
831 | 3.26k | mutable_cross_priority_host_map_ = std::make_shared<HostMap>(*const_cross_priority_host_map_); |
832 | 3.26k | } |
833 | | |
834 | 3.26k | for (const auto& host : hosts_removed) { |
835 | 0 | mutable_cross_priority_host_map_->erase(addressToString(host->address())); |
836 | 0 | } |
837 | | |
838 | 3.26k | for (const auto& host : hosts_added) { |
839 | 3.26k | mutable_cross_priority_host_map_->insert({addressToString(host->address()), host}); |
840 | 3.26k | } |
841 | 3.26k | } |
842 | | |
843 | | DeferredCreationCompatibleClusterTrafficStats |
844 | | ClusterInfoImpl::generateStats(Stats::ScopeSharedPtr scope, |
845 | 407k | const ClusterTrafficStatNames& stat_names, bool defer_creation) { |
846 | 407k | return Stats::createDeferredCompatibleStats<ClusterTrafficStats>(scope, stat_names, |
847 | 407k | defer_creation); |
848 | 407k | } |
849 | | |
850 | | ClusterRequestResponseSizeStats ClusterInfoImpl::generateRequestResponseSizeStats( |
851 | 403k | Stats::Scope& scope, const ClusterRequestResponseSizeStatNames& stat_names) { |
852 | 403k | return {stat_names, scope}; |
853 | 403k | } |
854 | | |
855 | | ClusterLoadReportStats |
856 | | ClusterInfoImpl::generateLoadReportStats(Stats::Scope& scope, |
857 | 407k | const ClusterLoadReportStatNames& stat_names) { |
858 | 407k | return {stat_names, scope}; |
859 | 407k | } |
860 | | |
861 | | ClusterTimeoutBudgetStats |
862 | | ClusterInfoImpl::generateTimeoutBudgetStats(Stats::Scope& scope, |
863 | 403k | const ClusterTimeoutBudgetStatNames& stat_names) { |
864 | 403k | return {stat_names, scope}; |
865 | 403k | } |
866 | | |
867 | | std::shared_ptr<const ClusterInfoImpl::HttpProtocolOptionsConfigImpl> |
868 | | createOptions(const envoy::config::cluster::v3::Cluster& config, |
869 | | std::shared_ptr<const ClusterInfoImpl::HttpProtocolOptionsConfigImpl>&& options, |
870 | 3.26k | ProtobufMessage::ValidationVisitor& validation_visitor) { |
871 | 3.26k | if (options) { |
872 | 2.24k | return std::move(options); |
873 | 2.24k | } |
874 | | |
875 | 1.02k | if (config.protocol_selection() == envoy::config::cluster::v3::Cluster::USE_CONFIGURED_PROTOCOL) { |
876 | | // Make sure multiple protocol configurations are not present |
877 | 1.02k | if (config.has_http_protocol_options() && config.has_http2_protocol_options()) { |
878 | 0 | throwEnvoyExceptionOrPanic( |
879 | 0 | fmt::format("cluster: Both HTTP1 and HTTP2 options may only be " |
880 | 0 | "configured with non-default 'protocol_selection' values")); |
881 | 0 | } |
882 | 1.02k | } |
883 | | |
884 | 1.02k | return std::make_shared<ClusterInfoImpl::HttpProtocolOptionsConfigImpl>( |
885 | 1.02k | config.http_protocol_options(), config.http2_protocol_options(), |
886 | 1.02k | config.common_http_protocol_options(), |
887 | 1.02k | (config.has_upstream_http_protocol_options() |
888 | 1.02k | ? absl::make_optional<envoy::config::core::v3::UpstreamHttpProtocolOptions>( |
889 | 0 | config.upstream_http_protocol_options()) |
890 | 1.02k | : absl::nullopt), |
891 | 1.02k | config.protocol_selection() == envoy::config::cluster::v3::Cluster::USE_DOWNSTREAM_PROTOCOL, |
892 | 1.02k | config.has_http2_protocol_options(), validation_visitor); |
893 | 1.02k | } |
894 | | |
895 | | absl::StatusOr<LegacyLbPolicyConfigHelper::Result> |
896 | | LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProtoWithoutSubset( |
897 | 3.26k | const ClusterProto& cluster, ProtobufMessage::ValidationVisitor& visitor) { |
898 | | |
899 | 3.26k | LoadBalancerConfigPtr lb_config; |
900 | 3.26k | TypedLoadBalancerFactory* lb_factory = nullptr; |
901 | | |
902 | 3.26k | switch (cluster.lb_policy()) { |
903 | 0 | PANIC_ON_PROTO_ENUM_SENTINEL_VALUES; |
904 | 3.26k | case ClusterProto::ROUND_ROBIN: |
905 | 3.26k | lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>( |
906 | 3.26k | "envoy.load_balancing_policies.round_robin"); |
907 | 3.26k | break; |
908 | 0 | case ClusterProto::LEAST_REQUEST: |
909 | 0 | lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>( |
910 | 0 | "envoy.load_balancing_policies.least_request"); |
911 | 0 | break; |
912 | 0 | case ClusterProto::RANDOM: |
913 | 0 | lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>( |
914 | 0 | "envoy.load_balancing_policies.random"); |
915 | 0 | break; |
916 | 0 | case ClusterProto::RING_HASH: |
917 | 0 | lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>( |
918 | 0 | "envoy.load_balancing_policies.ring_hash"); |
919 | 0 | break; |
920 | 0 | case ClusterProto::MAGLEV: |
921 | 0 | lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>( |
922 | 0 | "envoy.load_balancing_policies.maglev"); |
923 | 0 | break; |
924 | 0 | case ClusterProto::CLUSTER_PROVIDED: |
925 | 0 | lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>( |
926 | 0 | "envoy.load_balancing_policies.cluster_provided"); |
927 | 0 | break; |
928 | 0 | case ClusterProto::LOAD_BALANCING_POLICY_CONFIG: |
929 | | // 'LOAD_BALANCING_POLICY_CONFIG' should be handled by the 'configureLbPolicies' |
930 | | // function and should not reach here. |
931 | 0 | PANIC("getTypedLbConfigFromLegacyProtoWithoutSubset: should not reach here"); |
932 | 0 | break; |
933 | 3.26k | } |
934 | | |
935 | 3.26k | if (lb_factory == nullptr) { |
936 | 0 | return absl::InvalidArgumentError( |
937 | 0 | fmt::format("No load balancer factory found for LB type: {}", |
938 | 0 | ClusterProto::LbPolicy_Name(cluster.lb_policy()))); |
939 | 0 | } |
940 | | |
941 | 3.26k | ASSERT(lb_factory != nullptr); |
942 | 3.26k | return Result{lb_factory, lb_factory->loadConfig(cluster, visitor)}; |
943 | 3.26k | } |
944 | | |
945 | | absl::StatusOr<LegacyLbPolicyConfigHelper::Result> |
946 | | LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProto( |
947 | 3.26k | const ClusterProto& cluster, ProtobufMessage::ValidationVisitor& visitor) { |
948 | | |
949 | | // Handle the lb subset config case first. |
950 | 3.26k | if (cluster.has_lb_subset_config()) { |
951 | 0 | auto* lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>( |
952 | 0 | "envoy.load_balancing_policies.subset"); |
953 | 0 | if (lb_factory != nullptr) { |
954 | 0 | return Result{lb_factory, lb_factory->loadConfig(cluster, visitor)}; |
955 | 0 | } |
956 | 0 | return absl::InvalidArgumentError("No subset load balancer factory found"); |
957 | 0 | } |
958 | | |
959 | 3.26k | return getTypedLbConfigFromLegacyProtoWithoutSubset(cluster, visitor); |
960 | 3.26k | } |
961 | | |
962 | 3.26k | LBPolicyConfig::LBPolicyConfig(const envoy::config::cluster::v3::Cluster& config) { |
963 | 3.26k | switch (config.lb_config_case()) { |
964 | 0 | case envoy::config::cluster::v3::Cluster::kRoundRobinLbConfig: |
965 | 0 | lb_policy_ = std::make_unique<const envoy::config::cluster::v3::Cluster::RoundRobinLbConfig>( |
966 | 0 | config.round_robin_lb_config()); |
967 | 0 | break; |
968 | 0 | case envoy::config::cluster::v3::Cluster::kLeastRequestLbConfig: |
969 | 0 | lb_policy_ = std::make_unique<envoy::config::cluster::v3::Cluster::LeastRequestLbConfig>( |
970 | 0 | config.least_request_lb_config()); |
971 | 0 | break; |
972 | 0 | case envoy::config::cluster::v3::Cluster::kRingHashLbConfig: |
973 | 0 | lb_policy_ = std::make_unique<envoy::config::cluster::v3::Cluster::RingHashLbConfig>( |
974 | 0 | config.ring_hash_lb_config()); |
975 | 0 | break; |
976 | 0 | case envoy::config::cluster::v3::Cluster::kMaglevLbConfig: |
977 | 0 | lb_policy_ = std::make_unique<envoy::config::cluster::v3::Cluster::MaglevLbConfig>( |
978 | 0 | config.maglev_lb_config()); |
979 | 0 | break; |
980 | 0 | case envoy::config::cluster::v3::Cluster::kOriginalDstLbConfig: |
981 | 0 | lb_policy_ = std::make_unique<envoy::config::cluster::v3::Cluster::OriginalDstLbConfig>( |
982 | 0 | config.original_dst_lb_config()); |
983 | 0 | break; |
984 | 3.26k | case envoy::config::cluster::v3::Cluster::LB_CONFIG_NOT_SET: |
985 | | // The default value of the variant if there is no config would be nullptr |
986 | | // Which is set when the class is initialized. No action needed if config isn't set |
987 | 3.26k | break; |
988 | 3.26k | } |
989 | 3.26k | } |
990 | | |
991 | | ClusterInfoImpl::ClusterInfoImpl( |
992 | | Init::Manager& init_manager, Server::Configuration::ServerFactoryContext& server_context, |
993 | | const envoy::config::cluster::v3::Cluster& config, |
994 | | const absl::optional<envoy::config::core::v3::BindConfig>& bind_config, |
995 | | Runtime::Loader& runtime, TransportSocketMatcherPtr&& socket_matcher, |
996 | | Stats::ScopeSharedPtr&& stats_scope, bool added_via_api, |
997 | | Server::Configuration::TransportSocketFactoryContext& factory_context) |
998 | | : runtime_(runtime), name_(config.name()), |
999 | | observability_name_(!config.alt_stat_name().empty() |
1000 | | ? std::make_unique<std::string>(config.alt_stat_name()) |
1001 | | : nullptr), |
1002 | | eds_service_name_( |
1003 | | config.has_eds_cluster_config() |
1004 | | ? std::make_unique<std::string>(config.eds_cluster_config().service_name()) |
1005 | | : nullptr), |
1006 | | extension_protocol_options_(parseExtensionProtocolOptions(config, factory_context)), |
1007 | | http_protocol_options_( |
1008 | | createOptions(config, |
1009 | | extensionProtocolOptionsTyped<HttpProtocolOptionsConfigImpl>( |
1010 | | "envoy.extensions.upstreams.http.v3.HttpProtocolOptions"), |
1011 | | factory_context.messageValidationVisitor())), |
1012 | | tcp_protocol_options_(extensionProtocolOptionsTyped<TcpProtocolOptionsConfigImpl>( |
1013 | | "envoy.extensions.upstreams.tcp.v3.TcpProtocolOptions")), |
1014 | | max_requests_per_connection_(PROTOBUF_GET_WRAPPED_OR_DEFAULT( |
1015 | | http_protocol_options_->common_http_protocol_options_, max_requests_per_connection, |
1016 | | config.max_requests_per_connection().value())), |
1017 | | connect_timeout_( |
1018 | | std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, connect_timeout, 5000))), |
1019 | | per_upstream_preconnect_ratio_(PROTOBUF_GET_WRAPPED_OR_DEFAULT( |
1020 | | config.preconnect_policy(), per_upstream_preconnect_ratio, 1.0)), |
1021 | | peekahead_ratio_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.preconnect_policy(), |
1022 | | predictive_preconnect_ratio, 0)), |
1023 | | socket_matcher_(std::move(socket_matcher)), stats_scope_(std::move(stats_scope)), |
1024 | | traffic_stats_(generateStats(stats_scope_, |
1025 | | factory_context.clusterManager().clusterStatNames(), |
1026 | | server_context.statsConfig().enableDeferredCreationStats())), |
1027 | | config_update_stats_(factory_context.clusterManager().clusterConfigUpdateStatNames(), |
1028 | | *stats_scope_), |
1029 | | lb_stats_(factory_context.clusterManager().clusterLbStatNames(), *stats_scope_), |
1030 | | endpoint_stats_(factory_context.clusterManager().clusterEndpointStatNames(), *stats_scope_), |
1031 | | load_report_stats_store_(stats_scope_->symbolTable()), |
1032 | | load_report_stats_( |
1033 | | generateLoadReportStats(*load_report_stats_store_.rootScope(), |
1034 | | factory_context.clusterManager().clusterLoadReportStatNames())), |
1035 | | optional_cluster_stats_((config.has_track_cluster_stats() || config.track_timeout_budgets()) |
1036 | | ? std::make_unique<OptionalClusterStats>( |
1037 | | config, *stats_scope_, factory_context.clusterManager()) |
1038 | | : nullptr), |
1039 | | features_(ClusterInfoImpl::HttpProtocolOptionsConfigImpl::parseFeatures( |
1040 | | config, *http_protocol_options_)), |
1041 | | resource_managers_(config, runtime, name_, *stats_scope_, |
1042 | | factory_context.clusterManager().clusterCircuitBreakersStatNames()), |
1043 | | maintenance_mode_runtime_key_(absl::StrCat("upstream.maintenance_mode.", name_)), |
1044 | | upstream_local_address_selector_(createUpstreamLocalAddressSelector(config, bind_config)), |
1045 | | lb_policy_config_(std::make_unique<const LBPolicyConfig>(config)), |
1046 | | upstream_config_(config.has_upstream_config() |
1047 | | ? std::make_unique<envoy::config::core::v3::TypedExtensionConfig>( |
1048 | | config.upstream_config()) |
1049 | | : nullptr), |
1050 | | lb_subset_(config.has_lb_subset_config() |
1051 | | ? std::make_unique<LoadBalancerSubsetInfoImpl>(config.lb_subset_config()) |
1052 | | : nullptr), |
1053 | | metadata_(config.has_metadata() |
1054 | | ? std::make_unique<envoy::config::core::v3::Metadata>(config.metadata()) |
1055 | | : nullptr), |
1056 | | typed_metadata_(config.has_metadata() |
1057 | | ? std::make_unique<ClusterTypedMetadata>(config.metadata()) |
1058 | | : nullptr), |
1059 | | common_lb_config_( |
1060 | | factory_context.clusterManager().getCommonLbConfigPtr(config.common_lb_config())), |
1061 | | cluster_type_(config.has_cluster_type() |
1062 | | ? std::make_unique<envoy::config::cluster::v3::Cluster::CustomClusterType>( |
1063 | | config.cluster_type()) |
1064 | | : nullptr), |
1065 | | http_filter_config_provider_manager_( |
1066 | | Http::FilterChainUtility::createSingletonUpstreamFilterConfigProviderManager( |
1067 | | server_context)), |
1068 | | network_filter_config_provider_manager_( |
1069 | | createSingletonUpstreamNetworkFilterConfigProviderManager(server_context)), |
1070 | | upstream_context_(server_context, init_manager, *stats_scope_), |
1071 | | per_connection_buffer_limit_bytes_( |
1072 | | PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, per_connection_buffer_limit_bytes, 1024 * 1024)), |
1073 | | max_response_headers_count_(PROTOBUF_GET_WRAPPED_OR_DEFAULT( |
1074 | | http_protocol_options_->common_http_protocol_options_, max_headers_count, |
1075 | | runtime_.snapshot().getInteger(Http::MaxResponseHeadersCountOverrideKey, |
1076 | | Http::DEFAULT_MAX_HEADERS_COUNT))), |
1077 | | type_(config.type()), |
1078 | | drain_connections_on_host_removal_(config.ignore_health_on_host_removal()), |
1079 | | connection_pool_per_downstream_connection_( |
1080 | | config.connection_pool_per_downstream_connection()), |
1081 | | warm_hosts_(!config.health_checks().empty() && |
1082 | | common_lb_config_->ignore_new_hosts_until_first_hc()), |
1083 | | set_local_interface_name_on_upstream_connections_( |
1084 | | config.upstream_connection_options().set_local_interface_name_on_upstream_connections()), |
1085 | | added_via_api_(added_via_api), has_configured_http_filters_(false), |
1086 | | per_endpoint_stats_(config.has_track_cluster_stats() && |
1087 | 3.26k | config.track_cluster_stats().per_endpoint_stats()) { |
1088 | | #ifdef WIN32 |
1089 | | if (set_local_interface_name_on_upstream_connections_) { |
1090 | | throwEnvoyExceptionOrPanic( |
1091 | | "set_local_interface_name_on_upstream_connections_ cannot be set to true " |
1092 | | "on Windows platforms"); |
1093 | | } |
1094 | | #endif |
1095 | | |
1096 | | // Both LoadStatsReporter and per_endpoint_stats need to `latch()` the counters, so if both are |
1097 | | // configured they will interfere with each other and both get incorrect values. |
1098 | 3.26k | if (perEndpointStatsEnabled() && |
1099 | 3.26k | server_context.bootstrap().cluster_manager().has_load_stats_config()) { |
1100 | 0 | throwEnvoyExceptionOrPanic("Only one of cluster per_endpoint_stats and cluster manager " |
1101 | 0 | "load_stats_config can be specified"); |
1102 | 0 | } |
1103 | | |
1104 | 3.26k | if (config.has_max_requests_per_connection() && |
1105 | 3.26k | http_protocol_options_->common_http_protocol_options_.has_max_requests_per_connection()) { |
1106 | 0 | throwEnvoyExceptionOrPanic("Only one of max_requests_per_connection from Cluster or " |
1107 | 0 | "HttpProtocolOptions can be specified"); |
1108 | 0 | } |
1109 | | |
1110 | | // If load_balancing_policy is set we will use it directly, ignoring lb_policy. |
1111 | 3.26k | if (config.has_load_balancing_policy() || |
1112 | 3.26k | config.lb_policy() == envoy::config::cluster::v3::Cluster::LOAD_BALANCING_POLICY_CONFIG) { |
1113 | 0 | configureLbPolicies(config, server_context); |
1114 | 3.26k | } else if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.convert_legacy_lb_config")) { |
1115 | 3.26k | auto lb_pair = LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProto( |
1116 | 3.26k | config, server_context.messageValidationVisitor()); |
1117 | | |
1118 | 3.26k | if (!lb_pair.ok()) { |
1119 | 0 | throwEnvoyExceptionOrPanic(std::string(lb_pair.status().message())); |
1120 | 0 | } |
1121 | | |
1122 | 3.26k | load_balancer_config_ = std::move(lb_pair->config); |
1123 | 3.26k | load_balancer_factory_ = lb_pair->factory; |
1124 | 3.26k | lb_type_ = LoadBalancerType::LoadBalancingPolicyConfig; |
1125 | | |
1126 | 3.26k | RELEASE_ASSERT( |
1127 | 3.26k | load_balancer_factory_, |
1128 | 3.26k | fmt::format( |
1129 | 3.26k | "No load balancer factory found from legacy LB configuration (type: {}, subset: {}).", |
1130 | 3.26k | envoy::config::cluster::v3::Cluster::LbPolicy_Name(config.lb_policy()), |
1131 | 3.26k | config.has_lb_subset_config())); |
1132 | | |
1133 | | // Clear unnecessary legacy config because all legacy config is wrapped in load_balancer_config_ |
1134 | | // except the original_dst_lb_config. |
1135 | 3.26k | lb_subset_ = nullptr; |
1136 | 3.26k | if (config.lb_policy() != envoy::config::cluster::v3::Cluster::CLUSTER_PROVIDED) { |
1137 | 3.26k | lb_policy_config_ = nullptr; |
1138 | 3.26k | } |
1139 | 3.26k | } else { |
1140 | 0 | switch (config.lb_policy()) { |
1141 | 0 | PANIC_ON_PROTO_ENUM_SENTINEL_VALUES; |
1142 | 0 | case envoy::config::cluster::v3::Cluster::ROUND_ROBIN: |
1143 | 0 | lb_type_ = LoadBalancerType::RoundRobin; |
1144 | 0 | break; |
1145 | 0 | case envoy::config::cluster::v3::Cluster::LEAST_REQUEST: |
1146 | 0 | lb_type_ = LoadBalancerType::LeastRequest; |
1147 | 0 | break; |
1148 | 0 | case envoy::config::cluster::v3::Cluster::RANDOM: |
1149 | 0 | lb_type_ = LoadBalancerType::Random; |
1150 | 0 | break; |
1151 | 0 | case envoy::config::cluster::v3::Cluster::RING_HASH: |
1152 | 0 | lb_type_ = LoadBalancerType::RingHash; |
1153 | 0 | break; |
1154 | 0 | case envoy::config::cluster::v3::Cluster::MAGLEV: |
1155 | 0 | lb_type_ = LoadBalancerType::Maglev; |
1156 | 0 | break; |
1157 | 0 | case envoy::config::cluster::v3::Cluster::CLUSTER_PROVIDED: |
1158 | 0 | if (config.has_lb_subset_config()) { |
1159 | 0 | throwEnvoyExceptionOrPanic( |
1160 | 0 | fmt::format("cluster: LB policy {} cannot be combined with lb_subset_config", |
1161 | 0 | envoy::config::cluster::v3::Cluster::LbPolicy_Name(config.lb_policy()))); |
1162 | 0 | } |
1163 | | |
1164 | 0 | lb_type_ = LoadBalancerType::ClusterProvided; |
1165 | 0 | break; |
1166 | 0 | case envoy::config::cluster::v3::Cluster::LOAD_BALANCING_POLICY_CONFIG: { |
1167 | | // 'LOAD_BALANCING_POLICY_CONFIG' should be handled by the 'configureLbPolicies' |
1168 | | // function in previous branch and should not reach here. |
1169 | 0 | PANIC("Should not reach here"); |
1170 | 0 | break; |
1171 | 0 | } |
1172 | 0 | } |
1173 | 0 | } |
1174 | | |
1175 | 3.26k | if (config.lb_subset_config().locality_weight_aware() && |
1176 | 3.26k | !config.common_lb_config().has_locality_weighted_lb_config()) { |
1177 | 0 | throwEnvoyExceptionOrPanic(fmt::format("Locality weight aware subset LB requires that a " |
1178 | 0 | "locality_weighted_lb_config be set in {}", |
1179 | 0 | name_)); |
1180 | 0 | } |
1181 | | |
1182 | | // Use default (1h) or configured `idle_timeout`, unless it's set to 0, indicating that no |
1183 | | // timeout should be used. |
1184 | 3.26k | absl::optional<std::chrono::milliseconds> idle_timeout(std::chrono::hours(1)); |
1185 | 3.26k | if (http_protocol_options_->common_http_protocol_options_.has_idle_timeout()) { |
1186 | 0 | idle_timeout = std::chrono::milliseconds(DurationUtil::durationToMilliseconds( |
1187 | 0 | http_protocol_options_->common_http_protocol_options_.idle_timeout())); |
1188 | 0 | if (idle_timeout.value().count() == 0) { |
1189 | 0 | idle_timeout = absl::nullopt; |
1190 | 0 | } |
1191 | 0 | } |
1192 | 3.26k | if (idle_timeout.has_value()) { |
1193 | 3.26k | optional_timeouts_.set<OptionalTimeoutNames::IdleTimeout>(*idle_timeout); |
1194 | 3.26k | } |
1195 | | |
1196 | | // Use default (10m) or configured `tcp_pool_idle_timeout`, unless it's set to 0, indicating that |
1197 | | // no timeout should be used. |
1198 | 3.26k | absl::optional<std::chrono::milliseconds> tcp_pool_idle_timeout(std::chrono::minutes(10)); |
1199 | 3.26k | if (tcp_protocol_options_ && tcp_protocol_options_->idleTimeout().has_value()) { |
1200 | 0 | tcp_pool_idle_timeout = tcp_protocol_options_->idleTimeout(); |
1201 | 0 | if (tcp_pool_idle_timeout.value().count() == 0) { |
1202 | 0 | tcp_pool_idle_timeout = absl::nullopt; |
1203 | 0 | } |
1204 | 0 | } |
1205 | 3.26k | if (tcp_pool_idle_timeout.has_value()) { |
1206 | 3.26k | optional_timeouts_.set<OptionalTimeoutNames::TcpPoolIdleTimeout>(*tcp_pool_idle_timeout); |
1207 | 3.26k | } |
1208 | | |
1209 | | // Use configured `max_connection_duration`, unless it's set to 0, indicating that |
1210 | | // no timeout should be used. No timeout by default either. |
1211 | 3.26k | absl::optional<std::chrono::milliseconds> max_connection_duration; |
1212 | 3.26k | if (http_protocol_options_->common_http_protocol_options_.has_max_connection_duration()) { |
1213 | 0 | max_connection_duration = std::chrono::milliseconds(DurationUtil::durationToMilliseconds( |
1214 | 0 | http_protocol_options_->common_http_protocol_options_.max_connection_duration())); |
1215 | 0 | if (max_connection_duration.value().count() == 0) { |
1216 | 0 | max_connection_duration = absl::nullopt; |
1217 | 0 | } |
1218 | 0 | } |
1219 | 3.26k | if (max_connection_duration.has_value()) { |
1220 | 0 | optional_timeouts_.set<OptionalTimeoutNames::MaxConnectionDuration>(*max_connection_duration); |
1221 | 0 | } |
1222 | | |
1223 | 3.26k | if (config.has_eds_cluster_config()) { |
1224 | 14 | if (config.type() != envoy::config::cluster::v3::Cluster::EDS) { |
1225 | 0 | throwEnvoyExceptionOrPanic("eds_cluster_config set in a non-EDS cluster"); |
1226 | 0 | } |
1227 | 14 | } |
1228 | | |
1229 | | // TODO(htuch): Remove this temporary workaround when we have |
1230 | | // https://github.com/bufbuild/protoc-gen-validate/issues/97 resolved. This just provides |
1231 | | // early validation of sanity of fields that we should catch at config ingestion. |
1232 | 3.26k | DurationUtil::durationToMilliseconds(common_lb_config_->update_merge_window()); |
1233 | | |
1234 | | // Create upstream network filter factories |
1235 | 3.26k | const auto& filters = config.filters(); |
1236 | 3.26k | ASSERT(filter_factories_.empty()); |
1237 | 3.26k | filter_factories_.reserve(filters.size()); |
1238 | 3.26k | for (ssize_t i = 0; i < filters.size(); i++) { |
1239 | 0 | const auto& proto_config = filters[i]; |
1240 | 0 | const bool is_terminal = i == filters.size() - 1; |
1241 | 0 | ENVOY_LOG(debug, " upstream network filter #{}:", i); |
1242 | |
|
1243 | 0 | if (proto_config.has_config_discovery()) { |
1244 | 0 | if (proto_config.has_typed_config()) { |
1245 | 0 | throwEnvoyExceptionOrPanic("Only one of typed_config or config_discovery can be used"); |
1246 | 0 | } |
1247 | | |
1248 | 0 | ENVOY_LOG(debug, " dynamic filter name: {}", proto_config.name()); |
1249 | 0 | filter_factories_.push_back( |
1250 | 0 | network_filter_config_provider_manager_->createDynamicFilterConfigProvider( |
1251 | 0 | proto_config.config_discovery(), proto_config.name(), server_context, |
1252 | 0 | upstream_context_, factory_context.clusterManager(), is_terminal, "network", |
1253 | 0 | nullptr)); |
1254 | 0 | continue; |
1255 | 0 | } |
1256 | | |
1257 | 0 | ENVOY_LOG(debug, " name: {}", proto_config.name()); |
1258 | 0 | auto& factory = Config::Utility::getAndCheckFactory< |
1259 | 0 | Server::Configuration::NamedUpstreamNetworkFilterConfigFactory>(proto_config); |
1260 | 0 | auto message = factory.createEmptyConfigProto(); |
1261 | 0 | Config::Utility::translateOpaqueConfig(proto_config.typed_config(), |
1262 | 0 | factory_context.messageValidationVisitor(), *message); |
1263 | 0 | Network::FilterFactoryCb callback = |
1264 | 0 | factory.createFilterFactoryFromProto(*message, upstream_context_); |
1265 | 0 | filter_factories_.push_back( |
1266 | 0 | network_filter_config_provider_manager_->createStaticFilterConfigProvider( |
1267 | 0 | callback, proto_config.name())); |
1268 | 0 | } |
1269 | | |
1270 | 3.26k | if (http_protocol_options_) { |
1271 | 3.26k | Http::FilterChainUtility::FiltersList http_filters = http_protocol_options_->http_filters_; |
1272 | 3.26k | has_configured_http_filters_ = !http_filters.empty(); |
1273 | 3.26k | if (http_filters.empty()) { |
1274 | 3.26k | auto* codec_filter = http_filters.Add(); |
1275 | 3.26k | codec_filter->set_name("envoy.filters.http.upstream_codec"); |
1276 | 3.26k | codec_filter->mutable_typed_config()->PackFrom( |
1277 | 3.26k | envoy::extensions::filters::http::upstream_codec::v3::UpstreamCodec::default_instance()); |
1278 | 3.26k | } |
1279 | 3.26k | if (http_filters[http_filters.size() - 1].name() != "envoy.filters.http.upstream_codec") { |
1280 | 0 | throwEnvoyExceptionOrPanic( |
1281 | 0 | fmt::format("The codec filter is the only valid terminal upstream HTTP filter")); |
1282 | 0 | } |
1283 | | |
1284 | 3.26k | std::string prefix = stats_scope_->symbolTable().toString(stats_scope_->prefix()); |
1285 | 3.26k | Http::FilterChainHelper<Server::Configuration::UpstreamFactoryContext, |
1286 | 3.26k | Server::Configuration::UpstreamHttpFilterConfigFactory> |
1287 | 3.26k | helper(*http_filter_config_provider_manager_, upstream_context_.getServerFactoryContext(), |
1288 | 3.26k | factory_context.clusterManager(), upstream_context_, prefix); |
1289 | 3.26k | THROW_IF_NOT_OK(helper.processFilters(http_filters, "upstream http", "upstream http", |
1290 | 3.26k | http_filter_factories_)); |
1291 | 3.26k | } |
1292 | 3.26k | } |
1293 | | |
1294 | | // Configures the load balancer based on config.load_balancing_policy |
1295 | | void ClusterInfoImpl::configureLbPolicies(const envoy::config::cluster::v3::Cluster& config, |
1296 | 0 | Server::Configuration::ServerFactoryContext& context) { |
1297 | | // Check if load_balancing_policy is set first. |
1298 | 0 | if (!config.has_load_balancing_policy()) { |
1299 | 0 | throwEnvoyExceptionOrPanic("cluster: field load_balancing_policy need to be set"); |
1300 | 0 | } |
1301 | | |
1302 | 0 | if (config.has_lb_subset_config()) { |
1303 | 0 | throwEnvoyExceptionOrPanic( |
1304 | 0 | "cluster: load_balancing_policy cannot be combined with lb_subset_config"); |
1305 | 0 | } |
1306 | | |
1307 | 0 | if (config.has_common_lb_config()) { |
1308 | 0 | const auto& lb_config = config.common_lb_config(); |
1309 | 0 | if (lb_config.has_zone_aware_lb_config() || lb_config.has_locality_weighted_lb_config() || |
1310 | 0 | lb_config.has_consistent_hashing_lb_config()) { |
1311 | 0 | throwEnvoyExceptionOrPanic( |
1312 | 0 | "cluster: load_balancing_policy cannot be combined with partial fields " |
1313 | 0 | "(zone_aware_lb_config, " |
1314 | 0 | "locality_weighted_lb_config, consistent_hashing_lb_config) of common_lb_config"); |
1315 | 0 | } |
1316 | 0 | } |
1317 | | |
1318 | 0 | absl::InlinedVector<absl::string_view, 4> missing_policies; |
1319 | 0 | for (const auto& policy : config.load_balancing_policy().policies()) { |
1320 | 0 | TypedLoadBalancerFactory* factory = |
1321 | 0 | Config::Utility::getAndCheckFactory<TypedLoadBalancerFactory>( |
1322 | 0 | policy.typed_extension_config(), /*is_optional=*/true); |
1323 | 0 | if (factory != nullptr) { |
1324 | | // Load and validate the configuration. |
1325 | 0 | auto proto_message = factory->createEmptyConfigProto(); |
1326 | 0 | Config::Utility::translateOpaqueConfig(policy.typed_extension_config().typed_config(), |
1327 | 0 | context.messageValidationVisitor(), *proto_message); |
1328 | |
|
1329 | 0 | load_balancer_config_ = |
1330 | 0 | factory->loadConfig(*proto_message, context.messageValidationVisitor()); |
1331 | |
|
1332 | 0 | load_balancer_factory_ = factory; |
1333 | 0 | break; |
1334 | 0 | } |
1335 | 0 | missing_policies.push_back(policy.typed_extension_config().name()); |
1336 | 0 | } |
1337 | |
|
1338 | 0 | if (load_balancer_factory_ == nullptr) { |
1339 | 0 | throwEnvoyExceptionOrPanic( |
1340 | 0 | fmt::format("cluster: didn't find a registered load balancer factory " |
1341 | 0 | "implementation for cluster: '{}' with names from [{}]", |
1342 | 0 | name_, absl::StrJoin(missing_policies, ", "))); |
1343 | 0 | } |
1344 | | |
1345 | 0 | lb_type_ = LoadBalancerType::LoadBalancingPolicyConfig; |
1346 | 0 | } |
1347 | | |
1348 | | ProtocolOptionsConfigConstSharedPtr |
1349 | 6.53k | ClusterInfoImpl::extensionProtocolOptions(const std::string& name) const { |
1350 | 6.53k | auto i = extension_protocol_options_.find(name); |
1351 | 6.53k | if (i != extension_protocol_options_.end()) { |
1352 | 2.24k | return i->second; |
1353 | 2.24k | } |
1354 | 4.29k | return nullptr; |
1355 | 6.53k | } |
1356 | | |
1357 | | Network::UpstreamTransportSocketFactoryPtr createTransportSocketFactory( |
1358 | | const envoy::config::cluster::v3::Cluster& config, |
1359 | 3.26k | Server::Configuration::TransportSocketFactoryContext& factory_context) { |
1360 | | // If the cluster config doesn't have a transport socket configured, override with the default |
1361 | | // transport socket implementation based on the tls_context. We copy by value first then |
1362 | | // override if necessary. |
1363 | 3.26k | auto transport_socket = config.transport_socket(); |
1364 | 3.26k | if (!config.has_transport_socket()) { |
1365 | 3.26k | envoy::extensions::transport_sockets::raw_buffer::v3::RawBuffer raw_buffer; |
1366 | 3.26k | transport_socket.mutable_typed_config()->PackFrom(raw_buffer); |
1367 | 3.26k | transport_socket.set_name("envoy.transport_sockets.raw_buffer"); |
1368 | 3.26k | } |
1369 | | |
1370 | 3.26k | auto& config_factory = Config::Utility::getAndCheckFactory< |
1371 | 3.26k | Server::Configuration::UpstreamTransportSocketConfigFactory>(transport_socket); |
1372 | 3.26k | ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig( |
1373 | 3.26k | transport_socket, factory_context.messageValidationVisitor(), config_factory); |
1374 | 3.26k | return config_factory.createTransportSocketFactory(*message, factory_context); |
1375 | 3.26k | } |
1376 | | |
1377 | 1.32k | void ClusterInfoImpl::createNetworkFilterChain(Network::Connection& connection) const { |
1378 | 1.32k | for (const auto& filter_config_provider : filter_factories_) { |
1379 | 0 | auto config = filter_config_provider->config(); |
1380 | 0 | if (config.has_value()) { |
1381 | 0 | Network::FilterFactoryCb& factory = config.value(); |
1382 | 0 | factory(connection); |
1383 | 0 | } |
1384 | 0 | } |
1385 | 1.32k | } |
1386 | | |
1387 | | std::vector<Http::Protocol> |
1388 | 2.39k | ClusterInfoImpl::upstreamHttpProtocol(absl::optional<Http::Protocol> downstream_protocol) const { |
1389 | 2.39k | if (downstream_protocol.has_value() && |
1390 | 2.39k | features_ & Upstream::ClusterInfo::Features::USE_DOWNSTREAM_PROTOCOL) { |
1391 | 0 | if (downstream_protocol.value() == Http::Protocol::Http3 && |
1392 | 0 | !(features_ & Upstream::ClusterInfo::Features::HTTP3)) { |
1393 | 0 | return {Http::Protocol::Http2}; |
1394 | 0 | } |
1395 | | // use HTTP11 since HTTP10 upstream is not supported yet. |
1396 | 0 | if (downstream_protocol.value() == Http::Protocol::Http10) { |
1397 | 0 | return {Http::Protocol::Http11}; |
1398 | 0 | } |
1399 | 0 | return {downstream_protocol.value()}; |
1400 | 0 | } |
1401 | | |
1402 | 2.39k | if (features_ & Upstream::ClusterInfo::Features::USE_ALPN) { |
1403 | 0 | if (!(features_ & Upstream::ClusterInfo::Features::HTTP3)) { |
1404 | 0 | return {Http::Protocol::Http2, Http::Protocol::Http11}; |
1405 | 0 | } |
1406 | 0 | return {Http::Protocol::Http3, Http::Protocol::Http2, Http::Protocol::Http11}; |
1407 | 0 | } |
1408 | | |
1409 | 2.39k | if (features_ & Upstream::ClusterInfo::Features::HTTP3) { |
1410 | 0 | return {Http::Protocol::Http3}; |
1411 | 0 | } |
1412 | | |
1413 | 2.39k | return {(features_ & Upstream::ClusterInfo::Features::HTTP2) ? Http::Protocol::Http2 |
1414 | 2.39k | : Http::Protocol::Http11}; |
1415 | 2.39k | } |
1416 | | |
1417 | | bool validateTransportSocketSupportsQuic( |
1418 | 0 | const envoy::config::core::v3::TransportSocket& transport_socket) { |
1419 | | // The transport socket is valid for QUIC if it is either a QUIC transport socket, |
1420 | | // or if it is a QUIC transport socket wrapped in an HTTP/1.1 proxy socket. |
1421 | 0 | if (transport_socket.name() == "envoy.transport_sockets.quic") { |
1422 | 0 | return true; |
1423 | 0 | } |
1424 | 0 | if (transport_socket.name() != "envoy.transport_sockets.http_11_proxy") { |
1425 | 0 | return false; |
1426 | 0 | } |
1427 | 0 | envoy::extensions::transport_sockets::http_11_proxy::v3::Http11ProxyUpstreamTransport |
1428 | 0 | http11_socket; |
1429 | 0 | MessageUtil::unpackTo(transport_socket.typed_config(), http11_socket); |
1430 | 0 | return http11_socket.transport_socket().name() == "envoy.transport_sockets.quic"; |
1431 | 0 | } |
1432 | | |
1433 | | ClusterImplBase::ClusterImplBase(const envoy::config::cluster::v3::Cluster& cluster, |
1434 | | ClusterFactoryContext& cluster_context) |
1435 | | : init_manager_(fmt::format("Cluster {}", cluster.name())), |
1436 | 3.26k | init_watcher_("ClusterImplBase", [this]() { onInitDone(); }), |
1437 | | runtime_(cluster_context.serverFactoryContext().runtime()), |
1438 | | wait_for_warm_on_init_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(cluster, wait_for_warm_on_init, true)), |
1439 | | time_source_(cluster_context.serverFactoryContext().timeSource()), |
1440 | | local_cluster_(cluster_context.clusterManager().localClusterName().value_or("") == |
1441 | | cluster.name()), |
1442 | | const_metadata_shared_pool_(Config::Metadata::getConstMetadataSharedPool( |
1443 | | cluster_context.serverFactoryContext().singletonManager(), |
1444 | 3.26k | cluster_context.serverFactoryContext().mainThreadDispatcher())) { |
1445 | | |
1446 | 3.26k | auto& server_context = cluster_context.serverFactoryContext(); |
1447 | | |
1448 | 3.26k | auto stats_scope = generateStatsScope(cluster, server_context.serverScope().store()); |
1449 | 3.26k | transport_factory_context_ = |
1450 | 3.26k | std::make_unique<Server::Configuration::TransportSocketFactoryContextImpl>( |
1451 | 3.26k | server_context, cluster_context.sslContextManager(), *stats_scope, |
1452 | 3.26k | cluster_context.clusterManager(), cluster_context.messageValidationVisitor()); |
1453 | 3.26k | transport_factory_context_->setInitManager(init_manager_); |
1454 | | |
1455 | 3.26k | auto socket_factory = createTransportSocketFactory(cluster, *transport_factory_context_); |
1456 | 3.26k | auto* raw_factory_pointer = socket_factory.get(); |
1457 | | |
1458 | 3.26k | auto socket_matcher = std::make_unique<TransportSocketMatcherImpl>( |
1459 | 3.26k | cluster.transport_socket_matches(), *transport_factory_context_, socket_factory, |
1460 | 3.26k | *stats_scope); |
1461 | 3.26k | const bool matcher_supports_alpn = socket_matcher->allMatchesSupportAlpn(); |
1462 | 3.26k | auto& dispatcher = server_context.mainThreadDispatcher(); |
1463 | 3.26k | info_ = std::shared_ptr<const ClusterInfoImpl>( |
1464 | 3.26k | new ClusterInfoImpl(init_manager_, server_context, cluster, |
1465 | 3.26k | cluster_context.clusterManager().bindConfig(), runtime_, |
1466 | 3.26k | std::move(socket_matcher), std::move(stats_scope), |
1467 | 3.26k | cluster_context.addedViaApi(), *transport_factory_context_), |
1468 | 3.26k | [&dispatcher](const ClusterInfoImpl* self) { |
1469 | 3.26k | ENVOY_LOG(trace, "Schedule destroy cluster info {}", self->name()); |
1470 | 3.26k | dispatcher.deleteInDispatcherThread( |
1471 | 3.26k | std::unique_ptr<const Event::DispatcherThreadDeletable>(self)); |
1472 | 3.26k | }); |
1473 | | |
1474 | 3.26k | if ((info_->features() & ClusterInfoImpl::Features::USE_ALPN)) { |
1475 | 0 | if (!raw_factory_pointer->supportsAlpn()) { |
1476 | 0 | throwEnvoyExceptionOrPanic( |
1477 | 0 | fmt::format("ALPN configured for cluster {} which has a non-ALPN transport socket: {}", |
1478 | 0 | cluster.name(), cluster.DebugString())); |
1479 | 0 | } |
1480 | 0 | if (!matcher_supports_alpn && |
1481 | 0 | !runtime_.snapshot().featureEnabled(ClusterImplBase::DoNotValidateAlpnRuntimeKey, 0)) { |
1482 | 0 | throwEnvoyExceptionOrPanic(fmt::format( |
1483 | 0 | "ALPN configured for cluster {} which has a non-ALPN transport socket matcher: {}", |
1484 | 0 | cluster.name(), cluster.DebugString())); |
1485 | 0 | } |
1486 | 0 | } |
1487 | | |
1488 | 3.26k | if (info_->features() & ClusterInfoImpl::Features::HTTP3) { |
1489 | 0 | #if defined(ENVOY_ENABLE_QUIC) |
1490 | 0 | if (!validateTransportSocketSupportsQuic(cluster.transport_socket())) { |
1491 | 0 | throwEnvoyExceptionOrPanic( |
1492 | 0 | fmt::format("HTTP3 requires a QuicUpstreamTransport transport socket: {} {}", |
1493 | 0 | cluster.name(), cluster.transport_socket().DebugString())); |
1494 | 0 | } |
1495 | | #else |
1496 | | throwEnvoyExceptionOrPanic("HTTP3 configured but not enabled in the build."); |
1497 | | #endif |
1498 | 0 | } |
1499 | | |
1500 | | // Create the default (empty) priority set before registering callbacks to |
1501 | | // avoid getting an update the first time it is accessed. |
1502 | 3.26k | priority_set_.getOrCreateHostSet(0); |
1503 | 3.26k | priority_update_cb_ = priority_set_.addPriorityUpdateCb( |
1504 | 3.26k | [this](uint32_t, const HostVector& hosts_added, const HostVector& hosts_removed) { |
1505 | 3.26k | if (!hosts_added.empty() || !hosts_removed.empty()) { |
1506 | 3.26k | info_->endpointStats().membership_change_.inc(); |
1507 | 3.26k | } |
1508 | | |
1509 | 3.26k | uint32_t healthy_hosts = 0; |
1510 | 3.26k | uint32_t degraded_hosts = 0; |
1511 | 3.26k | uint32_t excluded_hosts = 0; |
1512 | 3.26k | uint32_t hosts = 0; |
1513 | 3.26k | for (const auto& host_set : prioritySet().hostSetsPerPriority()) { |
1514 | 3.26k | hosts += host_set->hosts().size(); |
1515 | 3.26k | healthy_hosts += host_set->healthyHosts().size(); |
1516 | 3.26k | degraded_hosts += host_set->degradedHosts().size(); |
1517 | 3.26k | excluded_hosts += host_set->excludedHosts().size(); |
1518 | 3.26k | } |
1519 | 3.26k | info_->endpointStats().membership_total_.set(hosts); |
1520 | 3.26k | info_->endpointStats().membership_healthy_.set(healthy_hosts); |
1521 | 3.26k | info_->endpointStats().membership_degraded_.set(degraded_hosts); |
1522 | 3.26k | info_->endpointStats().membership_excluded_.set(excluded_hosts); |
1523 | 3.26k | }); |
1524 | 3.26k | } |
1525 | | |
1526 | | namespace { |
1527 | | |
1528 | 6.53k | bool excludeBasedOnHealthFlag(const Host& host) { |
1529 | 6.53k | return host.healthFlagGet(Host::HealthFlag::PENDING_ACTIVE_HC) || |
1530 | 6.53k | host.healthFlagGet(Host::HealthFlag::EXCLUDED_VIA_IMMEDIATE_HC_FAIL); |
1531 | 6.53k | } |
1532 | | |
1533 | | } // namespace |
1534 | | |
1535 | | std::tuple<HealthyHostVectorConstSharedPtr, DegradedHostVectorConstSharedPtr, |
1536 | | ExcludedHostVectorConstSharedPtr> |
1537 | 3.26k | ClusterImplBase::partitionHostList(const HostVector& hosts) { |
1538 | 3.26k | auto healthy_list = std::make_shared<HealthyHostVector>(); |
1539 | 3.26k | auto degraded_list = std::make_shared<DegradedHostVector>(); |
1540 | 3.26k | auto excluded_list = std::make_shared<ExcludedHostVector>(); |
1541 | | |
1542 | 3.26k | for (const auto& host : hosts) { |
1543 | 3.26k | const Host::Health health_status = host->coarseHealth(); |
1544 | 3.26k | if (health_status == Host::Health::Healthy) { |
1545 | 3.26k | healthy_list->get().emplace_back(host); |
1546 | 3.26k | } else if (health_status == Host::Health::Degraded) { |
1547 | 0 | degraded_list->get().emplace_back(host); |
1548 | 0 | } |
1549 | 3.26k | if (excludeBasedOnHealthFlag(*host)) { |
1550 | 0 | excluded_list->get().emplace_back(host); |
1551 | 0 | } |
1552 | 3.26k | } |
1553 | | |
1554 | 3.26k | return std::make_tuple(healthy_list, degraded_list, excluded_list); |
1555 | 3.26k | } |
1556 | | |
1557 | | std::tuple<HostsPerLocalityConstSharedPtr, HostsPerLocalityConstSharedPtr, |
1558 | | HostsPerLocalityConstSharedPtr> |
1559 | 3.26k | ClusterImplBase::partitionHostsPerLocality(const HostsPerLocality& hosts) { |
1560 | 3.26k | auto filtered_clones = |
1561 | 3.26k | hosts.filter({[](const Host& host) { return host.coarseHealth() == Host::Health::Healthy; }, |
1562 | 3.26k | [](const Host& host) { return host.coarseHealth() == Host::Health::Degraded; }, |
1563 | 3.26k | [](const Host& host) { return excludeBasedOnHealthFlag(host); }}); |
1564 | | |
1565 | 3.26k | return std::make_tuple(std::move(filtered_clones[0]), std::move(filtered_clones[1]), |
1566 | 3.26k | std::move(filtered_clones[2])); |
1567 | 3.26k | } |
1568 | | |
1569 | 2.39k | bool ClusterInfoImpl::maintenanceMode() const { |
1570 | 2.39k | return runtime_.snapshot().featureEnabled(maintenance_mode_runtime_key_, 0); |
1571 | 2.39k | } |
1572 | | |
1573 | 21.6k | ResourceManager& ClusterInfoImpl::resourceManager(ResourcePriority priority) const { |
1574 | 21.6k | ASSERT(enumToInt(priority) < resource_managers_.managers_.size()); |
1575 | 21.6k | return *resource_managers_.managers_[enumToInt(priority)]; |
1576 | 21.6k | } |
1577 | | |
1578 | 3.26k | void ClusterImplBase::initialize(std::function<void()> callback) { |
1579 | 3.26k | ASSERT(!initialization_started_); |
1580 | 3.26k | ASSERT(initialization_complete_callback_ == nullptr); |
1581 | 3.26k | initialization_complete_callback_ = callback; |
1582 | 3.26k | startPreInit(); |
1583 | 3.26k | } |
1584 | | |
1585 | 3.26k | void ClusterImplBase::onPreInitComplete() { |
1586 | | // Protect against multiple calls. |
1587 | 3.26k | if (initialization_started_) { |
1588 | 0 | return; |
1589 | 0 | } |
1590 | 3.26k | initialization_started_ = true; |
1591 | | |
1592 | 3.26k | ENVOY_LOG(debug, "initializing {} cluster {} completed", |
1593 | 3.26k | initializePhase() == InitializePhase::Primary ? "Primary" : "Secondary", |
1594 | 3.26k | info()->name()); |
1595 | 3.26k | init_manager_.initialize(init_watcher_); |
1596 | 3.26k | } |
1597 | | |
1598 | 3.26k | void ClusterImplBase::onInitDone() { |
1599 | 3.26k | info()->configUpdateStats().warming_state_.set(0); |
1600 | 3.26k | if (health_checker_ && pending_initialize_health_checks_ == 0) { |
1601 | 0 | for (auto& host_set : prioritySet().hostSetsPerPriority()) { |
1602 | 0 | for (auto& host : host_set->hosts()) { |
1603 | 0 | if (host->disableActiveHealthCheck()) { |
1604 | 0 | continue; |
1605 | 0 | } |
1606 | 0 | ++pending_initialize_health_checks_; |
1607 | 0 | } |
1608 | 0 | } |
1609 | 0 | ENVOY_LOG(debug, "Cluster onInitDone pending initialize health check count {}", |
1610 | 0 | pending_initialize_health_checks_); |
1611 | | |
1612 | | // TODO(mattklein123): Remove this callback when done. |
1613 | 0 | health_checker_->addHostCheckCompleteCb([this](HostSharedPtr, HealthTransition) -> void { |
1614 | 0 | if (pending_initialize_health_checks_ > 0 && --pending_initialize_health_checks_ == 0) { |
1615 | 0 | finishInitialization(); |
1616 | 0 | } |
1617 | 0 | }); |
1618 | 0 | } |
1619 | | |
1620 | 3.26k | if (pending_initialize_health_checks_ == 0) { |
1621 | 3.26k | finishInitialization(); |
1622 | 3.26k | } |
1623 | 3.26k | } |
1624 | | |
1625 | 3.26k | void ClusterImplBase::finishInitialization() { |
1626 | 3.26k | ASSERT(initialization_complete_callback_ != nullptr); |
1627 | 3.26k | ASSERT(initialization_started_); |
1628 | | |
1629 | | // Snap a copy of the completion callback so that we can set it to nullptr to unblock |
1630 | | // reloadHealthyHosts(). See that function for more info on why we do this. |
1631 | 3.26k | auto snapped_callback = initialization_complete_callback_; |
1632 | 3.26k | initialization_complete_callback_ = nullptr; |
1633 | | |
1634 | 3.26k | if (health_checker_ != nullptr) { |
1635 | 0 | reloadHealthyHosts(nullptr); |
1636 | 0 | } |
1637 | | |
1638 | 3.26k | if (snapped_callback != nullptr) { |
1639 | 3.26k | snapped_callback(); |
1640 | 3.26k | } |
1641 | 3.26k | } |
1642 | | |
1643 | 0 | void ClusterImplBase::setHealthChecker(const HealthCheckerSharedPtr& health_checker) { |
1644 | 0 | ASSERT(!health_checker_); |
1645 | 0 | health_checker_ = health_checker; |
1646 | 0 | health_checker_->start(); |
1647 | 0 | health_checker_->addHostCheckCompleteCb( |
1648 | 0 | [this](const HostSharedPtr& host, HealthTransition changed_state) -> void { |
1649 | | // If we get a health check completion that resulted in a state change, signal to |
1650 | | // update the host sets on all threads. |
1651 | 0 | if (changed_state == HealthTransition::Changed) { |
1652 | 0 | reloadHealthyHosts(host); |
1653 | 0 | } |
1654 | 0 | }); |
1655 | 0 | } |
1656 | | |
1657 | 3.26k | void ClusterImplBase::setOutlierDetector(const Outlier::DetectorSharedPtr& outlier_detector) { |
1658 | 3.26k | if (!outlier_detector) { |
1659 | 3.26k | return; |
1660 | 3.26k | } |
1661 | | |
1662 | 0 | outlier_detector_ = outlier_detector; |
1663 | 0 | outlier_detector_->addChangedStateCb( |
1664 | 0 | [this](const HostSharedPtr& host) -> void { reloadHealthyHosts(host); }); |
1665 | 0 | } |
1666 | | |
1667 | 0 | void ClusterImplBase::reloadHealthyHosts(const HostSharedPtr& host) { |
1668 | | // Every time a host changes Health Check state we cause a full healthy host recalculation which |
1669 | | // for expensive LBs (ring, subset, etc.) can be quite time consuming. During startup, this |
1670 | | // can also block worker threads by doing this repeatedly. There is no reason to do this |
1671 | | // as we will not start taking traffic until we are initialized. By blocking Health Check |
1672 | | // updates while initializing we can avoid this. |
1673 | 0 | if (initialization_complete_callback_ != nullptr) { |
1674 | 0 | return; |
1675 | 0 | } |
1676 | | |
1677 | 0 | reloadHealthyHostsHelper(host); |
1678 | 0 | } |
1679 | | |
1680 | 0 | void ClusterImplBase::reloadHealthyHostsHelper(const HostSharedPtr&) { |
1681 | 0 | const auto& host_sets = prioritySet().hostSetsPerPriority(); |
1682 | 0 | for (size_t priority = 0; priority < host_sets.size(); ++priority) { |
1683 | 0 | const auto& host_set = host_sets[priority]; |
1684 | | // TODO(htuch): Can we skip these copies by exporting out const shared_ptr from HostSet? |
1685 | 0 | HostVectorConstSharedPtr hosts_copy = std::make_shared<HostVector>(host_set->hosts()); |
1686 | |
|
1687 | 0 | HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().clone(); |
1688 | 0 | prioritySet().updateHosts(priority, |
1689 | 0 | HostSetImpl::partitionHosts(hosts_copy, hosts_per_locality_copy), |
1690 | 0 | host_set->localityWeights(), {}, {}, absl::nullopt, absl::nullopt); |
1691 | 0 | } |
1692 | 0 | } |
1693 | | |
1694 | | const Network::Address::InstanceConstSharedPtr |
1695 | 3.26k | ClusterImplBase::resolveProtoAddress(const envoy::config::core::v3::Address& address) { |
1696 | 3.26k | TRY_ASSERT_MAIN_THREAD { return Network::Address::resolveProtoAddress(address); } |
1697 | 3.26k | END_TRY |
1698 | 3.26k | CATCH(EnvoyException & e, { |
1699 | 3.26k | if (info_->type() == envoy::config::cluster::v3::Cluster::STATIC || |
1700 | 3.26k | info_->type() == envoy::config::cluster::v3::Cluster::EDS) { |
1701 | 3.26k | throwEnvoyExceptionOrPanic( |
1702 | 3.26k | fmt::format("{}. Consider setting resolver_name or setting cluster type " |
1703 | 3.26k | "to 'STRICT_DNS' or 'LOGICAL_DNS'", |
1704 | 3.26k | e.what())); |
1705 | 3.26k | } |
1706 | 3.26k | throw e; |
1707 | 3.26k | }); |
1708 | 0 | } |
1709 | | |
1710 | | void ClusterImplBase::validateEndpointsForZoneAwareRouting( |
1711 | 3.26k | const envoy::config::endpoint::v3::LocalityLbEndpoints& endpoints) const { |
1712 | 3.26k | if (local_cluster_ && endpoints.priority() > 0) { |
1713 | 0 | throwEnvoyExceptionOrPanic( |
1714 | 0 | fmt::format("Unexpected non-zero priority for local cluster '{}'.", info()->name())); |
1715 | 0 | } |
1716 | 3.26k | } |
1717 | | |
1718 | | ClusterInfoImpl::OptionalClusterStats::OptionalClusterStats( |
1719 | | const envoy::config::cluster::v3::Cluster& config, Stats::Scope& stats_scope, |
1720 | | const ClusterManager& manager) |
1721 | | : timeout_budget_stats_( |
1722 | | (config.track_cluster_stats().timeout_budgets() || config.track_timeout_budgets()) |
1723 | | ? std::make_unique<ClusterTimeoutBudgetStats>(generateTimeoutBudgetStats( |
1724 | | stats_scope, manager.clusterTimeoutBudgetStatNames())) |
1725 | | : nullptr), |
1726 | | request_response_size_stats_( |
1727 | | (config.track_cluster_stats().request_response_sizes() |
1728 | | ? std::make_unique<ClusterRequestResponseSizeStats>(generateRequestResponseSizeStats( |
1729 | | stats_scope, manager.clusterRequestResponseSizeStatNames())) |
1730 | 0 | : nullptr)) {} |
1731 | | |
1732 | | ClusterInfoImpl::ResourceManagers::ResourceManagers( |
1733 | | const envoy::config::cluster::v3::Cluster& config, Runtime::Loader& runtime, |
1734 | | const std::string& cluster_name, Stats::Scope& stats_scope, |
1735 | | const ClusterCircuitBreakersStatNames& circuit_breakers_stat_names) |
1736 | 3.26k | : circuit_breakers_stat_names_(circuit_breakers_stat_names) { |
1737 | 3.26k | managers_[enumToInt(ResourcePriority::Default)] = |
1738 | 3.26k | load(config, runtime, cluster_name, stats_scope, envoy::config::core::v3::DEFAULT); |
1739 | 3.26k | managers_[enumToInt(ResourcePriority::High)] = |
1740 | 3.26k | load(config, runtime, cluster_name, stats_scope, envoy::config::core::v3::HIGH); |
1741 | 3.26k | } |
1742 | | |
1743 | | ClusterCircuitBreakersStats |
1744 | | ClusterInfoImpl::generateCircuitBreakersStats(Stats::Scope& scope, Stats::StatName prefix, |
1745 | | bool track_remaining, |
1746 | 410k | const ClusterCircuitBreakersStatNames& stat_names) { |
1747 | 4.07M | auto make_gauge = [&stat_names, &scope, prefix](Stats::StatName stat_name) -> Stats::Gauge& { |
1748 | 4.07M | return Stats::Utility::gaugeFromElements(scope, |
1749 | 4.07M | {stat_names.circuit_breakers_, prefix, stat_name}, |
1750 | 4.07M | Stats::Gauge::ImportMode::Accumulate); |
1751 | 4.07M | }; |
1752 | | |
1753 | 410k | #define REMAINING_GAUGE(stat_name) \ |
1754 | 2.05M | track_remaining ? make_gauge(stat_name) : scope.store().nullGauge() |
1755 | | |
1756 | 410k | return { |
1757 | 410k | make_gauge(stat_names.cx_open_), |
1758 | 410k | make_gauge(stat_names.cx_pool_open_), |
1759 | 410k | make_gauge(stat_names.rq_open_), |
1760 | 410k | make_gauge(stat_names.rq_pending_open_), |
1761 | 410k | make_gauge(stat_names.rq_retry_open_), |
1762 | 410k | REMAINING_GAUGE(stat_names.remaining_cx_), |
1763 | 410k | REMAINING_GAUGE(stat_names.remaining_cx_pools_), |
1764 | 410k | REMAINING_GAUGE(stat_names.remaining_pending_), |
1765 | 410k | REMAINING_GAUGE(stat_names.remaining_retries_), |
1766 | 410k | REMAINING_GAUGE(stat_names.remaining_rq_), |
1767 | 410k | }; |
1768 | | |
1769 | 410k | #undef REMAINING_GAUGE |
1770 | 410k | } |
1771 | | |
1772 | 53 | Http::Http1::CodecStats& ClusterInfoImpl::http1CodecStats() const { |
1773 | 53 | return Http::Http1::CodecStats::atomicGet(http1_codec_stats_, *stats_scope_); |
1774 | 53 | } |
1775 | | |
1776 | 1.27k | Http::Http2::CodecStats& ClusterInfoImpl::http2CodecStats() const { |
1777 | 1.27k | return Http::Http2::CodecStats::atomicGet(http2_codec_stats_, *stats_scope_); |
1778 | 1.27k | } |
1779 | | |
1780 | 0 | Http::Http3::CodecStats& ClusterInfoImpl::http3CodecStats() const { |
1781 | 0 | return Http::Http3::CodecStats::atomicGet(http3_codec_stats_, *stats_scope_); |
1782 | 0 | } |
1783 | | |
1784 | | #ifdef ENVOY_ENABLE_UHV |
1785 | | ::Envoy::Http::HeaderValidatorStats& |
1786 | | ClusterInfoImpl::getHeaderValidatorStats(Http::Protocol protocol) const { |
1787 | | switch (protocol) { |
1788 | | case Http::Protocol::Http10: |
1789 | | case Http::Protocol::Http11: |
1790 | | return http1CodecStats(); |
1791 | | case Http::Protocol::Http2: |
1792 | | return http2CodecStats(); |
1793 | | case Http::Protocol::Http3: |
1794 | | return http3CodecStats(); |
1795 | | } |
1796 | | PANIC_DUE_TO_CORRUPT_ENUM; |
1797 | | } |
1798 | | #endif |
1799 | | |
1800 | | Http::ClientHeaderValidatorPtr |
1801 | 2.32k | ClusterInfoImpl::makeHeaderValidator([[maybe_unused]] Http::Protocol protocol) const { |
1802 | | #ifdef ENVOY_ENABLE_UHV |
1803 | | return http_protocol_options_->header_validator_factory_ |
1804 | | ? http_protocol_options_->header_validator_factory_->createClientHeaderValidator( |
1805 | | protocol, getHeaderValidatorStats(protocol)) |
1806 | | : nullptr; |
1807 | | #else |
1808 | 2.32k | return nullptr; |
1809 | 2.32k | #endif |
1810 | 2.32k | } |
1811 | | |
1812 | | std::pair<absl::optional<double>, absl::optional<uint32_t>> ClusterInfoImpl::getRetryBudgetParams( |
1813 | 0 | const envoy::config::cluster::v3::CircuitBreakers::Thresholds& thresholds) { |
1814 | 0 | constexpr double default_budget_percent = 20.0; |
1815 | 0 | constexpr uint32_t default_retry_concurrency = 3; |
1816 | |
|
1817 | 0 | absl::optional<double> budget_percent; |
1818 | 0 | absl::optional<uint32_t> min_retry_concurrency; |
1819 | 0 | if (thresholds.has_retry_budget()) { |
1820 | | // The budget_percent and min_retry_concurrency values are only set if there is a retry budget |
1821 | | // message set in the cluster config. |
1822 | 0 | budget_percent = PROTOBUF_GET_WRAPPED_OR_DEFAULT(thresholds.retry_budget(), budget_percent, |
1823 | 0 | default_budget_percent); |
1824 | 0 | min_retry_concurrency = PROTOBUF_GET_WRAPPED_OR_DEFAULT( |
1825 | 0 | thresholds.retry_budget(), min_retry_concurrency, default_retry_concurrency); |
1826 | 0 | } |
1827 | 0 | return std::make_pair(budget_percent, min_retry_concurrency); |
1828 | 0 | } |
1829 | | |
1830 | | SINGLETON_MANAGER_REGISTRATION(upstream_network_filter_config_provider_manager); |
1831 | | |
1832 | | std::shared_ptr<UpstreamNetworkFilterConfigProviderManager> |
1833 | | ClusterInfoImpl::createSingletonUpstreamNetworkFilterConfigProviderManager( |
1834 | 3.26k | Server::Configuration::ServerFactoryContext& context) { |
1835 | 3.26k | return context.singletonManager().getTyped<UpstreamNetworkFilterConfigProviderManager>( |
1836 | 3.26k | SINGLETON_MANAGER_REGISTERED_NAME(upstream_network_filter_config_provider_manager), |
1837 | 3.26k | [] { return std::make_shared<Filter::UpstreamNetworkFilterConfigProviderManagerImpl>(); }); |
1838 | 3.26k | } |
1839 | | |
1840 | | ResourceManagerImplPtr |
1841 | | ClusterInfoImpl::ResourceManagers::load(const envoy::config::cluster::v3::Cluster& config, |
1842 | | Runtime::Loader& runtime, const std::string& cluster_name, |
1843 | | Stats::Scope& stats_scope, |
1844 | 6.53k | const envoy::config::core::v3::RoutingPriority& priority) { |
1845 | 6.53k | uint64_t max_connections = 1024; |
1846 | 6.53k | uint64_t max_pending_requests = 1024; |
1847 | 6.53k | uint64_t max_requests = 1024; |
1848 | 6.53k | uint64_t max_retries = 3; |
1849 | 6.53k | uint64_t max_connection_pools = std::numeric_limits<uint64_t>::max(); |
1850 | 6.53k | uint64_t max_connections_per_host = std::numeric_limits<uint64_t>::max(); |
1851 | | |
1852 | 6.53k | bool track_remaining = false; |
1853 | | |
1854 | 6.53k | Stats::StatName priority_stat_name; |
1855 | 6.53k | std::string priority_name; |
1856 | 6.53k | switch (priority) { |
1857 | 0 | PANIC_ON_PROTO_ENUM_SENTINEL_VALUES; |
1858 | 3.26k | case envoy::config::core::v3::DEFAULT: |
1859 | 3.26k | priority_stat_name = circuit_breakers_stat_names_.default_; |
1860 | 3.26k | priority_name = "default"; |
1861 | 3.26k | break; |
1862 | 3.26k | case envoy::config::core::v3::HIGH: |
1863 | 3.26k | priority_stat_name = circuit_breakers_stat_names_.high_; |
1864 | 3.26k | priority_name = "high"; |
1865 | 3.26k | break; |
1866 | 6.53k | } |
1867 | | |
1868 | 6.53k | const std::string runtime_prefix = |
1869 | 6.53k | fmt::format("circuit_breakers.{}.{}.", cluster_name, priority_name); |
1870 | | |
1871 | 6.53k | const auto& thresholds = config.circuit_breakers().thresholds(); |
1872 | 6.53k | const auto it = std::find_if( |
1873 | 6.53k | thresholds.cbegin(), thresholds.cend(), |
1874 | 6.53k | [priority](const envoy::config::cluster::v3::CircuitBreakers::Thresholds& threshold) { |
1875 | 0 | return threshold.priority() == priority; |
1876 | 0 | }); |
1877 | 6.53k | const auto& per_host_thresholds = config.circuit_breakers().per_host_thresholds(); |
1878 | 6.53k | const auto per_host_it = std::find_if( |
1879 | 6.53k | per_host_thresholds.cbegin(), per_host_thresholds.cend(), |
1880 | 6.53k | [priority](const envoy::config::cluster::v3::CircuitBreakers::Thresholds& threshold) { |
1881 | 0 | return threshold.priority() == priority; |
1882 | 0 | }); |
1883 | | |
1884 | 6.53k | absl::optional<double> budget_percent; |
1885 | 6.53k | absl::optional<uint32_t> min_retry_concurrency; |
1886 | 6.53k | if (it != thresholds.cend()) { |
1887 | 0 | max_connections = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_connections, max_connections); |
1888 | 0 | max_pending_requests = |
1889 | 0 | PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_pending_requests, max_pending_requests); |
1890 | 0 | max_requests = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_requests, max_requests); |
1891 | 0 | max_retries = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_retries, max_retries); |
1892 | 0 | track_remaining = it->track_remaining(); |
1893 | 0 | max_connection_pools = |
1894 | 0 | PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_connection_pools, max_connection_pools); |
1895 | 0 | std::tie(budget_percent, min_retry_concurrency) = ClusterInfoImpl::getRetryBudgetParams(*it); |
1896 | 0 | } |
1897 | 6.53k | if (per_host_it != per_host_thresholds.cend()) { |
1898 | 0 | if (per_host_it->has_max_pending_requests() || per_host_it->has_max_requests() || |
1899 | 0 | per_host_it->has_max_retries() || per_host_it->has_max_connection_pools() || |
1900 | 0 | per_host_it->has_retry_budget()) { |
1901 | 0 | throwEnvoyExceptionOrPanic("Unsupported field in per_host_thresholds"); |
1902 | 0 | } |
1903 | 0 | if (per_host_it->has_max_connections()) { |
1904 | 0 | max_connections_per_host = per_host_it->max_connections().value(); |
1905 | 0 | } |
1906 | 0 | } |
1907 | 6.53k | return std::make_unique<ResourceManagerImpl>( |
1908 | 6.53k | runtime, runtime_prefix, max_connections, max_pending_requests, max_requests, max_retries, |
1909 | 6.53k | max_connection_pools, max_connections_per_host, |
1910 | 6.53k | ClusterInfoImpl::generateCircuitBreakersStats(stats_scope, priority_stat_name, |
1911 | 6.53k | track_remaining, circuit_breakers_stat_names_), |
1912 | 6.53k | budget_percent, min_retry_concurrency); |
1913 | 6.53k | } |
1914 | | |
1915 | | PriorityStateManager::PriorityStateManager(ClusterImplBase& cluster, |
1916 | | const LocalInfo::LocalInfo& local_info, |
1917 | | PrioritySet::HostUpdateCb* update_cb) |
1918 | 3.26k | : parent_(cluster), local_info_node_(local_info.node()), update_cb_(update_cb) {} |
1919 | | |
1920 | | void PriorityStateManager::initializePriorityFor( |
1921 | 3.26k | const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint) { |
1922 | 3.26k | const uint32_t priority = locality_lb_endpoint.priority(); |
1923 | 3.26k | if (priority_state_.size() <= priority) { |
1924 | 3.26k | priority_state_.resize(priority + 1); |
1925 | 3.26k | } |
1926 | 3.26k | if (priority_state_[priority].first == nullptr) { |
1927 | 3.26k | priority_state_[priority].first = std::make_unique<HostVector>(); |
1928 | 3.26k | } |
1929 | 3.26k | if (locality_lb_endpoint.has_locality() && locality_lb_endpoint.has_load_balancing_weight()) { |
1930 | 0 | priority_state_[priority].second[locality_lb_endpoint.locality()] = |
1931 | 0 | locality_lb_endpoint.load_balancing_weight().value(); |
1932 | 0 | } |
1933 | 3.26k | } |
1934 | | |
1935 | | void PriorityStateManager::registerHostForPriority( |
1936 | | const std::string& hostname, Network::Address::InstanceConstSharedPtr address, |
1937 | | const std::vector<Network::Address::InstanceConstSharedPtr>& address_list, |
1938 | | const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint, |
1939 | 3.26k | const envoy::config::endpoint::v3::LbEndpoint& lb_endpoint, TimeSource& time_source) { |
1940 | 3.26k | auto metadata = lb_endpoint.has_metadata() |
1941 | 3.26k | ? parent_.constMetadataSharedPool()->getObject(lb_endpoint.metadata()) |
1942 | 3.26k | : nullptr; |
1943 | 3.26k | const auto host = std::make_shared<HostImpl>( |
1944 | 3.26k | parent_.info(), hostname, address, metadata, lb_endpoint.load_balancing_weight().value(), |
1945 | 3.26k | locality_lb_endpoint.locality(), lb_endpoint.endpoint().health_check_config(), |
1946 | 3.26k | locality_lb_endpoint.priority(), lb_endpoint.health_status(), time_source); |
1947 | 3.26k | if (!address_list.empty()) { |
1948 | 0 | host->setAddressList(address_list); |
1949 | 0 | } |
1950 | 3.26k | registerHostForPriority(host, locality_lb_endpoint); |
1951 | 3.26k | } |
1952 | | |
1953 | | void PriorityStateManager::registerHostForPriority( |
1954 | | const HostSharedPtr& host, |
1955 | 3.26k | const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint) { |
1956 | 3.26k | const uint32_t priority = locality_lb_endpoint.priority(); |
1957 | | // Should be called after initializePriorityFor. |
1958 | 3.26k | ASSERT(priority_state_[priority].first); |
1959 | 3.26k | priority_state_[priority].first->emplace_back(host); |
1960 | 3.26k | } |
1961 | | |
1962 | | void PriorityStateManager::updateClusterPrioritySet( |
1963 | | const uint32_t priority, HostVectorSharedPtr&& current_hosts, |
1964 | | const absl::optional<HostVector>& hosts_added, const absl::optional<HostVector>& hosts_removed, |
1965 | | const absl::optional<Upstream::Host::HealthFlag> health_checker_flag, |
1966 | | absl::optional<bool> weighted_priority_health, |
1967 | 3.26k | absl::optional<uint32_t> overprovisioning_factor) { |
1968 | | // If local locality is not defined then skip populating per locality hosts. |
1969 | 3.26k | const auto& local_locality = local_info_node_.locality(); |
1970 | 3.26k | ENVOY_LOG(trace, "Local locality: {}", local_locality.DebugString()); |
1971 | | |
1972 | | // For non-EDS, most likely the current hosts are from priority_state_[priority].first. |
1973 | 3.26k | HostVectorSharedPtr hosts(std::move(current_hosts)); |
1974 | 3.26k | LocalityWeightsMap empty_locality_map; |
1975 | 3.26k | LocalityWeightsMap& locality_weights_map = |
1976 | 3.26k | priority_state_.size() > priority ? priority_state_[priority].second : empty_locality_map; |
1977 | 3.26k | ASSERT(priority_state_.size() > priority || locality_weights_map.empty()); |
1978 | 3.26k | LocalityWeightsSharedPtr locality_weights; |
1979 | 3.26k | std::vector<HostVector> per_locality; |
1980 | | |
1981 | | // If we are configured for locality weighted LB we populate the locality weights. We also |
1982 | | // populate locality weights if the cluster uses load balancing extensions, since the extension |
1983 | | // may want to make use of locality weights and we cannot tell by inspecting the config whether |
1984 | | // this is the case. |
1985 | | // |
1986 | | // TODO: have the load balancing extension indicate, programmatically, whether it needs locality |
1987 | | // weights, as an optimization in cases where it doesn't. |
1988 | 3.26k | const bool locality_weighted_lb = |
1989 | 3.26k | parent_.info()->lbConfig().has_locality_weighted_lb_config() || |
1990 | 3.26k | parent_.info()->lbType() == LoadBalancerType::LoadBalancingPolicyConfig; |
1991 | 3.26k | if (locality_weighted_lb) { |
1992 | 3.26k | locality_weights = std::make_shared<LocalityWeights>(); |
1993 | 3.26k | } |
1994 | | |
1995 | | // We use std::map to guarantee a stable ordering for zone aware routing. |
1996 | 3.26k | std::map<envoy::config::core::v3::Locality, HostVector, LocalityLess> hosts_per_locality; |
1997 | | |
1998 | 3.26k | for (const HostSharedPtr& host : *hosts) { |
1999 | | // Take into consideration when a non-EDS cluster has active health checking, i.e. to mark all |
2000 | | // the hosts unhealthy (host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC)) and then fire |
2001 | | // update callbacks to start the health checking process. The endpoint with disabled active |
2002 | | // health check should not be set FAILED_ACTIVE_HC here. |
2003 | 3.26k | if (health_checker_flag.has_value() && !host->disableActiveHealthCheck()) { |
2004 | 0 | host->healthFlagSet(health_checker_flag.value()); |
2005 | 0 | } |
2006 | 3.26k | hosts_per_locality[host->locality()].push_back(host); |
2007 | 3.26k | } |
2008 | | |
2009 | | // Do we have hosts for the local locality? |
2010 | 3.26k | const bool non_empty_local_locality = |
2011 | 3.26k | local_info_node_.has_locality() && |
2012 | 3.26k | hosts_per_locality.find(local_locality) != hosts_per_locality.end(); |
2013 | | |
2014 | | // As per HostsPerLocality::get(), the per_locality vector must have the local locality hosts |
2015 | | // first if non_empty_local_locality. |
2016 | 3.26k | if (non_empty_local_locality) { |
2017 | 0 | per_locality.emplace_back(hosts_per_locality[local_locality]); |
2018 | 0 | if (locality_weighted_lb) { |
2019 | 0 | locality_weights->emplace_back(locality_weights_map[local_locality]); |
2020 | 0 | } |
2021 | 0 | } |
2022 | | |
2023 | | // After the local locality hosts (if any), we place the remaining locality host groups in |
2024 | | // lexicographic order. This provides a stable ordering for zone aware routing. |
2025 | 3.26k | for (auto& entry : hosts_per_locality) { |
2026 | 3.26k | if (!non_empty_local_locality || !LocalityEqualTo()(local_locality, entry.first)) { |
2027 | 3.26k | per_locality.emplace_back(entry.second); |
2028 | 3.26k | if (locality_weighted_lb) { |
2029 | 3.26k | locality_weights->emplace_back(locality_weights_map[entry.first]); |
2030 | 3.26k | } |
2031 | 3.26k | } |
2032 | 3.26k | } |
2033 | | |
2034 | 3.26k | auto per_locality_shared = |
2035 | 3.26k | std::make_shared<HostsPerLocalityImpl>(std::move(per_locality), non_empty_local_locality); |
2036 | | |
2037 | | // If a batch update callback was provided, use that. Otherwise directly update |
2038 | | // the PrioritySet. |
2039 | 3.26k | if (update_cb_ != nullptr) { |
2040 | 14 | update_cb_->updateHosts(priority, HostSetImpl::partitionHosts(hosts, per_locality_shared), |
2041 | 14 | std::move(locality_weights), hosts_added.value_or(*hosts), |
2042 | 14 | hosts_removed.value_or<HostVector>({}), weighted_priority_health, |
2043 | 14 | overprovisioning_factor); |
2044 | 3.25k | } else { |
2045 | 3.25k | parent_.prioritySet().updateHosts( |
2046 | 3.25k | priority, HostSetImpl::partitionHosts(hosts, per_locality_shared), |
2047 | 3.25k | std::move(locality_weights), hosts_added.value_or(*hosts), |
2048 | 3.25k | hosts_removed.value_or<HostVector>({}), weighted_priority_health, overprovisioning_factor); |
2049 | 3.25k | } |
2050 | 3.26k | } |
2051 | | |
2052 | | bool BaseDynamicClusterImpl::updateDynamicHostList( |
2053 | | const HostVector& new_hosts, HostVector& current_priority_hosts, |
2054 | | HostVector& hosts_added_to_current_priority, HostVector& hosts_removed_from_current_priority, |
2055 | 14 | const HostMap& all_hosts, const absl::flat_hash_set<std::string>& all_new_hosts) { |
2056 | 14 | uint64_t max_host_weight = 1; |
2057 | | |
2058 | | // Did hosts change? |
2059 | | // |
2060 | | // Have host attributes changed the health of any endpoint? If so, we |
2061 | | // rebuild the hosts vectors. We only do this if the health status of an |
2062 | | // endpoint has materially changed (e.g. if previously failing active health |
2063 | | // checks, we just note it's now failing EDS health status but don't rebuild). |
2064 | | // |
2065 | | // TODO(htuch): We can be smarter about this potentially, and not force a full |
2066 | | // host set update on health status change. The way this would work is to |
2067 | | // implement a HealthChecker subclass that provides thread local health |
2068 | | // updates to the Cluster object. This will probably make sense to do in |
2069 | | // conjunction with https://github.com/envoyproxy/envoy/issues/2874. |
2070 | 14 | bool hosts_changed = false; |
2071 | | |
2072 | | // Go through and see if the list we have is different from what we just got. If it is, we make |
2073 | | // a new host list and raise a change notification. We also check for duplicates here. It's |
2074 | | // possible for DNS to return the same address multiple times, and a bad EDS implementation |
2075 | | // could do the same thing. |
2076 | | |
2077 | | // Keep track of hosts we see in new_hosts that we are able to match up with an existing host. |
2078 | 14 | absl::flat_hash_set<std::string> existing_hosts_for_current_priority( |
2079 | 14 | current_priority_hosts.size()); |
2080 | | // Keep track of hosts we're adding (or replacing) |
2081 | 14 | absl::flat_hash_set<std::string> new_hosts_for_current_priority(new_hosts.size()); |
2082 | | // Keep track of hosts for which locality is changed. |
2083 | 14 | absl::flat_hash_set<std::string> hosts_with_updated_locality_for_current_priority( |
2084 | 14 | current_priority_hosts.size()); |
2085 | | // Keep track of hosts for which active health check flag is changed. |
2086 | 14 | absl::flat_hash_set<std::string> hosts_with_active_health_check_flag_changed( |
2087 | 14 | current_priority_hosts.size()); |
2088 | 14 | HostVector final_hosts; |
2089 | 14 | for (const HostSharedPtr& host : new_hosts) { |
2090 | | // To match a new host with an existing host means comparing their addresses. |
2091 | 14 | auto existing_host = all_hosts.find(addressToString(host->address())); |
2092 | 14 | const bool existing_host_found = existing_host != all_hosts.end(); |
2093 | | |
2094 | | // Clear any pending deletion flag on an existing host in case it came back while it was |
2095 | | // being stabilized. We will set it again below if needed. |
2096 | 14 | if (existing_host_found) { |
2097 | 0 | existing_host->second->healthFlagClear(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL); |
2098 | 0 | } |
2099 | | |
2100 | | // Check if in-place host update should be skipped, i.e. when the following criteria are met |
2101 | | // (currently there is only one criterion, but we might add more in the future): |
2102 | | // - The cluster health checker is activated and a new host is matched with the existing one, |
2103 | | // but the health check address is different. |
2104 | 14 | const bool health_check_address_changed = |
2105 | 14 | (health_checker_ != nullptr && existing_host_found && |
2106 | 14 | *existing_host->second->healthCheckAddress() != *host->healthCheckAddress()); |
2107 | 14 | bool locality_changed = false; |
2108 | 14 | locality_changed = (existing_host_found && |
2109 | 14 | (!LocalityEqualTo()(host->locality(), existing_host->second->locality()))); |
2110 | 14 | if (locality_changed) { |
2111 | 0 | hosts_with_updated_locality_for_current_priority.emplace(existing_host->first); |
2112 | 0 | } |
2113 | | |
2114 | 14 | const bool active_health_check_flag_changed = |
2115 | 14 | (health_checker_ != nullptr && existing_host_found && |
2116 | 14 | existing_host->second->disableActiveHealthCheck() != host->disableActiveHealthCheck()); |
2117 | 14 | if (active_health_check_flag_changed) { |
2118 | 0 | hosts_with_active_health_check_flag_changed.emplace(existing_host->first); |
2119 | 0 | } |
2120 | 14 | const bool skip_inplace_host_update = |
2121 | 14 | health_check_address_changed || locality_changed || active_health_check_flag_changed; |
2122 | | |
2123 | | // When there is a match and we decided to do in-place update, we potentially update the |
2124 | | // host's health check flag and metadata. Afterwards, the host is pushed back into the |
2125 | | // final_hosts, i.e. hosts that should be preserved in the current priority. |
2126 | 14 | if (existing_host_found && !skip_inplace_host_update) { |
2127 | 0 | existing_hosts_for_current_priority.emplace(existing_host->first); |
2128 | | // If we find a host matched based on address, we keep it. However we do change weight |
2129 | | // inline so do that here. |
2130 | 0 | if (host->weight() > max_host_weight) { |
2131 | 0 | max_host_weight = host->weight(); |
2132 | 0 | } |
2133 | 0 | if (existing_host->second->weight() != host->weight()) { |
2134 | 0 | existing_host->second->weight(host->weight()); |
2135 | | // We do full host set rebuilds so that load balancers can do pre-computation of data |
2136 | | // structures based on host weight. This may become a performance problem in certain |
2137 | | // deployments so it is runtime feature guarded and may also need to be configurable |
2138 | | // and/or dynamic in the future. |
2139 | 0 | hosts_changed = true; |
2140 | 0 | } |
2141 | |
|
2142 | 0 | hosts_changed |= updateEdsHealthFlag(*host, *existing_host->second); |
2143 | | |
2144 | | // Did metadata change? |
2145 | 0 | bool metadata_changed = true; |
2146 | 0 | if (host->metadata() && existing_host->second->metadata()) { |
2147 | 0 | metadata_changed = !Protobuf::util::MessageDifferencer::Equivalent( |
2148 | 0 | *host->metadata(), *existing_host->second->metadata()); |
2149 | 0 | } else if (!host->metadata() && !existing_host->second->metadata()) { |
2150 | 0 | metadata_changed = false; |
2151 | 0 | } |
2152 | |
|
2153 | 0 | if (metadata_changed) { |
2154 | | // First, update the entire metadata for the endpoint. |
2155 | 0 | existing_host->second->metadata(host->metadata()); |
2156 | | |
2157 | | // Also, given that the canary attribute of an endpoint is derived from its metadata |
2158 | | // (e.g.: from envoy.lb/canary), we do a blind update here since it's cheaper than testing |
2159 | | // to see if it actually changed. We must update this besides just updating the metadata, |
2160 | | // because it'll be used by the router filter to compute upstream stats. |
2161 | 0 | existing_host->second->canary(host->canary()); |
2162 | | |
2163 | | // If metadata changed, we need to rebuild. See github issue #3810. |
2164 | 0 | hosts_changed = true; |
2165 | 0 | } |
2166 | | |
2167 | | // Did the priority change? |
2168 | 0 | if (host->priority() != existing_host->second->priority()) { |
2169 | 0 | existing_host->second->priority(host->priority()); |
2170 | 0 | hosts_added_to_current_priority.emplace_back(existing_host->second); |
2171 | 0 | } |
2172 | |
|
2173 | 0 | final_hosts.push_back(existing_host->second); |
2174 | 14 | } else { |
2175 | 14 | new_hosts_for_current_priority.emplace(addressToString(host->address())); |
2176 | 14 | if (host->weight() > max_host_weight) { |
2177 | 0 | max_host_weight = host->weight(); |
2178 | 0 | } |
2179 | | |
2180 | | // If we are depending on a health checker, we initialize to unhealthy. |
2181 | | // If there's an existing host with the same health checker, the |
2182 | | // active health-status is kept. |
2183 | 14 | if (health_checker_ != nullptr && !host->disableActiveHealthCheck()) { |
2184 | 0 | if (Runtime::runtimeFeatureEnabled( |
2185 | 0 | "envoy.reloadable_features.keep_endpoint_active_hc_status_on_locality_update")) { |
2186 | 0 | if (existing_host_found && !health_check_address_changed && |
2187 | 0 | !active_health_check_flag_changed) { |
2188 | | // If there's an existing host, use the same active health-status. |
2189 | | // The existing host can be marked PENDING_ACTIVE_HC or |
2190 | | // ACTIVE_HC_TIMEOUT if it is also marked with FAILED_ACTIVE_HC. |
2191 | 0 | ASSERT(!existing_host->second->healthFlagGet(Host::HealthFlag::PENDING_ACTIVE_HC) || |
2192 | 0 | existing_host->second->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); |
2193 | 0 | ASSERT(!existing_host->second->healthFlagGet(Host::HealthFlag::ACTIVE_HC_TIMEOUT) || |
2194 | 0 | existing_host->second->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); |
2195 | | |
2196 | 0 | constexpr uint32_t active_hc_statuses_mask = |
2197 | 0 | enumToInt(Host::HealthFlag::FAILED_ACTIVE_HC) | |
2198 | 0 | enumToInt(Host::HealthFlag::DEGRADED_ACTIVE_HC) | |
2199 | 0 | enumToInt(Host::HealthFlag::PENDING_ACTIVE_HC) | |
2200 | 0 | enumToInt(Host::HealthFlag::ACTIVE_HC_TIMEOUT); |
2201 | |
|
2202 | 0 | const uint32_t existing_host_statuses = existing_host->second->healthFlagsGetAll(); |
2203 | 0 | host->healthFlagsSetAll(existing_host_statuses & active_hc_statuses_mask); |
2204 | 0 | } else { |
2205 | | // No previous known host, mark it as failed active HC. |
2206 | 0 | host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC); |
2207 | | |
2208 | | // If we want to exclude hosts until they have been health checked, mark them with |
2209 | | // a flag to indicate that they have not been health checked yet. |
2210 | 0 | if (info_->warmHosts()) { |
2211 | 0 | host->healthFlagSet(Host::HealthFlag::PENDING_ACTIVE_HC); |
2212 | 0 | } |
2213 | 0 | } |
2214 | 0 | } else { |
2215 | 0 | host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC); |
2216 | | |
2217 | | // If we want to exclude hosts until they have been health checked, mark them with |
2218 | | // a flag to indicate that they have not been health checked yet. |
2219 | 0 | if (info_->warmHosts()) { |
2220 | 0 | host->healthFlagSet(Host::HealthFlag::PENDING_ACTIVE_HC); |
2221 | 0 | } |
2222 | 0 | } |
2223 | 0 | } |
2224 | | |
2225 | 14 | final_hosts.push_back(host); |
2226 | 14 | hosts_added_to_current_priority.push_back(host); |
2227 | 14 | } |
2228 | 14 | } |
2229 | | |
2230 | | // Remove hosts from current_priority_hosts that were matched to an existing host in the |
2231 | | // previous loop. |
2232 | 14 | auto erase_from = |
2233 | 14 | std::remove_if(current_priority_hosts.begin(), current_priority_hosts.end(), |
2234 | 14 | [&existing_hosts_for_current_priority](const HostSharedPtr& p) { |
2235 | 0 | auto existing_itr = |
2236 | 0 | existing_hosts_for_current_priority.find(p->address()->asString()); |
2237 | |
|
2238 | 0 | if (existing_itr != existing_hosts_for_current_priority.end()) { |
2239 | 0 | existing_hosts_for_current_priority.erase(existing_itr); |
2240 | 0 | return true; |
2241 | 0 | } |
2242 | | |
2243 | 0 | return false; |
2244 | 0 | }); |
2245 | 14 | current_priority_hosts.erase(erase_from, current_priority_hosts.end()); |
2246 | | |
2247 | | // If we saw existing hosts during this iteration from a different priority, then we've moved |
2248 | | // a host from another priority into this one, so we should mark the priority as having changed. |
2249 | 14 | if (!existing_hosts_for_current_priority.empty()) { |
2250 | 0 | hosts_changed = true; |
2251 | 0 | } |
2252 | | |
2253 | | // The remaining hosts are hosts that are not referenced in the config update. We remove them |
2254 | | // from the priority if any of the following is true: |
2255 | | // - Active health checking is not enabled. |
2256 | | // - The removed hosts are failing active health checking OR have been explicitly marked as |
2257 | | // unhealthy by a previous EDS update. We do not count outlier as a reason to remove a host |
2258 | | // or any other future health condition that may be added so we do not use the coarseHealth() |
2259 | | // API. |
2260 | | // - We have explicitly configured the cluster to remove hosts regardless of active health |
2261 | | // status. |
2262 | 14 | const bool dont_remove_healthy_hosts = |
2263 | 14 | health_checker_ != nullptr && !info()->drainConnectionsOnHostRemoval(); |
2264 | 14 | if (!current_priority_hosts.empty() && dont_remove_healthy_hosts) { |
2265 | 0 | erase_from = std::remove_if( |
2266 | 0 | current_priority_hosts.begin(), current_priority_hosts.end(), |
2267 | 0 | [&all_new_hosts, &new_hosts_for_current_priority, |
2268 | 0 | &hosts_with_updated_locality_for_current_priority, |
2269 | 0 | &hosts_with_active_health_check_flag_changed, &final_hosts, |
2270 | 0 | &max_host_weight](const HostSharedPtr& p) { |
2271 | | // This host has already been added as a new host in the |
2272 | | // new_hosts_for_current_priority. Return false here to make sure that host |
2273 | | // reference with older locality gets cleaned up from the priority. |
2274 | 0 | if (hosts_with_updated_locality_for_current_priority.contains(p->address()->asString())) { |
2275 | 0 | return false; |
2276 | 0 | } |
2277 | | |
2278 | 0 | if (hosts_with_active_health_check_flag_changed.contains(p->address()->asString())) { |
2279 | 0 | return false; |
2280 | 0 | } |
2281 | | |
2282 | 0 | if (all_new_hosts.contains(p->address()->asString()) && |
2283 | 0 | !new_hosts_for_current_priority.contains(p->address()->asString())) { |
2284 | | // If the address is being completely deleted from this priority, but is |
2285 | | // referenced from another priority, then we assume that the other |
2286 | | // priority will perform an in-place update to re-use the existing Host. |
2287 | | // We should therefore not mark it as PENDING_DYNAMIC_REMOVAL, but |
2288 | | // instead remove it immediately from this priority. |
2289 | | // Example: health check address changed and priority also changed |
2290 | 0 | return false; |
2291 | 0 | } |
2292 | | |
2293 | | // PENDING_DYNAMIC_REMOVAL doesn't apply for the host with disabled active |
2294 | | // health check, the host is removed immediately from this priority. |
2295 | 0 | if ((!(p->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) || |
2296 | 0 | p->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH))) && |
2297 | 0 | !p->disableActiveHealthCheck()) { |
2298 | 0 | if (p->weight() > max_host_weight) { |
2299 | 0 | max_host_weight = p->weight(); |
2300 | 0 | } |
2301 | |
|
2302 | 0 | final_hosts.push_back(p); |
2303 | 0 | p->healthFlagSet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL); |
2304 | 0 | return true; |
2305 | 0 | } |
2306 | 0 | return false; |
2307 | 0 | }); |
2308 | 0 | current_priority_hosts.erase(erase_from, current_priority_hosts.end()); |
2309 | 0 | } |
2310 | | |
2311 | | // At this point we've accounted for all the new hosts as well the hosts that previously |
2312 | | // existed in this priority. |
2313 | 14 | info_->endpointStats().max_host_weight_.set(max_host_weight); |
2314 | | |
2315 | | // Whatever remains in current_priority_hosts should be removed. |
2316 | 14 | if (!hosts_added_to_current_priority.empty() || !current_priority_hosts.empty()) { |
2317 | 14 | hosts_removed_from_current_priority = std::move(current_priority_hosts); |
2318 | 14 | hosts_changed = true; |
2319 | 14 | } |
2320 | | |
2321 | | // During the update we populated final_hosts with all the hosts that should remain |
2322 | | // in the current priority, so move them back into current_priority_hosts. |
2323 | 14 | current_priority_hosts = std::move(final_hosts); |
2324 | | // We return false here in the absence of EDS health status or metadata changes, because we |
2325 | | // have no changes to host vector status (modulo weights). When we have EDS |
2326 | | // health status or metadata changed, we return true, causing updateHosts() to fire in the |
2327 | | // caller. |
2328 | 14 | return hosts_changed; |
2329 | 14 | } |
2330 | | |
2331 | | Network::DnsLookupFamily |
2332 | 0 | getDnsLookupFamilyFromCluster(const envoy::config::cluster::v3::Cluster& cluster) { |
2333 | 0 | return DnsUtils::getDnsLookupFamilyFromEnum(cluster.dns_lookup_family()); |
2334 | 0 | } |
2335 | | |
2336 | | void reportUpstreamCxDestroy(const Upstream::HostDescriptionConstSharedPtr& host, |
2337 | 1.32k | Network::ConnectionEvent event) { |
2338 | 1.32k | Upstream::ClusterTrafficStats& stats = *host->cluster().trafficStats(); |
2339 | 1.32k | stats.upstream_cx_destroy_.inc(); |
2340 | 1.32k | if (event == Network::ConnectionEvent::RemoteClose) { |
2341 | 619 | stats.upstream_cx_destroy_remote_.inc(); |
2342 | 706 | } else { |
2343 | 706 | stats.upstream_cx_destroy_local_.inc(); |
2344 | 706 | } |
2345 | 1.32k | } |
2346 | | |
2347 | | void reportUpstreamCxDestroyActiveRequest(const Upstream::HostDescriptionConstSharedPtr& host, |
2348 | 61 | Network::ConnectionEvent event) { |
2349 | 61 | Upstream::ClusterTrafficStats& stats = *host->cluster().trafficStats(); |
2350 | 61 | stats.upstream_cx_destroy_with_active_rq_.inc(); |
2351 | 61 | if (event == Network::ConnectionEvent::RemoteClose) { |
2352 | 27 | stats.upstream_cx_destroy_remote_with_active_rq_.inc(); |
2353 | 34 | } else { |
2354 | 34 | stats.upstream_cx_destroy_local_with_active_rq_.inc(); |
2355 | 34 | } |
2356 | 61 | } |
2357 | | |
2358 | | Network::Address::InstanceConstSharedPtr resolveHealthCheckAddress( |
2359 | | const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config, |
2360 | 211k | Network::Address::InstanceConstSharedPtr host_address) { |
2361 | 211k | Network::Address::InstanceConstSharedPtr health_check_address; |
2362 | 211k | const auto& port_value = health_check_config.port_value(); |
2363 | 211k | if (health_check_config.has_address()) { |
2364 | 0 | auto address = Network::Address::resolveProtoAddress(health_check_config.address()); |
2365 | 0 | health_check_address = |
2366 | 0 | port_value == 0 ? address : Network::Utility::getAddressWithPort(*address, port_value); |
2367 | 211k | } else { |
2368 | 211k | health_check_address = port_value == 0 |
2369 | 211k | ? host_address |
2370 | 211k | : Network::Utility::getAddressWithPort(*host_address, port_value); |
2371 | 211k | } |
2372 | 211k | return health_check_address; |
2373 | 211k | } |
2374 | | |
2375 | | } // namespace Upstream |
2376 | | } // namespace Envoy |