Line data Source code
1 : #pragma once
2 :
3 : #include <array>
4 : #include <cstdint>
5 : #include <functional>
6 : #include <list>
7 : #include <map>
8 : #include <memory>
9 : #include <string>
10 : #include <vector>
11 :
12 : #include "envoy/api/api.h"
13 : #include "envoy/common/callback.h"
14 : #include "envoy/common/random_generator.h"
15 : #include "envoy/config/bootstrap/v3/bootstrap.pb.h"
16 : #include "envoy/config/cluster/v3/cluster.pb.h"
17 : #include "envoy/config/core/v3/address.pb.h"
18 : #include "envoy/config/core/v3/config_source.pb.h"
19 : #include "envoy/config/xds_resources_delegate.h"
20 : #include "envoy/http/codes.h"
21 : #include "envoy/local_info/local_info.h"
22 : #include "envoy/router/context.h"
23 : #include "envoy/runtime/runtime.h"
24 : #include "envoy/secret/secret_manager.h"
25 : #include "envoy/ssl/context_manager.h"
26 : #include "envoy/stats/scope.h"
27 : #include "envoy/tcp/async_tcp_client.h"
28 : #include "envoy/thread_local/thread_local.h"
29 : #include "envoy/upstream/cluster_manager.h"
30 :
31 : #include "source/common/common/cleanup.h"
32 : #include "source/common/config/subscription_factory_impl.h"
33 : #include "source/common/http/async_client_impl.h"
34 : #include "source/common/http/http_server_properties_cache_impl.h"
35 : #include "source/common/http/http_server_properties_cache_manager_impl.h"
36 : #include "source/common/quic/quic_stat_names.h"
37 : #include "source/common/tcp/async_tcp_client_impl.h"
38 : #include "source/common/upstream/cluster_discovery_manager.h"
39 : #include "source/common/upstream/host_utility.h"
40 : #include "source/common/upstream/load_stats_reporter.h"
41 : #include "source/common/upstream/od_cds_api_impl.h"
42 : #include "source/common/upstream/priority_conn_pool_map.h"
43 : #include "source/common/upstream/upstream_impl.h"
44 :
45 : namespace Envoy {
46 : namespace Upstream {
47 :
48 : /**
49 : * Production implementation of ClusterManagerFactory.
50 : */
51 : class ProdClusterManagerFactory : public ClusterManagerFactory {
52 : public:
53 : using LazyCreateDnsResolver = std::function<Network::DnsResolverSharedPtr()>;
54 :
55 : ProdClusterManagerFactory(Server::Configuration::ServerFactoryContext& context,
56 : Stats::Store& stats, ThreadLocal::Instance& tls,
57 : Http::Context& http_context, LazyCreateDnsResolver dns_resolver_fn,
58 : Ssl::ContextManager& ssl_context_manager,
59 : Secret::SecretManager& secret_manager,
60 : Quic::QuicStatNames& quic_stat_names, const Server::Instance& server)
61 : : context_(context), stats_(stats), tls_(tls), http_context_(http_context),
62 : dns_resolver_fn_(dns_resolver_fn), ssl_context_manager_(ssl_context_manager),
63 : secret_manager_(secret_manager), quic_stat_names_(quic_stat_names),
64 : alternate_protocols_cache_manager_factory_(context.singletonManager(), tls,
65 : {context, context.messageValidationVisitor()}),
66 : alternate_protocols_cache_manager_(alternate_protocols_cache_manager_factory_.get()),
67 131 : server_(server) {}
68 :
69 : // Upstream::ClusterManagerFactory
70 : ClusterManagerPtr
71 : clusterManagerFromProto(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) override;
72 : Http::ConnectionPool::InstancePtr
73 : allocateConnPool(Event::Dispatcher& dispatcher, HostConstSharedPtr host,
74 : ResourcePriority priority, std::vector<Http::Protocol>& protocol,
75 : const absl::optional<envoy::config::core::v3::AlternateProtocolsCacheOptions>&
76 : alternate_protocol_options,
77 : const Network::ConnectionSocket::OptionsSharedPtr& options,
78 : const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
79 : TimeSource& time_source, ClusterConnectivityState& state,
80 : Http::PersistentQuicInfoPtr& quic_info) override;
81 : Tcp::ConnectionPool::InstancePtr
82 : allocateTcpConnPool(Event::Dispatcher& dispatcher, HostConstSharedPtr host,
83 : ResourcePriority priority,
84 : const Network::ConnectionSocket::OptionsSharedPtr& options,
85 : Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
86 : ClusterConnectivityState& state,
87 : absl::optional<std::chrono::milliseconds> tcp_pool_idle_timeout) override;
88 : absl::StatusOr<std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr>>
89 : clusterFromProto(const envoy::config::cluster::v3::Cluster& cluster, ClusterManager& cm,
90 : Outlier::EventLoggerSharedPtr outlier_event_logger, bool added_via_api) override;
91 : CdsApiPtr createCds(const envoy::config::core::v3::ConfigSource& cds_config,
92 : const xds::core::v3::ResourceLocator* cds_resources_locator,
93 : ClusterManager& cm) override;
94 0 : Secret::SecretManager& secretManager() override { return secret_manager_; }
95 33 : Singleton::Manager& singletonManager() override { return context_.singletonManager(); }
96 :
97 : protected:
98 : Server::Configuration::ServerFactoryContext& context_;
99 : Stats::Store& stats_;
100 : ThreadLocal::Instance& tls_;
101 : Http::Context& http_context_;
102 :
103 : LazyCreateDnsResolver dns_resolver_fn_;
104 : Ssl::ContextManager& ssl_context_manager_;
105 : Secret::SecretManager& secret_manager_;
106 : Quic::QuicStatNames& quic_stat_names_;
107 : Http::HttpServerPropertiesCacheManagerFactoryImpl alternate_protocols_cache_manager_factory_;
108 : Http::HttpServerPropertiesCacheManagerSharedPtr alternate_protocols_cache_manager_;
109 : const Server::Instance& server_;
110 : };
111 :
112 : // For friend declaration in ClusterManagerInitHelper.
113 : class ClusterManagerImpl;
114 :
115 : /**
116 : * Wrapper for a cluster owned by the cluster manager. Used by both the cluster manager and the
117 : * cluster manager init helper which needs to pass clusters back to the cluster manager.
118 : */
119 : class ClusterManagerCluster {
120 : public:
121 159 : virtual ~ClusterManagerCluster() = default;
122 :
123 : // Return the underlying cluster.
124 : virtual Cluster& cluster() PURE;
125 :
126 : // Return a new load balancer factory if the cluster has one.
127 : virtual LoadBalancerFactorySharedPtr loadBalancerFactory() PURE;
128 :
129 : // Return true if a cluster has already been added or updated.
130 : virtual bool addedOrUpdated() PURE;
131 :
132 : // Set when a cluster has been added or updated. This is only called a single time for a cluster.
133 : virtual void setAddedOrUpdated() PURE;
134 :
135 : // Return true if the cluster must be ready-for-use before ADS (Aggregated Discovery Service) can
136 : // be initialized; will only occur if ADS is configured to use the cluster via EnvoyGrpc.
137 : virtual bool requiredForAds() const PURE;
138 : };
139 :
140 : /**
141 : * This is a helper class used during cluster management initialization. Dealing with primary
142 : * clusters, secondary clusters, and CDS, is quite complicated, so this makes it easier to test.
143 : */
144 : class ClusterManagerInitHelper : Logger::Loggable<Logger::Id::upstream> {
145 : public:
146 : /**
147 : * @param per_cluster_init_callback supplies the callback to call when a cluster has itself
148 : * initialized. The cluster manager can use this for post-init processing.
149 : */
150 : ClusterManagerInitHelper(
151 : ClusterManager& cm,
152 : const std::function<void(ClusterManagerCluster&)>& per_cluster_init_callback)
153 131 : : cm_(cm), per_cluster_init_callback_(per_cluster_init_callback) {}
154 :
155 : enum class State {
156 : // Initial state. During this state all static clusters are loaded. Any primary clusters
157 : // immediately begin initialization.
158 : Loading,
159 : // In this state cluster manager waits for all primary clusters to finish initialization.
160 : // This state may immediately transition to the next state iff all clusters are STATIC and
161 : // without health checks enabled or health checks have failed immediately, since their
162 : // initialization completes immediately.
163 : WaitingForPrimaryInitializationToComplete,
164 : // During this state cluster manager waits to start initializing secondary clusters. In this
165 : // state all primary clusters have completed initialization. Initialization of the
166 : // secondary clusters is started by the `initializeSecondaryClusters` method.
167 : WaitingToStartSecondaryInitialization,
168 : // In this state cluster manager waits for all secondary clusters (if configured) to finish
169 : // initialization. Then, if CDS is configured, this state tracks waiting for the first CDS
170 : // response to populate dynamically configured clusters.
171 : WaitingToStartCdsInitialization,
172 : // During this state, all CDS populated clusters are undergoing either phase 1 or phase 2
173 : // initialization.
174 : CdsInitialized,
175 : // All clusters are fully initialized.
176 : AllClustersInitialized
177 : };
178 :
179 : void addCluster(ClusterManagerCluster& cluster);
180 : void onStaticLoadComplete();
181 : void removeCluster(ClusterManagerCluster& cluster);
182 : void setCds(CdsApi* cds);
183 : void setPrimaryClustersInitializedCb(ClusterManager::PrimaryClustersReadyCallback callback);
184 : void setInitializedCb(ClusterManager::InitializationCompleteCallback callback);
185 473 : State state() const { return state_; }
186 :
187 : void startInitializingSecondaryClusters();
188 :
189 : private:
190 : // To enable invariant assertions on the cluster lists.
191 : friend ClusterManagerImpl;
192 :
193 : void initializeSecondaryClusters();
194 : void maybeFinishInitialize();
195 : void onClusterInit(ClusterManagerCluster& cluster);
196 :
197 : ClusterManager& cm_;
198 : std::function<void(ClusterManagerCluster& cluster)> per_cluster_init_callback_;
199 : CdsApi* cds_{};
200 : ClusterManager::PrimaryClustersReadyCallback primary_clusters_initialized_callback_;
201 : ClusterManager::InitializationCompleteCallback initialized_callback_;
202 : absl::flat_hash_map<std::string, ClusterManagerCluster*> primary_init_clusters_;
203 : absl::flat_hash_map<std::string, ClusterManagerCluster*> secondary_init_clusters_;
204 : State state_{State::Loading};
205 : bool started_secondary_initialize_{};
206 : };
207 :
208 : /**
209 : * All cluster manager stats. @see stats_macros.h
210 : */
211 : #define ALL_CLUSTER_MANAGER_STATS(COUNTER, GAUGE) \
212 131 : COUNTER(cluster_added) \
213 131 : COUNTER(cluster_modified) \
214 131 : COUNTER(cluster_removed) \
215 131 : COUNTER(cluster_updated) \
216 131 : COUNTER(cluster_updated_via_merge) \
217 131 : COUNTER(update_merge_cancelled) \
218 131 : COUNTER(update_out_of_merge_window) \
219 131 : GAUGE(active_clusters, NeverImport) \
220 131 : GAUGE(warming_clusters, NeverImport)
221 :
222 : /**
223 : * Struct definition for all cluster manager stats. @see stats_macros.h
224 : */
225 : struct ClusterManagerStats {
226 : ALL_CLUSTER_MANAGER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT)
227 : };
228 :
229 : /**
230 : * All thread local cluster manager stats. @see stats_macros.h
231 : */
232 223 : #define ALL_THREAD_LOCAL_CLUSTER_MANAGER_STATS(GAUGE) GAUGE(clusters_inflated, NeverImport)
233 :
234 : /**
235 : * Struct definition for all cluster manager stats. @see stats_macros.h
236 : */
237 : struct ThreadLocalClusterManagerStats {
238 : ALL_THREAD_LOCAL_CLUSTER_MANAGER_STATS(GENERATE_GAUGE_STRUCT)
239 : };
240 :
241 : /**
242 : * Implementation of ClusterManager that reads from a proto configuration, maintains a central
243 : * cluster list, as well as thread local caches of each cluster and associated connection pools.
244 : */
245 : class ClusterManagerImpl : public ClusterManager,
246 : public MissingClusterNotifier,
247 : Logger::Loggable<Logger::Id::upstream> {
248 : public:
249 : // Initializes the ClusterManagerImpl instance based on the given Bootstrap config.
250 : //
251 : // This method *must* be called prior to invoking any other methods on the class and *must* only
252 : // be called once. This method should be called immediately after ClusterManagerImpl construction
253 : // and from the same thread in which the ClusterManagerImpl was constructed.
254 : //
255 : // The initialization is separated from the constructor because lots of work, including ADS
256 : // initialization, is done in this method. If the contents of this method are invoked during
257 : // construction, a derived class cannot override any of the virtual methods and have them invoked
258 : // instead, since the base class's methods are used when in a base class constructor.
259 : absl::Status init(const envoy::config::bootstrap::v3::Bootstrap& bootstrap);
260 :
261 0 : std::size_t warmingClusterCount() const { return warming_clusters_.size(); }
262 :
263 : // Upstream::ClusterManager
264 : bool addOrUpdateCluster(const envoy::config::cluster::v3::Cluster& cluster,
265 : const std::string& version_info) override;
266 :
267 111 : void setPrimaryClustersInitializedCb(PrimaryClustersReadyCallback callback) override {
268 111 : init_helper_.setPrimaryClustersInitializedCb(callback);
269 111 : }
270 :
271 98 : void setInitializedCb(InitializationCompleteCallback callback) override {
272 98 : init_helper_.setInitializedCb(callback);
273 98 : }
274 :
275 202 : ClusterInfoMaps clusters() const override {
276 202 : ClusterInfoMaps clusters_maps;
277 288 : for (const auto& cluster : active_clusters_) {
278 274 : clusters_maps.active_clusters_.emplace(cluster.first, *cluster.second->cluster_);
279 274 : if (cluster.second->cluster_->info()->addedViaApi()) {
280 28 : ++clusters_maps.added_via_api_clusters_num_;
281 28 : }
282 274 : }
283 202 : for (const auto& cluster : warming_clusters_) {
284 0 : clusters_maps.warming_clusters_.emplace(cluster.first, *cluster.second->cluster_);
285 0 : if (cluster.second->cluster_->info()->addedViaApi()) {
286 0 : ++clusters_maps.added_via_api_clusters_num_;
287 0 : }
288 0 : }
289 : // The number of clusters that were added via API must be at most the number
290 : // of active clusters + number of warming clusters.
291 202 : ASSERT(clusters_maps.added_via_api_clusters_num_ <=
292 202 : clusters_maps.active_clusters_.size() + clusters_maps.warming_clusters_.size());
293 202 : return clusters_maps;
294 202 : }
295 :
296 0 : const ClusterSet& primaryClusters() override { return primary_clusters_; }
297 : ThreadLocalCluster* getThreadLocalCluster(absl::string_view cluster) override;
298 :
299 : bool removeCluster(const std::string& cluster) override;
300 129 : void shutdown() override {
301 129 : shutdown_ = true;
302 129 : if (resume_cds_ != nullptr) {
303 0 : resume_cds_->cancel();
304 0 : }
305 : // Make sure we destroy all potential outgoing connections before this returns.
306 129 : cds_api_.reset();
307 129 : ads_mux_.reset();
308 129 : active_clusters_.clear();
309 129 : warming_clusters_.clear();
310 129 : updateClusterCounts();
311 129 : }
312 :
313 0 : bool isShutdown() override { return shutdown_; }
314 :
315 159 : const absl::optional<envoy::config::core::v3::BindConfig>& bindConfig() const override {
316 159 : return bind_config_;
317 159 : }
318 :
319 815 : Config::GrpcMuxSharedPtr adsMux() override { return ads_mux_; }
320 40 : Grpc::AsyncClientManager& grpcAsyncClientManager() override { return *async_client_manager_; }
321 :
322 159 : const absl::optional<std::string>& localClusterName() const override {
323 159 : return local_cluster_name_;
324 159 : }
325 :
326 : ClusterUpdateCallbacksHandlePtr
327 : addThreadLocalClusterUpdateCallbacks(ClusterUpdateCallbacks&) override;
328 :
329 : OdCdsApiHandlePtr
330 : allocateOdCdsApi(const envoy::config::core::v3::ConfigSource& odcds_config,
331 : OptRef<xds::core::v3::ResourceLocator> odcds_resources_locator,
332 : ProtobufMessage::ValidationVisitor& validation_visitor) override;
333 :
334 33 : ClusterManagerFactory& clusterManagerFactory() override { return factory_; }
335 :
336 205 : Config::SubscriptionFactory& subscriptionFactory() override { return *subscription_factory_; }
337 :
338 : absl::Status
339 : initializeSecondaryClusters(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) override;
340 :
341 159 : const ClusterTrafficStatNames& clusterStatNames() const override { return cluster_stat_names_; }
342 159 : const ClusterConfigUpdateStatNames& clusterConfigUpdateStatNames() const override {
343 159 : return cluster_config_update_stat_names_;
344 159 : }
345 159 : const ClusterLbStatNames& clusterLbStatNames() const override { return cluster_lb_stat_names_; }
346 159 : const ClusterEndpointStatNames& clusterEndpointStatNames() const override {
347 159 : return cluster_endpoint_stat_names_;
348 159 : }
349 159 : const ClusterLoadReportStatNames& clusterLoadReportStatNames() const override {
350 159 : return cluster_load_report_stat_names_;
351 159 : }
352 159 : const ClusterCircuitBreakersStatNames& clusterCircuitBreakersStatNames() const override {
353 159 : return cluster_circuit_breakers_stat_names_;
354 159 : }
355 0 : const ClusterRequestResponseSizeStatNames& clusterRequestResponseSizeStatNames() const override {
356 0 : return cluster_request_response_size_stat_names_;
357 0 : }
358 0 : const ClusterTimeoutBudgetStatNames& clusterTimeoutBudgetStatNames() const override {
359 0 : return cluster_timeout_budget_stat_names_;
360 0 : }
361 :
362 : void drainConnections(const std::string& cluster,
363 : DrainConnectionsHostPredicate predicate) override;
364 :
365 : void drainConnections(DrainConnectionsHostPredicate predicate) override;
366 :
367 : absl::Status checkActiveStaticCluster(const std::string& cluster) override;
368 :
369 : // Upstream::MissingClusterNotifier
370 : void notifyMissingCluster(absl::string_view name) override;
371 :
372 : /*
373 : * Return shared_ptr for common_lb_config which is stored in an ObjectSharedPool
374 : *
375 : * @param common_lb_config The config field to be stored in ObjectSharedPool
376 : * @return shared_ptr to the CommonLbConfig in ObjectSharedPool
377 : */
378 : std::shared_ptr<const envoy::config::cluster::v3::Cluster::CommonLbConfig> getCommonLbConfigPtr(
379 159 : const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_lb_config) override {
380 159 : return common_lb_config_pool_->getObject(common_lb_config);
381 159 : }
382 :
383 : Config::EdsResourcesCacheOptRef edsResourcesCache() override;
384 :
385 : protected:
386 : // ClusterManagerImpl's constructor should not be invoked directly; create instances from the
387 : // clusterManagerFromProto() static method. The init() method must be called after construction.
388 : ClusterManagerImpl(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
389 : ClusterManagerFactory& factory, Stats::Store& stats,
390 : ThreadLocal::Instance& tls, Runtime::Loader& runtime,
391 : const LocalInfo::LocalInfo& local_info,
392 : AccessLog::AccessLogManager& log_manager,
393 : Event::Dispatcher& main_thread_dispatcher, OptRef<Server::Admin> admin,
394 : ProtobufMessage::ValidationContext& validation_context, Api::Api& api,
395 : Http::Context& http_context, Grpc::Context& grpc_context,
396 : Router::Context& router_context, const Server::Instance& server);
397 :
398 : virtual void postThreadLocalRemoveHosts(const Cluster& cluster, const HostVector& hosts_removed);
399 :
400 : // Parameters for calling postThreadLocalClusterUpdate()
401 : struct ThreadLocalClusterUpdateParams {
402 : struct PerPriority {
403 : PerPriority(uint32_t priority, const HostVector& hosts_added, const HostVector& hosts_removed)
404 159 : : hosts_added_(hosts_added), hosts_removed_(hosts_removed), priority_(priority) {}
405 : // TODO(kbaichoo): make the hosts_added_ vector const and have the
406 : // cluster initialization object have a stripped down version of this
407 : // struct.
408 : HostVector hosts_added_;
409 : const HostVector hosts_removed_;
410 : PrioritySet::UpdateHostsParams update_hosts_params_;
411 : LocalityWeightsConstSharedPtr locality_weights_;
412 : // Keep small members (bools and enums) at the end of class, to reduce alignment overhead.
413 : const uint32_t priority_;
414 : bool weighted_priority_health_;
415 : uint32_t overprovisioning_factor_;
416 : };
417 :
418 159 : ThreadLocalClusterUpdateParams() = default;
419 : ThreadLocalClusterUpdateParams(uint32_t priority, const HostVector& hosts_added,
420 : const HostVector& hosts_removed)
421 0 : : per_priority_update_params_{{priority, hosts_added, hosts_removed}} {}
422 :
423 : std::vector<PerPriority> per_priority_update_params_;
424 : };
425 :
426 : /**
427 : * A cluster initialization object (CIO) encapsulates the relevant information
428 : * to create a cluster inline when there is traffic to it. We can thus use the
429 : * CIO to deferred instantiating clusters on workers until they are used.
430 : */
431 : struct ClusterInitializationObject {
432 : ClusterInitializationObject(const ThreadLocalClusterUpdateParams& params,
433 : ClusterInfoConstSharedPtr cluster_info,
434 : LoadBalancerFactorySharedPtr load_balancer_factory,
435 : HostMapConstSharedPtr map, UnitFloat drop_overload);
436 :
437 : ClusterInitializationObject(
438 : const absl::flat_hash_map<int, ThreadLocalClusterUpdateParams::PerPriority>&
439 : per_priority_state,
440 : const ThreadLocalClusterUpdateParams& update_params, ClusterInfoConstSharedPtr cluster_info,
441 : LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
442 : UnitFloat drop_overload);
443 :
444 : absl::flat_hash_map<int, ThreadLocalClusterUpdateParams::PerPriority> per_priority_state_;
445 : const ClusterInfoConstSharedPtr cluster_info_;
446 : const LoadBalancerFactorySharedPtr load_balancer_factory_;
447 : const HostMapConstSharedPtr cross_priority_host_map_;
448 : UnitFloat drop_overload_{0};
449 : };
450 :
451 : using ClusterInitializationObjectConstSharedPtr =
452 : std::shared_ptr<const ClusterInitializationObject>;
453 : using ClusterInitializationMap =
454 : absl::flat_hash_map<std::string, ClusterInitializationObjectConstSharedPtr>;
455 :
456 : /**
457 : * An implementation of an on-demand CDS handle. It forwards the discovery request to the cluster
458 : * manager that created the handle.
459 : *
460 : * It's a protected type, so unit tests can use it.
461 : */
462 : class OdCdsApiHandleImpl : public OdCdsApiHandle {
463 : public:
464 0 : static OdCdsApiHandlePtr create(ClusterManagerImpl& parent, OdCdsApiSharedPtr odcds) {
465 0 : return std::make_unique<OdCdsApiHandleImpl>(parent, std::move(odcds));
466 0 : }
467 :
468 : OdCdsApiHandleImpl(ClusterManagerImpl& parent, OdCdsApiSharedPtr odcds)
469 0 : : parent_(parent), odcds_(std::move(odcds)) {
470 0 : ASSERT(odcds_ != nullptr);
471 0 : }
472 :
473 : ClusterDiscoveryCallbackHandlePtr
474 : requestOnDemandClusterDiscovery(absl::string_view name, ClusterDiscoveryCallbackPtr callback,
475 0 : std::chrono::milliseconds timeout) override {
476 0 : return parent_.requestOnDemandClusterDiscovery(odcds_, std::string(name), std::move(callback),
477 0 : timeout);
478 0 : }
479 :
480 : private:
481 : ClusterManagerImpl& parent_;
482 : OdCdsApiSharedPtr odcds_;
483 : };
484 :
485 : virtual void postThreadLocalClusterUpdate(ClusterManagerCluster& cm_cluster,
486 : ThreadLocalClusterUpdateParams&& params);
487 :
488 : /**
489 : * Notifies cluster discovery managers in each worker thread that the discovery process for the
490 : * cluster with a passed name has timed out.
491 : *
492 : * It's protected, so the tests can use it.
493 : */
494 : void notifyExpiredDiscovery(absl::string_view name);
495 :
496 : /**
497 : * Creates a new discovery manager in current thread and swaps it with the one in thread local
498 : * cluster manager. This could be used to simulate requesting a cluster from a different
499 : * thread. Used for tests only.
500 : *
501 : * Protected, so tests can use it.
502 : *
503 : * @return the previous cluster discovery manager.
504 : */
505 : ClusterDiscoveryManager createAndSwapClusterDiscoveryManager(std::string thread_name);
506 :
507 : private:
508 : // To enable access to the protected constructor.
509 : friend ProdClusterManagerFactory;
510 :
511 : /**
512 : * Thread local cached cluster data. Each thread local cluster gets updates from the parent
513 : * central dynamic cluster (if applicable). It maintains load balancer state and any created
514 : * connection pools.
515 : */
516 : struct ThreadLocalClusterManagerImpl : public ThreadLocal::ThreadLocalObject,
517 : public ClusterLifecycleCallbackHandler {
518 : struct ConnPoolsContainer {
519 : ConnPoolsContainer(Event::Dispatcher& dispatcher, const HostConstSharedPtr& host)
520 : : host_handle_(host->acquireHandle()), pools_{std::make_shared<ConnPools>(dispatcher,
521 173 : host)} {}
522 :
523 : using ConnPools = PriorityConnPoolMap<std::vector<uint8_t>, Http::ConnectionPool::Instance>;
524 :
525 : // Destroyed after pools.
526 : const HostHandlePtr host_handle_;
527 : // This is a shared_ptr so we can keep it alive while cleaning up.
528 : std::shared_ptr<ConnPools> pools_;
529 :
530 : // Protect from deletion while iterating through pools_. See comments and usage
531 : // in `ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools()`.
532 : bool do_not_delete_{false};
533 : };
534 :
535 : struct TcpConnPoolsContainer {
536 0 : TcpConnPoolsContainer(HostHandlePtr&& host_handle) : host_handle_(std::move(host_handle)) {}
537 :
538 : using ConnPools = std::map<std::vector<uint8_t>, Tcp::ConnectionPool::InstancePtr>;
539 :
540 : // Destroyed after pools.
541 : const HostHandlePtr host_handle_;
542 : ConnPools pools_;
543 : };
544 :
545 : // Holds an unowned reference to a connection, and watches for Closed events. If the connection
546 : // is closed, this container removes itself from the container that owns it.
547 : struct TcpConnContainer : public Network::ConnectionCallbacks, public Event::DeferredDeletable {
548 : public:
549 : TcpConnContainer(ThreadLocalClusterManagerImpl& parent, const HostConstSharedPtr& host,
550 : Network::ClientConnection& connection)
551 0 : : parent_(parent), host_(host), connection_(connection) {
552 0 : connection_.addConnectionCallbacks(*this);
553 0 : }
554 :
555 : // Network::ConnectionCallbacks
556 0 : void onEvent(Network::ConnectionEvent event) override {
557 0 : if (event == Network::ConnectionEvent::LocalClose ||
558 0 : event == Network::ConnectionEvent::RemoteClose) {
559 0 : parent_.removeTcpConn(host_, connection_);
560 0 : }
561 0 : }
562 0 : void onAboveWriteBufferHighWatermark() override {}
563 0 : void onBelowWriteBufferLowWatermark() override {}
564 :
565 : ThreadLocalClusterManagerImpl& parent_;
566 : HostConstSharedPtr host_;
567 : Network::ClientConnection& connection_;
568 : };
569 : struct TcpConnectionsMap {
570 0 : TcpConnectionsMap(HostHandlePtr&& host_handle) : host_handle_(std::move(host_handle)) {}
571 :
572 : // Destroyed after pools.
573 : const HostHandlePtr host_handle_;
574 : absl::node_hash_map<Network::ClientConnection*, std::unique_ptr<TcpConnContainer>>
575 : connections_;
576 : };
577 :
578 : class ClusterEntry : public ThreadLocalCluster {
579 : public:
580 : ClusterEntry(ThreadLocalClusterManagerImpl& parent, ClusterInfoConstSharedPtr cluster,
581 : const LoadBalancerFactorySharedPtr& lb_factory);
582 : ~ClusterEntry() override;
583 :
584 : // Upstream::ThreadLocalCluster
585 306 : const PrioritySet& prioritySet() override { return priority_set_; }
586 687 : ClusterInfoConstSharedPtr info() override { return cluster_info_; }
587 0 : LoadBalancer& loadBalancer() override { return *lb_; }
588 : absl::optional<HttpPoolData> httpConnPool(ResourcePriority priority,
589 : absl::optional<Http::Protocol> downstream_protocol,
590 : LoadBalancerContext* context) override;
591 : absl::optional<TcpPoolData> tcpConnPool(ResourcePriority priority,
592 : LoadBalancerContext* context) override;
593 : Host::CreateConnectionData tcpConn(LoadBalancerContext* context) override;
594 : Http::AsyncClient& httpAsyncClient() override;
595 : Tcp::AsyncTcpClientPtr
596 : tcpAsyncClient(LoadBalancerContext* context,
597 : Tcp::AsyncTcpClientOptionsConstSharedPtr options) override;
598 :
599 : // Updates the hosts in the priority set.
600 : void updateHosts(const std::string& name, uint32_t priority,
601 : PrioritySet::UpdateHostsParams&& update_hosts_params,
602 : LocalityWeightsConstSharedPtr locality_weights,
603 : const HostVector& hosts_added, const HostVector& hosts_removed,
604 : absl::optional<bool> weighted_priority_health,
605 : absl::optional<uint32_t> overprovisioning_factor,
606 : HostMapConstSharedPtr cross_priority_host_map);
607 :
608 : // Drains any connection pools associated with the removed hosts. All connections will be
609 : // closed gracefully and no new connections will be created.
610 : void drainConnPools(const HostVector& hosts_removed);
611 : // Drains any connection pools associated with the all hosts. All connections will be
612 : // closed gracefully and no new connections will be created.
613 : void drainConnPools();
614 : // Drain any connection pools associated with the hosts filtered by the predicate.
615 : void drainConnPools(DrainConnectionsHostPredicate predicate,
616 : ConnectionPool::DrainBehavior behavior);
617 251 : UnitFloat dropOverload() const override { return drop_overload_; }
618 306 : void setDropOverload(UnitFloat drop_overload) override { drop_overload_ = drop_overload; }
619 :
620 : private:
621 : Http::ConnectionPool::Instance*
622 : httpConnPoolImpl(ResourcePriority priority,
623 : absl::optional<Http::Protocol> downstream_protocol,
624 : LoadBalancerContext* context, bool peek);
625 :
626 : Tcp::ConnectionPool::Instance* tcpConnPoolImpl(ResourcePriority priority,
627 : LoadBalancerContext* context, bool peek);
628 :
629 : HostConstSharedPtr chooseHost(LoadBalancerContext* context);
630 : HostConstSharedPtr peekAnotherHost(LoadBalancerContext* context);
631 :
632 : ThreadLocalClusterManagerImpl& parent_;
633 : PrioritySetImpl priority_set_;
634 : UnitFloat drop_overload_{0};
635 :
636 : // Don't change the order of cluster_info_ and lb_factory_/lb_ as the the lb_factory_/lb_
637 : // may keep a reference to the cluster_info_.
638 : ClusterInfoConstSharedPtr cluster_info_;
639 : // LB factory if applicable. Not all load balancer types have a factory. LB types that have
640 : // a factory will create a new LB on every membership update. LB types that don't have a
641 : // factory will create an LB on construction and use it forever.
642 : LoadBalancerFactorySharedPtr lb_factory_;
643 : // Current active LB.
644 : LoadBalancerPtr lb_;
645 : Http::AsyncClientPtr lazy_http_async_client_;
646 : // Stores QUICHE specific objects which live through out the life time of the cluster and can
647 : // be shared across its hosts.
648 : Http::PersistentQuicInfoPtr quic_info_;
649 :
650 : // Expected override host statues. Every bit in the HostStatusSet represent an enum value
651 : // of envoy::config::core::v3::HealthStatus. The specific correspondence is shown below:
652 : //
653 : // * 0b000001: envoy::config::core::v3::HealthStatus::UNKNOWN
654 : // * 0b000010: envoy::config::core::v3::HealthStatus::HEALTHY
655 : // * 0b000100: envoy::config::core::v3::HealthStatus::UNHEALTHY
656 : // * 0b001000: envoy::config::core::v3::HealthStatus::DRAINING
657 : // * 0b010000: envoy::config::core::v3::HealthStatus::TIMEOUT
658 : // * 0b100000: envoy::config::core::v3::HealthStatus::DEGRADED
659 : //
660 : // If multiple bit fields are set, it is acceptable as long as the status of override host is
661 : // in any of these statuses.
662 : const HostUtility::HostStatusSet override_host_statuses_{};
663 : };
664 :
665 : using ClusterEntryPtr = std::unique_ptr<ClusterEntry>;
666 :
667 : struct LocalClusterParams {
668 : LoadBalancerFactorySharedPtr load_balancer_factory_;
669 : ClusterInfoConstSharedPtr info_;
670 : };
671 :
672 : ThreadLocalClusterManagerImpl(ClusterManagerImpl& parent, Event::Dispatcher& dispatcher,
673 : const absl::optional<LocalClusterParams>& local_cluster_params);
674 : ~ThreadLocalClusterManagerImpl() override;
675 :
676 : // Drain or close connections of host. If no drain behavior is provided then closing will
677 : // be immediate.
678 : void drainOrCloseConnPools(const HostSharedPtr& host,
679 : absl::optional<ConnectionPool::DrainBehavior> drain_behavior);
680 :
681 : void httpConnPoolIsIdle(HostConstSharedPtr host, ResourcePriority priority,
682 : const std::vector<uint8_t>& hash_key);
683 : void tcpConnPoolIsIdle(HostConstSharedPtr host, const std::vector<uint8_t>& hash_key);
684 : void removeTcpConn(const HostConstSharedPtr& host, Network::ClientConnection& connection);
685 : void removeHosts(const std::string& name, const HostVector& hosts_removed);
686 : void updateClusterMembership(const std::string& name, uint32_t priority,
687 : PrioritySet::UpdateHostsParams update_hosts_params,
688 : LocalityWeightsConstSharedPtr locality_weights,
689 : const HostVector& hosts_added, const HostVector& hosts_removed,
690 : bool weighted_priority_health, uint64_t overprovisioning_factor,
691 : HostMapConstSharedPtr cross_priority_host_map);
692 : void onHostHealthFailure(const HostSharedPtr& host);
693 :
694 : ConnPoolsContainer* getHttpConnPoolsContainer(const HostConstSharedPtr& host,
695 : bool allocate = false);
696 :
697 : // Upstream::ClusterLifecycleCallbackHandler
698 : ClusterUpdateCallbacksHandlePtr addClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) override;
699 :
700 : /**
701 : * Transparently initialize the given thread local cluster if possible using
702 : * the Cluster Initialization object.
703 : *
704 : * @return The ClusterEntry of the newly initialized cluster or nullptr if there
705 : * is no cluster deferred cluster with that name.
706 : */
707 : ClusterEntry* initializeClusterInlineIfExists(absl::string_view cluster);
708 :
709 : ClusterManagerImpl& parent_;
710 : Event::Dispatcher& thread_local_dispatcher_;
711 : // Known clusters will exclusively exist in either `thread_local_clusters_`
712 : // or `thread_local_deferred_clusters_`.
713 : absl::flat_hash_map<std::string, ClusterEntryPtr> thread_local_clusters_;
714 : // Maps from a given cluster name to the CIO for that cluster.
715 : ClusterInitializationMap thread_local_deferred_clusters_;
716 :
717 : ClusterConnectivityState cluster_manager_state_;
718 :
719 : // These maps are owned by the ThreadLocalClusterManagerImpl instead of the ClusterEntry
720 : // to prevent lifetime/ownership issues when a cluster is dynamically removed.
721 : absl::node_hash_map<HostConstSharedPtr, ConnPoolsContainer> host_http_conn_pool_map_;
722 : absl::node_hash_map<HostConstSharedPtr, TcpConnPoolsContainer> host_tcp_conn_pool_map_;
723 : absl::node_hash_map<HostConstSharedPtr, TcpConnectionsMap> host_tcp_conn_map_;
724 :
725 : std::list<Envoy::Upstream::ClusterUpdateCallbacks*> update_callbacks_;
726 : const PrioritySet* local_priority_set_{};
727 : bool destroying_{};
728 : ClusterDiscoveryManager cdm_;
729 : ThreadLocalClusterManagerStats local_stats_;
730 :
731 : private:
732 : static ThreadLocalClusterManagerStats generateStats(Stats::Scope& scope,
733 : const std::string& thread_name);
734 : };
735 :
736 : struct ClusterData : public ClusterManagerCluster {
737 : ClusterData(const envoy::config::cluster::v3::Cluster& cluster_config,
738 : const uint64_t cluster_config_hash, const std::string& version_info,
739 : bool added_via_api, bool required_for_ads, ClusterSharedPtr&& cluster,
740 : TimeSource& time_source)
741 : : cluster_config_(cluster_config), config_hash_(cluster_config_hash),
742 : version_info_(version_info), cluster_(std::move(cluster)),
743 : last_updated_(time_source.systemTime()),
744 159 : added_via_api_(added_via_api), added_or_updated_{}, required_for_ads_(required_for_ads) {}
745 :
746 0 : bool blockUpdate(uint64_t hash) { return !added_via_api_ || config_hash_ == hash; }
747 :
748 : // ClusterManagerCluster
749 1908 : Cluster& cluster() override { return *cluster_; }
750 159 : LoadBalancerFactorySharedPtr loadBalancerFactory() override {
751 159 : if (thread_aware_lb_ != nullptr) {
752 159 : return thread_aware_lb_->factory();
753 159 : } else {
754 0 : return nullptr;
755 0 : }
756 159 : }
757 159 : bool addedOrUpdated() override { return added_or_updated_; }
758 159 : void setAddedOrUpdated() override {
759 159 : ASSERT(!added_or_updated_);
760 159 : added_or_updated_ = true;
761 159 : }
762 159 : bool requiredForAds() const override { return required_for_ads_; }
763 :
764 : const envoy::config::cluster::v3::Cluster cluster_config_;
765 : const uint64_t config_hash_;
766 : const std::string version_info_;
767 : // Don't change the order of cluster_ and thread_aware_lb_ as the thread_aware_lb_ may
768 : // keep a reference to the cluster_.
769 : ClusterSharedPtr cluster_;
770 : // Optional thread aware LB depending on the LB type. Not all clusters have one.
771 : ThreadAwareLoadBalancerPtr thread_aware_lb_;
772 : SystemTime last_updated_;
773 : Common::CallbackHandlePtr member_update_cb_;
774 : Common::CallbackHandlePtr priority_update_cb_;
775 : // Keep smaller fields near the end to reduce padding
776 : const bool added_via_api_ : 1;
777 : bool added_or_updated_ : 1;
778 : const bool required_for_ads_ : 1;
779 : };
780 :
781 : struct ClusterUpdateCallbacksHandleImpl : public ClusterUpdateCallbacksHandle,
782 : RaiiListElement<ClusterUpdateCallbacks*> {
783 : ClusterUpdateCallbacksHandleImpl(ClusterUpdateCallbacks& cb,
784 : std::list<ClusterUpdateCallbacks*>& parent)
785 223 : : RaiiListElement<ClusterUpdateCallbacks*>(parent, &cb) {}
786 : };
787 :
788 : using ClusterDataPtr = std::unique_ptr<ClusterData>;
789 : // This map is ordered so that config dumping is consistent.
790 : using ClusterMap = std::map<std::string, ClusterDataPtr>;
791 :
792 : struct PendingUpdates {
793 0 : ~PendingUpdates() { disableTimer(); }
794 0 : void enableTimer(const uint64_t timeout) {
795 0 : if (timer_ != nullptr) {
796 0 : ASSERT(!timer_->enabled());
797 0 : timer_->enableTimer(std::chrono::milliseconds(timeout));
798 0 : }
799 0 : }
800 0 : bool disableTimer() {
801 0 : if (timer_ == nullptr) {
802 0 : return false;
803 0 : }
804 :
805 0 : const bool was_enabled = timer_->enabled();
806 0 : timer_->disableTimer();
807 0 : return was_enabled;
808 0 : }
809 :
810 : Event::TimerPtr timer_;
811 : // This is default constructed to the clock's epoch:
812 : // https://en.cppreference.com/w/cpp/chrono/time_point/time_point
813 : //
814 : // Depending on your execution environment this value can be different.
815 : // When running as host process: This will usually be the computer's boot time, which means that
816 : // given a not very large `Cluster.CommonLbConfig.update_merge_window`, the first update will
817 : // trigger immediately (the expected behavior). When running in some sandboxed environment this
818 : // value can be set to the start time of the sandbox, which means that the delta calculated
819 : // between now and the start time may fall within the
820 : // `Cluster.CommonLbConfig.update_merge_window`, with the side effect to delay the first update.
821 : MonotonicTime last_updated_;
822 : };
823 :
824 : using PendingUpdatesPtr = std::unique_ptr<PendingUpdates>;
825 : using PendingUpdatesByPriorityMap = absl::node_hash_map<uint32_t, PendingUpdatesPtr>;
826 : using PendingUpdatesByPriorityMapPtr = std::unique_ptr<PendingUpdatesByPriorityMap>;
827 : using ClusterUpdatesMap = absl::node_hash_map<std::string, PendingUpdatesByPriorityMapPtr>;
828 :
829 : /**
830 : * Holds a reference to an on-demand CDS to keep it alive for the duration of a cluster discovery,
831 : * and an expiration timer notifying worker threads about discovery timing out.
832 : */
833 : struct ClusterCreation {
834 : OdCdsApiSharedPtr odcds_;
835 : Event::TimerPtr expiration_timer_;
836 : };
837 :
838 : using ClusterCreationsMap = absl::flat_hash_map<std::string, ClusterCreation>;
839 :
840 : void applyUpdates(ClusterManagerCluster& cluster, uint32_t priority, PendingUpdates& updates);
841 : bool scheduleUpdate(ClusterManagerCluster& cluster, uint32_t priority, bool mergeable,
842 : const uint64_t timeout);
843 : ProtobufTypes::MessagePtr dumpClusterConfigs(const Matchers::StringMatcher& name_matcher);
844 : static ClusterManagerStats generateStats(Stats::Scope& scope);
845 :
846 : /**
847 : * @return ClusterDataPtr contains the previous cluster in the cluster_map, or
848 : * nullptr if cluster_map did not contain the same cluster or an error if
849 : * cluster load fails.
850 : */
851 : absl::StatusOr<ClusterDataPtr> loadCluster(const envoy::config::cluster::v3::Cluster& cluster,
852 : const uint64_t cluster_hash,
853 : const std::string& version_info, bool added_via_api,
854 : bool required_for_ads, ClusterMap& cluster_map);
855 : void onClusterInit(ClusterManagerCluster& cluster);
856 : void postThreadLocalHealthFailure(const HostSharedPtr& host);
857 : void updateClusterCounts();
858 : void clusterWarmingToActive(const std::string& cluster_name);
859 : static void maybePreconnect(ThreadLocalClusterManagerImpl::ClusterEntry& cluster_entry,
860 : const ClusterConnectivityState& cluster_manager_state,
861 : std::function<ConnectionPool::Instance*()> preconnect_pool);
862 :
863 : ClusterDiscoveryCallbackHandlePtr
864 : requestOnDemandClusterDiscovery(OdCdsApiSharedPtr odcds, std::string name,
865 : ClusterDiscoveryCallbackPtr callback,
866 : std::chrono::milliseconds timeout);
867 :
868 : void notifyClusterDiscoveryStatus(absl::string_view name, ClusterDiscoveryStatus status);
869 :
870 : protected:
871 : ClusterMap active_clusters_;
872 : ClusterInitializationMap cluster_initialization_map_;
873 :
874 : private:
875 : /**
876 : * Builds the cluster initialization object for this given cluster.
877 : * @return a ClusterInitializationObjectSharedPtr that can be used to create
878 : * this cluster or nullptr if deferred cluster creation is off or the cluster
879 : * type is not supported.
880 : */
881 : ClusterInitializationObjectConstSharedPtr addOrUpdateClusterInitializationObjectIfSupported(
882 : const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info,
883 : LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
884 : UnitFloat drop_overload);
885 :
886 : bool deferralIsSupportedForCluster(const ClusterInfoConstSharedPtr& info) const;
887 :
888 : const Server::Instance& server_;
889 : ClusterManagerFactory& factory_;
890 : Runtime::Loader& runtime_;
891 : Stats::Store& stats_;
892 : ThreadLocal::TypedSlot<ThreadLocalClusterManagerImpl> tls_;
893 : // Contains information about ongoing on-demand cluster discoveries.
894 : ClusterCreationsMap pending_cluster_creations_;
895 : Random::RandomGenerator& random_;
896 : ClusterMap warming_clusters_;
897 : const bool deferred_cluster_creation_;
898 : absl::optional<envoy::config::core::v3::BindConfig> bind_config_;
899 : Outlier::EventLoggerSharedPtr outlier_event_logger_;
900 : const LocalInfo::LocalInfo& local_info_;
901 : CdsApiPtr cds_api_;
902 : ClusterManagerStats cm_stats_;
903 : ClusterManagerInitHelper init_helper_;
904 : Config::GrpcMuxSharedPtr ads_mux_;
905 : // Temporarily saved resume cds callback from updateClusterCounts invocation.
906 : Config::ScopedResume resume_cds_;
907 : LoadStatsReporterPtr load_stats_reporter_;
908 : // The name of the local cluster of this Envoy instance if defined.
909 : absl::optional<std::string> local_cluster_name_;
910 : Grpc::AsyncClientManagerPtr async_client_manager_;
911 : Server::ConfigTracker::EntryOwnerPtr config_tracker_entry_;
912 : TimeSource& time_source_;
913 : ClusterUpdatesMap updates_map_;
914 : Event::Dispatcher& dispatcher_;
915 : Http::Context& http_context_;
916 : ProtobufMessage::ValidationContext& validation_context_;
917 : Router::Context& router_context_;
918 : ClusterTrafficStatNames cluster_stat_names_;
919 : ClusterConfigUpdateStatNames cluster_config_update_stat_names_;
920 : ClusterLbStatNames cluster_lb_stat_names_;
921 : ClusterEndpointStatNames cluster_endpoint_stat_names_;
922 : ClusterLoadReportStatNames cluster_load_report_stat_names_;
923 : ClusterCircuitBreakersStatNames cluster_circuit_breakers_stat_names_;
924 : ClusterRequestResponseSizeStatNames cluster_request_response_size_stat_names_;
925 : ClusterTimeoutBudgetStatNames cluster_timeout_budget_stat_names_;
926 : std::shared_ptr<SharedPool::ObjectSharedPool<
927 : const envoy::config::cluster::v3::Cluster::CommonLbConfig, MessageUtil, MessageUtil>>
928 : common_lb_config_pool_;
929 :
930 : std::unique_ptr<Config::SubscriptionFactoryImpl> subscription_factory_;
931 : ClusterSet primary_clusters_;
932 :
933 : std::unique_ptr<Config::XdsResourcesDelegate> xds_resources_delegate_;
934 : std::unique_ptr<Config::XdsConfigTracker> xds_config_tracker_;
935 :
936 : bool initialized_{};
937 : bool ads_mux_initialized_{};
938 : std::atomic<bool> shutdown_{};
939 :
940 : // Records the last `warming_clusters_` map size from updateClusterCounts(). This variable is
941 : // used for bookkeeping to run the `resume_cds_` cleanup that decrements the pause count and
942 : // enables the resumption of DiscoveryRequests for the Cluster type url.
943 : //
944 : // The `warming_clusters` gauge is not suitable for this purpose, because different environments
945 : // (e.g. mobile) may have different stats enabled, leading to the gauge not having a reliable
946 : // previous warming clusters size value.
947 : std::size_t last_recorded_warming_clusters_count_{0};
948 : };
949 :
950 : } // namespace Upstream
951 : } // namespace Envoy
|