1
#pragma once
2

            
3
#include <array>
4
#include <cstdint>
5
#include <functional>
6
#include <list>
7
#include <map>
8
#include <memory>
9
#include <string>
10
#include <vector>
11

            
12
#include "envoy/api/api.h"
13
#include "envoy/common/callback.h"
14
#include "envoy/common/random_generator.h"
15
#include "envoy/config/bootstrap/v3/bootstrap.pb.h"
16
#include "envoy/config/cluster/v3/cluster.pb.h"
17
#include "envoy/config/core/v3/address.pb.h"
18
#include "envoy/config/core/v3/config_source.pb.h"
19
#include "envoy/config/xds_resources_delegate.h"
20
#include "envoy/http/codes.h"
21
#include "envoy/local_info/local_info.h"
22
#include "envoy/router/context.h"
23
#include "envoy/runtime/runtime.h"
24
#include "envoy/secret/secret_manager.h"
25
#include "envoy/server/instance.h"
26
#include "envoy/ssl/context_manager.h"
27
#include "envoy/stats/scope.h"
28
#include "envoy/tcp/async_tcp_client.h"
29
#include "envoy/thread_local/thread_local.h"
30
#include "envoy/upstream/cluster_manager.h"
31
#include "envoy/upstream/load_stats_reporter.h"
32

            
33
#include "source/common/common/cleanup.h"
34
#include "source/common/common/thread.h"
35
#include "source/common/http/async_client_impl.h"
36
#include "source/common/http/http_server_properties_cache_impl.h"
37
#include "source/common/http/http_server_properties_cache_manager_impl.h"
38
#include "source/common/quic/envoy_quic_network_observer_registry_factory.h"
39
#include "source/common/quic/quic_stat_names.h"
40
#include "source/common/tcp/async_tcp_client_impl.h"
41
#include "source/common/upstream/cluster_discovery_manager.h"
42
#include "source/common/upstream/host_utility.h"
43
#include "source/common/upstream/priority_conn_pool_map.h"
44
#include "source/common/upstream/upstream_impl.h"
45

            
46
namespace Envoy {
47
namespace Upstream {
48

            
49
/**
50
 * Production implementation of ClusterManagerFactory.
51
 */
52
class ProdClusterManagerFactory : public ClusterManagerFactory {
53
public:
54
  using LazyCreateDnsResolver = std::function<Network::DnsResolverSharedPtr()>;
55

            
56
  ProdClusterManagerFactory(Server::Configuration::ServerFactoryContext& context,
57
                            LazyCreateDnsResolver dns_resolver_fn,
58
                            Quic::QuicStatNames& quic_stat_names)
59
10832
      : context_(context), stats_(context.serverScope().store()), dns_resolver_fn_(dns_resolver_fn),
60
10832
        quic_stat_names_(quic_stat_names),
61
10832
        alternate_protocols_cache_manager_(context.httpServerPropertiesCacheManager()) {}
62

            
63
  // Upstream::ClusterManagerFactory
64
  absl::StatusOr<ClusterManagerPtr>
65
  clusterManagerFromProto(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) override;
66
  Http::ConnectionPool::InstancePtr allocateConnPool(
67
      Event::Dispatcher& dispatcher, HostConstSharedPtr host, ResourcePriority priority,
68
      std::vector<Http::Protocol>& protocol,
69
      const absl::optional<envoy::config::core::v3::AlternateProtocolsCacheOptions>&
70
          alternate_protocol_options,
71
      const Network::ConnectionSocket::OptionsSharedPtr& options,
72
      const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
73
      TimeSource& time_source, ClusterConnectivityState& state,
74
      Http::PersistentQuicInfoPtr& quic_info,
75
      OptRef<Quic::EnvoyQuicNetworkObserverRegistry> network_observer_registry) override;
76
  Tcp::ConnectionPool::InstancePtr
77
  allocateTcpConnPool(Event::Dispatcher& dispatcher, HostConstSharedPtr host,
78
                      ResourcePriority priority,
79
                      const Network::ConnectionSocket::OptionsSharedPtr& options,
80
                      Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
81
                      ClusterConnectivityState& state,
82
                      absl::optional<std::chrono::milliseconds> tcp_pool_idle_timeout) override;
83
  absl::StatusOr<std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr>>
84
  clusterFromProto(const envoy::config::cluster::v3::Cluster& cluster,
85
                   Outlier::EventLoggerSharedPtr outlier_event_logger, bool added_via_api) override;
86
  absl::StatusOr<CdsApiPtr> createCds(const envoy::config::core::v3::ConfigSource& cds_config,
87
                                      const xds::core::v3::ResourceLocator* cds_resources_locator,
88
                                      ClusterManager& cm, bool support_multi_ads_sources) override;
89

            
90
protected:
91
  Server::Configuration::ServerFactoryContext& context_;
92
  Stats::Store& stats_;
93
  LazyCreateDnsResolver dns_resolver_fn_;
94
  Quic::QuicStatNames& quic_stat_names_;
95
  Http::HttpServerPropertiesCacheManager& alternate_protocols_cache_manager_;
96
};
97

            
98
// For friend declaration in ClusterManagerInitHelper.
99
class ClusterManagerImpl;
100

            
101
/**
102
 * Wrapper for a cluster owned by the cluster manager. Used by both the cluster manager and the
103
 * cluster manager init helper which needs to pass clusters back to the cluster manager.
104
 */
105
class ClusterManagerCluster {
106
public:
107
17592
  virtual ~ClusterManagerCluster() = default;
108

            
109
  // Return the underlying cluster.
110
  virtual Cluster& cluster() PURE;
111

            
112
  // Return a new load balancer factory if the cluster has one.
113
  virtual LoadBalancerFactorySharedPtr loadBalancerFactory() PURE;
114

            
115
  // Return true if a cluster has already been added or updated.
116
  virtual bool addedOrUpdated() PURE;
117

            
118
  // Set when a cluster has been added or updated. This is only called a single time for a cluster.
119
  virtual void setAddedOrUpdated() PURE;
120

            
121
  // Return true if the cluster must be ready-for-use before ADS (Aggregated Discovery Service) can
122
  // be initialized; will only occur if ADS is configured to use the cluster via EnvoyGrpc.
123
  virtual bool requiredForAds() const PURE;
124
};
125

            
126
/**
127
 * This is a helper class used during cluster management initialization. Dealing with primary
128
 * clusters, secondary clusters, and CDS, is quite complicated, so this makes it easier to test.
129
 */
130
class ClusterManagerInitHelper : Logger::Loggable<Logger::Id::upstream> {
131
public:
132
  /**
133
   * @param per_cluster_init_callback supplies the callback to call when a cluster has itself
134
   *        initialized. The cluster manager can use this for post-init processing.
135
   */
136
  ClusterManagerInitHelper(
137
      Config::XdsManager& xds_manager,
138
      const std::function<absl::Status(ClusterManagerCluster&)>& per_cluster_init_callback)
139
11037
      : xds_manager_(xds_manager), per_cluster_init_callback_(per_cluster_init_callback) {}
140

            
141
  enum class State {
142
    // Initial state. During this state all static clusters are loaded. Any primary clusters
143
    // immediately begin initialization.
144
    Loading,
145
    // In this state cluster manager waits for all primary clusters to finish initialization.
146
    // This state may immediately transition to the next state iff all clusters are STATIC and
147
    // without health checks enabled or health checks have failed immediately, since their
148
    // initialization completes immediately.
149
    WaitingForPrimaryInitializationToComplete,
150
    // During this state cluster manager waits to start initializing secondary clusters. In this
151
    // state all primary clusters have completed initialization. Initialization of the
152
    // secondary clusters is started by the `initializeSecondaryClusters` method.
153
    WaitingToStartSecondaryInitialization,
154
    // In this state cluster manager waits for all secondary clusters (if configured) to finish
155
    // initialization. Then, if CDS is configured, this state tracks waiting for the first CDS
156
    // response to populate dynamically configured clusters.
157
    WaitingToStartCdsInitialization,
158
    // During this state, all CDS populated clusters are undergoing either phase 1 or phase 2
159
    // initialization.
160
    CdsInitialized,
161
    // All clusters are fully initialized.
162
    AllClustersInitialized
163
  };
164

            
165
  void addCluster(ClusterManagerCluster& cluster);
166
  void onStaticLoadComplete();
167
  void removeCluster(ClusterManagerCluster& cluster);
168
  void setCds(CdsApi* cds);
169
  void setPrimaryClustersInitializedCb(ClusterManager::PrimaryClustersReadyCallback callback);
170
  void setInitializedCb(ClusterManager::InitializationCompleteCallback callback);
171
43075
  State state() const { return state_; }
172

            
173
  void startInitializingSecondaryClusters();
174

            
175
private:
176
  // To enable invariant assertions on the cluster lists.
177
  friend ClusterManagerImpl;
178

            
179
  void initializeSecondaryClusters();
180
  void maybeFinishInitialize();
181
  absl::Status onClusterInit(ClusterManagerCluster& cluster);
182

            
183
  Config::XdsManager& xds_manager_;
184
  std::function<absl::Status(ClusterManagerCluster& cluster)> per_cluster_init_callback_;
185
  CdsApi* cds_{};
186
  ClusterManager::PrimaryClustersReadyCallback primary_clusters_initialized_callback_;
187
  ClusterManager::InitializationCompleteCallback initialized_callback_;
188
  absl::flat_hash_map<std::string, ClusterManagerCluster*> primary_init_clusters_;
189
  absl::flat_hash_map<std::string, ClusterManagerCluster*> secondary_init_clusters_;
190
  State state_{State::Loading};
191
  bool started_secondary_initialize_{};
192
};
193

            
194
/**
195
 * All cluster manager stats. @see stats_macros.h
196
 */
197
#define ALL_CLUSTER_MANAGER_STATS(COUNTER, GAUGE)                                                  \
198
11028
  COUNTER(cluster_added)                                                                           \
199
11028
  COUNTER(cluster_modified)                                                                        \
200
11028
  COUNTER(cluster_removed)                                                                         \
201
11028
  COUNTER(cluster_updated)                                                                         \
202
11028
  COUNTER(cluster_updated_via_merge)                                                               \
203
11028
  COUNTER(update_merge_cancelled)                                                                  \
204
11028
  COUNTER(update_out_of_merge_window)                                                              \
205
11028
  GAUGE(active_clusters, NeverImport)                                                              \
206
11028
  GAUGE(warming_clusters, NeverImport)
207

            
208
/**
209
 * Struct definition for all cluster manager stats. @see stats_macros.h
210
 */
211
struct ClusterManagerStats {
212
  ALL_CLUSTER_MANAGER_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT)
213
};
214

            
215
/**
216
 * All thread local cluster manager stats. @see stats_macros.h
217
 */
218
21644
#define ALL_THREAD_LOCAL_CLUSTER_MANAGER_STATS(GAUGE) GAUGE(clusters_inflated, NeverImport)
219

            
220
/**
221
 * Struct definition for all cluster manager stats. @see stats_macros.h
222
 */
223
struct ThreadLocalClusterManagerStats {
224
  ALL_THREAD_LOCAL_CLUSTER_MANAGER_STATS(GENERATE_GAUGE_STRUCT)
225
};
226

            
227
/**
228
 * Implementation of ClusterManager that reads from a proto configuration, maintains a central
229
 * cluster list, as well as thread local caches of each cluster and associated connection pools.
230
 */
231
class ClusterManagerImpl : public ClusterManager,
232
                           public MissingClusterNotifier,
233
                           Logger::Loggable<Logger::Id::upstream> {
234
public:
235
  absl::Status initialize(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) override;
236

            
237
  bool initialized() override { return initialized_; }
238

            
239
8
  std::size_t warmingClusterCount() const { return warming_clusters_.size(); }
240

            
241
  // Upstream::ClusterManager
242
  absl::StatusOr<bool> addOrUpdateCluster(const envoy::config::cluster::v3::Cluster& cluster,
243
                                          const std::string& version_info,
244
                                          const bool avoid_cds_removal = false) override;
245

            
246
10838
  void setPrimaryClustersInitializedCb(PrimaryClustersReadyCallback callback) override {
247
10838
    init_helper_.setPrimaryClustersInitializedCb(callback);
248
10838
  }
249

            
250
10691
  void setInitializedCb(InitializationCompleteCallback callback) override {
251
10691
    init_helper_.setInitializedCb(callback);
252
10691
  }
253

            
254
13479
  ClusterInfoMaps clusters() const override {
255
13479
    ClusterInfoMaps clusters_maps;
256
21499
    for (const auto& cluster : active_clusters_) {
257
21279
      clusters_maps.active_clusters_.emplace(cluster.first, *cluster.second->cluster_);
258
21279
      if (cluster.second->cluster_->info()->addedViaApi()) {
259
1506
        ++clusters_maps.added_via_api_clusters_num_;
260
1506
      }
261
21279
    }
262
13479
    for (const auto& cluster : warming_clusters_) {
263
95
      clusters_maps.warming_clusters_.emplace(cluster.first, *cluster.second->cluster_);
264
95
      if (cluster.second->cluster_->info()->addedViaApi()) {
265
95
        ++clusters_maps.added_via_api_clusters_num_;
266
95
      }
267
95
    }
268
    // The number of clusters that were added via API must be at most the number
269
    // of active clusters + number of warming clusters.
270
13479
    ASSERT(clusters_maps.added_via_api_clusters_num_ <=
271
13479
           clusters_maps.active_clusters_.size() + clusters_maps.warming_clusters_.size());
272
13479
    return clusters_maps;
273
13479
  }
274

            
275
67
  OptRef<const Cluster> getActiveCluster(const std::string& cluster_name) const override {
276
67
    ASSERT_IS_MAIN_OR_TEST_THREAD();
277
67
    if (const auto& it = active_clusters_.find(cluster_name); it != active_clusters_.end()) {
278
56
      return *it->second->cluster_;
279
56
    }
280
11
    return absl::nullopt;
281
67
  }
282

            
283
14
  OptRef<const Cluster> getActiveOrWarmingCluster(const std::string& cluster_name) const override {
284
14
    ASSERT_IS_MAIN_OR_TEST_THREAD();
285
14
    if (const auto& it = active_clusters_.find(cluster_name); it != active_clusters_.end()) {
286
12
      return *it->second->cluster_;
287
12
    }
288
2
    if (const auto& it = warming_clusters_.find(cluster_name); it != warming_clusters_.end()) {
289
1
      return *it->second->cluster_;
290
1
    }
291
1
    return absl::nullopt;
292
2
  }
293

            
294
10487
  bool hasCluster(const std::string& cluster_name) const override {
295
10487
    ASSERT_IS_MAIN_OR_TEST_THREAD();
296
10487
    return active_clusters_.contains(cluster_name) || warming_clusters_.contains(cluster_name);
297
10487
  }
298

            
299
3
  bool hasActiveClusters() const override {
300
3
    ASSERT_IS_MAIN_OR_TEST_THREAD();
301
3
    return !active_clusters_.empty();
302
3
  }
303

            
304
1081
  const ClusterSet& primaryClusters() override { return primary_clusters_; }
305
  ThreadLocalCluster* getThreadLocalCluster(absl::string_view cluster) override;
306

            
307
  bool removeCluster(const std::string& cluster, const bool remove_ignored = false) override;
308
10717
  void shutdown() override {
309
10717
    shutdown_ = true;
310
10717
    if (resume_cds_ != nullptr) {
311
14
      resume_cds_->cancel();
312
14
    }
313
    // Make sure we destroy all potential outgoing connections before this returns.
314
10717
    cds_api_.reset();
315
10717
    xds_manager_.shutdown();
316
10717
    load_stats_reporter_.reset();
317
10717
    active_clusters_.clear();
318
10717
    warming_clusters_.clear();
319
10717
    updateClusterCounts();
320
10717
  }
321

            
322
127
  bool isShutdown() override { return shutdown_; }
323

            
324
17519
  const absl::optional<envoy::config::core::v3::BindConfig>& bindConfig() const override {
325
17519
    return bind_config_;
326
17519
  }
327

            
328
12742
  Config::GrpcMuxSharedPtr adsMux() override { return xds_manager_.adsMux(); }
329
3138
  Grpc::AsyncClientManager& grpcAsyncClientManager() override { return *async_client_manager_; }
330

            
331
17531
  const absl::optional<std::string>& localClusterName() const override {
332
17531
    return local_cluster_name_;
333
17531
  }
334

            
335
  ClusterUpdateCallbacksHandlePtr
336
  addThreadLocalClusterUpdateCallbacks(ClusterUpdateCallbacks&) override;
337

            
338
  absl::StatusOr<OdCdsApiHandlePtr>
339
  allocateOdCdsApi(OdCdsCreationFunction creation_function,
340
                   const envoy::config::core::v3::ConfigSource& odcds_config,
341
                   OptRef<xds::core::v3::ResourceLocator> odcds_resources_locator,
342
                   ProtobufMessage::ValidationVisitor& validation_visitor) override;
343

            
344
  // TODO(adisuissa): remove this method, and switch all the callers to invoke
345
  // it directly via the XdsManger.
346
12623
  Config::SubscriptionFactory& subscriptionFactory() override {
347
12623
    return xds_manager_.subscriptionFactory();
348
12623
  }
349

            
350
  absl::Status
351
  initializeSecondaryClusters(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) override;
352

            
353
17563
  const ClusterTrafficStatNames& clusterStatNames() const override { return cluster_stat_names_; }
354
17563
  const ClusterConfigUpdateStatNames& clusterConfigUpdateStatNames() const override {
355
17563
    return cluster_config_update_stat_names_;
356
17563
  }
357
17563
  const ClusterLbStatNames& clusterLbStatNames() const override { return cluster_lb_stat_names_; }
358
17563
  const ClusterEndpointStatNames& clusterEndpointStatNames() const override {
359
17563
    return cluster_endpoint_stat_names_;
360
17563
  }
361
17563
  const ClusterLoadReportStatNames& clusterLoadReportStatNames() const override {
362
17563
    return cluster_load_report_stat_names_;
363
17563
  }
364
17563
  const ClusterCircuitBreakersStatNames& clusterCircuitBreakersStatNames() const override {
365
17563
    return cluster_circuit_breakers_stat_names_;
366
17563
  }
367
24
  const ClusterRequestResponseSizeStatNames& clusterRequestResponseSizeStatNames() const override {
368
24
    return cluster_request_response_size_stat_names_;
369
24
  }
370
  const ClusterTimeoutBudgetStatNames& clusterTimeoutBudgetStatNames() const override {
371
    return cluster_timeout_budget_stat_names_;
372
  }
373

            
374
  void drainConnections(const std::string& cluster,
375
                        DrainConnectionsHostPredicate predicate) override;
376

            
377
  void drainConnections(DrainConnectionsHostPredicate predicate,
378
                        ConnectionPool::DrainBehavior drain_behavior) override;
379

            
380
  absl::Status checkActiveStaticCluster(const std::string& cluster) override;
381

            
382
  // Upstream::MissingClusterNotifier
383
  void notifyMissingCluster(absl::string_view name) override;
384

            
385
  /*
386
   * Return shared_ptr for common_lb_config which is stored in an ObjectSharedPool
387
   *
388
   * @param common_lb_config The config field to be stored in ObjectSharedPool
389
   * @return shared_ptr to the CommonLbConfig in ObjectSharedPool
390
   */
391
  std::shared_ptr<const envoy::config::cluster::v3::Cluster::CommonLbConfig> getCommonLbConfigPtr(
392
17567
      const envoy::config::cluster::v3::Cluster::CommonLbConfig& common_lb_config) override {
393
17567
    return common_lb_config_pool_->getObject(common_lb_config);
394
17567
  }
395

            
396
  Config::EdsResourcesCacheOptRef edsResourcesCache() override;
397

            
398
  void
399
  createNetworkObserverRegistries(Quic::EnvoyQuicNetworkObserverRegistryFactory& factory) override;
400

            
401
protected:
402
  // ClusterManagerImpl's constructor should not be invoked directly; create instances from the
403
  // clusterManagerFromProto() static method. The init() method must be called after construction.
404
  ClusterManagerImpl(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
405
                     ClusterManagerFactory& factory,
406
                     Server::Configuration::ServerFactoryContext& context,
407
                     absl::Status& creation_status);
408

            
409
  virtual void postThreadLocalRemoveHosts(const Cluster& cluster, const HostVector& hosts_removed);
410

            
411
  // Parameters for calling postThreadLocalClusterUpdate()
412
  struct ThreadLocalClusterUpdateParams {
413
    struct PerPriority {
414
      PerPriority(uint32_t priority, const HostVector& hosts_added, const HostVector& hosts_removed)
415
17394
          : hosts_added_(hosts_added), hosts_removed_(hosts_removed), priority_(priority) {}
416
      // TODO(kbaichoo): make the hosts_added_ vector const and have the
417
      // cluster initialization object have a stripped down version of this
418
      // struct.
419
      HostVector hosts_added_;
420
      const HostVector hosts_removed_;
421
      PrioritySet::UpdateHostsParams update_hosts_params_;
422
      LocalityWeightsConstSharedPtr locality_weights_;
423
      // Keep small members (bools and enums) at the end of class, to reduce alignment overhead.
424
      const uint32_t priority_;
425
      bool weighted_priority_health_;
426
      uint32_t overprovisioning_factor_;
427
    };
428

            
429
17368
    ThreadLocalClusterUpdateParams() = default;
430
    ThreadLocalClusterUpdateParams(uint32_t priority, const HostVector& hosts_added,
431
                                   const HostVector& hosts_removed)
432
443
        : per_priority_update_params_{{priority, hosts_added, hosts_removed}} {}
433

            
434
    std::vector<PerPriority> per_priority_update_params_;
435
  };
436

            
437
  /**
438
   * A cluster initialization object (CIO) encapsulates the relevant information
439
   * to create a cluster inline when there is traffic to it. We can thus use the
440
   * CIO to deferred instantiating clusters on workers until they are used.
441
   */
442
  struct ClusterInitializationObject {
443
    ClusterInitializationObject(const ThreadLocalClusterUpdateParams& params,
444
                                ClusterInfoConstSharedPtr cluster_info,
445
                                LoadBalancerFactorySharedPtr load_balancer_factory,
446
                                HostMapConstSharedPtr map, UnitFloat drop_overload,
447
                                absl::string_view drop_category);
448

            
449
    ClusterInitializationObject(
450
        const absl::flat_hash_map<int, ThreadLocalClusterUpdateParams::PerPriority>&
451
            per_priority_state,
452
        const ThreadLocalClusterUpdateParams& update_params, ClusterInfoConstSharedPtr cluster_info,
453
        LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
454
        UnitFloat drop_overload, absl::string_view drop_category);
455

            
456
    absl::flat_hash_map<int, ThreadLocalClusterUpdateParams::PerPriority> per_priority_state_;
457
    const ClusterInfoConstSharedPtr cluster_info_;
458
    const LoadBalancerFactorySharedPtr load_balancer_factory_;
459
    const HostMapConstSharedPtr cross_priority_host_map_;
460
    UnitFloat drop_overload_{0};
461
    const std::string drop_category_;
462
  };
463

            
464
  using ClusterInitializationObjectConstSharedPtr =
465
      std::shared_ptr<const ClusterInitializationObject>;
466
  using ClusterInitializationMap =
467
      absl::flat_hash_map<std::string, ClusterInitializationObjectConstSharedPtr>;
468

            
469
  /**
470
   * An implementation of an on-demand CDS handle. It forwards the discovery request to the cluster
471
   * manager that created the handle.
472
   *
473
   * It's a protected type, so unit tests can use it.
474
   */
475
  class OdCdsApiHandleImpl : public OdCdsApiHandle {
476
  public:
477
189
    static OdCdsApiHandlePtr create(ClusterManagerImpl& parent, uint64_t config_source_key) {
478
189
      return std::make_unique<OdCdsApiHandleImpl>(parent, config_source_key);
479
189
    }
480

            
481
    OdCdsApiHandleImpl(ClusterManagerImpl& parent, uint64_t config_source_key)
482
189
        : parent_(parent), config_source_key_(config_source_key) {}
483

            
484
    ClusterDiscoveryCallbackHandlePtr
485
    requestOnDemandClusterDiscovery(absl::string_view name, ClusterDiscoveryCallbackPtr callback,
486
268
                                    std::chrono::milliseconds timeout) override {
487
268
      return parent_.requestOnDemandClusterDiscovery(config_source_key_, std::string(name),
488
268
                                                     std::move(callback), timeout);
489
268
    }
490

            
491
  private:
492
    ClusterManagerImpl& parent_;
493
    uint64_t config_source_key_;
494
  };
495

            
496
  virtual void postThreadLocalClusterUpdate(ClusterManagerCluster& cm_cluster,
497
                                            ThreadLocalClusterUpdateParams&& params);
498

            
499
  /**
500
   * Notifies cluster discovery managers in each worker thread that the discovery process for the
501
   * cluster with a passed name has timed out.
502
   *
503
   * It's protected, so the tests can use it.
504
   */
505
  void notifyExpiredDiscovery(absl::string_view name);
506

            
507
  /**
508
   * Creates a new discovery manager in current thread and swaps it with the one in thread local
509
   * cluster manager. This could be used to simulate requesting a cluster from a different
510
   * thread. Used for tests only.
511
   *
512
   * Protected, so tests can use it.
513
   *
514
   * @return the previous cluster discovery manager.
515
   */
516
  ClusterDiscoveryManager createAndSwapClusterDiscoveryManager(std::string thread_name);
517

            
518
private:
519
  // To enable access to the protected constructor.
520
  friend ProdClusterManagerFactory;
521

            
522
  /**
523
   * Thread local cached cluster data. Each thread local cluster gets updates from the parent
524
   * central dynamic cluster (if applicable). It maintains load balancer state and any created
525
   * connection pools.
526
   */
527
  struct ThreadLocalClusterManagerImpl : public ThreadLocal::ThreadLocalObject,
528
                                         public ClusterLifecycleCallbackHandler {
529
    struct ConnPoolsContainer {
530
      ConnPoolsContainer(Event::Dispatcher& dispatcher, const HostConstSharedPtr& host)
531
11735
          : host_handle_(host->acquireHandle()),
532
11735
            pools_{std::make_shared<ConnPools>(dispatcher, host)} {}
533

            
534
      using ConnPools = PriorityConnPoolMap<std::vector<uint8_t>, Http::ConnectionPool::Instance>;
535

            
536
      // Destroyed after pools.
537
      const HostHandlePtr host_handle_;
538
      // This is a shared_ptr so we can keep it alive while cleaning up.
539
      std::shared_ptr<ConnPools> pools_;
540

            
541
      // Protect from deletion while iterating through pools_. See comments and usage
542
      // in `ClusterManagerImpl::ThreadLocalClusterManagerImpl::drainConnPools()`.
543
      bool do_not_delete_{false};
544
    };
545

            
546
    struct TcpConnPoolsContainer {
547
809
      TcpConnPoolsContainer(HostHandlePtr&& host_handle) : host_handle_(std::move(host_handle)) {}
548

            
549
      using ConnPools = std::map<std::vector<uint8_t>, Tcp::ConnectionPool::InstancePtr>;
550

            
551
      // Destroyed after pools.
552
      const HostHandlePtr host_handle_;
553
      ConnPools pools_;
554
    };
555

            
556
    // Holds an unowned reference to a connection, and watches for Closed events. If the connection
557
    // is closed, this container removes itself from the container that owns it.
558
    struct TcpConnContainer : public Network::ConnectionCallbacks, public Event::DeferredDeletable {
559
    public:
560
      TcpConnContainer(ThreadLocalClusterManagerImpl& parent, const HostConstSharedPtr& host,
561
                       Network::ClientConnection& connection)
562
8
          : parent_(parent), host_(host), connection_(connection) {
563
8
        connection_.addConnectionCallbacks(*this);
564
8
      }
565

            
566
      // Network::ConnectionCallbacks
567
8
      void onEvent(Network::ConnectionEvent event) override {
568
8
        if (event == Network::ConnectionEvent::LocalClose ||
569
8
            event == Network::ConnectionEvent::RemoteClose) {
570
8
          parent_.removeTcpConn(host_, connection_);
571
8
        }
572
8
      }
573
      void onAboveWriteBufferHighWatermark() override {}
574
      void onBelowWriteBufferLowWatermark() override {}
575

            
576
      ThreadLocalClusterManagerImpl& parent_;
577
      HostConstSharedPtr host_;
578
      Network::ClientConnection& connection_;
579
    };
580
    struct TcpConnectionsMap {
581
6
      TcpConnectionsMap(HostHandlePtr&& host_handle) : host_handle_(std::move(host_handle)) {}
582

            
583
      // Destroyed after pools.
584
      const HostHandlePtr host_handle_;
585
      absl::node_hash_map<Network::ClientConnection*, std::unique_ptr<TcpConnContainer>>
586
          connections_;
587
    };
588

            
589
    class ClusterEntry : public ThreadLocalCluster {
590
    public:
591
      ClusterEntry(ThreadLocalClusterManagerImpl& parent, ClusterInfoConstSharedPtr cluster,
592
                   const LoadBalancerFactorySharedPtr& lb_factory);
593
      ~ClusterEntry() override;
594

            
595
      // Upstream::ThreadLocalCluster
596
33508
      const PrioritySet& prioritySet() override { return priority_set_; }
597
213660
      ClusterInfoConstSharedPtr info() override { return cluster_info_; }
598
18175
      LoadBalancer& loadBalancer() override { return *lb_; }
599
      HostSelectionResponse chooseHost(LoadBalancerContext* context) override;
600
      absl::optional<HttpPoolData> httpConnPool(HostConstSharedPtr host, ResourcePriority priority,
601
                                                absl::optional<Http::Protocol> downstream_protocol,
602
                                                LoadBalancerContext* context) override;
603
      absl::optional<TcpPoolData> tcpConnPool(HostConstSharedPtr host, ResourcePriority priority,
604
                                              LoadBalancerContext* context) override;
605
      absl::optional<TcpPoolData> tcpConnPool(ResourcePriority priority,
606
                                              LoadBalancerContext* context) override;
607
      Host::CreateConnectionData tcpConn(LoadBalancerContext* context) override;
608
      Http::AsyncClient& httpAsyncClient() override;
609
      Tcp::AsyncTcpClientPtr
610
      tcpAsyncClient(LoadBalancerContext* context,
611
                     Tcp::AsyncTcpClientOptionsConstSharedPtr options) override;
612

            
613
      // Updates the hosts in the priority set.
614
      void updateHosts(const std::string& name, uint32_t priority,
615
                       PrioritySet::UpdateHostsParams&& update_hosts_params,
616
                       LocalityWeightsConstSharedPtr locality_weights,
617
                       const HostVector& hosts_added, const HostVector& hosts_removed,
618
                       absl::optional<bool> weighted_priority_health,
619
                       absl::optional<uint32_t> overprovisioning_factor,
620
                       HostMapConstSharedPtr cross_priority_host_map);
621

            
622
      // Drains any connection pools associated with the removed hosts. All connections will be
623
      // closed gracefully and no new connections will be created.
624
      void drainConnPools(const HostVector& hosts_removed);
625
      // Drains any connection pools associated with the all hosts. All connections will be
626
      // closed gracefully and no new connections will be created.
627
      void drainConnPools();
628
      // Drain any connection pools associated with the hosts filtered by the predicate.
629
      void drainConnPools(DrainConnectionsHostPredicate predicate,
630
                          ConnectionPool::DrainBehavior behavior);
631
47026
      UnitFloat dropOverload() const override { return drop_overload_; }
632
      const std::string& dropCategory() const override { return drop_category_; }
633
34634
      void setDropOverload(UnitFloat drop_overload) override { drop_overload_ = drop_overload; }
634
34635
      void setDropCategory(absl::string_view drop_category) override {
635
34635
        drop_category_ = drop_category;
636
34635
      }
637

            
638
    private:
639
      Http::ConnectionPool::Instance*
640
      httpConnPoolImpl(HostConstSharedPtr host, ResourcePriority priority,
641
                       absl::optional<Http::Protocol> downstream_protocol,
642
                       LoadBalancerContext* context);
643

            
644
      Tcp::ConnectionPool::Instance* tcpConnPoolImpl(HostConstSharedPtr host,
645
                                                     ResourcePriority priority,
646
                                                     LoadBalancerContext* context);
647

            
648
      HostConstSharedPtr peekAnotherHost(LoadBalancerContext* context);
649

            
650
      ThreadLocalClusterManagerImpl& parent_;
651
      PrioritySetImpl priority_set_;
652
      UnitFloat drop_overload_{0};
653
      std::string drop_category_;
654

            
655
      // Don't change the order of cluster_info_ and lb_factory_/lb_ as the the lb_factory_/lb_
656
      // may keep a reference to the cluster_info_.
657
      ClusterInfoConstSharedPtr cluster_info_;
658

            
659
      // Factory to create active LB.
660
      LoadBalancerFactorySharedPtr lb_factory_;
661
      // Current active LB.
662
      LoadBalancerPtr lb_;
663
      Http::AsyncClientPtr lazy_http_async_client_;
664
      // Stores QUICHE specific objects which live through out the life time of the cluster and can
665
      // be shared across its hosts.
666
      Http::PersistentQuicInfoPtr quic_info_;
667

            
668
      // Expected override host statues. Every bit in the HostStatusSet represent an enum value
669
      // of envoy::config::core::v3::HealthStatus. The specific correspondence is shown below:
670
      //
671
      // * 0b000001: envoy::config::core::v3::HealthStatus::UNKNOWN
672
      // * 0b000010: envoy::config::core::v3::HealthStatus::HEALTHY
673
      // * 0b000100: envoy::config::core::v3::HealthStatus::UNHEALTHY
674
      // * 0b001000: envoy::config::core::v3::HealthStatus::DRAINING
675
      // * 0b010000: envoy::config::core::v3::HealthStatus::TIMEOUT
676
      // * 0b100000: envoy::config::core::v3::HealthStatus::DEGRADED
677
      //
678
      // If multiple bit fields are set, it is acceptable as long as the status of override host is
679
      // in any of these statuses.
680
      const HostUtility::HostStatusSet override_host_statuses_;
681
    };
682

            
683
    using ClusterEntryPtr = std::unique_ptr<ClusterEntry>;
684

            
685
    struct LocalClusterParams {
686
      LoadBalancerFactorySharedPtr load_balancer_factory_;
687
      ClusterInfoConstSharedPtr info_;
688
    };
689

            
690
    ThreadLocalClusterManagerImpl(ClusterManagerImpl& parent, Event::Dispatcher& dispatcher,
691
                                  const absl::optional<LocalClusterParams>& local_cluster_params);
692
    ~ThreadLocalClusterManagerImpl() override;
693

            
694
    // Drain or close connections of host. If no drain behavior is provided then closing will
695
    // be immediate.
696
    void drainOrCloseConnPools(const HostSharedPtr& host,
697
                               absl::optional<ConnectionPool::DrainBehavior> drain_behavior);
698

            
699
    void httpConnPoolIsIdle(HostConstSharedPtr host, ResourcePriority priority,
700
                            const std::vector<uint8_t>& hash_key);
701
    void tcpConnPoolIsIdle(HostConstSharedPtr host, const std::vector<uint8_t>& hash_key);
702
    void removeTcpConn(const HostConstSharedPtr& host, Network::ClientConnection& connection);
703
    void removeHosts(const std::string& name, const HostVector& hosts_removed);
704
    void updateClusterMembership(const std::string& name, uint32_t priority,
705
                                 PrioritySet::UpdateHostsParams update_hosts_params,
706
                                 LocalityWeightsConstSharedPtr locality_weights,
707
                                 const HostVector& hosts_added, const HostVector& hosts_removed,
708
                                 bool weighted_priority_health, uint64_t overprovisioning_factor,
709
                                 HostMapConstSharedPtr cross_priority_host_map);
710
    void onHostHealthFailure(const HostSharedPtr& host);
711

            
712
    ConnPoolsContainer* getHttpConnPoolsContainer(const HostConstSharedPtr& host,
713
                                                  bool allocate = false);
714

            
715
    // Upstream::ClusterLifecycleCallbackHandler
716
    ClusterUpdateCallbacksHandlePtr addClusterUpdateCallbacks(ClusterUpdateCallbacks& cb) override;
717

            
718
    /**
719
     * Transparently initialize the given thread local cluster if possible using
720
     * the Cluster Initialization object.
721
     *
722
     * @return The ClusterEntry of the newly initialized cluster or nullptr if there
723
     * is no cluster deferred cluster with that name.
724
     */
725
    ClusterEntry* initializeClusterInlineIfExists(absl::string_view cluster);
726

            
727
11757
    OptRef<Quic::EnvoyQuicNetworkObserverRegistry> getNetworkObserverRegistry() {
728
11757
      return makeOptRefFromPtr(network_observer_registry_.get());
729
11757
    }
730

            
731
#ifdef ENVOY_ENABLE_QUIC
732
    void createThreadLocalNetworkObserverRegistry(
733
1
        Quic::EnvoyQuicNetworkObserverRegistryFactory& factory) {
734
1
      network_observer_registry_ =
735
1
          factory.createQuicNetworkObserverRegistry(thread_local_dispatcher_);
736
1
    }
737
#endif
738

            
739
    ClusterManagerImpl& parent_;
740
    Event::Dispatcher& thread_local_dispatcher_;
741
    // Known clusters will exclusively exist in either `thread_local_clusters_`
742
    // or `thread_local_deferred_clusters_`.
743
    absl::flat_hash_map<std::string, ClusterEntryPtr> thread_local_clusters_;
744
    // Maps from a given cluster name to the CIO for that cluster.
745
    ClusterInitializationMap thread_local_deferred_clusters_;
746

            
747
    ClusterConnectivityState cluster_manager_state_;
748

            
749
    // These maps are owned by the ThreadLocalClusterManagerImpl instead of the ClusterEntry
750
    // to prevent lifetime/ownership issues when a cluster is dynamically removed.
751
    absl::node_hash_map<HostConstSharedPtr, ConnPoolsContainer> host_http_conn_pool_map_;
752
    absl::node_hash_map<HostConstSharedPtr, TcpConnPoolsContainer> host_tcp_conn_pool_map_;
753
    absl::node_hash_map<HostConstSharedPtr, TcpConnectionsMap> host_tcp_conn_map_;
754

            
755
    std::list<Envoy::Upstream::ClusterUpdateCallbacks*> update_callbacks_;
756
    const PrioritySet* local_priority_set_{};
757
    bool destroying_{};
758
    ClusterDiscoveryManager cdm_;
759
    ThreadLocalClusterManagerStats local_stats_;
760

            
761
  private:
762
    static ThreadLocalClusterManagerStats generateStats(Stats::Scope& scope,
763
                                                        const std::string& thread_name);
764

            
765
    Quic::EnvoyQuicNetworkObserverRegistryPtr network_observer_registry_;
766
  };
767

            
768
  struct ClusterData : public ClusterManagerCluster {
769
    ClusterData(const envoy::config::cluster::v3::Cluster& cluster_config,
770
                const uint64_t cluster_config_hash, const std::string& version_info,
771
                bool added_via_api, bool required_for_ads, ClusterSharedPtr&& cluster,
772
                TimeSource& time_source, const bool avoid_cds_removal = false)
773
17578
        : cluster_config_(cluster_config), config_hash_(cluster_config_hash),
774
17578
          version_info_(version_info), cluster_(std::move(cluster)),
775
17578
          last_updated_(time_source.systemTime()), added_via_api_(added_via_api),
776
17578
          avoid_cds_removal_(avoid_cds_removal), added_or_updated_{},
777
17578
          required_for_ads_(required_for_ads) {}
778

            
779
233
    bool blockUpdate(uint64_t hash) { return !added_via_api_ || config_hash_ == hash; }
780

            
781
    // ClusterManagerCluster
782
225297
    Cluster& cluster() override { return *cluster_; }
783
17355
    LoadBalancerFactorySharedPtr loadBalancerFactory() override {
784
17355
      if (thread_aware_lb_ != nullptr) {
785
17355
        return thread_aware_lb_->factory();
786
17355
      } else {
787
        return nullptr;
788
      }
789
17355
    }
790
17783
    bool addedOrUpdated() override { return added_or_updated_; }
791
17355
    void setAddedOrUpdated() override {
792
17355
      ASSERT(!added_or_updated_);
793
17355
      added_or_updated_ = true;
794
17355
    }
795
17783
    bool requiredForAds() const override { return required_for_ads_; }
796

            
797
    const envoy::config::cluster::v3::Cluster cluster_config_;
798
    const uint64_t config_hash_;
799
    const std::string version_info_;
800
    // Don't change the order of cluster_ and thread_aware_lb_ as the thread_aware_lb_ may
801
    // keep a reference to the cluster_.
802
    ClusterSharedPtr cluster_;
803
    // Optional thread aware LB depending on the LB type. Not all clusters have one.
804
    ThreadAwareLoadBalancerPtr thread_aware_lb_;
805
    SystemTime last_updated_;
806
    Common::CallbackHandlePtr member_update_cb_;
807
    Common::CallbackHandlePtr priority_update_cb_;
808
    // Keep smaller fields near the end to reduce padding
809
    const bool added_via_api_ : 1;
810
    const bool avoid_cds_removal_ : 1;
811
    bool added_or_updated_ : 1;
812
    const bool required_for_ads_ : 1;
813
  };
814

            
815
  struct ClusterUpdateCallbacksHandleImpl : public ClusterUpdateCallbacksHandle,
816
                                            RaiiListElement<ClusterUpdateCallbacks*> {
817
    ClusterUpdateCallbacksHandleImpl(ClusterUpdateCallbacks& cb,
818
                                     std::list<ClusterUpdateCallbacks*>& parent)
819
22052
        : RaiiListElement<ClusterUpdateCallbacks*>(parent, &cb) {}
820
  };
821

            
822
  using ClusterDataPtr = std::unique_ptr<ClusterData>;
823
  // This map is ordered so that config dumping is consistent.
824
  using ClusterMap = std::map<std::string, ClusterDataPtr>;
825

            
826
  struct PendingUpdates {
827
286
    ~PendingUpdates() { disableTimer(); }
828
44
    void enableTimer(const uint64_t timeout) {
829
44
      if (timer_ != nullptr) {
830
44
        ASSERT(!timer_->enabled());
831
44
        timer_->enableTimer(std::chrono::milliseconds(timeout));
832
44
      }
833
44
    }
834
679
    bool disableTimer() {
835
679
      if (timer_ == nullptr) {
836
627
        return false;
837
627
      }
838

            
839
52
      const bool was_enabled = timer_->enabled();
840
52
      timer_->disableTimer();
841
52
      return was_enabled;
842
679
    }
843

            
844
    Event::TimerPtr timer_;
845
    // This is default constructed to the clock's epoch:
846
    // https://en.cppreference.com/w/cpp/chrono/time_point/time_point
847
    //
848
    // Depending on your execution environment this value can be different.
849
    // When running as host process: This will usually be the computer's boot time, which means that
850
    // given a not very large `Cluster.CommonLbConfig.update_merge_window`, the first update will
851
    // trigger immediately (the expected behavior). When running in some sandboxed environment this
852
    // value can be set to the start time of the sandbox, which means that the delta calculated
853
    // between now and the start time may fall within the
854
    // `Cluster.CommonLbConfig.update_merge_window`, with the side effect to delay the first update.
855
    MonotonicTime last_updated_;
856
  };
857

            
858
  using PendingUpdatesPtr = std::unique_ptr<PendingUpdates>;
859
  using PendingUpdatesByPriorityMap = absl::node_hash_map<uint32_t, PendingUpdatesPtr>;
860
  using PendingUpdatesByPriorityMapPtr = std::unique_ptr<PendingUpdatesByPriorityMap>;
861
  using ClusterUpdatesMap = absl::node_hash_map<std::string, PendingUpdatesByPriorityMapPtr>;
862

            
863
  /**
864
   * Holds a reference to an on-demand CDS to keep it alive for the duration of a cluster discovery,
865
   * and an expiration timer notifying worker threads about discovery timing out.
866
   */
867
  struct ClusterCreation {
868
    OdCdsApiSharedPtr odcds_;
869
    Event::TimerPtr expiration_timer_;
870
  };
871

            
872
  using ClusterCreationsMap = absl::flat_hash_map<std::string, ClusterCreation>;
873

            
874
  void applyUpdates(ClusterManagerCluster& cluster, uint32_t priority, PendingUpdates& updates);
875
  bool scheduleUpdate(ClusterManagerCluster& cluster, uint32_t priority, bool mergeable,
876
                      const uint64_t timeout);
877
  ProtobufTypes::MessagePtr dumpClusterConfigs(const Matchers::StringMatcher& name_matcher);
878
  static ClusterManagerStats generateStats(Stats::Scope& scope);
879

            
880
  /**
881
   * @return ClusterDataPtr contains the previous cluster in the cluster_map, or
882
   * nullptr if cluster_map did not contain the same cluster or an error if
883
   * cluster load fails.
884
   */
885
  absl::StatusOr<ClusterDataPtr> loadCluster(const envoy::config::cluster::v3::Cluster& cluster,
886
                                             const uint64_t cluster_hash,
887
                                             const std::string& version_info, bool added_via_api,
888
                                             bool required_for_ads, ClusterMap& cluster_map,
889
                                             bool avoid_cds_removal = false);
890
  absl::Status onClusterInit(ClusterManagerCluster& cluster);
891
  void postThreadLocalHealthFailure(const HostSharedPtr& host);
892
  void updateClusterCounts();
893
  void clusterWarmingToActive(const std::string& cluster_name);
894
  static void maybePreconnect(ThreadLocalClusterManagerImpl::ClusterEntry& cluster_entry,
895
                              const ClusterConnectivityState& cluster_manager_state,
896
                              std::function<ConnectionPool::Instance*()> preconnect_pool);
897

            
898
  ClusterDiscoveryCallbackHandlePtr
899
  requestOnDemandClusterDiscovery(uint64_t subscription_key, std::string name,
900
                                  ClusterDiscoveryCallbackPtr callback,
901
                                  std::chrono::milliseconds timeout);
902

            
903
  void notifyClusterDiscoveryStatus(absl::string_view name, ClusterDiscoveryStatus status);
904

            
905
protected:
906
  ClusterInitializationMap cluster_initialization_map_;
907
  // OdCDS subscriptions keyed by config source hash. Subscriptions persist for the lifetime
908
  // of ClusterManagerImpl to avoid complexity around cleanup. A future optimization could
909
  // add proper lifetime management to close unnecessary connections.
910
  absl::flat_hash_map<uint64_t, OdCdsApiSharedPtr> odcds_subscriptions_;
911

            
912
private:
913
  /**
914
   * Builds the cluster initialization object for this given cluster.
915
   * @return a ClusterInitializationObjectSharedPtr that can be used to create
916
   * this cluster or nullptr if deferred cluster creation is off or the cluster
917
   * type is not supported.
918
   */
919
  ClusterInitializationObjectConstSharedPtr addOrUpdateClusterInitializationObjectIfSupported(
920
      const ThreadLocalClusterUpdateParams& params, ClusterInfoConstSharedPtr cluster_info,
921
      LoadBalancerFactorySharedPtr load_balancer_factory, HostMapConstSharedPtr map,
922
      UnitFloat drop_overload, absl::string_view drop_category);
923

            
924
  bool deferralIsSupportedForCluster(const ClusterInfoConstSharedPtr& info) const;
925

            
926
  Server::Configuration::ServerFactoryContext& context_;
927
  ClusterManagerFactory& factory_;
928
  Runtime::Loader& runtime_;
929
  Stats::Store& stats_;
930
  ThreadLocal::TypedSlot<ThreadLocalClusterManagerImpl> tls_;
931
  // Contains information about ongoing on-demand cluster discoveries.
932
  ClusterCreationsMap pending_cluster_creations_;
933
  Config::XdsManager& xds_manager_;
934
  Random::RandomGenerator& random_;
935
  const bool deferred_cluster_creation_;
936
  absl::optional<envoy::config::core::v3::BindConfig> bind_config_;
937
  Outlier::EventLoggerSharedPtr outlier_event_logger_;
938
  const LocalInfo::LocalInfo& local_info_;
939
  CdsApiPtr cds_api_;
940
  ClusterManagerStats cm_stats_;
941
  ClusterManagerInitHelper init_helper_;
942
  // Temporarily saved resume cds callback from updateClusterCounts invocation.
943
  Config::ScopedResume resume_cds_;
944
  LoadStatsReporterPtr load_stats_reporter_;
945
  // The name of the local cluster of this Envoy instance if defined.
946
  absl::optional<std::string> local_cluster_name_;
947
  Grpc::AsyncClientManagerPtr async_client_manager_;
948
  Server::ConfigTracker::EntryOwnerPtr config_tracker_entry_;
949
  TimeSource& time_source_;
950
  ClusterUpdatesMap updates_map_;
951
  Event::Dispatcher& dispatcher_;
952
  Http::Context& http_context_;
953
  Router::Context& router_context_;
954
  ClusterTrafficStatNames cluster_stat_names_;
955
  ClusterConfigUpdateStatNames cluster_config_update_stat_names_;
956
  ClusterLbStatNames cluster_lb_stat_names_;
957
  ClusterEndpointStatNames cluster_endpoint_stat_names_;
958
  ClusterLoadReportStatNames cluster_load_report_stat_names_;
959
  ClusterCircuitBreakersStatNames cluster_circuit_breakers_stat_names_;
960
  ClusterRequestResponseSizeStatNames cluster_request_response_size_stat_names_;
961
  ClusterTimeoutBudgetStatNames cluster_timeout_budget_stat_names_;
962
  std::shared_ptr<SharedPool::ObjectSharedPool<
963
      const envoy::config::cluster::v3::Cluster::CommonLbConfig, MessageUtil, MessageUtil>>
964
      common_lb_config_pool_;
965

            
966
  ClusterSet primary_clusters_;
967

            
968
  bool initialized_{};
969
  bool ads_mux_initialized_{};
970
  std::atomic<bool> shutdown_;
971

            
972
  // Keep all the ClusterMaps at the end, so that they get destroyed first.
973
  // Clusters may keep references to the cluster manager and in destructor can call
974
  // cluster manager methods.
975
  //
976
  // This might make MSAN unhappy because it thinks that we are accessing uninitialized
977
  // memory when those methods access fields of the cluster manager class.
978
  ClusterMap warming_clusters_;
979

            
980
protected:
981
  ClusterMap active_clusters_;
982
};
983

            
984
} // namespace Upstream
985
} // namespace Envoy