/proc/self/cwd/source/common/upstream/upstream_impl.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/common/upstream/upstream_impl.h" |
2 | | |
3 | | #include <chrono> |
4 | | #include <cstdint> |
5 | | #include <limits> |
6 | | #include <list> |
7 | | #include <memory> |
8 | | #include <string> |
9 | | #include <vector> |
10 | | |
11 | | #include "envoy/config/cluster/v3/circuit_breaker.pb.h" |
12 | | #include "envoy/config/cluster/v3/cluster.pb.h" |
13 | | #include "envoy/config/core/v3/address.pb.h" |
14 | | #include "envoy/config/core/v3/base.pb.h" |
15 | | #include "envoy/config/core/v3/health_check.pb.h" |
16 | | #include "envoy/config/core/v3/protocol.pb.h" |
17 | | #include "envoy/config/endpoint/v3/endpoint_components.pb.h" |
18 | | #include "envoy/config/upstream/local_address_selector/v3/default_local_address_selector.pb.h" |
19 | | #include "envoy/event/dispatcher.h" |
20 | | #include "envoy/event/timer.h" |
21 | | #include "envoy/extensions/filters/http/upstream_codec/v3/upstream_codec.pb.h" |
22 | | #include "envoy/extensions/transport_sockets/http_11_proxy/v3/upstream_http_11_connect.pb.h" |
23 | | #include "envoy/extensions/transport_sockets/raw_buffer/v3/raw_buffer.pb.h" |
24 | | #include "envoy/init/manager.h" |
25 | | #include "envoy/network/dns.h" |
26 | | #include "envoy/network/transport_socket.h" |
27 | | #include "envoy/registry/registry.h" |
28 | | #include "envoy/secret/secret_manager.h" |
29 | | #include "envoy/server/filter_config.h" |
30 | | #include "envoy/server/transport_socket_config.h" |
31 | | #include "envoy/ssl/context_manager.h" |
32 | | #include "envoy/stats/scope.h" |
33 | | #include "envoy/upstream/health_checker.h" |
34 | | #include "envoy/upstream/upstream.h" |
35 | | |
36 | | #include "source/common/common/dns_utils.h" |
37 | | #include "source/common/common/enum_to_int.h" |
38 | | #include "source/common/common/fmt.h" |
39 | | #include "source/common/common/utility.h" |
40 | | #include "source/common/config/utility.h" |
41 | | #include "source/common/http/http1/codec_stats.h" |
42 | | #include "source/common/http/http2/codec_stats.h" |
43 | | #include "source/common/http/utility.h" |
44 | | #include "source/common/network/address_impl.h" |
45 | | #include "source/common/network/filter_state_proxy_info.h" |
46 | | #include "source/common/network/happy_eyeballs_connection_impl.h" |
47 | | #include "source/common/network/resolver_impl.h" |
48 | | #include "source/common/network/socket_option_factory.h" |
49 | | #include "source/common/network/socket_option_impl.h" |
50 | | #include "source/common/protobuf/protobuf.h" |
51 | | #include "source/common/protobuf/utility.h" |
52 | | #include "source/common/router/config_utility.h" |
53 | | #include "source/common/runtime/runtime_features.h" |
54 | | #include "source/common/runtime/runtime_impl.h" |
55 | | #include "source/common/stats/deferred_creation.h" |
56 | | #include "source/common/upstream/cluster_factory_impl.h" |
57 | | #include "source/common/upstream/health_checker_impl.h" |
58 | | #include "source/extensions/filters/network/http_connection_manager/config.h" |
59 | | #include "source/server/transport_socket_config_impl.h" |
60 | | |
61 | | #include "absl/container/node_hash_set.h" |
62 | | #include "absl/strings/str_cat.h" |
63 | | |
64 | | namespace Envoy { |
65 | | namespace Upstream { |
66 | | namespace { |
67 | 2.43k | std::string addressToString(Network::Address::InstanceConstSharedPtr address) { |
68 | 2.43k | if (!address) { |
69 | 0 | return ""; |
70 | 0 | } |
71 | 2.43k | return address->asString(); |
72 | 2.43k | } |
73 | | |
74 | | Network::TcpKeepaliveConfig |
75 | 0 | parseTcpKeepaliveConfig(const envoy::config::cluster::v3::Cluster& config) { |
76 | 0 | const envoy::config::core::v3::TcpKeepalive& options = |
77 | 0 | config.upstream_connection_options().tcp_keepalive(); |
78 | 0 | return Network::TcpKeepaliveConfig{ |
79 | 0 | PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, keepalive_probes, absl::optional<uint32_t>()), |
80 | 0 | PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, keepalive_time, absl::optional<uint32_t>()), |
81 | 0 | PROTOBUF_GET_WRAPPED_OR_DEFAULT(options, keepalive_interval, absl::optional<uint32_t>())}; |
82 | 0 | } |
83 | | |
84 | | absl::StatusOr<ProtocolOptionsConfigConstSharedPtr> |
85 | | createProtocolOptionsConfig(const std::string& name, const ProtobufWkt::Any& typed_config, |
86 | 1.54k | Server::Configuration::ProtocolOptionsFactoryContext& factory_context) { |
87 | 1.54k | Server::Configuration::ProtocolOptionsFactory* factory = |
88 | 1.54k | Registry::FactoryRegistry<Server::Configuration::NamedNetworkFilterConfigFactory>::getFactory( |
89 | 1.54k | name); |
90 | 1.54k | if (factory == nullptr) { |
91 | 1.54k | factory = |
92 | 1.54k | Registry::FactoryRegistry<Server::Configuration::NamedHttpFilterConfigFactory>::getFactory( |
93 | 1.54k | name); |
94 | 1.54k | } |
95 | 1.54k | if (factory == nullptr) { |
96 | 1.54k | factory = |
97 | 1.54k | Registry::FactoryRegistry<Server::Configuration::ProtocolOptionsFactory>::getFactory(name); |
98 | 1.54k | } |
99 | | |
100 | 1.54k | if (factory == nullptr) { |
101 | 0 | return absl::InvalidArgumentError( |
102 | 0 | fmt::format("Didn't find a registered network or http filter or protocol " |
103 | 0 | "options implementation for name: '{}'", |
104 | 0 | name)); |
105 | 0 | } |
106 | | |
107 | 1.54k | ProtobufTypes::MessagePtr proto_config = factory->createEmptyProtocolOptionsProto(); |
108 | | |
109 | 1.54k | if (proto_config == nullptr) { |
110 | 0 | return absl::InvalidArgumentError( |
111 | 0 | fmt::format("filter {} does not support protocol options", name)); |
112 | 0 | } |
113 | | |
114 | 1.54k | Envoy::Config::Utility::translateOpaqueConfig( |
115 | 1.54k | typed_config, factory_context.messageValidationVisitor(), *proto_config); |
116 | 1.54k | return factory->createProtocolOptionsConfig(*proto_config, factory_context); |
117 | 1.54k | } |
118 | | |
119 | | absl::StatusOr<absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr>> |
120 | | parseExtensionProtocolOptions( |
121 | | const envoy::config::cluster::v3::Cluster& config, |
122 | 2.40k | Server::Configuration::ProtocolOptionsFactoryContext& factory_context) { |
123 | 2.40k | absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr> options; |
124 | | |
125 | 2.40k | for (const auto& it : config.typed_extension_protocol_options()) { |
126 | 1.54k | auto& name = it.first; |
127 | 1.54k | auto object_or_error = createProtocolOptionsConfig(name, it.second, factory_context); |
128 | 1.54k | RETURN_IF_NOT_OK_REF(object_or_error.status()); |
129 | 1.54k | if (object_or_error.value() != nullptr) { |
130 | 1.54k | options[name] = std::move(object_or_error.value()); |
131 | 1.54k | } |
132 | 1.54k | } |
133 | | |
134 | 2.40k | return options; |
135 | 2.40k | } |
136 | | |
137 | | // Updates the EDS health flags for an existing host to match the new host. |
138 | | // @param updated_host the new host to read health flag values from. |
139 | | // @param existing_host the host to update. |
140 | | // @return bool whether the flag update caused the host health to change. |
141 | 0 | bool updateEdsHealthFlag(const Host& updated_host, Host& existing_host) { |
142 | 0 | auto updated_eds_health_status = updated_host.edsHealthStatus(); |
143 | | |
144 | | // Check if the health flag has changed. |
145 | 0 | if (updated_eds_health_status == existing_host.edsHealthStatus()) { |
146 | 0 | return false; |
147 | 0 | } |
148 | | |
149 | 0 | const auto previous_health = existing_host.coarseHealth(); |
150 | 0 | existing_host.setEdsHealthStatus(updated_eds_health_status); |
151 | | |
152 | | // Rebuild if changing the flag affected the host health. |
153 | 0 | return previous_health != existing_host.coarseHealth(); |
154 | 0 | } |
155 | | |
156 | | // Converts a set of hosts into a HostVector, excluding certain hosts. |
157 | | // @param hosts hosts to convert |
158 | | // @param excluded_hosts hosts to exclude from the resulting vector. |
159 | | HostVector filterHosts(const absl::node_hash_set<HostSharedPtr>& hosts, |
160 | 28 | const absl::node_hash_set<HostSharedPtr>& excluded_hosts) { |
161 | 28 | HostVector net_hosts; |
162 | 28 | net_hosts.reserve(hosts.size()); |
163 | | |
164 | 28 | for (const auto& host : hosts) { |
165 | 14 | if (excluded_hosts.find(host) == excluded_hosts.end()) { |
166 | 14 | net_hosts.emplace_back(host); |
167 | 14 | } |
168 | 14 | } |
169 | | |
170 | 28 | return net_hosts; |
171 | 28 | } |
172 | | |
173 | | Stats::ScopeSharedPtr generateStatsScope(const envoy::config::cluster::v3::Cluster& config, |
174 | 2.40k | Stats::Store& stats) { |
175 | 2.40k | return stats.createScope(fmt::format( |
176 | 2.40k | "cluster.{}.", config.alt_stat_name().empty() ? config.name() : config.alt_stat_name())); |
177 | 2.40k | } |
178 | | |
179 | | Network::ConnectionSocket::OptionsSharedPtr |
180 | | buildBaseSocketOptions(const envoy::config::cluster::v3::Cluster& cluster_config, |
181 | 2.40k | const envoy::config::core::v3::BindConfig& bootstrap_bind_config) { |
182 | 2.40k | Network::ConnectionSocket::OptionsSharedPtr base_options = |
183 | 2.40k | std::make_shared<Network::ConnectionSocket::Options>(); |
184 | | |
185 | | // The process-wide `signal()` handling may fail to handle SIGPIPE if overridden |
186 | | // in the process (i.e., on a mobile client). Some OSes support handling it at the socket layer: |
187 | 2.40k | if (ENVOY_SOCKET_SO_NOSIGPIPE.hasValue()) { |
188 | 0 | Network::Socket::appendOptions(base_options, |
189 | 0 | Network::SocketOptionFactory::buildSocketNoSigpipeOptions()); |
190 | 0 | } |
191 | | // Cluster IP_FREEBIND settings, when set, will override the cluster manager wide settings. |
192 | 2.40k | if ((bootstrap_bind_config.freebind().value() && |
193 | 2.40k | !cluster_config.upstream_bind_config().has_freebind()) || |
194 | 2.40k | cluster_config.upstream_bind_config().freebind().value()) { |
195 | 0 | Network::Socket::appendOptions(base_options, |
196 | 0 | Network::SocketOptionFactory::buildIpFreebindOptions()); |
197 | 0 | } |
198 | 2.40k | if (cluster_config.upstream_connection_options().has_tcp_keepalive()) { |
199 | 0 | Network::Socket::appendOptions(base_options, |
200 | 0 | Network::SocketOptionFactory::buildTcpKeepaliveOptions( |
201 | 0 | parseTcpKeepaliveConfig(cluster_config))); |
202 | 0 | } |
203 | | |
204 | 2.40k | return base_options; |
205 | 2.40k | } |
206 | | |
207 | | Network::ConnectionSocket::OptionsSharedPtr |
208 | | buildClusterSocketOptions(const envoy::config::cluster::v3::Cluster& cluster_config, |
209 | 2.40k | const envoy::config::core::v3::BindConfig& bootstrap_bind_config) { |
210 | 2.40k | Network::ConnectionSocket::OptionsSharedPtr cluster_options = |
211 | 2.40k | std::make_shared<Network::ConnectionSocket::Options>(); |
212 | | // Cluster socket_options trump cluster manager wide. |
213 | 2.40k | if (bootstrap_bind_config.socket_options().size() + |
214 | 2.40k | cluster_config.upstream_bind_config().socket_options().size() > |
215 | 2.40k | 0) { |
216 | 0 | auto socket_options = !cluster_config.upstream_bind_config().socket_options().empty() |
217 | 0 | ? cluster_config.upstream_bind_config().socket_options() |
218 | 0 | : bootstrap_bind_config.socket_options(); |
219 | 0 | Network::Socket::appendOptions( |
220 | 0 | cluster_options, Network::SocketOptionFactory::buildLiteralOptions(socket_options)); |
221 | 0 | } |
222 | 2.40k | return cluster_options; |
223 | 2.40k | } |
224 | | |
225 | | absl::StatusOr<std::vector<::Envoy::Upstream::UpstreamLocalAddress>> |
226 | | parseBindConfig(::Envoy::OptRef<const envoy::config::core::v3::BindConfig> bind_config, |
227 | | const absl::optional<std::string>& cluster_name, |
228 | | Network::ConnectionSocket::OptionsSharedPtr base_socket_options, |
229 | 2.40k | Network::ConnectionSocket::OptionsSharedPtr cluster_socket_options) { |
230 | | |
231 | 2.40k | std::vector<::Envoy::Upstream::UpstreamLocalAddress> upstream_local_addresses; |
232 | 2.40k | if (bind_config.has_value()) { |
233 | 0 | UpstreamLocalAddress upstream_local_address; |
234 | 0 | upstream_local_address.address_ = nullptr; |
235 | 0 | if (bind_config->has_source_address()) { |
236 | |
|
237 | 0 | auto address_or_error = |
238 | 0 | ::Envoy::Network::Address::resolveProtoSocketAddress(bind_config->source_address()); |
239 | 0 | RETURN_IF_NOT_OK_REF(address_or_error.status()); |
240 | 0 | upstream_local_address.address_ = address_or_error.value(); |
241 | 0 | } |
242 | 0 | upstream_local_address.socket_options_ = std::make_shared<Network::ConnectionSocket::Options>(); |
243 | |
|
244 | 0 | ::Envoy::Network::Socket::appendOptions(upstream_local_address.socket_options_, |
245 | 0 | base_socket_options); |
246 | 0 | ::Envoy::Network::Socket::appendOptions(upstream_local_address.socket_options_, |
247 | 0 | cluster_socket_options); |
248 | |
|
249 | 0 | upstream_local_addresses.push_back(upstream_local_address); |
250 | |
|
251 | 0 | for (const auto& extra_source_address : bind_config->extra_source_addresses()) { |
252 | 0 | UpstreamLocalAddress extra_upstream_local_address; |
253 | 0 | auto address_or_error = |
254 | 0 | ::Envoy::Network::Address::resolveProtoSocketAddress(extra_source_address.address()); |
255 | 0 | RETURN_IF_NOT_OK_REF(address_or_error.status()); |
256 | 0 | extra_upstream_local_address.address_ = address_or_error.value(); |
257 | |
|
258 | 0 | extra_upstream_local_address.socket_options_ = |
259 | 0 | std::make_shared<::Envoy::Network::ConnectionSocket::Options>(); |
260 | 0 | ::Envoy::Network::Socket::appendOptions(extra_upstream_local_address.socket_options_, |
261 | 0 | base_socket_options); |
262 | |
|
263 | 0 | if (extra_source_address.has_socket_options()) { |
264 | 0 | ::Envoy::Network::Socket::appendOptions( |
265 | 0 | extra_upstream_local_address.socket_options_, |
266 | 0 | ::Envoy::Network::SocketOptionFactory::buildLiteralOptions( |
267 | 0 | extra_source_address.socket_options().socket_options())); |
268 | 0 | } else { |
269 | 0 | ::Envoy::Network::Socket::appendOptions(extra_upstream_local_address.socket_options_, |
270 | 0 | cluster_socket_options); |
271 | 0 | } |
272 | 0 | upstream_local_addresses.push_back(extra_upstream_local_address); |
273 | 0 | } |
274 | | |
275 | 0 | for (const auto& additional_source_address : bind_config->additional_source_addresses()) { |
276 | 0 | UpstreamLocalAddress additional_upstream_local_address; |
277 | 0 | auto address_or_error = |
278 | 0 | ::Envoy::Network::Address::resolveProtoSocketAddress(additional_source_address); |
279 | 0 | RETURN_IF_NOT_OK_REF(address_or_error.status()); |
280 | 0 | additional_upstream_local_address.address_ = address_or_error.value(); |
281 | 0 | additional_upstream_local_address.socket_options_ = |
282 | 0 | std::make_shared<::Envoy::Network::ConnectionSocket::Options>(); |
283 | 0 | ::Envoy::Network::Socket::appendOptions(additional_upstream_local_address.socket_options_, |
284 | 0 | base_socket_options); |
285 | 0 | ::Envoy::Network::Socket::appendOptions(additional_upstream_local_address.socket_options_, |
286 | 0 | cluster_socket_options); |
287 | 0 | upstream_local_addresses.push_back(additional_upstream_local_address); |
288 | 0 | } |
289 | 2.40k | } else { |
290 | | // If there is no bind config specified, then return a nullptr for the address. |
291 | 2.40k | UpstreamLocalAddress local_address; |
292 | 2.40k | local_address.address_ = nullptr; |
293 | 2.40k | local_address.socket_options_ = std::make_shared<::Envoy::Network::ConnectionSocket::Options>(); |
294 | 2.40k | ::Envoy::Network::Socket::appendOptions(local_address.socket_options_, base_socket_options); |
295 | 2.40k | Network::Socket::appendOptions(local_address.socket_options_, cluster_socket_options); |
296 | 2.40k | upstream_local_addresses.push_back(local_address); |
297 | 2.40k | } |
298 | | |
299 | | // Verify that we have valid addresses if size is greater than 1. |
300 | 2.40k | if (upstream_local_addresses.size() > 1) { |
301 | 0 | for (auto const& upstream_local_address : upstream_local_addresses) { |
302 | 0 | if (upstream_local_address.address_ == nullptr) { |
303 | 0 | return absl::InvalidArgumentError(fmt::format( |
304 | 0 | "{}'s upstream binding config has invalid IP addresses.", |
305 | 0 | !(cluster_name.has_value()) ? "Bootstrap" |
306 | 0 | : fmt::format("Cluster {}", cluster_name.value()))); |
307 | 0 | } |
308 | 0 | } |
309 | 0 | } |
310 | | |
311 | 2.40k | return upstream_local_addresses; |
312 | 2.40k | } |
313 | | |
314 | | absl::StatusOr<Envoy::Upstream::UpstreamLocalAddressSelectorConstSharedPtr> |
315 | | createUpstreamLocalAddressSelector( |
316 | | const envoy::config::cluster::v3::Cluster& cluster_config, |
317 | 2.40k | const absl::optional<envoy::config::core::v3::BindConfig>& bootstrap_bind_config) { |
318 | | |
319 | | // Use the cluster bind config if specified. This completely overrides the |
320 | | // bootstrap bind config when present. |
321 | 2.40k | OptRef<const envoy::config::core::v3::BindConfig> bind_config; |
322 | 2.40k | absl::optional<std::string> cluster_name; |
323 | 2.40k | if (cluster_config.has_upstream_bind_config()) { |
324 | 0 | bind_config.emplace(cluster_config.upstream_bind_config()); |
325 | 0 | cluster_name.emplace(cluster_config.name()); |
326 | 2.40k | } else if (bootstrap_bind_config.has_value()) { |
327 | 0 | bind_config.emplace(*bootstrap_bind_config); |
328 | 0 | } |
329 | | |
330 | | // Verify that bind config is valid. |
331 | 2.40k | if (bind_config.has_value()) { |
332 | 0 | if (bind_config->additional_source_addresses_size() > 0 && |
333 | 0 | bind_config->extra_source_addresses_size() > 0) { |
334 | 0 | return absl::InvalidArgumentError(fmt::format( |
335 | 0 | "Can't specify both `extra_source_addresses` and `additional_source_addresses` " |
336 | 0 | "in the {}'s upstream binding config", |
337 | 0 | !(cluster_name.has_value()) ? "Bootstrap" |
338 | 0 | : fmt::format("Cluster {}", cluster_name.value()))); |
339 | 0 | } |
340 | | |
341 | 0 | if (!bind_config->has_source_address() && |
342 | 0 | (bind_config->extra_source_addresses_size() > 0 || |
343 | 0 | bind_config->additional_source_addresses_size() > 0)) { |
344 | 0 | return absl::InvalidArgumentError(fmt::format( |
345 | 0 | "{}'s upstream binding config has extra/additional source addresses but no " |
346 | 0 | "source_address. Extra/additional addresses cannot be specified if " |
347 | 0 | "source_address is not set.", |
348 | 0 | !(cluster_name.has_value()) ? "Bootstrap" |
349 | 0 | : fmt::format("Cluster {}", cluster_name.value()))); |
350 | 0 | } |
351 | 0 | } |
352 | 2.40k | UpstreamLocalAddressSelectorFactory* local_address_selector_factory; |
353 | 2.40k | const envoy::config::core::v3::TypedExtensionConfig* const local_address_selector_config = |
354 | 2.40k | bind_config.has_value() && bind_config->has_local_address_selector() |
355 | 2.40k | ? &bind_config->local_address_selector() |
356 | 2.40k | : nullptr; |
357 | 2.40k | if (local_address_selector_config) { |
358 | 0 | local_address_selector_factory = |
359 | 0 | Config::Utility::getAndCheckFactory<UpstreamLocalAddressSelectorFactory>( |
360 | 0 | *local_address_selector_config, false); |
361 | 2.40k | } else { |
362 | | // Create the default local address selector if one was not specified. |
363 | 2.40k | envoy::config::upstream::local_address_selector::v3::DefaultLocalAddressSelector default_config; |
364 | 2.40k | envoy::config::core::v3::TypedExtensionConfig typed_extension; |
365 | 2.40k | typed_extension.mutable_typed_config()->PackFrom(default_config); |
366 | 2.40k | local_address_selector_factory = |
367 | 2.40k | Config::Utility::getAndCheckFactory<UpstreamLocalAddressSelectorFactory>(typed_extension, |
368 | 2.40k | false); |
369 | 2.40k | } |
370 | 2.40k | absl::StatusOr<std::vector<::Envoy::Upstream::UpstreamLocalAddress>> config_or_error = |
371 | 2.40k | parseBindConfig( |
372 | 2.40k | bind_config, cluster_name, |
373 | 2.40k | buildBaseSocketOptions(cluster_config, bootstrap_bind_config.value_or( |
374 | 2.40k | envoy::config::core::v3::BindConfig{})), |
375 | 2.40k | buildClusterSocketOptions(cluster_config, bootstrap_bind_config.value_or( |
376 | 2.40k | envoy::config::core::v3::BindConfig{}))); |
377 | 2.40k | RETURN_IF_NOT_OK_REF(config_or_error.status()); |
378 | 2.40k | auto selector_or_error = local_address_selector_factory->createLocalAddressSelector( |
379 | 2.40k | config_or_error.value(), cluster_name); |
380 | 2.40k | RETURN_IF_NOT_OK_REF(selector_or_error.status()); |
381 | 2.40k | return selector_or_error.value(); |
382 | 2.40k | } |
383 | | |
384 | | class LoadBalancerFactoryContextImpl : public Upstream::LoadBalancerFactoryContext { |
385 | | public: |
386 | | explicit LoadBalancerFactoryContextImpl( |
387 | | Server::Configuration::ServerFactoryContext& server_context) |
388 | 2.40k | : server_context_(server_context) {} |
389 | | |
390 | 0 | Event::Dispatcher& mainThreadDispatcher() override { |
391 | 0 | return server_context_.mainThreadDispatcher(); |
392 | 0 | } |
393 | | |
394 | | private: |
395 | | Server::Configuration::ServerFactoryContext& server_context_; |
396 | | }; |
397 | | |
398 | | } // namespace |
399 | | |
400 | | // Allow disabling ALPN checks for transport sockets. See |
401 | | // https://github.com/envoyproxy/envoy/issues/22876 |
402 | | const absl::string_view ClusterImplBase::DoNotValidateAlpnRuntimeKey = |
403 | | "config.do_not_validate_alpn_support"; |
404 | | |
405 | | // Overriding drop_overload ratio settings from EDS. |
406 | | const absl::string_view ClusterImplBase::DropOverloadRuntimeKey = |
407 | | "load_balancing_policy.drop_overload_limit"; |
408 | | |
409 | | // TODO(pianiststickman): this implementation takes a lock on the hot path and puts a copy of the |
410 | | // stat name into every host that receives a copy of that metric. This can be improved by putting |
411 | | // a single copy of the stat name into a thread-local key->index map so that the lock can be avoided |
412 | | // and using the index as the key to the stat map instead. |
413 | 0 | void LoadMetricStatsImpl::add(const absl::string_view key, double value) { |
414 | 0 | absl::MutexLock lock(&mu_); |
415 | 0 | if (map_ == nullptr) { |
416 | 0 | map_ = std::make_unique<StatMap>(); |
417 | 0 | } |
418 | 0 | Stat& stat = (*map_)[key]; |
419 | 0 | ++stat.num_requests_with_metric; |
420 | 0 | stat.total_metric_value += value; |
421 | 0 | } |
422 | | |
423 | 0 | LoadMetricStats::StatMapPtr LoadMetricStatsImpl::latch() { |
424 | 0 | absl::MutexLock lock(&mu_); |
425 | 0 | StatMapPtr latched = std::move(map_); |
426 | 0 | map_ = nullptr; |
427 | 0 | return latched; |
428 | 0 | } |
429 | | |
430 | | HostDescriptionImpl::HostDescriptionImpl( |
431 | | ClusterInfoConstSharedPtr cluster, const std::string& hostname, |
432 | | Network::Address::InstanceConstSharedPtr dest_address, MetadataConstSharedPtr endpoint_metadata, |
433 | | MetadataConstSharedPtr locality_metadata, const envoy::config::core::v3::Locality& locality, |
434 | | const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config, |
435 | | uint32_t priority, TimeSource& time_source, const AddressVector& address_list) |
436 | | : HostDescriptionImplBase(cluster, hostname, dest_address, endpoint_metadata, locality_metadata, |
437 | | locality, health_check_config, priority, time_source), |
438 | | address_(dest_address), |
439 | | address_list_or_null_(makeAddressListOrNull(dest_address, address_list)), |
440 | 206k | health_check_address_(resolveHealthCheckAddress(health_check_config, dest_address)) {} Envoy::Upstream::HostDescriptionImpl::HostDescriptionImpl(std::__1::shared_ptr<Envoy::Upstream::ClusterInfo const>, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<Envoy::Network::Address::Instance const>, std::__1::shared_ptr<envoy::config::core::v3::Metadata const>, std::__1::shared_ptr<envoy::config::core::v3::Metadata const>, envoy::config::core::v3::Locality const&, envoy::config::endpoint::v3::Endpoint_HealthCheckConfig const&, unsigned int, Envoy::TimeSource&, std::__1::vector<std::__1::shared_ptr<Envoy::Network::Address::Instance const>, std::__1::allocator<std::__1::shared_ptr<Envoy::Network::Address::Instance const> > > const&) Line | Count | Source | 440 | 205k | health_check_address_(resolveHealthCheckAddress(health_check_config, dest_address)) {} |
Envoy::Upstream::HostDescriptionImpl::HostDescriptionImpl(std::__1::shared_ptr<Envoy::Upstream::ClusterInfo const>, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<Envoy::Network::Address::Instance const>, std::__1::shared_ptr<envoy::config::core::v3::Metadata const>, std::__1::shared_ptr<envoy::config::core::v3::Metadata const>, envoy::config::core::v3::Locality const&, envoy::config::endpoint::v3::Endpoint_HealthCheckConfig const&, unsigned int, Envoy::TimeSource&, std::__1::vector<std::__1::shared_ptr<Envoy::Network::Address::Instance const>, std::__1::allocator<std::__1::shared_ptr<Envoy::Network::Address::Instance const> > > const&) Line | Count | Source | 440 | 815 | health_check_address_(resolveHealthCheckAddress(health_check_config, dest_address)) {} |
|
441 | | |
442 | | HostDescriptionImplBase::HostDescriptionImplBase( |
443 | | ClusterInfoConstSharedPtr cluster, const std::string& hostname, |
444 | | Network::Address::InstanceConstSharedPtr dest_address, MetadataConstSharedPtr endpoint_metadata, |
445 | | MetadataConstSharedPtr locality_metadata, const envoy::config::core::v3::Locality& locality, |
446 | | const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config, |
447 | | uint32_t priority, TimeSource& time_source) |
448 | | : cluster_(cluster), hostname_(hostname), |
449 | | health_checks_hostname_(health_check_config.hostname()), |
450 | | canary_(Config::Metadata::metadataValue(endpoint_metadata.get(), |
451 | | Config::MetadataFilters::get().ENVOY_LB, |
452 | | Config::MetadataEnvoyLbKeys::get().CANARY) |
453 | | .bool_value()), |
454 | | endpoint_metadata_(endpoint_metadata), locality_metadata_(locality_metadata), |
455 | | locality_(locality), |
456 | | locality_zone_stat_name_(locality.zone(), cluster->statsScope().symbolTable()), |
457 | | priority_(priority), |
458 | | socket_factory_(resolveTransportSocketFactory(dest_address, endpoint_metadata_.get())), |
459 | 206k | creation_time_(time_source.monotonicTime()) { |
460 | 206k | if (health_check_config.port_value() != 0 && dest_address->type() != Network::Address::Type::Ip) { |
461 | | // Setting the health check port to non-0 only works for IP-type addresses. Setting the port |
462 | | // for a pipe address is a misconfiguration. Throw an exception. |
463 | 0 | throwEnvoyExceptionOrPanic( |
464 | 0 | fmt::format("Invalid host configuration: non-zero port for non-IP address")); |
465 | 0 | } |
466 | 206k | } |
467 | | |
468 | | HostDescription::SharedConstAddressVector HostDescriptionImplBase::makeAddressListOrNull( |
469 | 206k | const Network::Address::InstanceConstSharedPtr& address, const AddressVector& address_list) { |
470 | 206k | if (address_list.empty()) { |
471 | 206k | return {}; |
472 | 206k | } |
473 | 0 | ASSERT(*address_list.front() == *address); |
474 | 0 | return std::make_shared<AddressVector>(address_list); |
475 | 0 | } |
476 | | |
477 | | Network::UpstreamTransportSocketFactory& HostDescriptionImplBase::resolveTransportSocketFactory( |
478 | | const Network::Address::InstanceConstSharedPtr& dest_address, |
479 | 207k | const envoy::config::core::v3::Metadata* endpoint_metadata) const { |
480 | 207k | auto match = |
481 | 207k | cluster_->transportSocketMatcher().resolve(endpoint_metadata, locality_metadata_.get()); |
482 | 207k | match.stats_.total_match_count_.inc(); |
483 | 207k | ENVOY_LOG(debug, "transport socket match, socket {} selected for host with address {}", |
484 | 207k | match.name_, dest_address ? dest_address->asString() : "empty"); |
485 | | |
486 | 207k | return match.factory_; |
487 | 207k | } |
488 | | |
489 | | Host::CreateConnectionData HostImplBase::createConnection( |
490 | | Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options, |
491 | 911 | Network::TransportSocketOptionsConstSharedPtr transport_socket_options) const { |
492 | 911 | return createConnection(dispatcher, cluster(), address(), addressListOrNull(), |
493 | 911 | transportSocketFactory(), options, transport_socket_options, |
494 | 911 | shared_from_this()); |
495 | 911 | } |
496 | | |
497 | 205k | void HostImplBase::setEdsHealthFlag(envoy::config::core::v3::HealthStatus health_status) { |
498 | | // Clear all old EDS health flags first. |
499 | 205k | HostImplBase::healthFlagClear(Host::HealthFlag::FAILED_EDS_HEALTH); |
500 | 205k | HostImplBase::healthFlagClear(Host::HealthFlag::DEGRADED_EDS_HEALTH); |
501 | 205k | if (Runtime::runtimeFeatureEnabled( |
502 | 205k | "envoy.reloadable_features.exclude_host_in_eds_status_draining")) { |
503 | 205k | HostImplBase::healthFlagClear(Host::HealthFlag::EDS_STATUS_DRAINING); |
504 | 205k | } |
505 | | |
506 | | // Set the appropriate EDS health flag. |
507 | 205k | switch (health_status) { |
508 | 0 | case envoy::config::core::v3::UNHEALTHY: |
509 | 0 | FALLTHRU; |
510 | 0 | case envoy::config::core::v3::TIMEOUT: |
511 | 0 | HostImplBase::healthFlagSet(Host::HealthFlag::FAILED_EDS_HEALTH); |
512 | 0 | break; |
513 | 0 | case envoy::config::core::v3::DRAINING: |
514 | 0 | if (Runtime::runtimeFeatureEnabled( |
515 | 0 | "envoy.reloadable_features.exclude_host_in_eds_status_draining")) { |
516 | 0 | HostImplBase::healthFlagSet(Host::HealthFlag::EDS_STATUS_DRAINING); |
517 | 0 | } |
518 | 0 | break; |
519 | 0 | case envoy::config::core::v3::DEGRADED: |
520 | 0 | HostImplBase::healthFlagSet(Host::HealthFlag::DEGRADED_EDS_HEALTH); |
521 | 0 | break; |
522 | 205k | default: |
523 | 205k | break; |
524 | | // No health flags should be set. |
525 | 205k | } |
526 | 205k | } |
527 | | |
528 | | Host::CreateConnectionData HostImplBase::createHealthCheckConnection( |
529 | | Event::Dispatcher& dispatcher, |
530 | | Network::TransportSocketOptionsConstSharedPtr transport_socket_options, |
531 | 22.4k | const envoy::config::core::v3::Metadata* metadata) const { |
532 | 22.4k | Network::UpstreamTransportSocketFactory& factory = |
533 | 22.4k | (metadata != nullptr) ? resolveTransportSocketFactory(healthCheckAddress(), metadata) |
534 | 22.4k | : transportSocketFactory(); |
535 | 22.4k | return createConnection(dispatcher, cluster(), healthCheckAddress(), {}, factory, nullptr, |
536 | 22.4k | transport_socket_options, shared_from_this()); |
537 | 22.4k | } |
538 | | |
539 | | absl::optional<Network::Address::InstanceConstSharedPtr> HostImplBase::maybeGetProxyRedirectAddress( |
540 | | const Network::TransportSocketOptionsConstSharedPtr transport_socket_options, |
541 | 23.3k | HostDescriptionConstSharedPtr host) { |
542 | 23.3k | if (transport_socket_options && transport_socket_options->http11ProxyInfo().has_value()) { |
543 | 0 | return transport_socket_options->http11ProxyInfo()->proxy_address; |
544 | 0 | } |
545 | | |
546 | | // See if host metadata contains a proxy address and only check locality metadata if host |
547 | | // metadata did not have the relevant key. |
548 | 46.6k | for (const auto& metadata : {host->metadata(), host->localityMetadata()}) { |
549 | 46.6k | if (metadata == nullptr) { |
550 | 46.6k | continue; |
551 | 46.6k | } |
552 | | |
553 | 0 | auto addr_it = metadata->typed_filter_metadata().find( |
554 | 0 | Config::MetadataFilters::get().ENVOY_HTTP11_PROXY_TRANSPORT_SOCKET_ADDR); |
555 | 0 | if (addr_it == metadata->typed_filter_metadata().end()) { |
556 | 0 | continue; |
557 | 0 | } |
558 | | |
559 | | // Parse an address from the metadata. |
560 | 0 | envoy::config::core::v3::Address proxy_addr; |
561 | 0 | auto status = MessageUtil::unpackTo(addr_it->second, proxy_addr); |
562 | 0 | if (!status.ok()) { |
563 | 0 | ENVOY_LOG_EVERY_POW_2( |
564 | 0 | error, "failed to parse proto from endpoint/locality metadata field {}, host={}", |
565 | 0 | Config::MetadataFilters::get().ENVOY_HTTP11_PROXY_TRANSPORT_SOCKET_ADDR, |
566 | 0 | host->hostname()); |
567 | 0 | return absl::nullopt; |
568 | 0 | } |
569 | | |
570 | | // Resolve the parsed address proto. |
571 | 0 | auto resolve_status = Network::Address::resolveProtoAddress(proxy_addr); |
572 | 0 | if (!resolve_status.ok()) { |
573 | 0 | ENVOY_LOG_EVERY_POW_2( |
574 | 0 | error, "failed to resolve address from endpoint/locality metadata field {}, host={}", |
575 | 0 | Config::MetadataFilters::get().ENVOY_HTTP11_PROXY_TRANSPORT_SOCKET_ADDR, |
576 | 0 | host->hostname()); |
577 | 0 | return absl::nullopt; |
578 | 0 | } |
579 | | |
580 | | // We successfully resolved, so return the instance ptr. |
581 | 0 | return resolve_status.value(); |
582 | 0 | } |
583 | | |
584 | 23.3k | return absl::nullopt; |
585 | 23.3k | } |
586 | | |
587 | | Host::CreateConnectionData HostImplBase::createConnection( |
588 | | Event::Dispatcher& dispatcher, const ClusterInfo& cluster, |
589 | | const Network::Address::InstanceConstSharedPtr& address, |
590 | | const SharedConstAddressVector& address_list_or_null, |
591 | | Network::UpstreamTransportSocketFactory& socket_factory, |
592 | | const Network::ConnectionSocket::OptionsSharedPtr& options, |
593 | | Network::TransportSocketOptionsConstSharedPtr transport_socket_options, |
594 | 23.3k | HostDescriptionConstSharedPtr host) { |
595 | 23.3k | auto source_address_selector = cluster.getUpstreamLocalAddressSelector(); |
596 | | |
597 | 23.3k | absl::optional<Network::Address::InstanceConstSharedPtr> proxy_address = |
598 | 23.3k | maybeGetProxyRedirectAddress(transport_socket_options, host); |
599 | | |
600 | 23.3k | Network::ClientConnectionPtr connection; |
601 | | // If the transport socket options or endpoint/locality metadata indicate the connection should |
602 | | // be redirected to a proxy, create the TCP connection to the proxy's address not the host's |
603 | | // address. |
604 | 23.3k | if (proxy_address.has_value()) { |
605 | 0 | auto upstream_local_address = |
606 | 0 | source_address_selector->getUpstreamLocalAddress(address, options); |
607 | 0 | ENVOY_LOG(debug, "Connecting to configured HTTP/1.1 proxy at {}", |
608 | 0 | proxy_address.value()->asString()); |
609 | 0 | connection = dispatcher.createClientConnection( |
610 | 0 | proxy_address.value(), upstream_local_address.address_, |
611 | 0 | socket_factory.createTransportSocket(transport_socket_options, host), |
612 | 0 | upstream_local_address.socket_options_, transport_socket_options); |
613 | 23.3k | } else if (address_list_or_null != nullptr && address_list_or_null->size() > 1) { |
614 | 0 | absl::optional<envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig> |
615 | 0 | happy_eyeballs_config; |
616 | 0 | if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.use_config_in_happy_eyeballs")) { |
617 | 0 | ENVOY_LOG(debug, "Upstream using happy eyeballs config."); |
618 | 0 | if (cluster.happyEyeballsConfig().has_value()) { |
619 | 0 | happy_eyeballs_config = *cluster.happyEyeballsConfig(); |
620 | 0 | } else { |
621 | 0 | envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig default_config; |
622 | 0 | default_config.set_first_address_family_version( |
623 | 0 | envoy::config::cluster::v3::UpstreamConnectionOptions::DEFAULT); |
624 | 0 | default_config.mutable_first_address_family_count()->set_value(1); |
625 | 0 | happy_eyeballs_config = absl::make_optional(default_config); |
626 | 0 | } |
627 | 0 | } |
628 | 0 | connection = std::make_unique<Network::HappyEyeballsConnectionImpl>( |
629 | 0 | dispatcher, *address_list_or_null, source_address_selector, socket_factory, |
630 | 0 | transport_socket_options, host, options, happy_eyeballs_config); |
631 | 23.3k | } else { |
632 | 23.3k | auto upstream_local_address = |
633 | 23.3k | source_address_selector->getUpstreamLocalAddress(address, options); |
634 | 23.3k | connection = dispatcher.createClientConnection( |
635 | 23.3k | address, upstream_local_address.address_, |
636 | 23.3k | socket_factory.createTransportSocket(transport_socket_options, host), |
637 | 23.3k | upstream_local_address.socket_options_, transport_socket_options); |
638 | 23.3k | } |
639 | | |
640 | 23.3k | connection->connectionInfoSetter().enableSettingInterfaceName( |
641 | 23.3k | cluster.setLocalInterfaceNameOnUpstreamConnections()); |
642 | 23.3k | connection->setBufferLimits(cluster.perConnectionBufferLimitBytes()); |
643 | 23.3k | if (auto upstream_info = connection->streamInfo().upstreamInfo(); upstream_info) { |
644 | 23.3k | upstream_info->setUpstreamHost(host); |
645 | 23.3k | } |
646 | 23.3k | cluster.createNetworkFilterChain(*connection); |
647 | 23.3k | return {std::move(connection), std::move(host)}; |
648 | 23.3k | } |
649 | | |
650 | 10.3M | void HostImplBase::weight(uint32_t new_weight) { weight_ = std::max(1U, new_weight); } |
651 | | |
652 | | std::vector<HostsPerLocalityConstSharedPtr> HostsPerLocalityImpl::filter( |
653 | 2.40k | const std::vector<std::function<bool(const Host&)>>& predicates) const { |
654 | | // We keep two lists: one for being able to mutate the clone and one for returning to the |
655 | | // caller. Creating them both at the start avoids iterating over the mutable values at the end |
656 | | // to convert them to a const pointer. |
657 | 2.40k | std::vector<std::shared_ptr<HostsPerLocalityImpl>> mutable_clones; |
658 | 2.40k | std::vector<HostsPerLocalityConstSharedPtr> filtered_clones; |
659 | | |
660 | 2.40k | mutable_clones.reserve(predicates.size()); |
661 | 2.40k | filtered_clones.reserve(predicates.size()); |
662 | 9.62k | for (size_t i = 0; i < predicates.size(); ++i) { |
663 | 7.22k | mutable_clones.emplace_back(std::make_shared<HostsPerLocalityImpl>()); |
664 | 7.22k | filtered_clones.emplace_back(mutable_clones.back()); |
665 | 7.22k | mutable_clones.back()->local_ = local_; |
666 | 7.22k | } |
667 | | |
668 | 2.40k | for (const auto& hosts_locality : hosts_per_locality_) { |
669 | 2.40k | std::vector<HostVector> current_locality_hosts; |
670 | 2.40k | current_locality_hosts.resize(predicates.size()); |
671 | | |
672 | | // Since # of hosts >> # of predicates, we iterate over the hosts in the outer loop. |
673 | 2.40k | for (const auto& host : hosts_locality) { |
674 | 9.62k | for (size_t i = 0; i < predicates.size(); ++i) { |
675 | 7.22k | if (predicates[i](*host)) { |
676 | 2.40k | current_locality_hosts[i].emplace_back(host); |
677 | 2.40k | } |
678 | 7.22k | } |
679 | 2.40k | } |
680 | | |
681 | 9.62k | for (size_t i = 0; i < predicates.size(); ++i) { |
682 | 7.22k | mutable_clones[i]->hosts_per_locality_.push_back(std::move(current_locality_hosts[i])); |
683 | 7.22k | } |
684 | 2.40k | } |
685 | | |
686 | 2.40k | return filtered_clones; |
687 | 2.40k | } |
688 | | |
689 | | void HostSetImpl::updateHosts(PrioritySet::UpdateHostsParams&& update_hosts_params, |
690 | | LocalityWeightsConstSharedPtr locality_weights, |
691 | | const HostVector& hosts_added, const HostVector& hosts_removed, |
692 | | uint64_t seed, absl::optional<bool> weighted_priority_health, |
693 | 8.76k | absl::optional<uint32_t> overprovisioning_factor) { |
694 | 8.76k | if (weighted_priority_health.has_value()) { |
695 | 7.22k | weighted_priority_health_ = weighted_priority_health.value(); |
696 | 7.22k | } |
697 | 8.76k | if (overprovisioning_factor.has_value()) { |
698 | 7.22k | ASSERT(overprovisioning_factor.value() > 0); |
699 | 7.22k | overprovisioning_factor_ = overprovisioning_factor.value(); |
700 | 7.22k | } |
701 | 8.76k | hosts_ = std::move(update_hosts_params.hosts); |
702 | 8.76k | healthy_hosts_ = std::move(update_hosts_params.healthy_hosts); |
703 | 8.76k | degraded_hosts_ = std::move(update_hosts_params.degraded_hosts); |
704 | 8.76k | excluded_hosts_ = std::move(update_hosts_params.excluded_hosts); |
705 | 8.76k | hosts_per_locality_ = std::move(update_hosts_params.hosts_per_locality); |
706 | 8.76k | healthy_hosts_per_locality_ = std::move(update_hosts_params.healthy_hosts_per_locality); |
707 | 8.76k | degraded_hosts_per_locality_ = std::move(update_hosts_params.degraded_hosts_per_locality); |
708 | 8.76k | excluded_hosts_per_locality_ = std::move(update_hosts_params.excluded_hosts_per_locality); |
709 | 8.76k | locality_weights_ = std::move(locality_weights); |
710 | | |
711 | | // TODO(ggreenway): implement `weighted_priority_health` support in `rebuildLocalityScheduler`. |
712 | 8.76k | rebuildLocalityScheduler(healthy_locality_scheduler_, healthy_locality_entries_, |
713 | 8.76k | *healthy_hosts_per_locality_, healthy_hosts_->get(), hosts_per_locality_, |
714 | 8.76k | excluded_hosts_per_locality_, locality_weights_, |
715 | 8.76k | overprovisioning_factor_, seed); |
716 | 8.76k | rebuildLocalityScheduler(degraded_locality_scheduler_, degraded_locality_entries_, |
717 | 8.76k | *degraded_hosts_per_locality_, degraded_hosts_->get(), |
718 | 8.76k | hosts_per_locality_, excluded_hosts_per_locality_, locality_weights_, |
719 | 8.76k | overprovisioning_factor_, seed); |
720 | | |
721 | 8.76k | runUpdateCallbacks(hosts_added, hosts_removed); |
722 | 8.76k | } |
723 | | |
724 | | void HostSetImpl::rebuildLocalityScheduler( |
725 | | std::unique_ptr<EdfScheduler<LocalityEntry>>& locality_scheduler, |
726 | | std::vector<std::shared_ptr<LocalityEntry>>& locality_entries, |
727 | | const HostsPerLocality& eligible_hosts_per_locality, const HostVector& eligible_hosts, |
728 | | HostsPerLocalityConstSharedPtr all_hosts_per_locality, |
729 | | HostsPerLocalityConstSharedPtr excluded_hosts_per_locality, |
730 | | LocalityWeightsConstSharedPtr locality_weights, uint32_t overprovisioning_factor, |
731 | 17.5k | uint64_t seed) { |
732 | | // Rebuild the locality scheduler by computing the effective weight of each |
733 | | // locality in this priority. The scheduler is reset by default, and is rebuilt only if we have |
734 | | // locality weights (i.e. using EDS) and there is at least one eligible host in this priority. |
735 | | // |
736 | | // We omit building a scheduler when there are zero eligible hosts in the priority as |
737 | | // all the localities will have zero effective weight. At selection time, we'll either select |
738 | | // from a different scheduler or there will be no available hosts in the priority. At that point |
739 | | // we'll rely on other mechanisms such as panic mode to select a host, none of which rely on the |
740 | | // scheduler. |
741 | | // |
742 | | // TODO(htuch): if the underlying locality index -> |
743 | | // envoy::config::core::v3::Locality hasn't changed in hosts_/healthy_hosts_/degraded_hosts_, we |
744 | | // could just update locality_weight_ without rebuilding. Similar to how host |
745 | | // level WRR works, we would age out the existing entries via picks and lazily |
746 | | // apply the new weights. |
747 | 17.5k | locality_scheduler = nullptr; |
748 | 17.5k | if (all_hosts_per_locality != nullptr && locality_weights != nullptr && |
749 | 17.5k | !locality_weights->empty() && !eligible_hosts.empty()) { |
750 | 7.22k | if (Runtime::runtimeFeatureEnabled( |
751 | 7.22k | "envoy.reloadable_features.edf_lb_locality_scheduler_init_fix")) { |
752 | 7.22k | locality_entries.clear(); |
753 | 14.4k | for (uint32_t i = 0; i < all_hosts_per_locality->get().size(); ++i) { |
754 | 7.22k | const double effective_weight = effectiveLocalityWeight( |
755 | 7.22k | i, eligible_hosts_per_locality, *excluded_hosts_per_locality, *all_hosts_per_locality, |
756 | 7.22k | *locality_weights, overprovisioning_factor); |
757 | 7.22k | if (effective_weight > 0) { |
758 | 0 | locality_entries.emplace_back(std::make_shared<LocalityEntry>(i, effective_weight)); |
759 | 0 | } |
760 | 7.22k | } |
761 | | // If not all effective weights were zero, create the scheduler. |
762 | 7.22k | if (!locality_entries.empty()) { |
763 | 0 | locality_scheduler = std::make_unique<EdfScheduler<LocalityEntry>>( |
764 | 0 | EdfScheduler<LocalityEntry>::createWithPicks( |
765 | 0 | locality_entries, |
766 | 0 | [](const LocalityEntry& entry) { return entry.effective_weight_; }, seed)); |
767 | 0 | } |
768 | 7.22k | } else { |
769 | 0 | locality_scheduler = std::make_unique<EdfScheduler<LocalityEntry>>(); |
770 | 0 | locality_entries.clear(); |
771 | 0 | for (uint32_t i = 0; i < all_hosts_per_locality->get().size(); ++i) { |
772 | 0 | const double effective_weight = effectiveLocalityWeight( |
773 | 0 | i, eligible_hosts_per_locality, *excluded_hosts_per_locality, *all_hosts_per_locality, |
774 | 0 | *locality_weights, overprovisioning_factor); |
775 | 0 | if (effective_weight > 0) { |
776 | 0 | locality_entries.emplace_back(std::make_shared<LocalityEntry>(i, effective_weight)); |
777 | 0 | locality_scheduler->add(effective_weight, locality_entries.back()); |
778 | 0 | } |
779 | 0 | } |
780 | | // If all effective weights were zero, reset the scheduler. |
781 | 0 | if (locality_scheduler->empty()) { |
782 | 0 | locality_scheduler = nullptr; |
783 | 0 | } |
784 | 0 | } |
785 | 7.22k | } |
786 | 17.5k | } |
787 | | |
788 | 0 | absl::optional<uint32_t> HostSetImpl::chooseHealthyLocality() { |
789 | 0 | return chooseLocality(healthy_locality_scheduler_.get()); |
790 | 0 | } |
791 | | |
792 | 0 | absl::optional<uint32_t> HostSetImpl::chooseDegradedLocality() { |
793 | 0 | return chooseLocality(degraded_locality_scheduler_.get()); |
794 | 0 | } |
795 | | |
796 | | absl::optional<uint32_t> |
797 | 0 | HostSetImpl::chooseLocality(EdfScheduler<LocalityEntry>* locality_scheduler) { |
798 | 0 | if (locality_scheduler == nullptr) { |
799 | 0 | return {}; |
800 | 0 | } |
801 | 0 | const std::shared_ptr<LocalityEntry> locality = locality_scheduler->pickAndAdd( |
802 | 0 | [](const LocalityEntry& locality) { return locality.effective_weight_; }); |
803 | | // We don't build a schedule if there are no weighted localities, so we should always succeed. |
804 | 0 | ASSERT(locality != nullptr); |
805 | | // If we picked it before, its weight must have been positive. |
806 | 0 | ASSERT(locality->effective_weight_ > 0); |
807 | 0 | return locality->index_; |
808 | 0 | } |
809 | | |
810 | | PrioritySet::UpdateHostsParams |
811 | | HostSetImpl::updateHostsParams(HostVectorConstSharedPtr hosts, |
812 | | HostsPerLocalityConstSharedPtr hosts_per_locality, |
813 | | HealthyHostVectorConstSharedPtr healthy_hosts, |
814 | | HostsPerLocalityConstSharedPtr healthy_hosts_per_locality, |
815 | | DegradedHostVectorConstSharedPtr degraded_hosts, |
816 | | HostsPerLocalityConstSharedPtr degraded_hosts_per_locality, |
817 | | ExcludedHostVectorConstSharedPtr excluded_hosts, |
818 | 6.35k | HostsPerLocalityConstSharedPtr excluded_hosts_per_locality) { |
819 | 6.35k | return PrioritySet::UpdateHostsParams{std::move(hosts), |
820 | 6.35k | std::move(healthy_hosts), |
821 | 6.35k | std::move(degraded_hosts), |
822 | 6.35k | std::move(excluded_hosts), |
823 | 6.35k | std::move(hosts_per_locality), |
824 | 6.35k | std::move(healthy_hosts_per_locality), |
825 | 6.35k | std::move(degraded_hosts_per_locality), |
826 | 6.35k | std::move(excluded_hosts_per_locality)}; |
827 | 6.35k | } |
828 | | |
829 | 3.94k | PrioritySet::UpdateHostsParams HostSetImpl::updateHostsParams(const HostSet& host_set) { |
830 | 3.94k | return updateHostsParams(host_set.hostsPtr(), host_set.hostsPerLocalityPtr(), |
831 | 3.94k | host_set.healthyHostsPtr(), host_set.healthyHostsPerLocalityPtr(), |
832 | 3.94k | host_set.degradedHostsPtr(), host_set.degradedHostsPerLocalityPtr(), |
833 | 3.94k | host_set.excludedHostsPtr(), host_set.excludedHostsPerLocalityPtr()); |
834 | 3.94k | } |
835 | | PrioritySet::UpdateHostsParams |
836 | | HostSetImpl::partitionHosts(HostVectorConstSharedPtr hosts, |
837 | 2.40k | HostsPerLocalityConstSharedPtr hosts_per_locality) { |
838 | 2.40k | auto partitioned_hosts = ClusterImplBase::partitionHostList(*hosts); |
839 | 2.40k | auto healthy_degraded_excluded_hosts_per_locality = |
840 | 2.40k | ClusterImplBase::partitionHostsPerLocality(*hosts_per_locality); |
841 | | |
842 | 2.40k | return updateHostsParams(std::move(hosts), std::move(hosts_per_locality), |
843 | 2.40k | std::move(std::get<0>(partitioned_hosts)), |
844 | 2.40k | std::move(std::get<0>(healthy_degraded_excluded_hosts_per_locality)), |
845 | 2.40k | std::move(std::get<1>(partitioned_hosts)), |
846 | 2.40k | std::move(std::get<1>(healthy_degraded_excluded_hosts_per_locality)), |
847 | 2.40k | std::move(std::get<2>(partitioned_hosts)), |
848 | 2.40k | std::move(std::get<2>(healthy_degraded_excluded_hosts_per_locality))); |
849 | 2.40k | } |
850 | | |
851 | | double HostSetImpl::effectiveLocalityWeight(uint32_t index, |
852 | | const HostsPerLocality& eligible_hosts_per_locality, |
853 | | const HostsPerLocality& excluded_hosts_per_locality, |
854 | | const HostsPerLocality& all_hosts_per_locality, |
855 | | const LocalityWeights& locality_weights, |
856 | 7.22k | uint32_t overprovisioning_factor) { |
857 | 7.22k | const auto& locality_eligible_hosts = eligible_hosts_per_locality.get()[index]; |
858 | 7.22k | const uint32_t excluded_count = excluded_hosts_per_locality.get().size() > index |
859 | 7.22k | ? excluded_hosts_per_locality.get()[index].size() |
860 | 7.22k | : 0; |
861 | 7.22k | const auto host_count = all_hosts_per_locality.get()[index].size() - excluded_count; |
862 | 7.22k | if (host_count == 0) { |
863 | 0 | return 0.0; |
864 | 0 | } |
865 | 7.22k | const double locality_availability_ratio = 1.0 * locality_eligible_hosts.size() / host_count; |
866 | 7.22k | const uint32_t weight = locality_weights[index]; |
867 | | // Availability ranges from 0-1.0, and is the ratio of eligible hosts to total hosts, modified |
868 | | // by the overprovisioning factor. |
869 | 7.22k | const double effective_locality_availability_ratio = |
870 | 7.22k | std::min(1.0, (overprovisioning_factor / 100.0) * locality_availability_ratio); |
871 | 7.22k | return weight * effective_locality_availability_ratio; |
872 | 7.22k | } |
873 | | |
874 | | const HostSet& |
875 | | PrioritySetImpl::getOrCreateHostSet(uint32_t priority, |
876 | | absl::optional<bool> weighted_priority_health, |
877 | 16.0k | absl::optional<uint32_t> overprovisioning_factor) { |
878 | 16.0k | if (host_sets_.size() < priority + 1) { |
879 | 14.6k | for (size_t i = host_sets_.size(); i <= priority; ++i) { |
880 | 7.31k | HostSetImplPtr host_set = createHostSet(i, weighted_priority_health, overprovisioning_factor); |
881 | 7.31k | host_sets_priority_update_cbs_.push_back( |
882 | 7.31k | host_set->addPriorityUpdateCb([this](uint32_t priority, const HostVector& hosts_added, |
883 | 8.76k | const HostVector& hosts_removed) { |
884 | 8.76k | runReferenceUpdateCallbacks(priority, hosts_added, hosts_removed); |
885 | 8.76k | return absl::OkStatus(); |
886 | 8.76k | })); |
887 | 7.31k | host_sets_.push_back(std::move(host_set)); |
888 | 7.31k | } |
889 | 7.31k | } |
890 | 16.0k | return *host_sets_[priority]; |
891 | 16.0k | } |
892 | | |
893 | | void PrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params, |
894 | | LocalityWeightsConstSharedPtr locality_weights, |
895 | | const HostVector& hosts_added, const HostVector& hosts_removed, |
896 | | uint64_t seed, absl::optional<bool> weighted_priority_health, |
897 | | absl::optional<uint32_t> overprovisioning_factor, |
898 | 8.76k | HostMapConstSharedPtr cross_priority_host_map) { |
899 | | // Update cross priority host map first. In this way, when the update callbacks of the priority |
900 | | // set are executed, the latest host map can always be obtained. |
901 | 8.76k | if (cross_priority_host_map != nullptr) { |
902 | 4.81k | const_cross_priority_host_map_ = std::move(cross_priority_host_map); |
903 | 4.81k | } |
904 | | |
905 | | // Ensure that we have a HostSet for the given priority. |
906 | 8.76k | getOrCreateHostSet(priority, weighted_priority_health, overprovisioning_factor); |
907 | 8.76k | static_cast<HostSetImpl*>(host_sets_[priority].get()) |
908 | 8.76k | ->updateHosts(std::move(update_hosts_params), std::move(locality_weights), hosts_added, |
909 | 8.76k | hosts_removed, seed, weighted_priority_health, overprovisioning_factor); |
910 | | |
911 | 8.76k | if (!batch_update_) { |
912 | 8.74k | runUpdateCallbacks(hosts_added, hosts_removed); |
913 | 8.74k | } |
914 | 8.76k | } |
915 | | |
916 | 14 | void PrioritySetImpl::batchHostUpdate(BatchUpdateCb& callback) { |
917 | 14 | BatchUpdateScope scope(*this); |
918 | | |
919 | | // We wrap the update call with a lambda that tracks all the hosts that have been added/removed. |
920 | 14 | callback.batchUpdate(scope); |
921 | | |
922 | | // Now that all the updates have been complete, we can compute the diff. |
923 | 14 | HostVector net_hosts_added = filterHosts(scope.all_hosts_added_, scope.all_hosts_removed_); |
924 | 14 | HostVector net_hosts_removed = filterHosts(scope.all_hosts_removed_, scope.all_hosts_added_); |
925 | | |
926 | 14 | runUpdateCallbacks(net_hosts_added, net_hosts_removed); |
927 | 14 | } |
928 | | |
929 | | void PrioritySetImpl::BatchUpdateScope::updateHosts( |
930 | | uint32_t priority, PrioritySet::UpdateHostsParams&& update_hosts_params, |
931 | | LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, |
932 | | const HostVector& hosts_removed, uint64_t seed, absl::optional<bool> weighted_priority_health, |
933 | 14 | absl::optional<uint32_t> overprovisioning_factor) { |
934 | | // We assume that each call updates a different priority. |
935 | 14 | ASSERT(priorities_.find(priority) == priorities_.end()); |
936 | 14 | priorities_.insert(priority); |
937 | | |
938 | 14 | for (const auto& host : hosts_added) { |
939 | 14 | all_hosts_added_.insert(host); |
940 | 14 | } |
941 | | |
942 | 14 | for (const auto& host : hosts_removed) { |
943 | 0 | all_hosts_removed_.insert(host); |
944 | 0 | } |
945 | | |
946 | 14 | parent_.updateHosts(priority, std::move(update_hosts_params), locality_weights, hosts_added, |
947 | 14 | hosts_removed, seed, weighted_priority_health, overprovisioning_factor); |
948 | 14 | } |
949 | | |
950 | | void MainPrioritySetImpl::updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params, |
951 | | LocalityWeightsConstSharedPtr locality_weights, |
952 | | const HostVector& hosts_added, |
953 | | const HostVector& hosts_removed, uint64_t seed, |
954 | | absl::optional<bool> weighted_priority_health, |
955 | | absl::optional<uint32_t> overprovisioning_factor, |
956 | 2.40k | HostMapConstSharedPtr cross_priority_host_map) { |
957 | 2.40k | ASSERT(cross_priority_host_map == nullptr, |
958 | 2.40k | "External cross-priority host map is meaningless to MainPrioritySetImpl"); |
959 | 2.40k | updateCrossPriorityHostMap(priority, hosts_added, hosts_removed); |
960 | | |
961 | 2.40k | PrioritySetImpl::updateHosts(priority, std::move(update_hosts_params), locality_weights, |
962 | 2.40k | hosts_added, hosts_removed, seed, weighted_priority_health, |
963 | 2.40k | overprovisioning_factor); |
964 | 2.40k | } |
965 | | |
966 | 2.42k | HostMapConstSharedPtr MainPrioritySetImpl::crossPriorityHostMap() const { |
967 | | // Check if the host set in the main thread PrioritySet has been updated. |
968 | 2.42k | if (mutable_cross_priority_host_map_ != nullptr) { |
969 | 2.40k | const_cross_priority_host_map_ = std::move(mutable_cross_priority_host_map_); |
970 | 2.40k | ASSERT(mutable_cross_priority_host_map_ == nullptr); |
971 | 2.40k | } |
972 | 2.42k | return const_cross_priority_host_map_; |
973 | 2.42k | } |
974 | | |
975 | | void MainPrioritySetImpl::updateCrossPriorityHostMap(uint32_t priority, |
976 | | const HostVector& hosts_added, |
977 | 2.40k | const HostVector& hosts_removed) { |
978 | 2.40k | if (hosts_added.empty() && hosts_removed.empty()) { |
979 | | // No new hosts have been added and no old hosts have been removed. |
980 | 0 | return; |
981 | 0 | } |
982 | | |
983 | | // Since read_only_all_host_map_ may be shared by multiple threads, when the host set changes, |
984 | | // we cannot directly modify read_only_all_host_map_. |
985 | 2.40k | if (mutable_cross_priority_host_map_ == nullptr) { |
986 | | // Copy old read only host map to mutable host map. |
987 | 2.40k | mutable_cross_priority_host_map_ = std::make_shared<HostMap>(*const_cross_priority_host_map_); |
988 | 2.40k | } |
989 | | |
990 | 2.40k | for (const auto& host : hosts_removed) { |
991 | 0 | const auto host_address = addressToString(host->address()); |
992 | 0 | const auto existing_host = mutable_cross_priority_host_map_->find(host_address); |
993 | 0 | if (existing_host != mutable_cross_priority_host_map_->end()) { |
994 | | // Only delete from the current priority to protect from situations where |
995 | | // the add operation was already executed and has already moved the metadata of the host |
996 | | // from a higher priority value to a lower priority value. |
997 | 0 | if (existing_host->second->priority() == priority) { |
998 | 0 | mutable_cross_priority_host_map_->erase(host_address); |
999 | 0 | } |
1000 | 0 | } |
1001 | 0 | } |
1002 | | |
1003 | 2.40k | for (const auto& host : hosts_added) { |
1004 | 2.40k | mutable_cross_priority_host_map_->insert({addressToString(host->address()), host}); |
1005 | 2.40k | } |
1006 | 2.40k | } |
1007 | | |
1008 | | DeferredCreationCompatibleClusterTrafficStats |
1009 | | ClusterInfoImpl::generateStats(Stats::ScopeSharedPtr scope, |
1010 | 403k | const ClusterTrafficStatNames& stat_names, bool defer_creation) { |
1011 | 403k | return Stats::createDeferredCompatibleStats<ClusterTrafficStats>(scope, stat_names, |
1012 | 403k | defer_creation); |
1013 | 403k | } |
1014 | | |
1015 | | ClusterRequestResponseSizeStats ClusterInfoImpl::generateRequestResponseSizeStats( |
1016 | 401k | Stats::Scope& scope, const ClusterRequestResponseSizeStatNames& stat_names) { |
1017 | 401k | return {stat_names, scope}; |
1018 | 401k | } |
1019 | | |
1020 | | ClusterLoadReportStats |
1021 | | ClusterInfoImpl::generateLoadReportStats(Stats::Scope& scope, |
1022 | 403k | const ClusterLoadReportStatNames& stat_names) { |
1023 | 403k | return {stat_names, scope}; |
1024 | 403k | } |
1025 | | |
1026 | | ClusterTimeoutBudgetStats |
1027 | | ClusterInfoImpl::generateTimeoutBudgetStats(Stats::Scope& scope, |
1028 | 401k | const ClusterTimeoutBudgetStatNames& stat_names) { |
1029 | 401k | return {stat_names, scope}; |
1030 | 401k | } |
1031 | | |
1032 | | absl::StatusOr<std::shared_ptr<const ClusterInfoImpl::HttpProtocolOptionsConfigImpl>> |
1033 | | createOptions(const envoy::config::cluster::v3::Cluster& config, |
1034 | | std::shared_ptr<const ClusterInfoImpl::HttpProtocolOptionsConfigImpl>&& options, |
1035 | 2.40k | ProtobufMessage::ValidationVisitor& validation_visitor) { |
1036 | 2.40k | if (options) { |
1037 | 1.54k | return std::move(options); |
1038 | 1.54k | } |
1039 | | |
1040 | 862 | if (config.protocol_selection() == envoy::config::cluster::v3::Cluster::USE_CONFIGURED_PROTOCOL) { |
1041 | | // Make sure multiple protocol configurations are not present |
1042 | 862 | if (config.has_http_protocol_options() && config.has_http2_protocol_options()) { |
1043 | 0 | return absl::InvalidArgumentError( |
1044 | 0 | fmt::format("cluster: Both HTTP1 and HTTP2 options may only be " |
1045 | 0 | "configured with non-default 'protocol_selection' values")); |
1046 | 0 | } |
1047 | 862 | } |
1048 | | |
1049 | 862 | auto options_or_error = |
1050 | 862 | ClusterInfoImpl::HttpProtocolOptionsConfigImpl::createProtocolOptionsConfig( |
1051 | 862 | config.http_protocol_options(), config.http2_protocol_options(), |
1052 | 862 | config.common_http_protocol_options(), |
1053 | 862 | (config.has_upstream_http_protocol_options() |
1054 | 862 | ? absl::make_optional<envoy::config::core::v3::UpstreamHttpProtocolOptions>( |
1055 | 0 | config.upstream_http_protocol_options()) |
1056 | 862 | : absl::nullopt), |
1057 | 862 | config.protocol_selection() == |
1058 | 862 | envoy::config::cluster::v3::Cluster::USE_DOWNSTREAM_PROTOCOL, |
1059 | 862 | config.has_http2_protocol_options(), validation_visitor); |
1060 | 862 | RETURN_IF_NOT_OK_REF(options_or_error.status()); |
1061 | 862 | return options_or_error.value(); |
1062 | 862 | } |
1063 | | |
1064 | | absl::StatusOr<LegacyLbPolicyConfigHelper::Result> |
1065 | | LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProtoWithoutSubset( |
1066 | | LoadBalancerFactoryContext& lb_factory_context, const ClusterProto& cluster, |
1067 | 2.40k | ProtobufMessage::ValidationVisitor& visitor) { |
1068 | 2.40k | LoadBalancerConfigPtr lb_config; |
1069 | 2.40k | TypedLoadBalancerFactory* lb_factory = nullptr; |
1070 | | |
1071 | 2.40k | switch (cluster.lb_policy()) { |
1072 | 0 | PANIC_ON_PROTO_ENUM_SENTINEL_VALUES; |
1073 | 2.40k | case ClusterProto::ROUND_ROBIN: |
1074 | 2.40k | lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>( |
1075 | 2.40k | "envoy.load_balancing_policies.round_robin"); |
1076 | 2.40k | break; |
1077 | 0 | case ClusterProto::LEAST_REQUEST: |
1078 | 0 | lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>( |
1079 | 0 | "envoy.load_balancing_policies.least_request"); |
1080 | 0 | break; |
1081 | 0 | case ClusterProto::RANDOM: |
1082 | 0 | lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>( |
1083 | 0 | "envoy.load_balancing_policies.random"); |
1084 | 0 | break; |
1085 | 0 | case ClusterProto::RING_HASH: |
1086 | 0 | lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>( |
1087 | 0 | "envoy.load_balancing_policies.ring_hash"); |
1088 | 0 | break; |
1089 | 0 | case ClusterProto::MAGLEV: |
1090 | 0 | lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>( |
1091 | 0 | "envoy.load_balancing_policies.maglev"); |
1092 | 0 | break; |
1093 | 0 | case ClusterProto::CLUSTER_PROVIDED: |
1094 | 0 | lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>( |
1095 | 0 | "envoy.load_balancing_policies.cluster_provided"); |
1096 | 0 | break; |
1097 | 0 | case ClusterProto::LOAD_BALANCING_POLICY_CONFIG: |
1098 | | // 'LOAD_BALANCING_POLICY_CONFIG' should be handled by the 'configureLbPolicies' |
1099 | | // function and should not reach here. |
1100 | 0 | PANIC("getTypedLbConfigFromLegacyProtoWithoutSubset: should not reach here"); |
1101 | 0 | break; |
1102 | 2.40k | } |
1103 | | |
1104 | 2.40k | if (lb_factory == nullptr) { |
1105 | 0 | return absl::InvalidArgumentError( |
1106 | 0 | fmt::format("No load balancer factory found for LB type: {}", |
1107 | 0 | ClusterProto::LbPolicy_Name(cluster.lb_policy()))); |
1108 | 0 | } |
1109 | | |
1110 | 2.40k | return Result{lb_factory, lb_factory->loadConfig(lb_factory_context, cluster, visitor)}; |
1111 | 2.40k | } |
1112 | | |
1113 | | absl::StatusOr<LegacyLbPolicyConfigHelper::Result> |
1114 | | LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProto( |
1115 | | LoadBalancerFactoryContext& lb_factory_context, const ClusterProto& cluster, |
1116 | 2.40k | ProtobufMessage::ValidationVisitor& visitor) { |
1117 | | // Handle the lb subset config case first. |
1118 | | // Note it is possible to have a lb_subset_config without actually having any subset selectors. |
1119 | | // In this case the subset load balancer should not be used. |
1120 | 2.40k | if (cluster.has_lb_subset_config() && !cluster.lb_subset_config().subset_selectors().empty()) { |
1121 | 0 | auto* lb_factory = Config::Utility::getFactoryByName<TypedLoadBalancerFactory>( |
1122 | 0 | "envoy.load_balancing_policies.subset"); |
1123 | 0 | if (lb_factory != nullptr) { |
1124 | 0 | return Result{lb_factory, lb_factory->loadConfig(lb_factory_context, cluster, visitor)}; |
1125 | 0 | } |
1126 | 0 | return absl::InvalidArgumentError("No subset load balancer factory found"); |
1127 | 0 | } |
1128 | | |
1129 | 2.40k | return getTypedLbConfigFromLegacyProtoWithoutSubset(lb_factory_context, cluster, visitor); |
1130 | 2.40k | } |
1131 | | |
1132 | | using ProtocolOptionsHashMap = |
1133 | | absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr>; |
1134 | | |
1135 | | ClusterInfoImpl::ClusterInfoImpl( |
1136 | | Init::Manager& init_manager, Server::Configuration::ServerFactoryContext& server_context, |
1137 | | const envoy::config::cluster::v3::Cluster& config, |
1138 | | const absl::optional<envoy::config::core::v3::BindConfig>& bind_config, |
1139 | | Runtime::Loader& runtime, TransportSocketMatcherPtr&& socket_matcher, |
1140 | | Stats::ScopeSharedPtr&& stats_scope, bool added_via_api, |
1141 | | Server::Configuration::TransportSocketFactoryContext& factory_context) |
1142 | | : runtime_(runtime), name_(config.name()), |
1143 | | observability_name_(!config.alt_stat_name().empty() |
1144 | | ? std::make_unique<std::string>(config.alt_stat_name()) |
1145 | | : nullptr), |
1146 | | eds_service_name_( |
1147 | | config.has_eds_cluster_config() |
1148 | | ? std::make_unique<std::string>(config.eds_cluster_config().service_name()) |
1149 | | : nullptr), |
1150 | | extension_protocol_options_(THROW_OR_RETURN_VALUE( |
1151 | | parseExtensionProtocolOptions(config, factory_context), ProtocolOptionsHashMap)), |
1152 | | http_protocol_options_(THROW_OR_RETURN_VALUE( |
1153 | | createOptions(config, |
1154 | | extensionProtocolOptionsTyped<HttpProtocolOptionsConfigImpl>( |
1155 | | "envoy.extensions.upstreams.http.v3.HttpProtocolOptions"), |
1156 | | factory_context.messageValidationVisitor()), |
1157 | | std::shared_ptr<const ClusterInfoImpl::HttpProtocolOptionsConfigImpl>)), |
1158 | | tcp_protocol_options_(extensionProtocolOptionsTyped<TcpProtocolOptionsConfigImpl>( |
1159 | | "envoy.extensions.upstreams.tcp.v3.TcpProtocolOptions")), |
1160 | | max_requests_per_connection_(PROTOBUF_GET_WRAPPED_OR_DEFAULT( |
1161 | | http_protocol_options_->common_http_protocol_options_, max_requests_per_connection, |
1162 | | config.max_requests_per_connection().value())), |
1163 | | connect_timeout_( |
1164 | | std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, connect_timeout, 5000))), |
1165 | | per_upstream_preconnect_ratio_(PROTOBUF_GET_WRAPPED_OR_DEFAULT( |
1166 | | config.preconnect_policy(), per_upstream_preconnect_ratio, 1.0)), |
1167 | | peekahead_ratio_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(config.preconnect_policy(), |
1168 | | predictive_preconnect_ratio, 0)), |
1169 | | socket_matcher_(std::move(socket_matcher)), stats_scope_(std::move(stats_scope)), |
1170 | | traffic_stats_(generateStats(stats_scope_, |
1171 | | factory_context.clusterManager().clusterStatNames(), |
1172 | | server_context.statsConfig().enableDeferredCreationStats())), |
1173 | | config_update_stats_(factory_context.clusterManager().clusterConfigUpdateStatNames(), |
1174 | | *stats_scope_), |
1175 | | lb_stats_(factory_context.clusterManager().clusterLbStatNames(), *stats_scope_), |
1176 | | endpoint_stats_(factory_context.clusterManager().clusterEndpointStatNames(), *stats_scope_), |
1177 | | load_report_stats_store_(stats_scope_->symbolTable()), |
1178 | | load_report_stats_( |
1179 | | generateLoadReportStats(*load_report_stats_store_.rootScope(), |
1180 | | factory_context.clusterManager().clusterLoadReportStatNames())), |
1181 | | optional_cluster_stats_((config.has_track_cluster_stats() || config.track_timeout_budgets()) |
1182 | | ? std::make_unique<OptionalClusterStats>( |
1183 | | config, *stats_scope_, factory_context.clusterManager()) |
1184 | | : nullptr), |
1185 | | features_(ClusterInfoImpl::HttpProtocolOptionsConfigImpl::parseFeatures( |
1186 | | config, *http_protocol_options_)), |
1187 | | resource_managers_(config, runtime, name_, *stats_scope_, |
1188 | | factory_context.clusterManager().clusterCircuitBreakersStatNames()), |
1189 | | maintenance_mode_runtime_key_(absl::StrCat("upstream.maintenance_mode.", name_)), |
1190 | | upstream_local_address_selector_( |
1191 | | THROW_OR_RETURN_VALUE(createUpstreamLocalAddressSelector(config, bind_config), |
1192 | | Envoy::Upstream::UpstreamLocalAddressSelectorConstSharedPtr)), |
1193 | | upstream_config_(config.has_upstream_config() |
1194 | | ? std::make_unique<envoy::config::core::v3::TypedExtensionConfig>( |
1195 | | config.upstream_config()) |
1196 | | : nullptr), |
1197 | | metadata_(config.has_metadata() |
1198 | | ? std::make_unique<envoy::config::core::v3::Metadata>(config.metadata()) |
1199 | | : nullptr), |
1200 | | typed_metadata_(config.has_metadata() |
1201 | | ? std::make_unique<ClusterTypedMetadata>(config.metadata()) |
1202 | | : nullptr), |
1203 | | common_lb_config_( |
1204 | | factory_context.clusterManager().getCommonLbConfigPtr(config.common_lb_config())), |
1205 | | cluster_type_(config.has_cluster_type() |
1206 | | ? std::make_unique<envoy::config::cluster::v3::Cluster::CustomClusterType>( |
1207 | | config.cluster_type()) |
1208 | | : nullptr), |
1209 | | http_filter_config_provider_manager_( |
1210 | | Http::FilterChainUtility::createSingletonUpstreamFilterConfigProviderManager( |
1211 | | server_context)), |
1212 | | network_filter_config_provider_manager_( |
1213 | | createSingletonUpstreamNetworkFilterConfigProviderManager(server_context)), |
1214 | | upstream_context_(server_context, init_manager, *stats_scope_), |
1215 | | happy_eyeballs_config_( |
1216 | | config.upstream_connection_options().has_happy_eyeballs_config() |
1217 | | ? std::make_unique< |
1218 | | envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig>( |
1219 | | config.upstream_connection_options().happy_eyeballs_config()) |
1220 | | : nullptr), |
1221 | | lrs_report_metric_names_(!config.lrs_report_endpoint_metrics().empty() |
1222 | | ? std::make_unique<Envoy::Orca::LrsReportMetricNames>( |
1223 | | config.lrs_report_endpoint_metrics().begin(), |
1224 | | config.lrs_report_endpoint_metrics().end()) |
1225 | | : nullptr), |
1226 | | per_connection_buffer_limit_bytes_( |
1227 | | PROTOBUF_GET_WRAPPED_OR_DEFAULT(config, per_connection_buffer_limit_bytes, 1024 * 1024)), |
1228 | | max_response_headers_count_(PROTOBUF_GET_WRAPPED_OR_DEFAULT( |
1229 | | http_protocol_options_->common_http_protocol_options_, max_headers_count, |
1230 | | runtime_.snapshot().getInteger(Http::MaxResponseHeadersCountOverrideKey, |
1231 | | Http::DEFAULT_MAX_HEADERS_COUNT))), |
1232 | | type_(config.type()), |
1233 | | drain_connections_on_host_removal_(config.ignore_health_on_host_removal()), |
1234 | | connection_pool_per_downstream_connection_( |
1235 | | config.connection_pool_per_downstream_connection()), |
1236 | | warm_hosts_(!config.health_checks().empty() && |
1237 | | common_lb_config_->ignore_new_hosts_until_first_hc()), |
1238 | | set_local_interface_name_on_upstream_connections_( |
1239 | | config.upstream_connection_options().set_local_interface_name_on_upstream_connections()), |
1240 | | added_via_api_(added_via_api), has_configured_http_filters_(false), |
1241 | | per_endpoint_stats_(config.has_track_cluster_stats() && |
1242 | 2.40k | config.track_cluster_stats().per_endpoint_stats()) { |
1243 | | #ifdef WIN32 |
1244 | | if (set_local_interface_name_on_upstream_connections_) { |
1245 | | throwEnvoyExceptionOrPanic( |
1246 | | "set_local_interface_name_on_upstream_connections_ cannot be set to true " |
1247 | | "on Windows platforms"); |
1248 | | } |
1249 | | #endif |
1250 | | |
1251 | | // Both LoadStatsReporter and per_endpoint_stats need to `latch()` the counters, so if both are |
1252 | | // configured they will interfere with each other and both get incorrect values. |
1253 | | // TODO(ggreenway): Verify that bypassing virtual dispatch here was intentional |
1254 | 2.40k | if (ClusterInfoImpl::perEndpointStatsEnabled() && |
1255 | 2.40k | server_context.bootstrap().cluster_manager().has_load_stats_config()) { |
1256 | 0 | throwEnvoyExceptionOrPanic("Only one of cluster per_endpoint_stats and cluster manager " |
1257 | 0 | "load_stats_config can be specified"); |
1258 | 0 | } |
1259 | | |
1260 | 2.40k | if (config.has_max_requests_per_connection() && |
1261 | 2.40k | http_protocol_options_->common_http_protocol_options_.has_max_requests_per_connection()) { |
1262 | 0 | throwEnvoyExceptionOrPanic("Only one of max_requests_per_connection from Cluster or " |
1263 | 0 | "HttpProtocolOptions can be specified"); |
1264 | 0 | } |
1265 | | |
1266 | 2.40k | if (config.has_load_balancing_policy() || |
1267 | 2.40k | config.lb_policy() == envoy::config::cluster::v3::Cluster::LOAD_BALANCING_POLICY_CONFIG) { |
1268 | | // If load_balancing_policy is set we will use it directly, ignoring lb_policy. |
1269 | |
|
1270 | 0 | THROW_IF_NOT_OK(configureLbPolicies(config, server_context)); |
1271 | 2.40k | } else { |
1272 | | // If load_balancing_policy is not set, we will try to convert legacy lb_policy |
1273 | | // to load_balancing_policy and use it. |
1274 | 2.40k | LoadBalancerFactoryContextImpl lb_factory_context(server_context); |
1275 | | |
1276 | 2.40k | auto lb_pair = LegacyLbPolicyConfigHelper::getTypedLbConfigFromLegacyProto( |
1277 | 2.40k | lb_factory_context, config, server_context.messageValidationVisitor()); |
1278 | | |
1279 | 2.40k | if (!lb_pair.ok()) { |
1280 | 0 | throwEnvoyExceptionOrPanic(std::string(lb_pair.status().message())); |
1281 | 0 | } |
1282 | 2.40k | load_balancer_factory_ = lb_pair->factory; |
1283 | 2.40k | ASSERT(load_balancer_factory_ != nullptr, "null load balancer factory"); |
1284 | 2.40k | load_balancer_config_ = std::move(lb_pair->config); |
1285 | 2.40k | } |
1286 | | |
1287 | 2.40k | if (config.lb_subset_config().locality_weight_aware() && |
1288 | 2.40k | !config.common_lb_config().has_locality_weighted_lb_config()) { |
1289 | 0 | throwEnvoyExceptionOrPanic(fmt::format("Locality weight aware subset LB requires that a " |
1290 | 0 | "locality_weighted_lb_config be set in {}", |
1291 | 0 | name_)); |
1292 | 0 | } |
1293 | | |
1294 | | // Use default (1h) or configured `idle_timeout`, unless it's set to 0, indicating that no |
1295 | | // timeout should be used. |
1296 | 2.40k | absl::optional<std::chrono::milliseconds> idle_timeout(std::chrono::hours(1)); |
1297 | 2.40k | if (http_protocol_options_->common_http_protocol_options_.has_idle_timeout()) { |
1298 | 0 | idle_timeout = std::chrono::milliseconds(DurationUtil::durationToMilliseconds( |
1299 | 0 | http_protocol_options_->common_http_protocol_options_.idle_timeout())); |
1300 | 0 | if (idle_timeout.value().count() == 0) { |
1301 | 0 | idle_timeout = absl::nullopt; |
1302 | 0 | } |
1303 | 0 | } |
1304 | 2.40k | if (idle_timeout.has_value()) { |
1305 | 2.40k | optional_timeouts_.set<OptionalTimeoutNames::IdleTimeout>(*idle_timeout); |
1306 | 2.40k | } |
1307 | | |
1308 | | // Use default (10m) or configured `tcp_pool_idle_timeout`, unless it's set to 0, indicating |
1309 | | // that no timeout should be used. |
1310 | 2.40k | absl::optional<std::chrono::milliseconds> tcp_pool_idle_timeout(std::chrono::minutes(10)); |
1311 | 2.40k | if (tcp_protocol_options_ && tcp_protocol_options_->idleTimeout().has_value()) { |
1312 | 0 | tcp_pool_idle_timeout = tcp_protocol_options_->idleTimeout(); |
1313 | 0 | if (tcp_pool_idle_timeout.value().count() == 0) { |
1314 | 0 | tcp_pool_idle_timeout = absl::nullopt; |
1315 | 0 | } |
1316 | 0 | } |
1317 | 2.40k | if (tcp_pool_idle_timeout.has_value()) { |
1318 | 2.40k | optional_timeouts_.set<OptionalTimeoutNames::TcpPoolIdleTimeout>(*tcp_pool_idle_timeout); |
1319 | 2.40k | } |
1320 | | |
1321 | | // Use configured `max_connection_duration`, unless it's set to 0, indicating that |
1322 | | // no timeout should be used. No timeout by default either. |
1323 | 2.40k | absl::optional<std::chrono::milliseconds> max_connection_duration; |
1324 | 2.40k | if (http_protocol_options_->common_http_protocol_options_.has_max_connection_duration()) { |
1325 | 0 | max_connection_duration = std::chrono::milliseconds(DurationUtil::durationToMilliseconds( |
1326 | 0 | http_protocol_options_->common_http_protocol_options_.max_connection_duration())); |
1327 | 0 | if (max_connection_duration.value().count() == 0) { |
1328 | 0 | max_connection_duration = absl::nullopt; |
1329 | 0 | } |
1330 | 0 | } |
1331 | 2.40k | if (max_connection_duration.has_value()) { |
1332 | 0 | optional_timeouts_.set<OptionalTimeoutNames::MaxConnectionDuration>(*max_connection_duration); |
1333 | 0 | } |
1334 | | |
1335 | 2.40k | if (config.has_eds_cluster_config()) { |
1336 | 14 | if (config.type() != envoy::config::cluster::v3::Cluster::EDS) { |
1337 | 0 | throwEnvoyExceptionOrPanic("eds_cluster_config set in a non-EDS cluster"); |
1338 | 0 | } |
1339 | 14 | } |
1340 | | |
1341 | | // TODO(htuch): Remove this temporary workaround when we have |
1342 | | // https://github.com/bufbuild/protoc-gen-validate/issues/97 resolved. This just provides |
1343 | | // early validation of sanity of fields that we should catch at config ingestion. |
1344 | 2.40k | DurationUtil::durationToMilliseconds(common_lb_config_->update_merge_window()); |
1345 | | |
1346 | | // Create upstream network filter factories |
1347 | 2.40k | const auto& filters = config.filters(); |
1348 | 2.40k | ASSERT(filter_factories_.empty()); |
1349 | 2.40k | filter_factories_.reserve(filters.size()); |
1350 | 2.40k | for (ssize_t i = 0; i < filters.size(); i++) { |
1351 | 0 | const auto& proto_config = filters[i]; |
1352 | 0 | const bool is_terminal = i == filters.size() - 1; |
1353 | 0 | ENVOY_LOG(debug, " upstream network filter #{}:", i); |
1354 | |
|
1355 | 0 | if (proto_config.has_config_discovery()) { |
1356 | 0 | if (proto_config.has_typed_config()) { |
1357 | 0 | throwEnvoyExceptionOrPanic("Only one of typed_config or config_discovery can be used"); |
1358 | 0 | } |
1359 | | |
1360 | 0 | ENVOY_LOG(debug, " dynamic filter name: {}", proto_config.name()); |
1361 | 0 | filter_factories_.push_back( |
1362 | 0 | network_filter_config_provider_manager_->createDynamicFilterConfigProvider( |
1363 | 0 | proto_config.config_discovery(), proto_config.name(), server_context, |
1364 | 0 | upstream_context_, factory_context.clusterManager(), is_terminal, "network", |
1365 | 0 | nullptr)); |
1366 | 0 | continue; |
1367 | 0 | } |
1368 | | |
1369 | 0 | ENVOY_LOG(debug, " name: {}", proto_config.name()); |
1370 | 0 | auto& factory = Config::Utility::getAndCheckFactory< |
1371 | 0 | Server::Configuration::NamedUpstreamNetworkFilterConfigFactory>(proto_config); |
1372 | 0 | auto message = factory.createEmptyConfigProto(); |
1373 | 0 | Config::Utility::translateOpaqueConfig(proto_config.typed_config(), |
1374 | 0 | factory_context.messageValidationVisitor(), *message); |
1375 | 0 | Network::FilterFactoryCb callback = |
1376 | 0 | factory.createFilterFactoryFromProto(*message, upstream_context_); |
1377 | 0 | filter_factories_.push_back( |
1378 | 0 | network_filter_config_provider_manager_->createStaticFilterConfigProvider( |
1379 | 0 | callback, proto_config.name())); |
1380 | 0 | } |
1381 | | |
1382 | 2.40k | if (http_protocol_options_) { |
1383 | 2.40k | Http::FilterChainUtility::FiltersList http_filters = http_protocol_options_->http_filters_; |
1384 | 2.40k | has_configured_http_filters_ = !http_filters.empty(); |
1385 | 2.40k | static const std::string upstream_codec_type_url = |
1386 | 2.40k | envoy::extensions::filters::http::upstream_codec::v3::UpstreamCodec::default_instance() |
1387 | 2.40k | .GetTypeName(); |
1388 | 2.40k | if (http_filters.empty()) { |
1389 | 2.40k | auto* codec_filter = http_filters.Add(); |
1390 | 2.40k | codec_filter->set_name("envoy.filters.http.upstream_codec"); |
1391 | 2.40k | codec_filter->mutable_typed_config()->set_type_url( |
1392 | 2.40k | absl::StrCat("type.googleapis.com/", upstream_codec_type_url)); |
1393 | 2.40k | } else { |
1394 | 0 | const auto last_type_url = |
1395 | 0 | Config::Utility::getFactoryType(http_filters[http_filters.size() - 1].typed_config()); |
1396 | 0 | if (last_type_url != upstream_codec_type_url) { |
1397 | 0 | throwEnvoyExceptionOrPanic(fmt::format( |
1398 | 0 | "The codec filter is the only valid terminal upstream HTTP filter, use '{}'", |
1399 | 0 | upstream_codec_type_url)); |
1400 | 0 | } |
1401 | 0 | } |
1402 | | |
1403 | 2.40k | std::string prefix = stats_scope_->symbolTable().toString(stats_scope_->prefix()); |
1404 | 2.40k | Http::FilterChainHelper<Server::Configuration::UpstreamFactoryContext, |
1405 | 2.40k | Server::Configuration::UpstreamHttpFilterConfigFactory> |
1406 | 2.40k | helper(*http_filter_config_provider_manager_, upstream_context_.serverFactoryContext(), |
1407 | 2.40k | factory_context.clusterManager(), upstream_context_, prefix); |
1408 | 2.40k | THROW_IF_NOT_OK(helper.processFilters(http_filters, "upstream http", "upstream http", |
1409 | 2.40k | http_filter_factories_)); |
1410 | 2.40k | } |
1411 | 2.40k | } |
1412 | | |
1413 | | // Configures the load balancer based on config.load_balancing_policy |
1414 | | absl::Status |
1415 | | ClusterInfoImpl::configureLbPolicies(const envoy::config::cluster::v3::Cluster& config, |
1416 | 0 | Server::Configuration::ServerFactoryContext& context) { |
1417 | | // Check if load_balancing_policy is set first. |
1418 | 0 | if (!config.has_load_balancing_policy()) { |
1419 | 0 | return absl::InvalidArgumentError("cluster: field load_balancing_policy need to be set"); |
1420 | 0 | } |
1421 | | |
1422 | 0 | if (config.has_lb_subset_config()) { |
1423 | 0 | return absl::InvalidArgumentError( |
1424 | 0 | "cluster: load_balancing_policy cannot be combined with lb_subset_config"); |
1425 | 0 | } |
1426 | | |
1427 | 0 | if (config.has_common_lb_config()) { |
1428 | 0 | const auto& lb_config = config.common_lb_config(); |
1429 | 0 | if (lb_config.has_zone_aware_lb_config() || lb_config.has_locality_weighted_lb_config() || |
1430 | 0 | lb_config.has_consistent_hashing_lb_config()) { |
1431 | 0 | return absl::InvalidArgumentError( |
1432 | 0 | "cluster: load_balancing_policy cannot be combined with partial fields " |
1433 | 0 | "(zone_aware_lb_config, " |
1434 | 0 | "locality_weighted_lb_config, consistent_hashing_lb_config) of common_lb_config"); |
1435 | 0 | } |
1436 | 0 | } |
1437 | | |
1438 | 0 | absl::InlinedVector<absl::string_view, 4> missing_policies; |
1439 | 0 | for (const auto& policy : config.load_balancing_policy().policies()) { |
1440 | 0 | TypedLoadBalancerFactory* factory = |
1441 | 0 | Config::Utility::getAndCheckFactory<TypedLoadBalancerFactory>( |
1442 | 0 | policy.typed_extension_config(), /*is_optional=*/true); |
1443 | 0 | if (factory != nullptr) { |
1444 | | // Load and validate the configuration. |
1445 | 0 | LoadBalancerFactoryContextImpl lb_factory_context(context); |
1446 | 0 | auto proto_message = factory->createEmptyConfigProto(); |
1447 | 0 | Config::Utility::translateOpaqueConfig(policy.typed_extension_config().typed_config(), |
1448 | 0 | context.messageValidationVisitor(), *proto_message); |
1449 | |
|
1450 | 0 | load_balancer_factory_ = factory; |
1451 | 0 | load_balancer_config_ = factory->loadConfig(lb_factory_context, *proto_message, |
1452 | 0 | context.messageValidationVisitor()); |
1453 | |
|
1454 | 0 | break; |
1455 | 0 | } |
1456 | 0 | missing_policies.push_back(policy.typed_extension_config().name()); |
1457 | 0 | } |
1458 | |
|
1459 | 0 | if (load_balancer_factory_ == nullptr) { |
1460 | 0 | return absl::InvalidArgumentError( |
1461 | 0 | fmt::format("cluster: didn't find a registered load balancer factory " |
1462 | 0 | "implementation for cluster: '{}' with names from [{}]", |
1463 | 0 | name_, absl::StrJoin(missing_policies, ", "))); |
1464 | 0 | } |
1465 | 0 | return absl::OkStatus(); |
1466 | 0 | } |
1467 | | |
1468 | | ProtocolOptionsConfigConstSharedPtr |
1469 | 4.81k | ClusterInfoImpl::extensionProtocolOptions(const std::string& name) const { |
1470 | 4.81k | auto i = extension_protocol_options_.find(name); |
1471 | 4.81k | if (i != extension_protocol_options_.end()) { |
1472 | 1.54k | return i->second; |
1473 | 1.54k | } |
1474 | 3.26k | return nullptr; |
1475 | 4.81k | } |
1476 | | |
1477 | | absl::StatusOr<Network::UpstreamTransportSocketFactoryPtr> createTransportSocketFactory( |
1478 | | const envoy::config::cluster::v3::Cluster& config, |
1479 | 2.40k | Server::Configuration::TransportSocketFactoryContext& factory_context) { |
1480 | | // If the cluster config doesn't have a transport socket configured, override with the default |
1481 | | // transport socket implementation based on the tls_context. We copy by value first then |
1482 | | // override if necessary. |
1483 | 2.40k | auto transport_socket = config.transport_socket(); |
1484 | 2.40k | if (!config.has_transport_socket()) { |
1485 | 2.40k | envoy::extensions::transport_sockets::raw_buffer::v3::RawBuffer raw_buffer; |
1486 | 2.40k | transport_socket.mutable_typed_config()->PackFrom(raw_buffer); |
1487 | 2.40k | transport_socket.set_name("envoy.transport_sockets.raw_buffer"); |
1488 | 2.40k | } |
1489 | | |
1490 | 2.40k | auto& config_factory = Config::Utility::getAndCheckFactory< |
1491 | 2.40k | Server::Configuration::UpstreamTransportSocketConfigFactory>(transport_socket); |
1492 | 2.40k | ProtobufTypes::MessagePtr message = Config::Utility::translateToFactoryConfig( |
1493 | 2.40k | transport_socket, factory_context.messageValidationVisitor(), config_factory); |
1494 | 2.40k | return config_factory.createTransportSocketFactory(*message, factory_context); |
1495 | 2.40k | } |
1496 | | |
1497 | 911 | void ClusterInfoImpl::createNetworkFilterChain(Network::Connection& connection) const { |
1498 | 911 | for (const auto& filter_config_provider : filter_factories_) { |
1499 | 0 | auto config = filter_config_provider->config(); |
1500 | 0 | if (config.has_value()) { |
1501 | 0 | Network::FilterFactoryCb& factory = config.value(); |
1502 | 0 | factory(connection); |
1503 | 0 | } |
1504 | 0 | } |
1505 | 911 | } |
1506 | | |
1507 | | std::vector<Http::Protocol> |
1508 | 1.81k | ClusterInfoImpl::upstreamHttpProtocol(absl::optional<Http::Protocol> downstream_protocol) const { |
1509 | 1.81k | if (downstream_protocol.has_value() && |
1510 | 1.81k | features_ & Upstream::ClusterInfo::Features::USE_DOWNSTREAM_PROTOCOL) { |
1511 | 0 | if (downstream_protocol.value() == Http::Protocol::Http3 && |
1512 | 0 | !(features_ & Upstream::ClusterInfo::Features::HTTP3)) { |
1513 | 0 | return {Http::Protocol::Http2}; |
1514 | 0 | } |
1515 | | // use HTTP11 since HTTP10 upstream is not supported yet. |
1516 | 0 | if (downstream_protocol.value() == Http::Protocol::Http10) { |
1517 | 0 | return {Http::Protocol::Http11}; |
1518 | 0 | } |
1519 | 0 | return {downstream_protocol.value()}; |
1520 | 0 | } |
1521 | | |
1522 | 1.81k | if (features_ & Upstream::ClusterInfo::Features::USE_ALPN) { |
1523 | 0 | if (!(features_ & Upstream::ClusterInfo::Features::HTTP3)) { |
1524 | 0 | return {Http::Protocol::Http2, Http::Protocol::Http11}; |
1525 | 0 | } |
1526 | 0 | return {Http::Protocol::Http3, Http::Protocol::Http2, Http::Protocol::Http11}; |
1527 | 0 | } |
1528 | | |
1529 | 1.81k | if (features_ & Upstream::ClusterInfo::Features::HTTP3) { |
1530 | 0 | return {Http::Protocol::Http3}; |
1531 | 0 | } |
1532 | | |
1533 | 1.81k | return {(features_ & Upstream::ClusterInfo::Features::HTTP2) ? Http::Protocol::Http2 |
1534 | 1.81k | : Http::Protocol::Http11}; |
1535 | 1.81k | } |
1536 | | |
1537 | | absl::StatusOr<bool> validateTransportSocketSupportsQuic( |
1538 | 0 | const envoy::config::core::v3::TransportSocket& transport_socket) { |
1539 | | // The transport socket is valid for QUIC if it is either a QUIC transport socket, |
1540 | | // or if it is a QUIC transport socket wrapped in an HTTP/1.1 proxy socket. |
1541 | 0 | if (transport_socket.name() == "envoy.transport_sockets.quic") { |
1542 | 0 | return true; |
1543 | 0 | } |
1544 | 0 | if (transport_socket.name() != "envoy.transport_sockets.http_11_proxy") { |
1545 | 0 | return false; |
1546 | 0 | } |
1547 | 0 | envoy::extensions::transport_sockets::http_11_proxy::v3::Http11ProxyUpstreamTransport |
1548 | 0 | http11_socket; |
1549 | 0 | RETURN_IF_NOT_OK(MessageUtil::unpackTo(transport_socket.typed_config(), http11_socket)); |
1550 | 0 | return http11_socket.transport_socket().name() == "envoy.transport_sockets.quic"; |
1551 | 0 | } |
1552 | | |
1553 | | ClusterImplBase::ClusterImplBase(const envoy::config::cluster::v3::Cluster& cluster, |
1554 | | ClusterFactoryContext& cluster_context, |
1555 | | absl::Status& creation_status) |
1556 | | : init_manager_(fmt::format("Cluster {}", cluster.name())), |
1557 | 2.40k | init_watcher_("ClusterImplBase", [this]() { onInitDone(); }), |
1558 | | runtime_(cluster_context.serverFactoryContext().runtime()), |
1559 | | wait_for_warm_on_init_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(cluster, wait_for_warm_on_init, true)), |
1560 | | random_(cluster_context.serverFactoryContext().api().randomGenerator()), |
1561 | | time_source_(cluster_context.serverFactoryContext().timeSource()), |
1562 | | local_cluster_(cluster_context.clusterManager().localClusterName().value_or("") == |
1563 | | cluster.name()), |
1564 | | const_metadata_shared_pool_(Config::Metadata::getConstMetadataSharedPool( |
1565 | | cluster_context.serverFactoryContext().singletonManager(), |
1566 | 2.40k | cluster_context.serverFactoryContext().mainThreadDispatcher())) { |
1567 | 2.40k | auto& server_context = cluster_context.serverFactoryContext(); |
1568 | | |
1569 | 2.40k | auto stats_scope = generateStatsScope(cluster, server_context.serverScope().store()); |
1570 | 2.40k | transport_factory_context_ = |
1571 | 2.40k | std::make_unique<Server::Configuration::TransportSocketFactoryContextImpl>( |
1572 | 2.40k | server_context, cluster_context.sslContextManager(), *stats_scope, |
1573 | 2.40k | cluster_context.clusterManager(), cluster_context.messageValidationVisitor()); |
1574 | 2.40k | transport_factory_context_->setInitManager(init_manager_); |
1575 | | |
1576 | 2.40k | auto socket_factory_or_error = createTransportSocketFactory(cluster, *transport_factory_context_); |
1577 | 2.40k | SET_AND_RETURN_IF_NOT_OK(socket_factory_or_error.status(), creation_status); |
1578 | 2.40k | auto* raw_factory_pointer = socket_factory_or_error.value().get(); |
1579 | | |
1580 | 2.40k | auto socket_matcher_or_error = TransportSocketMatcherImpl::create( |
1581 | 2.40k | cluster.transport_socket_matches(), *transport_factory_context_, |
1582 | 2.40k | socket_factory_or_error.value(), *stats_scope); |
1583 | 2.40k | SET_AND_RETURN_IF_NOT_OK(socket_matcher_or_error.status(), creation_status); |
1584 | 2.40k | auto socket_matcher = std::move(*socket_matcher_or_error); |
1585 | 2.40k | const bool matcher_supports_alpn = socket_matcher->allMatchesSupportAlpn(); |
1586 | 2.40k | auto& dispatcher = server_context.mainThreadDispatcher(); |
1587 | 2.40k | info_ = std::shared_ptr<const ClusterInfoImpl>( |
1588 | 2.40k | new ClusterInfoImpl(init_manager_, server_context, cluster, |
1589 | 2.40k | cluster_context.clusterManager().bindConfig(), runtime_, |
1590 | 2.40k | std::move(socket_matcher), std::move(stats_scope), |
1591 | 2.40k | cluster_context.addedViaApi(), *transport_factory_context_), |
1592 | 2.40k | [&dispatcher](const ClusterInfoImpl* self) { |
1593 | 2.40k | ENVOY_LOG(trace, "Schedule destroy cluster info {}", self->name()); |
1594 | 2.40k | dispatcher.deleteInDispatcherThread( |
1595 | 2.40k | std::unique_ptr<const Event::DispatcherThreadDeletable>(self)); |
1596 | 2.40k | }); |
1597 | | |
1598 | 2.40k | if ((info_->features() & ClusterInfoImpl::Features::USE_ALPN)) { |
1599 | 0 | if (!raw_factory_pointer->supportsAlpn()) { |
1600 | 0 | creation_status = absl::InvalidArgumentError( |
1601 | 0 | fmt::format("ALPN configured for cluster {} which has a non-ALPN transport socket: {}", |
1602 | 0 | cluster.name(), cluster.DebugString())); |
1603 | 0 | return; |
1604 | 0 | } |
1605 | 0 | if (!matcher_supports_alpn && |
1606 | 0 | !runtime_.snapshot().featureEnabled(ClusterImplBase::DoNotValidateAlpnRuntimeKey, 0)) { |
1607 | 0 | creation_status = absl::InvalidArgumentError(fmt::format( |
1608 | 0 | "ALPN configured for cluster {} which has a non-ALPN transport socket matcher: {}", |
1609 | 0 | cluster.name(), cluster.DebugString())); |
1610 | 0 | return; |
1611 | 0 | } |
1612 | 0 | } |
1613 | | |
1614 | 2.40k | if (info_->features() & ClusterInfoImpl::Features::HTTP3) { |
1615 | 0 | #if defined(ENVOY_ENABLE_QUIC) |
1616 | 0 | absl::StatusOr<bool> supports_quic = |
1617 | 0 | validateTransportSocketSupportsQuic(cluster.transport_socket()); |
1618 | 0 | SET_AND_RETURN_IF_NOT_OK(supports_quic.status(), creation_status); |
1619 | 0 | if (!*supports_quic) { |
1620 | 0 | creation_status = absl::InvalidArgumentError( |
1621 | 0 | fmt::format("HTTP3 requires a QuicUpstreamTransport transport socket: {} {}", |
1622 | 0 | cluster.name(), cluster.transport_socket().DebugString())); |
1623 | 0 | return; |
1624 | 0 | } |
1625 | | #else |
1626 | | creation_status = absl::InvalidArgumentError("HTTP3 configured but not enabled in the build."); |
1627 | | return; |
1628 | | #endif |
1629 | 0 | } |
1630 | | |
1631 | | // Create the default (empty) priority set before registering callbacks to |
1632 | | // avoid getting an update the first time it is accessed. |
1633 | 2.40k | priority_set_.getOrCreateHostSet(0); |
1634 | 2.40k | priority_update_cb_ = priority_set_.addPriorityUpdateCb( |
1635 | 2.40k | [this](uint32_t, const HostVector& hosts_added, const HostVector& hosts_removed) { |
1636 | 2.40k | if (!hosts_added.empty() || !hosts_removed.empty()) { |
1637 | 2.40k | info_->endpointStats().membership_change_.inc(); |
1638 | 2.40k | } |
1639 | | |
1640 | 2.40k | uint32_t healthy_hosts = 0; |
1641 | 2.40k | uint32_t degraded_hosts = 0; |
1642 | 2.40k | uint32_t excluded_hosts = 0; |
1643 | 2.40k | uint32_t hosts = 0; |
1644 | 2.40k | for (const auto& host_set : prioritySet().hostSetsPerPriority()) { |
1645 | 2.40k | hosts += host_set->hosts().size(); |
1646 | 2.40k | healthy_hosts += host_set->healthyHosts().size(); |
1647 | 2.40k | degraded_hosts += host_set->degradedHosts().size(); |
1648 | 2.40k | excluded_hosts += host_set->excludedHosts().size(); |
1649 | 2.40k | } |
1650 | 2.40k | info_->endpointStats().membership_total_.set(hosts); |
1651 | 2.40k | info_->endpointStats().membership_healthy_.set(healthy_hosts); |
1652 | 2.40k | info_->endpointStats().membership_degraded_.set(degraded_hosts); |
1653 | 2.40k | info_->endpointStats().membership_excluded_.set(excluded_hosts); |
1654 | 2.40k | return absl::OkStatus(); |
1655 | 2.40k | }); |
1656 | | // Drop overload configuration parsing. |
1657 | 2.40k | SET_AND_RETURN_IF_NOT_OK(parseDropOverloadConfig(cluster.load_assignment()), creation_status); |
1658 | 2.40k | } |
1659 | | |
1660 | | namespace { |
1661 | 4.81k | bool excludeBasedOnHealthFlag(const Host& host) { |
1662 | 4.81k | return host.healthFlagGet(Host::HealthFlag::PENDING_ACTIVE_HC) || |
1663 | 4.81k | host.healthFlagGet(Host::HealthFlag::EXCLUDED_VIA_IMMEDIATE_HC_FAIL) || |
1664 | 4.81k | host.healthFlagGet(Host::HealthFlag::EDS_STATUS_DRAINING); |
1665 | 4.81k | } |
1666 | | |
1667 | | } // namespace |
1668 | | |
1669 | | std::tuple<HealthyHostVectorConstSharedPtr, DegradedHostVectorConstSharedPtr, |
1670 | | ExcludedHostVectorConstSharedPtr> |
1671 | 2.40k | ClusterImplBase::partitionHostList(const HostVector& hosts) { |
1672 | 2.40k | auto healthy_list = std::make_shared<HealthyHostVector>(); |
1673 | 2.40k | auto degraded_list = std::make_shared<DegradedHostVector>(); |
1674 | 2.40k | auto excluded_list = std::make_shared<ExcludedHostVector>(); |
1675 | | |
1676 | 2.40k | for (const auto& host : hosts) { |
1677 | 2.40k | const Host::Health health_status = host->coarseHealth(); |
1678 | 2.40k | if (health_status == Host::Health::Healthy) { |
1679 | 2.40k | healthy_list->get().emplace_back(host); |
1680 | 2.40k | } else if (health_status == Host::Health::Degraded) { |
1681 | 0 | degraded_list->get().emplace_back(host); |
1682 | 0 | } |
1683 | 2.40k | if (excludeBasedOnHealthFlag(*host)) { |
1684 | 0 | excluded_list->get().emplace_back(host); |
1685 | 0 | } |
1686 | 2.40k | } |
1687 | | |
1688 | 2.40k | return std::make_tuple(healthy_list, degraded_list, excluded_list); |
1689 | 2.40k | } |
1690 | | |
1691 | | std::tuple<HostsPerLocalityConstSharedPtr, HostsPerLocalityConstSharedPtr, |
1692 | | HostsPerLocalityConstSharedPtr> |
1693 | 2.40k | ClusterImplBase::partitionHostsPerLocality(const HostsPerLocality& hosts) { |
1694 | 2.40k | auto filtered_clones = |
1695 | 2.40k | hosts.filter({[](const Host& host) { return host.coarseHealth() == Host::Health::Healthy; }, |
1696 | 2.40k | [](const Host& host) { return host.coarseHealth() == Host::Health::Degraded; }, |
1697 | 2.40k | [](const Host& host) { return excludeBasedOnHealthFlag(host); }}); |
1698 | | |
1699 | 2.40k | return std::make_tuple(std::move(filtered_clones[0]), std::move(filtered_clones[1]), |
1700 | 2.40k | std::move(filtered_clones[2])); |
1701 | 2.40k | } |
1702 | | |
1703 | 1.81k | bool ClusterInfoImpl::maintenanceMode() const { |
1704 | 1.81k | return runtime_.snapshot().featureEnabled(maintenance_mode_runtime_key_, 0); |
1705 | 1.81k | } |
1706 | | |
1707 | 15.4k | ResourceManager& ClusterInfoImpl::resourceManager(ResourcePriority priority) const { |
1708 | 15.4k | ASSERT(enumToInt(priority) < resource_managers_.managers_.size()); |
1709 | 15.4k | return *resource_managers_.managers_[enumToInt(priority)]; |
1710 | 15.4k | } |
1711 | | |
1712 | 2.40k | void ClusterImplBase::initialize(std::function<void()> callback) { |
1713 | 2.40k | ASSERT(!initialization_started_); |
1714 | 2.40k | ASSERT(initialization_complete_callback_ == nullptr); |
1715 | 2.40k | initialization_complete_callback_ = callback; |
1716 | 2.40k | startPreInit(); |
1717 | 2.40k | } |
1718 | | |
1719 | 2.40k | void ClusterImplBase::onPreInitComplete() { |
1720 | | // Protect against multiple calls. |
1721 | 2.40k | if (initialization_started_) { |
1722 | 0 | return; |
1723 | 0 | } |
1724 | 2.40k | initialization_started_ = true; |
1725 | | |
1726 | 2.40k | ENVOY_LOG(debug, "initializing {} cluster {} completed", |
1727 | 2.40k | initializePhase() == InitializePhase::Primary ? "Primary" : "Secondary", |
1728 | 2.40k | info()->name()); |
1729 | 2.40k | init_manager_.initialize(init_watcher_); |
1730 | 2.40k | } |
1731 | | |
1732 | 2.40k | void ClusterImplBase::onInitDone() { |
1733 | 2.40k | info()->configUpdateStats().warming_state_.set(0); |
1734 | 2.40k | if (health_checker_ && pending_initialize_health_checks_ == 0) { |
1735 | 0 | for (auto& host_set : prioritySet().hostSetsPerPriority()) { |
1736 | 0 | for (auto& host : host_set->hosts()) { |
1737 | 0 | if (host->disableActiveHealthCheck()) { |
1738 | 0 | continue; |
1739 | 0 | } |
1740 | 0 | ++pending_initialize_health_checks_; |
1741 | 0 | } |
1742 | 0 | } |
1743 | 0 | ENVOY_LOG(debug, "Cluster onInitDone pending initialize health check count {}", |
1744 | 0 | pending_initialize_health_checks_); |
1745 | | |
1746 | | // TODO(mattklein123): Remove this callback when done. |
1747 | 0 | health_checker_->addHostCheckCompleteCb( |
1748 | 0 | [this](HostSharedPtr, HealthTransition, HealthState) -> void { |
1749 | 0 | if (pending_initialize_health_checks_ > 0 && --pending_initialize_health_checks_ == 0) { |
1750 | 0 | finishInitialization(); |
1751 | 0 | } |
1752 | 0 | }); |
1753 | 0 | } |
1754 | | |
1755 | 2.40k | if (pending_initialize_health_checks_ == 0) { |
1756 | 2.40k | finishInitialization(); |
1757 | 2.40k | } |
1758 | 2.40k | } |
1759 | | |
1760 | 2.40k | void ClusterImplBase::finishInitialization() { |
1761 | 2.40k | ASSERT(initialization_complete_callback_ != nullptr); |
1762 | 2.40k | ASSERT(initialization_started_); |
1763 | | |
1764 | | // Snap a copy of the completion callback so that we can set it to nullptr to unblock |
1765 | | // reloadHealthyHosts(). See that function for more info on why we do this. |
1766 | 2.40k | auto snapped_callback = initialization_complete_callback_; |
1767 | 2.40k | initialization_complete_callback_ = nullptr; |
1768 | | |
1769 | 2.40k | if (health_checker_ != nullptr) { |
1770 | 0 | reloadHealthyHosts(nullptr); |
1771 | 0 | } |
1772 | | |
1773 | 2.40k | if (snapped_callback != nullptr) { |
1774 | 2.40k | snapped_callback(); |
1775 | 2.40k | } |
1776 | 2.40k | } |
1777 | | |
1778 | | absl::Status ClusterImplBase::parseDropOverloadConfig( |
1779 | 2.42k | const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment) { |
1780 | | // Default drop_overload_ to zero. |
1781 | 2.42k | drop_overload_ = UnitFloat(0); |
1782 | | |
1783 | 2.42k | if (!cluster_load_assignment.has_policy()) { |
1784 | 2.42k | return absl::OkStatus(); |
1785 | 2.42k | } |
1786 | 0 | const auto& policy = cluster_load_assignment.policy(); |
1787 | 0 | if (policy.drop_overloads().empty()) { |
1788 | 0 | return absl::OkStatus(); |
1789 | 0 | } |
1790 | 0 | if (policy.drop_overloads().size() > kDropOverloadSize) { |
1791 | 0 | return absl::InvalidArgumentError( |
1792 | 0 | fmt::format("Cluster drop_overloads config has {} categories. Envoy only support one.", |
1793 | 0 | policy.drop_overloads().size())); |
1794 | 0 | } |
1795 | | |
1796 | 0 | const auto& drop_percentage = policy.drop_overloads(0).drop_percentage(); |
1797 | 0 | float denominator = 100; |
1798 | 0 | switch (drop_percentage.denominator()) { |
1799 | 0 | case envoy::type::v3::FractionalPercent::HUNDRED: |
1800 | 0 | denominator = 100; |
1801 | 0 | break; |
1802 | 0 | case envoy::type::v3::FractionalPercent::TEN_THOUSAND: |
1803 | 0 | denominator = 10000; |
1804 | 0 | break; |
1805 | 0 | case envoy::type::v3::FractionalPercent::MILLION: |
1806 | 0 | denominator = 1000000; |
1807 | 0 | break; |
1808 | 0 | default: |
1809 | 0 | return absl::InvalidArgumentError(fmt::format( |
1810 | 0 | "Cluster drop_overloads config denominator setting is invalid : {}. Valid range 0~2.", |
1811 | 0 | static_cast<int>(drop_percentage.denominator()))); |
1812 | 0 | } |
1813 | | |
1814 | | // If DropOverloadRuntimeKey is not enabled, honor the EDS drop_overload config. |
1815 | | // If it is enabled, choose the smaller one between it and the EDS config. |
1816 | 0 | float drop_ratio = float(drop_percentage.numerator()) / (denominator); |
1817 | 0 | if (drop_ratio > 1) { |
1818 | 0 | return absl::InvalidArgumentError( |
1819 | 0 | fmt::format("Cluster drop_overloads config is invalid. drop_ratio={}(Numerator {} / " |
1820 | 0 | "Denominator {}). The valid range is 0~1.", |
1821 | 0 | drop_ratio, drop_percentage.numerator(), denominator)); |
1822 | 0 | } |
1823 | 0 | const uint64_t MAX_DROP_OVERLOAD_RUNTIME = 100; |
1824 | 0 | uint64_t drop_ratio_runtime = runtime_.snapshot().getInteger( |
1825 | 0 | ClusterImplBase::DropOverloadRuntimeKey, MAX_DROP_OVERLOAD_RUNTIME); |
1826 | 0 | if (drop_ratio_runtime > MAX_DROP_OVERLOAD_RUNTIME) { |
1827 | 0 | return absl::InvalidArgumentError( |
1828 | 0 | fmt::format("load_balancing_policy.drop_overload_limit runtime key config {} is invalid. " |
1829 | 0 | "The valid range is 0~100", |
1830 | 0 | drop_ratio_runtime)); |
1831 | 0 | } |
1832 | | |
1833 | 0 | drop_ratio = std::min(drop_ratio, float(drop_ratio_runtime) / float(MAX_DROP_OVERLOAD_RUNTIME)); |
1834 | 0 | drop_overload_ = UnitFloat(drop_ratio); |
1835 | 0 | drop_category_ = policy.drop_overloads(0).category(); |
1836 | 0 | return absl::OkStatus(); |
1837 | 0 | } |
1838 | | |
1839 | 0 | void ClusterImplBase::setHealthChecker(const HealthCheckerSharedPtr& health_checker) { |
1840 | 0 | ASSERT(!health_checker_); |
1841 | 0 | health_checker_ = health_checker; |
1842 | 0 | health_checker_->start(); |
1843 | 0 | health_checker_->addHostCheckCompleteCb( |
1844 | 0 | [this](const HostSharedPtr& host, HealthTransition changed_state, HealthState) -> void { |
1845 | | // If we get a health check completion that resulted in a state change, signal to |
1846 | | // update the host sets on all threads. |
1847 | 0 | if (changed_state == HealthTransition::Changed) { |
1848 | 0 | reloadHealthyHosts(host); |
1849 | 0 | } |
1850 | 0 | }); |
1851 | 0 | } |
1852 | | |
1853 | 2.40k | void ClusterImplBase::setOutlierDetector(const Outlier::DetectorSharedPtr& outlier_detector) { |
1854 | 2.40k | if (!outlier_detector) { |
1855 | 2.40k | return; |
1856 | 2.40k | } |
1857 | | |
1858 | 0 | outlier_detector_ = outlier_detector; |
1859 | 0 | outlier_detector_->addChangedStateCb( |
1860 | 0 | [this](const HostSharedPtr& host) -> void { reloadHealthyHosts(host); }); |
1861 | 0 | } |
1862 | | |
1863 | 0 | void ClusterImplBase::reloadHealthyHosts(const HostSharedPtr& host) { |
1864 | | // Every time a host changes Health Check state we cause a full healthy host recalculation which |
1865 | | // for expensive LBs (ring, subset, etc.) can be quite time consuming. During startup, this |
1866 | | // can also block worker threads by doing this repeatedly. There is no reason to do this |
1867 | | // as we will not start taking traffic until we are initialized. By blocking Health Check |
1868 | | // updates while initializing we can avoid this. |
1869 | 0 | if (initialization_complete_callback_ != nullptr) { |
1870 | 0 | return; |
1871 | 0 | } |
1872 | | |
1873 | 0 | reloadHealthyHostsHelper(host); |
1874 | 0 | } |
1875 | | |
1876 | 0 | void ClusterImplBase::reloadHealthyHostsHelper(const HostSharedPtr&) { |
1877 | 0 | const auto& host_sets = prioritySet().hostSetsPerPriority(); |
1878 | 0 | for (size_t priority = 0; priority < host_sets.size(); ++priority) { |
1879 | 0 | const auto& host_set = host_sets[priority]; |
1880 | | // TODO(htuch): Can we skip these copies by exporting out const shared_ptr from HostSet? |
1881 | 0 | HostVectorConstSharedPtr hosts_copy = std::make_shared<HostVector>(host_set->hosts()); |
1882 | |
|
1883 | 0 | HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().clone(); |
1884 | 0 | prioritySet().updateHosts( |
1885 | 0 | priority, HostSetImpl::partitionHosts(hosts_copy, hosts_per_locality_copy), |
1886 | 0 | host_set->localityWeights(), {}, {}, random_.random(), absl::nullopt, absl::nullopt); |
1887 | 0 | } |
1888 | 0 | } |
1889 | | |
1890 | | absl::StatusOr<const Network::Address::InstanceConstSharedPtr> |
1891 | 2.40k | ClusterImplBase::resolveProtoAddress(const envoy::config::core::v3::Address& address) { |
1892 | 2.40k | absl::Status resolve_status; |
1893 | 2.40k | TRY_ASSERT_MAIN_THREAD { |
1894 | 2.40k | auto address_or_error = Network::Address::resolveProtoAddress(address); |
1895 | 2.40k | if (address_or_error.status().ok()) { |
1896 | 2.40k | return address_or_error.value(); |
1897 | 2.40k | } |
1898 | 0 | resolve_status = address_or_error.status(); |
1899 | 0 | } |
1900 | 0 | END_TRY |
1901 | 2.40k | CATCH(EnvoyException & e, { resolve_status = absl::InvalidArgumentError(e.what()); }); |
1902 | 0 | if (info_->type() == envoy::config::cluster::v3::Cluster::STATIC || |
1903 | 0 | info_->type() == envoy::config::cluster::v3::Cluster::EDS) { |
1904 | 0 | return absl::InvalidArgumentError( |
1905 | 0 | fmt::format("{}. Consider setting resolver_name or setting cluster type " |
1906 | 0 | "to 'STRICT_DNS' or 'LOGICAL_DNS'", |
1907 | 0 | resolve_status.message())); |
1908 | 0 | } |
1909 | 0 | return resolve_status; |
1910 | 0 | } |
1911 | | |
1912 | | absl::Status ClusterImplBase::validateEndpointsForZoneAwareRouting( |
1913 | 2.40k | const envoy::config::endpoint::v3::LocalityLbEndpoints& endpoints) const { |
1914 | 2.40k | if (local_cluster_ && endpoints.priority() > 0) { |
1915 | 0 | return absl::InvalidArgumentError( |
1916 | 0 | fmt::format("Unexpected non-zero priority for local cluster '{}'.", info()->name())); |
1917 | 0 | } |
1918 | 2.40k | return absl::OkStatus(); |
1919 | 2.40k | } |
1920 | | |
1921 | | ClusterInfoImpl::OptionalClusterStats::OptionalClusterStats( |
1922 | | const envoy::config::cluster::v3::Cluster& config, Stats::Scope& stats_scope, |
1923 | | const ClusterManager& manager) |
1924 | | : timeout_budget_stats_( |
1925 | | (config.track_cluster_stats().timeout_budgets() || config.track_timeout_budgets()) |
1926 | | ? std::make_unique<ClusterTimeoutBudgetStats>(generateTimeoutBudgetStats( |
1927 | | stats_scope, manager.clusterTimeoutBudgetStatNames())) |
1928 | | : nullptr), |
1929 | | request_response_size_stats_( |
1930 | | (config.track_cluster_stats().request_response_sizes() |
1931 | | ? std::make_unique<ClusterRequestResponseSizeStats>(generateRequestResponseSizeStats( |
1932 | | stats_scope, manager.clusterRequestResponseSizeStatNames())) |
1933 | 0 | : nullptr)) {} |
1934 | | |
1935 | | ClusterInfoImpl::ResourceManagers::ResourceManagers( |
1936 | | const envoy::config::cluster::v3::Cluster& config, Runtime::Loader& runtime, |
1937 | | const std::string& cluster_name, Stats::Scope& stats_scope, |
1938 | | const ClusterCircuitBreakersStatNames& circuit_breakers_stat_names) |
1939 | 2.40k | : circuit_breakers_stat_names_(circuit_breakers_stat_names) { |
1940 | 2.40k | managers_[enumToInt(ResourcePriority::Default)] = THROW_OR_RETURN_VALUE( |
1941 | 2.40k | load(config, runtime, cluster_name, stats_scope, envoy::config::core::v3::DEFAULT), |
1942 | 2.40k | ResourceManagerImplPtr); |
1943 | 2.40k | managers_[enumToInt(ResourcePriority::High)] = THROW_OR_RETURN_VALUE( |
1944 | 2.40k | load(config, runtime, cluster_name, stats_scope, envoy::config::core::v3::HIGH), |
1945 | 2.40k | ResourceManagerImplPtr); |
1946 | 2.40k | } |
1947 | | |
1948 | | ClusterCircuitBreakersStats |
1949 | | ClusterInfoImpl::generateCircuitBreakersStats(Stats::Scope& scope, Stats::StatName prefix, |
1950 | | bool track_remaining, |
1951 | 406k | const ClusterCircuitBreakersStatNames& stat_names) { |
1952 | 4.03M | auto make_gauge = [&stat_names, &scope, prefix](Stats::StatName stat_name) -> Stats::Gauge& { |
1953 | 4.03M | return Stats::Utility::gaugeFromElements(scope, |
1954 | 4.03M | {stat_names.circuit_breakers_, prefix, stat_name}, |
1955 | 4.03M | Stats::Gauge::ImportMode::Accumulate); |
1956 | 4.03M | }; |
1957 | | |
1958 | 406k | #define REMAINING_GAUGE(stat_name) \ |
1959 | 2.03M | track_remaining ? make_gauge(stat_name) : scope.store().nullGauge() |
1960 | | |
1961 | 406k | return { |
1962 | 406k | make_gauge(stat_names.cx_open_), |
1963 | 406k | make_gauge(stat_names.cx_pool_open_), |
1964 | 406k | make_gauge(stat_names.rq_open_), |
1965 | 406k | make_gauge(stat_names.rq_pending_open_), |
1966 | 406k | make_gauge(stat_names.rq_retry_open_), |
1967 | 406k | REMAINING_GAUGE(stat_names.remaining_cx_), |
1968 | 406k | REMAINING_GAUGE(stat_names.remaining_cx_pools_), |
1969 | 406k | REMAINING_GAUGE(stat_names.remaining_pending_), |
1970 | 406k | REMAINING_GAUGE(stat_names.remaining_retries_), |
1971 | 406k | REMAINING_GAUGE(stat_names.remaining_rq_), |
1972 | 406k | }; |
1973 | | |
1974 | 406k | #undef REMAINING_GAUGE |
1975 | 406k | } |
1976 | | |
1977 | 30 | Http::Http1::CodecStats& ClusterInfoImpl::http1CodecStats() const { |
1978 | 30 | return Http::Http1::CodecStats::atomicGet(http1_codec_stats_, *stats_scope_); |
1979 | 30 | } |
1980 | | |
1981 | 881 | Http::Http2::CodecStats& ClusterInfoImpl::http2CodecStats() const { |
1982 | 881 | return Http::Http2::CodecStats::atomicGet(http2_codec_stats_, *stats_scope_); |
1983 | 881 | } |
1984 | | |
1985 | 0 | Http::Http3::CodecStats& ClusterInfoImpl::http3CodecStats() const { |
1986 | 0 | return Http::Http3::CodecStats::atomicGet(http3_codec_stats_, *stats_scope_); |
1987 | 0 | } |
1988 | | |
1989 | | #ifdef ENVOY_ENABLE_UHV |
1990 | | ::Envoy::Http::HeaderValidatorStats& |
1991 | | ClusterInfoImpl::getHeaderValidatorStats(Http::Protocol protocol) const { |
1992 | | switch (protocol) { |
1993 | | case Http::Protocol::Http10: |
1994 | | case Http::Protocol::Http11: |
1995 | | return http1CodecStats(); |
1996 | | case Http::Protocol::Http2: |
1997 | | return http2CodecStats(); |
1998 | | case Http::Protocol::Http3: |
1999 | | return http3CodecStats(); |
2000 | | } |
2001 | | PANIC_DUE_TO_CORRUPT_ENUM; |
2002 | | } |
2003 | | #endif |
2004 | | |
2005 | | Http::ClientHeaderValidatorPtr |
2006 | 1.76k | ClusterInfoImpl::makeHeaderValidator([[maybe_unused]] Http::Protocol protocol) const { |
2007 | | #ifdef ENVOY_ENABLE_UHV |
2008 | | return http_protocol_options_->header_validator_factory_ |
2009 | | ? http_protocol_options_->header_validator_factory_->createClientHeaderValidator( |
2010 | | protocol, getHeaderValidatorStats(protocol)) |
2011 | | : nullptr; |
2012 | | #else |
2013 | 1.76k | return nullptr; |
2014 | 1.76k | #endif |
2015 | 1.76k | } |
2016 | | |
2017 | | std::pair<absl::optional<double>, absl::optional<uint32_t>> ClusterInfoImpl::getRetryBudgetParams( |
2018 | 0 | const envoy::config::cluster::v3::CircuitBreakers::Thresholds& thresholds) { |
2019 | 0 | constexpr double default_budget_percent = 20.0; |
2020 | 0 | constexpr uint32_t default_retry_concurrency = 3; |
2021 | |
|
2022 | 0 | absl::optional<double> budget_percent; |
2023 | 0 | absl::optional<uint32_t> min_retry_concurrency; |
2024 | 0 | if (thresholds.has_retry_budget()) { |
2025 | | // The budget_percent and min_retry_concurrency values are only set if there is a retry budget |
2026 | | // message set in the cluster config. |
2027 | 0 | budget_percent = PROTOBUF_GET_WRAPPED_OR_DEFAULT(thresholds.retry_budget(), budget_percent, |
2028 | 0 | default_budget_percent); |
2029 | 0 | min_retry_concurrency = PROTOBUF_GET_WRAPPED_OR_DEFAULT( |
2030 | 0 | thresholds.retry_budget(), min_retry_concurrency, default_retry_concurrency); |
2031 | 0 | } |
2032 | 0 | return std::make_pair(budget_percent, min_retry_concurrency); |
2033 | 0 | } |
2034 | | |
2035 | | SINGLETON_MANAGER_REGISTRATION(upstream_network_filter_config_provider_manager); |
2036 | | |
2037 | | std::shared_ptr<UpstreamNetworkFilterConfigProviderManager> |
2038 | | ClusterInfoImpl::createSingletonUpstreamNetworkFilterConfigProviderManager( |
2039 | 2.40k | Server::Configuration::ServerFactoryContext& context) { |
2040 | 2.40k | return context.singletonManager().getTyped<UpstreamNetworkFilterConfigProviderManager>( |
2041 | 2.40k | SINGLETON_MANAGER_REGISTERED_NAME(upstream_network_filter_config_provider_manager), |
2042 | 2.40k | [] { return std::make_shared<Filter::UpstreamNetworkFilterConfigProviderManagerImpl>(); }); |
2043 | 2.40k | } |
2044 | | |
2045 | | absl::StatusOr<ResourceManagerImplPtr> |
2046 | | ClusterInfoImpl::ResourceManagers::load(const envoy::config::cluster::v3::Cluster& config, |
2047 | | Runtime::Loader& runtime, const std::string& cluster_name, |
2048 | | Stats::Scope& stats_scope, |
2049 | 4.81k | const envoy::config::core::v3::RoutingPriority& priority) { |
2050 | 4.81k | uint64_t max_connections = 1024; |
2051 | 4.81k | uint64_t max_pending_requests = 1024; |
2052 | 4.81k | uint64_t max_requests = 1024; |
2053 | 4.81k | uint64_t max_retries = 3; |
2054 | 4.81k | uint64_t max_connection_pools = std::numeric_limits<uint64_t>::max(); |
2055 | 4.81k | uint64_t max_connections_per_host = std::numeric_limits<uint64_t>::max(); |
2056 | | |
2057 | 4.81k | bool track_remaining = false; |
2058 | | |
2059 | 4.81k | Stats::StatName priority_stat_name; |
2060 | 4.81k | std::string priority_name; |
2061 | 4.81k | switch (priority) { |
2062 | 0 | PANIC_ON_PROTO_ENUM_SENTINEL_VALUES; |
2063 | 2.40k | case envoy::config::core::v3::DEFAULT: |
2064 | 2.40k | priority_stat_name = circuit_breakers_stat_names_.default_; |
2065 | 2.40k | priority_name = "default"; |
2066 | 2.40k | break; |
2067 | 2.40k | case envoy::config::core::v3::HIGH: |
2068 | 2.40k | priority_stat_name = circuit_breakers_stat_names_.high_; |
2069 | 2.40k | priority_name = "high"; |
2070 | 2.40k | break; |
2071 | 4.81k | } |
2072 | | |
2073 | 4.81k | const std::string runtime_prefix = |
2074 | 4.81k | fmt::format("circuit_breakers.{}.{}.", cluster_name, priority_name); |
2075 | | |
2076 | 4.81k | const auto& thresholds = config.circuit_breakers().thresholds(); |
2077 | 4.81k | const auto it = std::find_if( |
2078 | 4.81k | thresholds.cbegin(), thresholds.cend(), |
2079 | 4.81k | [priority](const envoy::config::cluster::v3::CircuitBreakers::Thresholds& threshold) { |
2080 | 0 | return threshold.priority() == priority; |
2081 | 0 | }); |
2082 | 4.81k | const auto& per_host_thresholds = config.circuit_breakers().per_host_thresholds(); |
2083 | 4.81k | const auto per_host_it = std::find_if( |
2084 | 4.81k | per_host_thresholds.cbegin(), per_host_thresholds.cend(), |
2085 | 4.81k | [priority](const envoy::config::cluster::v3::CircuitBreakers::Thresholds& threshold) { |
2086 | 0 | return threshold.priority() == priority; |
2087 | 0 | }); |
2088 | | |
2089 | 4.81k | absl::optional<double> budget_percent; |
2090 | 4.81k | absl::optional<uint32_t> min_retry_concurrency; |
2091 | 4.81k | if (it != thresholds.cend()) { |
2092 | 0 | max_connections = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_connections, max_connections); |
2093 | 0 | max_pending_requests = |
2094 | 0 | PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_pending_requests, max_pending_requests); |
2095 | 0 | max_requests = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_requests, max_requests); |
2096 | 0 | max_retries = PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_retries, max_retries); |
2097 | 0 | track_remaining = it->track_remaining(); |
2098 | 0 | max_connection_pools = |
2099 | 0 | PROTOBUF_GET_WRAPPED_OR_DEFAULT(*it, max_connection_pools, max_connection_pools); |
2100 | 0 | std::tie(budget_percent, min_retry_concurrency) = ClusterInfoImpl::getRetryBudgetParams(*it); |
2101 | 0 | } |
2102 | 4.81k | if (per_host_it != per_host_thresholds.cend()) { |
2103 | 0 | if (per_host_it->has_max_pending_requests() || per_host_it->has_max_requests() || |
2104 | 0 | per_host_it->has_max_retries() || per_host_it->has_max_connection_pools() || |
2105 | 0 | per_host_it->has_retry_budget()) { |
2106 | 0 | return absl::InvalidArgumentError("Unsupported field in per_host_thresholds"); |
2107 | 0 | } |
2108 | 0 | if (per_host_it->has_max_connections()) { |
2109 | 0 | max_connections_per_host = per_host_it->max_connections().value(); |
2110 | 0 | } |
2111 | 0 | } |
2112 | 4.81k | return std::make_unique<ResourceManagerImpl>( |
2113 | 4.81k | runtime, runtime_prefix, max_connections, max_pending_requests, max_requests, max_retries, |
2114 | 4.81k | max_connection_pools, max_connections_per_host, |
2115 | 4.81k | ClusterInfoImpl::generateCircuitBreakersStats(stats_scope, priority_stat_name, |
2116 | 4.81k | track_remaining, circuit_breakers_stat_names_), |
2117 | 4.81k | budget_percent, min_retry_concurrency); |
2118 | 4.81k | } |
2119 | | |
2120 | | PriorityStateManager::PriorityStateManager(ClusterImplBase& cluster, |
2121 | | const LocalInfo::LocalInfo& local_info, |
2122 | | PrioritySet::HostUpdateCb* update_cb, |
2123 | | Random::RandomGenerator& random) |
2124 | | : parent_(cluster), local_info_node_(local_info.node()), update_cb_(update_cb), |
2125 | 2.40k | random_(random) {} |
2126 | | |
2127 | | void PriorityStateManager::initializePriorityFor( |
2128 | 2.40k | const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint) { |
2129 | 2.40k | const uint32_t priority = locality_lb_endpoint.priority(); |
2130 | 2.40k | if (priority_state_.size() <= priority) { |
2131 | 2.40k | priority_state_.resize(priority + 1); |
2132 | 2.40k | } |
2133 | 2.40k | if (priority_state_[priority].first == nullptr) { |
2134 | 2.40k | priority_state_[priority].first = std::make_unique<HostVector>(); |
2135 | 2.40k | } |
2136 | 2.40k | if (locality_lb_endpoint.has_locality() && locality_lb_endpoint.has_load_balancing_weight()) { |
2137 | 0 | priority_state_[priority].second[locality_lb_endpoint.locality()] = |
2138 | 0 | locality_lb_endpoint.load_balancing_weight().value(); |
2139 | 0 | } |
2140 | 2.40k | } |
2141 | | |
2142 | | void PriorityStateManager::registerHostForPriority( |
2143 | | const std::string& hostname, Network::Address::InstanceConstSharedPtr address, |
2144 | | const std::vector<Network::Address::InstanceConstSharedPtr>& address_list, |
2145 | | const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint, |
2146 | 2.40k | const envoy::config::endpoint::v3::LbEndpoint& lb_endpoint, TimeSource& time_source) { |
2147 | 2.40k | auto endpoint_metadata = |
2148 | 2.40k | lb_endpoint.has_metadata() |
2149 | 2.40k | ? parent_.constMetadataSharedPool()->getObject(lb_endpoint.metadata()) |
2150 | 2.40k | : nullptr; |
2151 | 2.40k | auto locality_metadata = |
2152 | 2.40k | locality_lb_endpoint.has_metadata() |
2153 | 2.40k | ? parent_.constMetadataSharedPool()->getObject(locality_lb_endpoint.metadata()) |
2154 | 2.40k | : nullptr; |
2155 | 2.40k | const auto host = std::make_shared<HostImpl>( |
2156 | 2.40k | parent_.info(), hostname, address, endpoint_metadata, locality_metadata, |
2157 | 2.40k | lb_endpoint.load_balancing_weight().value(), locality_lb_endpoint.locality(), |
2158 | 2.40k | lb_endpoint.endpoint().health_check_config(), locality_lb_endpoint.priority(), |
2159 | 2.40k | lb_endpoint.health_status(), time_source, address_list); |
2160 | 2.40k | registerHostForPriority(host, locality_lb_endpoint); |
2161 | 2.40k | } |
2162 | | |
2163 | | void PriorityStateManager::registerHostForPriority( |
2164 | | const HostSharedPtr& host, |
2165 | 2.40k | const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint) { |
2166 | 2.40k | const uint32_t priority = locality_lb_endpoint.priority(); |
2167 | | // Should be called after initializePriorityFor. |
2168 | 2.40k | ASSERT(priority_state_[priority].first); |
2169 | 2.40k | priority_state_[priority].first->emplace_back(host); |
2170 | 2.40k | } |
2171 | | |
2172 | | void PriorityStateManager::updateClusterPrioritySet( |
2173 | | const uint32_t priority, HostVectorSharedPtr&& current_hosts, |
2174 | | const absl::optional<HostVector>& hosts_added, const absl::optional<HostVector>& hosts_removed, |
2175 | | const absl::optional<Upstream::Host::HealthFlag> health_checker_flag, |
2176 | | absl::optional<bool> weighted_priority_health, |
2177 | 2.40k | absl::optional<uint32_t> overprovisioning_factor) { |
2178 | | // If local locality is not defined then skip populating per locality hosts. |
2179 | 2.40k | const auto& local_locality = local_info_node_.locality(); |
2180 | 2.40k | ENVOY_LOG(trace, "Local locality: {}", local_locality.DebugString()); |
2181 | | |
2182 | | // For non-EDS, most likely the current hosts are from priority_state_[priority].first. |
2183 | 2.40k | HostVectorSharedPtr hosts(std::move(current_hosts)); |
2184 | 2.40k | LocalityWeightsMap empty_locality_map; |
2185 | 2.40k | LocalityWeightsMap& locality_weights_map = |
2186 | 2.40k | priority_state_.size() > priority ? priority_state_[priority].second : empty_locality_map; |
2187 | 2.40k | ASSERT(priority_state_.size() > priority || locality_weights_map.empty()); |
2188 | 2.40k | LocalityWeightsSharedPtr locality_weights; |
2189 | 2.40k | std::vector<HostVector> per_locality; |
2190 | | |
2191 | | // TODO: have the load balancing extension indicate, programmatically, whether it needs locality |
2192 | | // weights, as an optimization in cases where it doesn't. |
2193 | 2.40k | locality_weights = std::make_shared<LocalityWeights>(); |
2194 | | |
2195 | | // We use std::map to guarantee a stable ordering for zone aware routing. |
2196 | 2.40k | std::map<envoy::config::core::v3::Locality, HostVector, LocalityLess> hosts_per_locality; |
2197 | | |
2198 | 2.40k | for (const HostSharedPtr& host : *hosts) { |
2199 | | // Take into consideration when a non-EDS cluster has active health checking, i.e. to mark all |
2200 | | // the hosts unhealthy (host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC)) and then fire |
2201 | | // update callbacks to start the health checking process. The endpoint with disabled active |
2202 | | // health check should not be set FAILED_ACTIVE_HC here. |
2203 | 2.40k | if (health_checker_flag.has_value() && !host->disableActiveHealthCheck()) { |
2204 | 0 | host->healthFlagSet(health_checker_flag.value()); |
2205 | 0 | } |
2206 | 2.40k | hosts_per_locality[host->locality()].push_back(host); |
2207 | 2.40k | } |
2208 | | |
2209 | | // Do we have hosts for the local locality? |
2210 | 2.40k | const bool non_empty_local_locality = |
2211 | 2.40k | local_info_node_.has_locality() && |
2212 | 2.40k | hosts_per_locality.find(local_locality) != hosts_per_locality.end(); |
2213 | | |
2214 | | // As per HostsPerLocality::get(), the per_locality vector must have the local locality hosts |
2215 | | // first if non_empty_local_locality. |
2216 | 2.40k | if (non_empty_local_locality) { |
2217 | 0 | per_locality.emplace_back(hosts_per_locality[local_locality]); |
2218 | 0 | locality_weights->emplace_back(locality_weights_map[local_locality]); |
2219 | 0 | } |
2220 | | |
2221 | | // After the local locality hosts (if any), we place the remaining locality host groups in |
2222 | | // lexicographic order. This provides a stable ordering for zone aware routing. |
2223 | 2.40k | for (auto& entry : hosts_per_locality) { |
2224 | 2.40k | if (!non_empty_local_locality || !LocalityEqualTo()(local_locality, entry.first)) { |
2225 | 2.40k | per_locality.emplace_back(entry.second); |
2226 | 2.40k | locality_weights->emplace_back(locality_weights_map[entry.first]); |
2227 | 2.40k | } |
2228 | 2.40k | } |
2229 | | |
2230 | 2.40k | auto per_locality_shared = |
2231 | 2.40k | std::make_shared<HostsPerLocalityImpl>(std::move(per_locality), non_empty_local_locality); |
2232 | | |
2233 | | // If a batch update callback was provided, use that. Otherwise directly update |
2234 | | // the PrioritySet. |
2235 | 2.40k | if (update_cb_ != nullptr) { |
2236 | 14 | update_cb_->updateHosts(priority, HostSetImpl::partitionHosts(hosts, per_locality_shared), |
2237 | 14 | std::move(locality_weights), hosts_added.value_or(*hosts), |
2238 | 14 | hosts_removed.value_or<HostVector>({}), random_.random(), |
2239 | 14 | weighted_priority_health, overprovisioning_factor); |
2240 | 2.39k | } else { |
2241 | 2.39k | parent_.prioritySet().updateHosts(priority, |
2242 | 2.39k | HostSetImpl::partitionHosts(hosts, per_locality_shared), |
2243 | 2.39k | std::move(locality_weights), hosts_added.value_or(*hosts), |
2244 | 2.39k | hosts_removed.value_or<HostVector>({}), random_.random(), |
2245 | 2.39k | weighted_priority_health, overprovisioning_factor); |
2246 | 2.39k | } |
2247 | 2.40k | } |
2248 | | |
2249 | | bool BaseDynamicClusterImpl::updateDynamicHostList( |
2250 | | const HostVector& new_hosts, HostVector& current_priority_hosts, |
2251 | | HostVector& hosts_added_to_current_priority, HostVector& hosts_removed_from_current_priority, |
2252 | 14 | const HostMap& all_hosts, const absl::flat_hash_set<std::string>& all_new_hosts) { |
2253 | 14 | uint64_t max_host_weight = 1; |
2254 | | |
2255 | | // Did hosts change? |
2256 | | // |
2257 | | // Have host attributes changed the health of any endpoint? If so, we |
2258 | | // rebuild the hosts vectors. We only do this if the health status of an |
2259 | | // endpoint has materially changed (e.g. if previously failing active health |
2260 | | // checks, we just note it's now failing EDS health status but don't rebuild). |
2261 | | // |
2262 | | // TODO(htuch): We can be smarter about this potentially, and not force a full |
2263 | | // host set update on health status change. The way this would work is to |
2264 | | // implement a HealthChecker subclass that provides thread local health |
2265 | | // updates to the Cluster object. This will probably make sense to do in |
2266 | | // conjunction with https://github.com/envoyproxy/envoy/issues/2874. |
2267 | 14 | bool hosts_changed = false; |
2268 | | |
2269 | | // Go through and see if the list we have is different from what we just got. If it is, we make |
2270 | | // a new host list and raise a change notification. We also check for duplicates here. It's |
2271 | | // possible for DNS to return the same address multiple times, and a bad EDS implementation |
2272 | | // could do the same thing. |
2273 | | |
2274 | | // Keep track of hosts we see in new_hosts that we are able to match up with an existing host. |
2275 | 14 | absl::flat_hash_set<std::string> existing_hosts_for_current_priority( |
2276 | 14 | current_priority_hosts.size()); |
2277 | | // Keep track of hosts we're adding (or replacing) |
2278 | 14 | absl::flat_hash_set<std::string> new_hosts_for_current_priority(new_hosts.size()); |
2279 | | // Keep track of hosts for which locality is changed. |
2280 | 14 | absl::flat_hash_set<std::string> hosts_with_updated_locality_for_current_priority( |
2281 | 14 | current_priority_hosts.size()); |
2282 | | // Keep track of hosts for which active health check flag is changed. |
2283 | 14 | absl::flat_hash_set<std::string> hosts_with_active_health_check_flag_changed( |
2284 | 14 | current_priority_hosts.size()); |
2285 | 14 | HostVector final_hosts; |
2286 | 14 | for (const HostSharedPtr& host : new_hosts) { |
2287 | | // To match a new host with an existing host means comparing their addresses. |
2288 | 14 | auto existing_host = all_hosts.find(addressToString(host->address())); |
2289 | 14 | const bool existing_host_found = existing_host != all_hosts.end(); |
2290 | | |
2291 | | // Clear any pending deletion flag on an existing host in case it came back while it was |
2292 | | // being stabilized. We will set it again below if needed. |
2293 | 14 | if (existing_host_found) { |
2294 | 0 | existing_host->second->healthFlagClear(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL); |
2295 | 0 | } |
2296 | | |
2297 | | // Check if in-place host update should be skipped, i.e. when the following criteria are met |
2298 | | // (currently there is only one criterion, but we might add more in the future): |
2299 | | // - The cluster health checker is activated and a new host is matched with the existing one, |
2300 | | // but the health check address is different. |
2301 | 14 | const bool health_check_address_changed = |
2302 | 14 | (health_checker_ != nullptr && existing_host_found && |
2303 | 14 | *existing_host->second->healthCheckAddress() != *host->healthCheckAddress()); |
2304 | 14 | bool locality_changed = false; |
2305 | 14 | locality_changed = (existing_host_found && |
2306 | 14 | (!LocalityEqualTo()(host->locality(), existing_host->second->locality()))); |
2307 | 14 | if (locality_changed) { |
2308 | 0 | hosts_with_updated_locality_for_current_priority.emplace(existing_host->first); |
2309 | 0 | } |
2310 | | |
2311 | 14 | const bool active_health_check_flag_changed = |
2312 | 14 | (health_checker_ != nullptr && existing_host_found && |
2313 | 14 | existing_host->second->disableActiveHealthCheck() != host->disableActiveHealthCheck()); |
2314 | 14 | if (active_health_check_flag_changed) { |
2315 | 0 | hosts_with_active_health_check_flag_changed.emplace(existing_host->first); |
2316 | 0 | } |
2317 | 14 | const bool skip_inplace_host_update = |
2318 | 14 | health_check_address_changed || locality_changed || active_health_check_flag_changed; |
2319 | | |
2320 | | // When there is a match and we decided to do in-place update, we potentially update the |
2321 | | // host's health check flag and metadata. Afterwards, the host is pushed back into the |
2322 | | // final_hosts, i.e. hosts that should be preserved in the current priority. |
2323 | 14 | if (existing_host_found && !skip_inplace_host_update) { |
2324 | 0 | existing_hosts_for_current_priority.emplace(existing_host->first); |
2325 | | // If we find a host matched based on address, we keep it. However we do change weight |
2326 | | // inline so do that here. |
2327 | 0 | if (host->weight() > max_host_weight) { |
2328 | 0 | max_host_weight = host->weight(); |
2329 | 0 | } |
2330 | 0 | if (existing_host->second->weight() != host->weight()) { |
2331 | 0 | existing_host->second->weight(host->weight()); |
2332 | | // We do full host set rebuilds so that load balancers can do pre-computation of data |
2333 | | // structures based on host weight. This may become a performance problem in certain |
2334 | | // deployments so it is runtime feature guarded and may also need to be configurable |
2335 | | // and/or dynamic in the future. |
2336 | 0 | hosts_changed = true; |
2337 | 0 | } |
2338 | |
|
2339 | 0 | hosts_changed |= updateEdsHealthFlag(*host, *existing_host->second); |
2340 | | |
2341 | | // Did metadata change? |
2342 | 0 | bool metadata_changed = true; |
2343 | 0 | if (host->metadata() && existing_host->second->metadata()) { |
2344 | 0 | metadata_changed = !Protobuf::util::MessageDifferencer::Equivalent( |
2345 | 0 | *host->metadata(), *existing_host->second->metadata()); |
2346 | 0 | } else if (!host->metadata() && !existing_host->second->metadata()) { |
2347 | 0 | metadata_changed = false; |
2348 | 0 | } |
2349 | |
|
2350 | 0 | if (metadata_changed) { |
2351 | | // First, update the entire metadata for the endpoint. |
2352 | 0 | existing_host->second->metadata(host->metadata()); |
2353 | | |
2354 | | // Also, given that the canary attribute of an endpoint is derived from its metadata |
2355 | | // (e.g.: from envoy.lb/canary), we do a blind update here since it's cheaper than testing |
2356 | | // to see if it actually changed. We must update this besides just updating the metadata, |
2357 | | // because it'll be used by the router filter to compute upstream stats. |
2358 | 0 | existing_host->second->canary(host->canary()); |
2359 | | |
2360 | | // If metadata changed, we need to rebuild. See github issue #3810. |
2361 | 0 | hosts_changed = true; |
2362 | 0 | } |
2363 | | |
2364 | | // Did the priority change? |
2365 | 0 | if (host->priority() != existing_host->second->priority()) { |
2366 | 0 | existing_host->second->priority(host->priority()); |
2367 | 0 | hosts_added_to_current_priority.emplace_back(existing_host->second); |
2368 | 0 | } |
2369 | |
|
2370 | 0 | final_hosts.push_back(existing_host->second); |
2371 | 14 | } else { |
2372 | 14 | new_hosts_for_current_priority.emplace(addressToString(host->address())); |
2373 | 14 | if (host->weight() > max_host_weight) { |
2374 | 0 | max_host_weight = host->weight(); |
2375 | 0 | } |
2376 | | |
2377 | | // If we are depending on a health checker, we initialize to unhealthy. |
2378 | | // If there's an existing host with the same health checker, the |
2379 | | // active health-status is kept. |
2380 | 14 | if (health_checker_ != nullptr && !host->disableActiveHealthCheck()) { |
2381 | 0 | if (existing_host_found && !health_check_address_changed && |
2382 | 0 | !active_health_check_flag_changed) { |
2383 | | // If there's an existing host, use the same active health-status. |
2384 | | // The existing host can be marked PENDING_ACTIVE_HC or |
2385 | | // ACTIVE_HC_TIMEOUT if it is also marked with FAILED_ACTIVE_HC. |
2386 | 0 | ASSERT(!existing_host->second->healthFlagGet(Host::HealthFlag::PENDING_ACTIVE_HC) || |
2387 | 0 | existing_host->second->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); |
2388 | 0 | ASSERT(!existing_host->second->healthFlagGet(Host::HealthFlag::ACTIVE_HC_TIMEOUT) || |
2389 | 0 | existing_host->second->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC)); |
2390 | | |
2391 | 0 | constexpr uint32_t active_hc_statuses_mask = |
2392 | 0 | enumToInt(Host::HealthFlag::FAILED_ACTIVE_HC) | |
2393 | 0 | enumToInt(Host::HealthFlag::DEGRADED_ACTIVE_HC) | |
2394 | 0 | enumToInt(Host::HealthFlag::PENDING_ACTIVE_HC) | |
2395 | 0 | enumToInt(Host::HealthFlag::ACTIVE_HC_TIMEOUT); |
2396 | |
|
2397 | 0 | const uint32_t existing_host_statuses = existing_host->second->healthFlagsGetAll(); |
2398 | 0 | host->healthFlagsSetAll(existing_host_statuses & active_hc_statuses_mask); |
2399 | 0 | } else { |
2400 | | // No previous known host, mark it as failed active HC. |
2401 | 0 | host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC); |
2402 | | |
2403 | | // If we want to exclude hosts until they have been health checked, mark them with |
2404 | | // a flag to indicate that they have not been health checked yet. |
2405 | 0 | if (info_->warmHosts()) { |
2406 | 0 | host->healthFlagSet(Host::HealthFlag::PENDING_ACTIVE_HC); |
2407 | 0 | } |
2408 | 0 | } |
2409 | 0 | } |
2410 | | |
2411 | 14 | final_hosts.push_back(host); |
2412 | 14 | hosts_added_to_current_priority.push_back(host); |
2413 | 14 | } |
2414 | 14 | } |
2415 | | |
2416 | | // Remove hosts from current_priority_hosts that were matched to an existing host in the |
2417 | | // previous loop. |
2418 | 14 | auto erase_from = |
2419 | 14 | std::remove_if(current_priority_hosts.begin(), current_priority_hosts.end(), |
2420 | 14 | [&existing_hosts_for_current_priority](const HostSharedPtr& p) { |
2421 | 0 | auto existing_itr = |
2422 | 0 | existing_hosts_for_current_priority.find(p->address()->asString()); |
2423 | |
|
2424 | 0 | if (existing_itr != existing_hosts_for_current_priority.end()) { |
2425 | 0 | existing_hosts_for_current_priority.erase(existing_itr); |
2426 | 0 | return true; |
2427 | 0 | } |
2428 | | |
2429 | 0 | return false; |
2430 | 0 | }); |
2431 | 14 | current_priority_hosts.erase(erase_from, current_priority_hosts.end()); |
2432 | | |
2433 | | // If we saw existing hosts during this iteration from a different priority, then we've moved |
2434 | | // a host from another priority into this one, so we should mark the priority as having changed. |
2435 | 14 | if (!existing_hosts_for_current_priority.empty()) { |
2436 | 0 | hosts_changed = true; |
2437 | 0 | } |
2438 | | |
2439 | | // The remaining hosts are hosts that are not referenced in the config update. We remove them |
2440 | | // from the priority if any of the following is true: |
2441 | | // - Active health checking is not enabled. |
2442 | | // - The removed hosts are failing active health checking OR have been explicitly marked as |
2443 | | // unhealthy by a previous EDS update. We do not count outlier as a reason to remove a host |
2444 | | // or any other future health condition that may be added so we do not use the coarseHealth() |
2445 | | // API. |
2446 | | // - We have explicitly configured the cluster to remove hosts regardless of active health |
2447 | | // status. |
2448 | 14 | const bool dont_remove_healthy_hosts = |
2449 | 14 | health_checker_ != nullptr && !info()->drainConnectionsOnHostRemoval(); |
2450 | 14 | if (!current_priority_hosts.empty() && dont_remove_healthy_hosts) { |
2451 | 0 | erase_from = std::remove_if( |
2452 | 0 | current_priority_hosts.begin(), current_priority_hosts.end(), |
2453 | 0 | [&all_new_hosts, &new_hosts_for_current_priority, |
2454 | 0 | &hosts_with_updated_locality_for_current_priority, |
2455 | 0 | &hosts_with_active_health_check_flag_changed, &final_hosts, |
2456 | 0 | &max_host_weight](const HostSharedPtr& p) { |
2457 | | // This host has already been added as a new host in the |
2458 | | // new_hosts_for_current_priority. Return false here to make sure that host |
2459 | | // reference with older locality gets cleaned up from the priority. |
2460 | 0 | if (hosts_with_updated_locality_for_current_priority.contains(p->address()->asString())) { |
2461 | 0 | return false; |
2462 | 0 | } |
2463 | | |
2464 | 0 | if (hosts_with_active_health_check_flag_changed.contains(p->address()->asString())) { |
2465 | 0 | return false; |
2466 | 0 | } |
2467 | | |
2468 | 0 | if (all_new_hosts.contains(p->address()->asString()) && |
2469 | 0 | !new_hosts_for_current_priority.contains(p->address()->asString())) { |
2470 | | // If the address is being completely deleted from this priority, but is |
2471 | | // referenced from another priority, then we assume that the other |
2472 | | // priority will perform an in-place update to re-use the existing Host. |
2473 | | // We should therefore not mark it as PENDING_DYNAMIC_REMOVAL, but |
2474 | | // instead remove it immediately from this priority. |
2475 | | // Example: health check address changed and priority also changed |
2476 | 0 | return false; |
2477 | 0 | } |
2478 | | |
2479 | | // PENDING_DYNAMIC_REMOVAL doesn't apply for the host with disabled active |
2480 | | // health check, the host is removed immediately from this priority. |
2481 | 0 | if ((!(p->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) || |
2482 | 0 | p->healthFlagGet(Host::HealthFlag::FAILED_EDS_HEALTH))) && |
2483 | 0 | !p->disableActiveHealthCheck()) { |
2484 | 0 | if (p->weight() > max_host_weight) { |
2485 | 0 | max_host_weight = p->weight(); |
2486 | 0 | } |
2487 | |
|
2488 | 0 | final_hosts.push_back(p); |
2489 | 0 | p->healthFlagSet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL); |
2490 | 0 | return true; |
2491 | 0 | } |
2492 | 0 | return false; |
2493 | 0 | }); |
2494 | 0 | current_priority_hosts.erase(erase_from, current_priority_hosts.end()); |
2495 | 0 | } |
2496 | | |
2497 | | // At this point we've accounted for all the new hosts as well the hosts that previously |
2498 | | // existed in this priority. |
2499 | 14 | info_->endpointStats().max_host_weight_.set(max_host_weight); |
2500 | | |
2501 | | // Whatever remains in current_priority_hosts should be removed. |
2502 | 14 | if (!hosts_added_to_current_priority.empty() || !current_priority_hosts.empty()) { |
2503 | 14 | hosts_removed_from_current_priority = std::move(current_priority_hosts); |
2504 | 14 | hosts_changed = true; |
2505 | 14 | } |
2506 | | |
2507 | | // During the update we populated final_hosts with all the hosts that should remain |
2508 | | // in the current priority, so move them back into current_priority_hosts. |
2509 | 14 | current_priority_hosts = std::move(final_hosts); |
2510 | | // We return false here in the absence of EDS health status or metadata changes, because we |
2511 | | // have no changes to host vector status (modulo weights). When we have EDS |
2512 | | // health status or metadata changed, we return true, causing updateHosts() to fire in the |
2513 | | // caller. |
2514 | 14 | return hosts_changed; |
2515 | 14 | } |
2516 | | |
2517 | | Network::DnsLookupFamily |
2518 | 0 | getDnsLookupFamilyFromCluster(const envoy::config::cluster::v3::Cluster& cluster) { |
2519 | 0 | return DnsUtils::getDnsLookupFamilyFromEnum(cluster.dns_lookup_family()); |
2520 | 0 | } |
2521 | | |
2522 | | void reportUpstreamCxDestroy(const Upstream::HostDescriptionConstSharedPtr& host, |
2523 | 911 | Network::ConnectionEvent event) { |
2524 | 911 | Upstream::ClusterTrafficStats& stats = *host->cluster().trafficStats(); |
2525 | 911 | stats.upstream_cx_destroy_.inc(); |
2526 | 911 | if (event == Network::ConnectionEvent::RemoteClose) { |
2527 | 433 | stats.upstream_cx_destroy_remote_.inc(); |
2528 | 478 | } else { |
2529 | 478 | stats.upstream_cx_destroy_local_.inc(); |
2530 | 478 | } |
2531 | 911 | } |
2532 | | |
2533 | | void reportUpstreamCxDestroyActiveRequest(const Upstream::HostDescriptionConstSharedPtr& host, |
2534 | 41 | Network::ConnectionEvent event) { |
2535 | 41 | Upstream::ClusterTrafficStats& stats = *host->cluster().trafficStats(); |
2536 | 41 | stats.upstream_cx_destroy_with_active_rq_.inc(); |
2537 | 41 | if (event == Network::ConnectionEvent::RemoteClose) { |
2538 | 20 | stats.upstream_cx_destroy_remote_with_active_rq_.inc(); |
2539 | 21 | } else { |
2540 | 21 | stats.upstream_cx_destroy_local_with_active_rq_.inc(); |
2541 | 21 | } |
2542 | 41 | } |
2543 | | |
2544 | | Network::Address::InstanceConstSharedPtr resolveHealthCheckAddress( |
2545 | | const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config, |
2546 | 206k | Network::Address::InstanceConstSharedPtr host_address) { |
2547 | 206k | Network::Address::InstanceConstSharedPtr health_check_address; |
2548 | 206k | const auto& port_value = health_check_config.port_value(); |
2549 | 206k | if (health_check_config.has_address()) { |
2550 | 0 | auto address_or_error = Network::Address::resolveProtoAddress(health_check_config.address()); |
2551 | 0 | THROW_IF_NOT_OK_REF(address_or_error.status()); |
2552 | 0 | auto address = address_or_error.value(); |
2553 | 0 | health_check_address = |
2554 | 0 | port_value == 0 ? address : Network::Utility::getAddressWithPort(*address, port_value); |
2555 | 206k | } else { |
2556 | 206k | health_check_address = port_value == 0 |
2557 | 206k | ? host_address |
2558 | 206k | : Network::Utility::getAddressWithPort(*host_address, port_value); |
2559 | 206k | } |
2560 | 206k | return health_check_address; |
2561 | 206k | } |
2562 | | |
2563 | | } // namespace Upstream |
2564 | | } // namespace Envoy |