1
#pragma once
2

            
3
#include <array>
4
#include <atomic>
5
#include <chrono>
6
#include <cstdint>
7
#include <functional>
8
#include <list>
9
#include <memory>
10
#include <string>
11
#include <utility>
12
#include <variant>
13
#include <vector>
14

            
15
#include "envoy/common/callback.h"
16
#include "envoy/common/optref.h"
17
#include "envoy/common/time.h"
18
#include "envoy/config/cluster/v3/cluster.pb.h"
19
#include "envoy/config/core/v3/address.pb.h"
20
#include "envoy/config/core/v3/base.pb.h"
21
#include "envoy/config/core/v3/health_check.pb.h"
22
#include "envoy/config/core/v3/protocol.pb.h"
23
#include "envoy/config/endpoint/v3/endpoint_components.pb.h"
24
#include "envoy/config/typed_metadata.h"
25
#include "envoy/event/timer.h"
26
#include "envoy/local_info/local_info.h"
27
#include "envoy/network/dns.h"
28
#include "envoy/network/filter.h"
29
#include "envoy/runtime/runtime.h"
30
#include "envoy/secret/secret_manager.h"
31
#include "envoy/server/filter_config.h"
32
#include "envoy/server/transport_socket_config.h"
33
#include "envoy/ssl/context_manager.h"
34
#include "envoy/stats/scope.h"
35
#include "envoy/thread_local/thread_local.h"
36
#include "envoy/upstream/cluster_factory.h"
37
#include "envoy/upstream/cluster_manager.h"
38
#include "envoy/upstream/health_checker.h"
39
#include "envoy/upstream/load_balancer.h"
40
#include "envoy/upstream/locality.h"
41
#include "envoy/upstream/outlier_detection.h"
42
#include "envoy/upstream/upstream.h"
43

            
44
#include "source/common/common/callback_impl.h"
45
#include "source/common/common/empty_string.h"
46
#include "source/common/common/enum_to_int.h"
47
#include "source/common/common/logger.h"
48
#include "source/common/common/packed_struct.h"
49
#include "source/common/common/thread.h"
50
#include "source/common/config/metadata.h"
51
#include "source/common/config/well_known_names.h"
52
#include "source/common/http/filter_chain_helper.h"
53
#include "source/common/http/http1/codec_stats.h"
54
#include "source/common/http/http2/codec_stats.h"
55
#include "source/common/http/http3/codec_stats.h"
56
#include "source/common/init/manager_impl.h"
57
#include "source/common/network/utility.h"
58
#include "source/common/orca/orca_load_metrics.h"
59
#include "source/common/shared_pool/shared_pool.h"
60
#include "source/common/stats/isolated_store_impl.h"
61
#include "source/common/upstream/load_balancer_context_base.h"
62
#include "source/common/upstream/locality_pool.h"
63
#include "source/common/upstream/resource_manager_impl.h"
64
#include "source/common/upstream/transport_socket_match_impl.h"
65
#include "source/common/upstream/upstream_factory_context_impl.h"
66
#include "source/extensions/upstreams/http/config.h"
67
#include "source/extensions/upstreams/tcp/config.h"
68
#include "source/server/transport_socket_config_impl.h"
69

            
70
#include "absl/container/node_hash_set.h"
71
#include "absl/synchronization/mutex.h"
72

            
73
namespace Envoy {
74
namespace Upstream {
75

            
76
// Priority levels and localities are considered overprovisioned with this factor.
77
static constexpr uint32_t kDefaultOverProvisioningFactor = 140;
78

            
79
using ClusterProto = envoy::config::cluster::v3::Cluster;
80

            
81
using UpstreamNetworkFilterConfigProviderManager =
82
    Filter::FilterConfigProviderManager<Network::FilterFactoryCb,
83
                                        Server::Configuration::UpstreamFactoryContext>;
84

            
85
class LegacyLbPolicyConfigHelper {
86
public:
87
  struct Result {
88
    TypedLoadBalancerFactory* factory;
89
    LoadBalancerConfigPtr config;
90
  };
91

            
92
  static absl::StatusOr<Result> getTypedLbConfigFromLegacyProtoWithoutSubset(
93
      Server::Configuration::ServerFactoryContext& factory_context, const ClusterProto& cluster);
94

            
95
  static absl::StatusOr<Result>
96
  getTypedLbConfigFromLegacyProto(Server::Configuration::ServerFactoryContext& factory_context,
97
                                  const ClusterProto& cluster);
98
};
99

            
100
/**
101
 * Null implementation of HealthCheckHostMonitor.
102
 */
103
class HealthCheckHostMonitorNullImpl : public HealthCheckHostMonitor {
104
public:
105
  // Upstream::HealthCheckHostMonitor
106
13
  void setUnhealthy(UnhealthyType) override {}
107
};
108

            
109
/**
110
 * Implementation of LoadMetricStats.
111
 */
112
class LoadMetricStatsImpl : public LoadMetricStats {
113
public:
114
  void add(const absl::string_view key, double value) override;
115
  StatMapPtr latch() override;
116

            
117
private:
118
  absl::Mutex mu_;
119
  StatMapPtr map_ ABSL_GUARDED_BY(mu_);
120
};
121

            
122
/**
123
 * Null host monitor implementation.
124
 */
125
class DetectorHostMonitorNullImpl : public Outlier::DetectorHostMonitor {
126
public:
127
  // Upstream::Outlier::DetectorHostMonitor
128
1
  uint32_t numEjections() override { return 0; }
129
91467
  void putResult(Outlier::Result, absl::optional<uint64_t>) override {}
130
35738
  void putResponseTime(std::chrono::milliseconds) override {}
131
1
  const absl::optional<MonotonicTime>& lastEjectionTime() override { return time_; }
132
1
  const absl::optional<MonotonicTime>& lastUnejectionTime() override { return time_; }
133
12
  double successRate(SuccessRateMonitorType) const override { return -1; }
134

            
135
private:
136
  const absl::optional<MonotonicTime> time_{};
137
};
138

            
139
/**
140
 * Base implementation of most of Upstream::HostDescription, shared between
141
 * HostDescriptionImpl and LogicalHost, which is in
142
 * source/extensions/clusters/common/logical_host.h. These differ in threading.
143
 *
144
 * HostDescriptionImpl and HostImpl are intended to be initialized in the main
145
 * thread, and are thereafter read-only, and thus do not require locking.
146
 *
147
 * LogicalHostImpl is intended to be dynamically changed due to DNS resolution
148
 * and Happy Eyeballs from multiple threads, and thus requires an address_lock
149
 * and lock annotations to enforce this.
150
 *
151
 * The two level implementation inheritance allows most of the implementation
152
 * to be shared, but sinks the ones requiring different lock semantics into
153
 * the leaf subclasses.
154
 */
155
class HostDescriptionImplBase : virtual public HostDescription,
156
                                protected Logger::Loggable<Logger::Id::upstream> {
157
public:
158
139359
  Network::UpstreamTransportSocketFactory& transportSocketFactory() const override {
159
139359
    absl::ReaderMutexLock lock(&metadata_mutex_);
160
139359
    return socket_factory_;
161
139359
  }
162

            
163
80024
  bool canary() const override { return canary_; }
164
1
  void canary(bool is_canary) override { canary_ = is_canary; }
165

            
166
  // Metadata getter/setter.
167
  //
168
  // It's possible that the lock that guards the metadata will become highly contended (e.g.:
169
  // endpoints churning during a deploy of a large cluster). A possible improvement
170
  // would be to use TLS and post metadata updates from the main thread. This model would
171
  // possibly benefit other related and expensive computations too (e.g.: updating subsets).
172
40300
  MetadataConstSharedPtr metadata() const override {
173
40300
    absl::ReaderMutexLock lock(&metadata_mutex_);
174
40300
    return endpoint_metadata_;
175
40300
  }
176
984
  std::size_t metadataHash() const override {
177
984
    absl::ReaderMutexLock lock(&metadata_mutex_);
178
984
    return endpoint_metadata_hash_;
179
984
  }
180
54
  void metadata(MetadataConstSharedPtr new_metadata) override {
181
54
    auto& new_socket_factory = resolveTransportSocketFactory(address(), new_metadata.get());
182
54
    {
183
54
      absl::WriterMutexLock lock(&metadata_mutex_);
184
54
      endpoint_metadata_ = new_metadata;
185
54
      endpoint_metadata_hash_ = new_metadata ? MessageUtil::hash(*new_metadata) : 0;
186
      // Update data members dependent on metadata.
187
54
      socket_factory_ = new_socket_factory;
188
54
    }
189
54
  }
190

            
191
1687859
  const ClusterInfo& cluster() const override { return *cluster_; }
192
17
  HealthCheckHostMonitor& healthChecker() const override {
193
17
    if (health_checker_) {
194
4
      return *health_checker_;
195
4
    }
196

            
197
13
    static HealthCheckHostMonitorNullImpl* null_health_checker =
198
13
        new HealthCheckHostMonitorNullImpl();
199
13
    return *null_health_checker;
200
17
  }
201

            
202
31041
  bool canCreateConnection(Upstream::ResourcePriority priority) const override {
203
31041
    if (stats().cx_active_.value() >= cluster().resourceManager(priority).maxConnectionsPerHost()) {
204
4
      return false;
205
4
    }
206
31037
    return cluster().resourceManager(priority).connections().canCreate();
207
31041
  }
208

            
209
134555
  Outlier::DetectorHostMonitor& outlierDetector() const override {
210
134555
    if (outlier_detector_) {
211
7338
      return *outlier_detector_;
212
7338
    }
213

            
214
127217
    static DetectorHostMonitorNullImpl* null_outlier_detector = new DetectorHostMonitorNullImpl();
215
127217
    return *null_outlier_detector;
216
134555
  }
217
18714083
  HostStats& stats() const override { return stats_; }
218
112
  LoadMetricStats& loadMetricStats() const override { return load_metric_stats_; }
219
355
  const std::string& hostnameForHealthChecks() const override { return health_checks_hostname_; }
220
272
  const std::string& hostname() const override { return hostname_; }
221
20527
  const envoy::config::core::v3::Locality& locality() const override { return *locality_; }
222
30606
  const MetadataConstSharedPtr localityMetadata() const override { return locality_metadata_; }
223
77034
  Stats::StatName localityZoneStatName() const override {
224
77034
    return locality_zone_stat_name_.statName();
225
77034
  }
226
1338
  uint32_t priority() const override { return priority_; }
227
54
  void priority(uint32_t priority) override { priority_ = priority; }
228
  Network::UpstreamTransportSocketFactory& resolveTransportSocketFactory(
229
      const Network::Address::InstanceConstSharedPtr& dest_address,
230
      const envoy::config::core::v3::Metadata* metadata,
231
      Network::TransportSocketOptionsConstSharedPtr transport_socket_options =
232
          nullptr) const override;
233
2198
  absl::optional<MonotonicTime> lastHcPassTime() const override { return last_hc_pass_time_; }
234

            
235
352
  void setHealthChecker(HealthCheckHostMonitorPtr&& health_checker) override {
236
352
    health_checker_ = std::move(health_checker);
237
352
  }
238

            
239
114
  void setOutlierDetector(Outlier::DetectorHostMonitorPtr&& outlier_detector) override {
240
114
    outlier_detector_ = std::move(outlier_detector);
241
114
  }
242

            
243
162
  void setLastHcPassTime(MonotonicTime last_hc_pass_time) override {
244
162
    last_hc_pass_time_.emplace(std::move(last_hc_pass_time));
245
162
  }
246

            
247
77
  void setLbPolicyData(HostLbPolicyDataPtr lb_policy_data) override {
248
77
    lb_policy_data_ = std::move(lb_policy_data);
249
77
  }
250
40759
  OptRef<HostLbPolicyData> lbPolicyData() const override {
251
40759
    return makeOptRefFromPtr(lb_policy_data_.get());
252
40759
  }
253

            
254
protected:
255
  HostDescriptionImplBase(
256
      ClusterInfoConstSharedPtr cluster, const std::string& hostname,
257
      Network::Address::InstanceConstSharedPtr dest_address,
258
      MetadataConstSharedPtr endpoint_metadata, MetadataConstSharedPtr locality_metadata,
259
      std::shared_ptr<const envoy::config::core::v3::Locality> locality,
260
      const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
261
      uint32_t priority, absl::Status& creation_status);
262

            
263
  /**
264
   * @return nullptr if address_list is empty, otherwise a shared_ptr copy of address_list.
265
   */
266
  static SharedConstAddressVector
267
  makeAddressListOrNull(const Network::Address::InstanceConstSharedPtr& address,
268
                        const AddressVector& address_list);
269

            
270
private:
271
  ClusterInfoConstSharedPtr cluster_;
272
  const std::string hostname_;
273
  const std::string health_checks_hostname_;
274
  std::atomic<bool> canary_;
275
  mutable absl::Mutex metadata_mutex_;
276
  MetadataConstSharedPtr endpoint_metadata_ ABSL_GUARDED_BY(metadata_mutex_);
277
  std::size_t endpoint_metadata_hash_ ABSL_GUARDED_BY(metadata_mutex_){0};
278
  const MetadataConstSharedPtr locality_metadata_;
279
  const std::shared_ptr<const envoy::config::core::v3::Locality> locality_;
280
  Stats::StatNameDynamicStorage locality_zone_stat_name_;
281
  mutable HostStats stats_;
282
  mutable LoadMetricStatsImpl load_metric_stats_;
283
  Outlier::DetectorHostMonitorPtr outlier_detector_;
284
  HealthCheckHostMonitorPtr health_checker_;
285
  std::atomic<uint32_t> priority_;
286
  std::reference_wrapper<Network::UpstreamTransportSocketFactory>
287
      socket_factory_ ABSL_GUARDED_BY(metadata_mutex_);
288
  absl::optional<MonotonicTime> last_hc_pass_time_;
289
  HostLbPolicyDataPtr lb_policy_data_;
290
};
291

            
292
/**
293
 * Final implementation of most of Upstream::HostDescription, providing const
294
 * of the address-related member variables.
295
 *
296
 * See also LogicalHostDescriptionImpl in
297
 * source/extensions/clusters/common/logical_host.h for a variant that allows
298
 * safe dynamic update to addresses.
299
 */
300
class HostDescriptionImpl : public HostDescriptionImplBase {
301
public:
302
  static absl::StatusOr<std::unique_ptr<HostDescriptionImpl>>
303
  create(ClusterInfoConstSharedPtr cluster, const std::string& hostname,
304
         Network::Address::InstanceConstSharedPtr dest_address,
305
         MetadataConstSharedPtr endpoint_metadata, MetadataConstSharedPtr locality_metadata,
306
         std::shared_ptr<const envoy::config::core::v3::Locality> locality,
307
         const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
308
         uint32_t priority, const AddressVector& address_list = {});
309

            
310
  // HostDescription
311
269830
  Network::Address::InstanceConstSharedPtr address() const override { return address_; }
312
626
  Network::Address::InstanceConstSharedPtr healthCheckAddress() const override {
313
626
    return health_check_address_;
314
626
  }
315
30140
  SharedConstAddressVector addressListOrNull() const override { return address_list_or_null_; }
316

            
317
protected:
318
  HostDescriptionImpl(
319
      absl::Status& creation_status, ClusterInfoConstSharedPtr cluster, const std::string& hostname,
320
      Network::Address::InstanceConstSharedPtr dest_address,
321
      MetadataConstSharedPtr endpoint_metadata, MetadataConstSharedPtr locality_metadata,
322
      std::shared_ptr<const envoy::config::core::v3::Locality> locality,
323
      const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
324
      uint32_t priority, const AddressVector& address_list = {});
325

            
326
private:
327
  // No locks are required in this implementation: all address-related member
328
  // variables are set at construction and never change. See
329
  // LogicalHostDescription in source/extensions/clusters/common/logical_host.h
330
  // for an alternative that supports dynamic update.
331
  const Network::Address::InstanceConstSharedPtr address_;
332
  const SharedConstAddressVector address_list_or_null_;
333
  const Network::Address::InstanceConstSharedPtr health_check_address_;
334
};
335

            
336
/**
337
 * Implementation of Upstream::Host.
338
 */
339
class HostImplBase : public Host,
340
                     protected Logger::Loggable<Logger::Id::upstream>,
341
                     public std::enable_shared_from_this<HostImplBase> {
342
public:
343
  HostImplBase(uint32_t initial_weight,
344
               const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
345
               const envoy::config::core::v3::HealthStatus health_status)
346
26831
      : disable_active_health_check_(health_check_config.disable_active_health_check()) {
347
    // This EDS flags setting is still necessary for stats, configuration dump, canonical
348
    // coarseHealth() etc.
349
26831
    HostImplBase::setEdsHealthStatus(health_status);
350
26831
    HostImplBase::weight(initial_weight);
351
26831
  }
352

            
353
867
  bool disableActiveHealthCheck() const override { return disable_active_health_check_; }
354
1
  void setDisableActiveHealthCheck(bool disable_active_health_check) override {
355
1
    disable_active_health_check_ = disable_active_health_check;
356
1
  }
357

            
358
  // Upstream::Host
359
  std::vector<std::pair<absl::string_view, Stats::PrimitiveCounterReference>>
360
6
  counters() const override {
361
6
    return stats().counters();
362
6
  }
363
  CreateConnectionData createConnection(
364
      Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
365
      Network::TransportSocketOptionsConstSharedPtr transport_socket_options) const override;
366
  CreateConnectionData createHealthCheckConnection(
367
      Event::Dispatcher& dispatcher,
368
      Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
369
      const envoy::config::core::v3::Metadata* metadata) const override;
370

            
371
  std::vector<std::pair<absl::string_view, Stats::PrimitiveGaugeReference>>
372
6
  gauges() const override {
373
6
    return stats().gauges();
374
6
  }
375
81854
  void healthFlagClear(HealthFlag flag) override { health_flags_ &= ~enumToInt(flag); }
376
529398
  bool healthFlagGet(HealthFlag flag) const override { return health_flags_ & enumToInt(flag); }
377
933
  void healthFlagSet(HealthFlag flag) final { health_flags_ |= enumToInt(flag); }
378
7
  uint32_t healthFlagsGetAll() const override { return health_flags_; }
379
7
  void healthFlagsSetAll(uint32_t bits) override { health_flags_ |= bits; }
380

            
381
40
  Host::HealthStatus healthStatus() const override {
382
    // Evaluate active health status first.
383

            
384
    // Active unhealthy.
385
40
    if (healthFlagsGet(enumToInt(HealthFlag::FAILED_ACTIVE_HC) |
386
40
                       enumToInt(HealthFlag::FAILED_OUTLIER_CHECK))) {
387
3
      return HealthStatus::UNHEALTHY;
388
3
    }
389

            
390
    // Eds unhealthy.
391
37
    if (eds_health_status_ == envoy::config::core::v3::UNHEALTHY ||
392
37
        eds_health_status_ == envoy::config::core::v3::DRAINING ||
393
37
        eds_health_status_ == envoy::config::core::v3::TIMEOUT) {
394
2
      return eds_health_status_;
395
2
    }
396

            
397
    // Active degraded.
398
35
    if (healthFlagGet(HealthFlag::DEGRADED_ACTIVE_HC)) {
399
1
      return HealthStatus::DEGRADED;
400
1
    }
401

            
402
    // Eds degraded or healthy.
403
34
    return eds_health_status_;
404
35
  }
405

            
406
140592
  Host::Health coarseHealth() const override {
407
    // If any of the unhealthy flags are set, host is unhealthy.
408
140592
    if (healthFlagsGet(enumToInt(HealthFlag::FAILED_ACTIVE_HC) |
409
140592
                       enumToInt(HealthFlag::FAILED_OUTLIER_CHECK) |
410
140592
                       enumToInt(HealthFlag::FAILED_EDS_HEALTH) |
411
140592
                       enumToInt(HealthFlag::EDS_STATUS_DRAINING))) {
412
1709
      return Host::Health::Unhealthy;
413
1709
    }
414

            
415
    // If any of the degraded flags are set, host is degraded.
416
138883
    if (healthFlagsGet(enumToInt(HealthFlag::DEGRADED_ACTIVE_HC) |
417
138883
                       enumToInt(HealthFlag::DEGRADED_OUTLIER_DETECTION) |
418
138883
                       enumToInt(HealthFlag::DEGRADED_EDS_HEALTH))) {
419
159
      return Host::Health::Degraded;
420
159
    }
421

            
422
    // The host must have no flags or be pending removal.
423
138724
    ASSERT(health_flags_ == 0 || healthFlagGet(HealthFlag::PENDING_DYNAMIC_REMOVAL));
424
138724
    return Host::Health::Healthy;
425
138883
  }
426

            
427
26866
  void setEdsHealthStatus(envoy::config::core::v3::HealthStatus eds_health_status) override {
428
26866
    eds_health_status_ = eds_health_status;
429
26866
    setEdsHealthFlag(eds_health_status);
430
26866
  }
431
1010
  Host::HealthStatus edsHealthStatus() const override {
432
1010
    return Host::HealthStatus(eds_health_status_.load());
433
1010
  }
434

            
435
1322305
  uint32_t weight() const override { return weight_; }
436
  void weight(uint32_t new_weight) override;
437
21
  bool used() const override { return handle_count_ > 0; }
438
12553
  HostHandlePtr acquireHandle() const override {
439
12553
    return std::make_unique<HostHandleImpl>(shared_from_this());
440
12553
  }
441

            
442
protected:
443
  static CreateConnectionData
444
  createConnection(Event::Dispatcher& dispatcher, const ClusterInfo& cluster,
445
                   const Network::Address::InstanceConstSharedPtr& address,
446
                   const SharedConstAddressVector& address_list,
447
                   Network::UpstreamTransportSocketFactory& socket_factory,
448
                   const Network::ConnectionSocket::OptionsSharedPtr& options,
449
                   Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
450
                   HostDescriptionConstSharedPtr host);
451
  static absl::optional<Network::Address::InstanceConstSharedPtr> maybeGetProxyRedirectAddress(
452
      const Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
453
      HostDescriptionConstSharedPtr host,
454
      const Network::UpstreamTransportSocketFactory& socket_factory);
455

            
456
private:
457
  // Helper function to check multiple health flags at once.
458
279515
  bool healthFlagsGet(uint32_t flags) const { return health_flags_ & flags; }
459

            
460
  void setEdsHealthFlag(envoy::config::core::v3::HealthStatus health_status);
461

            
462
  std::atomic<uint32_t> health_flags_{};
463
  std::atomic<uint32_t> weight_;
464
  bool disable_active_health_check_;
465
  // TODO(wbpcode): should we store the EDS health status to health_flags_ to get unified status or
466
  // flag access? May be we could refactor HealthFlag to contain all these statuses and flags in the
467
  // future.
468
  std::atomic<Host::HealthStatus> eds_health_status_{};
469

            
470
  struct HostHandleImpl : HostHandle {
471
12553
    HostHandleImpl(const std::shared_ptr<const HostImplBase>& parent) : parent_(parent) {
472
12553
      parent->handle_count_++;
473
12553
    }
474
12553
    ~HostHandleImpl() override {
475
12553
      if (const auto host = parent_.lock()) {
476
12553
        ASSERT(host->handle_count_ > 0);
477
12553
        host->handle_count_--;
478
12553
      }
479
12553
    }
480
    const std::weak_ptr<const HostImplBase> parent_;
481
  };
482
  mutable std::atomic<uint32_t> handle_count_{};
483
};
484

            
485
class HostImpl : public HostImplBase, public HostDescriptionImpl {
486
public:
487
  static absl::StatusOr<std::unique_ptr<HostImpl>>
488
  create(ClusterInfoConstSharedPtr cluster, const std::string& hostname,
489
         Network::Address::InstanceConstSharedPtr address, MetadataConstSharedPtr endpoint_metadata,
490
         MetadataConstSharedPtr locality_metadata, uint32_t initial_weight,
491
         std::shared_ptr<const envoy::config::core::v3::Locality> locality,
492
         const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
493
         uint32_t priority, const envoy::config::core::v3::HealthStatus health_status,
494
         const AddressVector& address_list = {});
495

            
496
protected:
497
  HostImpl(absl::Status& creation_status, ClusterInfoConstSharedPtr cluster,
498
           const std::string& hostname, Network::Address::InstanceConstSharedPtr address,
499
           MetadataConstSharedPtr endpoint_metadata, MetadataConstSharedPtr locality_metadata,
500
           uint32_t initial_weight,
501
           std::shared_ptr<const envoy::config::core::v3::Locality> locality,
502
           const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
503
           uint32_t priority, const envoy::config::core::v3::HealthStatus health_status,
504
           const AddressVector& address_list = {})
505
26652
      : HostImplBase(initial_weight, health_check_config, health_status),
506
26652
        HostDescriptionImpl(creation_status, cluster, hostname, address, endpoint_metadata,
507
26652
                            locality_metadata, locality, health_check_config, priority,
508
26652
                            address_list) {}
509
};
510

            
511
class HostsPerLocalityImpl : public HostsPerLocality {
512
public:
513
235815
  HostsPerLocalityImpl() : HostsPerLocalityImpl(std::vector<HostVector>(), false) {}
514

            
515
  // Single locality constructor
516
  //
517
  // Parameter requirements:
518
  // 1. All entries in hosts must have the same locality.
519
  // 2. If has_local_locality is true, then the locality of all entries in hosts
520
  //    must be equal to the current envoy's locality.
521
  HostsPerLocalityImpl(const HostVector& hosts, bool has_local_locality = false)
522
705
      : HostsPerLocalityImpl(std::vector<HostVector>({hosts}), has_local_locality) {}
523

            
524
  // Multiple localities constructor
525
  //
526
  // locality_hosts must adhere to the following ordering constraints:
527
  // 1. All hosts within a single HostVector bucket must have the same locality
528
  // 2. No hosts in different HostVector buckets can have the same locality
529
  // 3. If has_local_locality is true, then the locality of all hosts in the first HostVector bucket
530
  //    must be equal to the current envoy's locality.
531
  // 4. All non-local HostVector buckets must be sorted in ascending order by the LocalityLess
532
  // comparator
533
  HostsPerLocalityImpl(std::vector<HostVector>&& locality_hosts, bool has_local_locality)
534
254532
      : local_(has_local_locality), hosts_per_locality_(std::move(locality_hosts)) {
535
254532
    ASSERT(!has_local_locality || !hosts_per_locality_.empty());
536
254532
  }
537

            
538
1020
  bool hasLocalLocality() const override { return local_; }
539
276123
  const std::vector<HostVector>& get() const override { return hosts_per_locality_; }
540
  std::vector<HostsPerLocalityConstSharedPtr>
541
  filter(const std::vector<std::function<bool(const Host&)>>& predicate) const override;
542

            
543
  // The const shared pointer for the empty HostsPerLocalityImpl.
544
213706
  static HostsPerLocalityConstSharedPtr empty() {
545
213706
    static HostsPerLocalityConstSharedPtr empty = std::make_shared<HostsPerLocalityImpl>();
546
213706
    return empty;
547
213706
  }
548

            
549
private:
550
  // Does an entry exist for the local locality?
551
  bool local_{};
552
  // The first entry is for local hosts in the local locality.
553
  std::vector<HostVector> hosts_per_locality_;
554
};
555

            
556
/**
557
 * A class for management of the set of hosts for a given priority level.
558
 */
559
class HostSetImpl : public HostSet {
560
public:
561
  HostSetImpl(uint32_t priority, absl::optional<bool> weighted_priority_health,
562
              absl::optional<uint32_t> overprovisioning_factor)
563
53312
      : priority_(priority), overprovisioning_factor_(overprovisioning_factor.has_value()
564
53312
                                                          ? overprovisioning_factor.value()
565
53312
                                                          : kDefaultOverProvisioningFactor),
566
53312
        weighted_priority_health_(weighted_priority_health.value_or(false)),
567
53312
        hosts_(new HostVector()), healthy_hosts_(new HealthyHostVector()),
568
53312
        degraded_hosts_(new DegradedHostVector()), excluded_hosts_(new ExcludedHostVector()) {}
569

            
570
  /**
571
   * Install a callback that will be invoked when the host set membership changes.
572
   * @param callback supplies the callback to invoke.
573
   * @return Common::CallbackHandlePtr the callback handle.
574
   */
575
  ABSL_MUST_USE_RESULT Common::CallbackHandlePtr
576
53229
  addPriorityUpdateCb(PrioritySet::PriorityUpdateCb callback) const {
577
53229
    return member_update_cb_helper_.add(callback);
578
53229
  }
579

            
580
  // Upstream::HostSet
581
297963
  const HostVector& hosts() const override { return *hosts_; }
582
17546
  HostVectorConstSharedPtr hostsPtr() const override { return hosts_; }
583
821753
  const HostVector& healthyHosts() const override { return healthy_hosts_->get(); }
584
17538
  HealthyHostVectorConstSharedPtr healthyHostsPtr() const override { return healthy_hosts_; }
585
119955
  const HostVector& degradedHosts() const override { return degraded_hosts_->get(); }
586
17538
  DegradedHostVectorConstSharedPtr degradedHostsPtr() const override { return degraded_hosts_; }
587
120862
  const HostVector& excludedHosts() const override { return excluded_hosts_->get(); }
588
17538
  ExcludedHostVectorConstSharedPtr excludedHostsPtr() const override { return excluded_hosts_; }
589
3144
  const HostsPerLocality& hostsPerLocality() const override { return *hosts_per_locality_; }
590
17698
  HostsPerLocalityConstSharedPtr hostsPerLocalityPtr() const override {
591
17698
    return hosts_per_locality_;
592
17698
  }
593
137693
  const HostsPerLocality& healthyHostsPerLocality() const override {
594
137693
    return *healthy_hosts_per_locality_;
595
137693
  }
596
17538
  HostsPerLocalityConstSharedPtr healthyHostsPerLocalityPtr() const override {
597
17538
    return healthy_hosts_per_locality_;
598
17538
  }
599
132649
  const HostsPerLocality& degradedHostsPerLocality() const override {
600
132649
    return *degraded_hosts_per_locality_;
601
132649
  }
602
17538
  HostsPerLocalityConstSharedPtr degradedHostsPerLocalityPtr() const override {
603
17538
    return degraded_hosts_per_locality_;
604
17538
  }
605
192
  const HostsPerLocality& excludedHostsPerLocality() const override {
606
192
    return *excluded_hosts_per_locality_;
607
192
  }
608
17698
  HostsPerLocalityConstSharedPtr excludedHostsPerLocalityPtr() const override {
609
17698
    return excluded_hosts_per_locality_;
610
17698
  }
611
17882
  LocalityWeightsConstSharedPtr localityWeights() const override { return locality_weights_; }
612
2584431
  uint32_t priority() const override { return priority_; }
613
86592
  uint32_t overprovisioningFactor() const override { return overprovisioning_factor_; }
614
52196
  bool weightedPriorityHealth() const override { return weighted_priority_health_; }
615

            
616
  static PrioritySet::UpdateHostsParams
617
  updateHostsParams(HostVectorConstSharedPtr hosts,
618
                    HostsPerLocalityConstSharedPtr hosts_per_locality,
619
                    HealthyHostVectorConstSharedPtr healthy_hosts,
620
                    HostsPerLocalityConstSharedPtr healthy_hosts_per_locality,
621
                    DegradedHostVectorConstSharedPtr degraded_hosts,
622
                    HostsPerLocalityConstSharedPtr degraded_hosts_per_locality,
623
                    ExcludedHostVectorConstSharedPtr excluded_hosts,
624
                    HostsPerLocalityConstSharedPtr excluded_hosts_per_locality);
625
  static PrioritySet::UpdateHostsParams updateHostsParams(const HostSet& host_set);
626
  static PrioritySet::UpdateHostsParams
627
  partitionHosts(HostVectorConstSharedPtr hosts, HostsPerLocalityConstSharedPtr hosts_per_locality);
628

            
629
  void updateHosts(PrioritySet::UpdateHostsParams&& update_hosts_params,
630
                   LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
631
                   const HostVector& hosts_removed,
632
                   absl::optional<bool> weighted_priority_health = absl::nullopt,
633
                   absl::optional<uint32_t> overprovisioning_factor = absl::nullopt);
634

            
635
protected:
636
53095
  virtual void runUpdateCallbacks(const HostVector& hosts_added, const HostVector& hosts_removed) {
637
53095
    member_update_cb_helper_.runCallbacks(priority_, hosts_added, hosts_removed);
638
53095
  }
639

            
640
private:
641
  const uint32_t priority_;
642
  uint32_t overprovisioning_factor_;
643
  bool weighted_priority_health_;
644
  HostVectorConstSharedPtr hosts_;
645
  HealthyHostVectorConstSharedPtr healthy_hosts_;
646
  DegradedHostVectorConstSharedPtr degraded_hosts_;
647
  ExcludedHostVectorConstSharedPtr excluded_hosts_;
648
  HostsPerLocalityConstSharedPtr hosts_per_locality_{HostsPerLocalityImpl::empty()};
649
  HostsPerLocalityConstSharedPtr healthy_hosts_per_locality_{HostsPerLocalityImpl::empty()};
650
  HostsPerLocalityConstSharedPtr degraded_hosts_per_locality_{HostsPerLocalityImpl::empty()};
651
  HostsPerLocalityConstSharedPtr excluded_hosts_per_locality_{HostsPerLocalityImpl::empty()};
652
  // TODO(mattklein123): Remove mutable.
653
  mutable Common::CallbackManager<void, uint32_t, const HostVector&, const HostVector&>
654
      member_update_cb_helper_;
655
  // Locality weights.
656
  LocalityWeightsConstSharedPtr locality_weights_;
657
};
658

            
659
using HostSetImplPtr = std::unique_ptr<HostSetImpl>;
660

            
661
/**
662
 * A class for management of the set of hosts in a given cluster.
663
 */
664
class PrioritySetImpl : public PrioritySet {
665
public:
666
52918
  PrioritySetImpl() : batch_update_(false) {}
667
  // From PrioritySet
668
  ABSL_MUST_USE_RESULT Common::CallbackHandlePtr
669
118638
  addMemberUpdateCb(MemberUpdateCb callback) const override {
670
118638
    return member_update_cb_helper_.add(callback);
671
118638
  }
672
  ABSL_MUST_USE_RESULT Common::CallbackHandlePtr
673
135944
  addPriorityUpdateCb(PriorityUpdateCb callback) const override {
674
135944
    return priority_update_cb_helper_.add(callback);
675
135944
  }
676
2912886
  const std::vector<std::unique_ptr<HostSet>>& hostSetsPerPriority() const override {
677
2912886
    return host_sets_;
678
2912886
  }
679
  // Get the host set for this priority level, creating it if necessary.
680
  const HostSet&
681
  getOrCreateHostSet(uint32_t priority,
682
                     absl::optional<bool> weighted_priority_health = absl::nullopt,
683
                     absl::optional<uint32_t> overprovisioning_factor = absl::nullopt);
684

            
685
  void updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params,
686
                   LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
687
                   const HostVector& hosts_removed,
688
                   absl::optional<bool> weighted_priority_health = absl::nullopt,
689
                   absl::optional<uint32_t> overprovisioning_factor = absl::nullopt,
690
                   HostMapConstSharedPtr cross_priority_host_map = nullptr) override;
691

            
692
  void batchHostUpdate(BatchUpdateCb& callback) override;
693

            
694
50236
  HostMapConstSharedPtr crossPriorityHostMap() const override {
695
50236
    return const_cross_priority_host_map_;
696
50236
  }
697

            
698
protected:
699
  // Allows subclasses of PrioritySetImpl to create their own type of HostSetImpl.
700
  virtual HostSetImplPtr createHostSet(uint32_t priority,
701
                                       absl::optional<bool> weighted_priority_health,
702
52712
                                       absl::optional<uint32_t> overprovisioning_factor) {
703
52712
    return std::make_unique<HostSetImpl>(priority, weighted_priority_health,
704
52712
                                         overprovisioning_factor);
705
52712
  }
706

            
707
52978
  virtual void runUpdateCallbacks(const HostVector& hosts_added, const HostVector& hosts_removed) {
708
52978
    member_update_cb_helper_.runCallbacks(hosts_added, hosts_removed);
709
52978
  }
710
  virtual void runReferenceUpdateCallbacks(uint32_t priority, const HostVector& hosts_added,
711
53483
                                           const HostVector& hosts_removed) {
712
53483
    priority_update_cb_helper_.runCallbacks(priority, hosts_added, hosts_removed);
713
53483
  }
714
  // This vector will generally have at least one member, for priority level 0.
715
  // It will expand as host sets are added but currently does not shrink to
716
  // avoid any potential lifetime issues.
717
  std::vector<std::unique_ptr<HostSet>> host_sets_;
718

            
719
  // Read only all host map for fast host searching. This will never be null.
720
  mutable HostMapConstSharedPtr const_cross_priority_host_map_{std::make_shared<HostMap>()};
721

            
722
private:
723
  // This is a matching vector to store the callback handles for host_sets_. It is kept separately
724
  // because host_sets_ is directly returned so we avoid translation.
725
  std::vector<Common::CallbackHandlePtr> host_sets_priority_update_cbs_;
726
  // TODO(mattklein123): Remove mutable.
727
  mutable Common::CallbackManager<void, const HostVector&, const HostVector&>
728
      member_update_cb_helper_;
729
  mutable Common::CallbackManager<void, uint32_t, const HostVector&, const HostVector&>
730
      priority_update_cb_helper_;
731
  bool batch_update_ : 1;
732

            
733
  // Helper class to maintain state as we perform multiple host updates. Keeps track of all hosts
734
  // that have been added/removed throughout the batch update, and ensures that we properly manage
735
  // the batch_update_ flag.
736
  class BatchUpdateScope : public HostUpdateCb {
737
  public:
738
803
    explicit BatchUpdateScope(PrioritySetImpl& parent) : parent_(parent) {
739
803
      ASSERT(!parent_.batch_update_);
740
803
      parent_.batch_update_ = true;
741
803
    }
742
803
    ~BatchUpdateScope() override { parent_.batch_update_ = false; }
743

            
744
    void updateHosts(uint32_t priority, PrioritySet::UpdateHostsParams&& update_hosts_params,
745
                     LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
746
                     const HostVector& hosts_removed, absl::optional<bool> weighted_priority_health,
747
                     absl::optional<uint32_t> overprovisioning_factor) override;
748

            
749
    absl::node_hash_set<HostSharedPtr> all_hosts_added_;
750
    absl::node_hash_set<HostSharedPtr> all_hosts_removed_;
751

            
752
  private:
753
    PrioritySetImpl& parent_;
754
    absl::node_hash_set<uint32_t> priorities_;
755
  };
756
};
757

            
758
/**
759
 * Specialized PrioritySetImpl designed for the main thread. It will update and maintain the read
760
 * only cross priority host map when the host set changes.
761
 */
762
class MainPrioritySetImpl : public PrioritySetImpl, public Logger::Loggable<Logger::Id::upstream> {
763
public:
764
  // PrioritySet
765
  void updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params,
766
                   LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
767
                   const HostVector& hosts_removed,
768
                   absl::optional<bool> weighted_priority_health = absl::nullopt,
769
                   absl::optional<uint32_t> overprovisioning_factor = absl::nullopt,
770
                   HostMapConstSharedPtr cross_priority_host_map = nullptr) override;
771
  HostMapConstSharedPtr crossPriorityHostMap() const override;
772

            
773
protected:
774
  void updateCrossPriorityHostMap(uint32_t priority, const HostVector& hosts_added,
775
                                  const HostVector& hosts_removed);
776

            
777
  mutable HostMapSharedPtr mutable_cross_priority_host_map_;
778
};
779

            
780
/**
781
 * Implementation of ClusterInfo that reads from JSON.
782
 */
783
class ClusterInfoImpl : public ClusterInfo,
784
                        public Event::DispatcherThreadDeletable,
785
                        protected Logger::Loggable<Logger::Id::upstream> {
786
public:
787
  using HttpProtocolOptionsConfigImpl =
788
      Envoy::Extensions::Upstreams::Http::ProtocolOptionsConfigImpl;
789
  using TcpProtocolOptionsConfigImpl = Envoy::Extensions::Upstreams::Tcp::ProtocolOptionsConfigImpl;
790
  static absl::StatusOr<std::unique_ptr<ClusterInfoImpl>>
791
  create(Init::Manager& info, Server::Configuration::ServerFactoryContext& server_context,
792
         const envoy::config::cluster::v3::Cluster& config,
793
         const absl::optional<envoy::config::core::v3::BindConfig>& bind_config,
794
         Runtime::Loader& runtime, TransportSocketMatcherPtr&& socket_matcher,
795
         Stats::ScopeSharedPtr&& stats_scope, bool added_via_api,
796
         Server::Configuration::TransportSocketFactoryContext&);
797

            
798
  static DeferredCreationCompatibleClusterTrafficStats
799
  generateStats(Stats::ScopeSharedPtr scope, const ClusterTrafficStatNames& cluster_stat_names,
800
                bool defer_creation);
801
  static ClusterLoadReportStats
802
  generateLoadReportStats(Stats::Scope& scope, const ClusterLoadReportStatNames& stat_names);
803
  static ClusterCircuitBreakersStats
804
  generateCircuitBreakersStats(Stats::Scope& scope, Stats::StatName prefix, bool track_remaining,
805
                               const ClusterCircuitBreakersStatNames& stat_names);
806
  static ClusterRequestResponseSizeStats
807
  generateRequestResponseSizeStats(Stats::Scope&,
808
                                   const ClusterRequestResponseSizeStatNames& stat_names);
809
  static ClusterTimeoutBudgetStats
810
  generateTimeoutBudgetStats(Stats::Scope&, const ClusterTimeoutBudgetStatNames& stat_names);
811

            
812
  // Upstream::ClusterInfo
813
21415
  bool addedViaApi() const override { return added_via_api_; }
814
34753
  OptRef<const LoadBalancerConfig> loadBalancerConfig() const override {
815
34753
    return makeOptRefFromPtr<const LoadBalancerConfig>(load_balancer_config_.get());
816
34753
  }
817
17523
  TypedLoadBalancerFactory& loadBalancerFactory() const override {
818
17523
    ASSERT(load_balancer_factory_ != nullptr, "null load balancer factory");
819
17523
    return *load_balancer_factory_;
820
17523
  }
821
68757
  const envoy::config::cluster::v3::Cluster::CommonLbConfig& lbConfig() const override {
822
68757
    return *common_lb_config_;
823
68757
  }
824
31861
  std::chrono::milliseconds connectTimeout() const override { return connect_timeout_; }
825

            
826
  // `OptionalTimeouts` manages various `optional` values. We pack them in a separate data
827
  // structure for memory efficiency -- avoiding overhead of `absl::optional` per variable, and
828
  // avoiding overhead of storing unset timeouts.
829
  enum class OptionalTimeoutNames { IdleTimeout = 0, TcpPoolIdleTimeout, MaxConnectionDuration };
830
  using OptionalTimeouts = PackedStruct<std::chrono::milliseconds, 3, OptionalTimeoutNames>;
831

            
832
28907
  const absl::optional<std::chrono::milliseconds> idleTimeout() const override {
833
28907
    auto timeout = optional_timeouts_.get<OptionalTimeoutNames::IdleTimeout>();
834
28907
    if (timeout.has_value()) {
835
28903
      return *timeout;
836
28903
    }
837
4
    return absl::nullopt;
838
28907
  }
839
832
  const absl::optional<std::chrono::milliseconds> tcpPoolIdleTimeout() const override {
840
832
    auto timeout = optional_timeouts_.get<OptionalTimeoutNames::TcpPoolIdleTimeout>();
841
832
    if (timeout.has_value()) {
842
830
      return *timeout;
843
830
    }
844
2
    return absl::nullopt;
845
832
  }
846
30585
  const absl::optional<std::chrono::milliseconds> maxConnectionDuration() const override {
847
30585
    auto timeout = optional_timeouts_.get<OptionalTimeoutNames::MaxConnectionDuration>();
848
30585
    if (timeout.has_value()) {
849
13
      return *timeout;
850
13
    }
851
30572
    return absl::nullopt;
852
30585
  }
853

            
854
81816
  float perUpstreamPreconnectRatio() const override { return per_upstream_preconnect_ratio_; }
855
49348
  float peekaheadRatio() const override { return peekahead_ratio_; }
856
31091
  uint32_t perConnectionBufferLimitBytes() const override {
857
31091
    return per_connection_buffer_limit_bytes_;
858
31091
  }
859
30204
  std::chrono::milliseconds perConnectionBufferHighWatermarkTimeout() const override {
860
30204
    return buffer_high_watermark_timeout_;
861
30204
  }
862
84902
  uint64_t features() const override { return features_; }
863
278518
  const HttpProtocolOptionsConfig& httpProtocolOptions() const override {
864
278518
    return *http_protocol_options_;
865
278518
  }
866
  absl::Status configureLbPolicies(const envoy::config::cluster::v3::Cluster& config,
867
                                   Server::Configuration::ServerFactoryContext& context);
868
  ProtocolOptionsConfigConstSharedPtr
869
  extensionProtocolOptions(const std::string& name) const override;
870
510
  envoy::config::cluster::v3::Cluster::DiscoveryType type() const override { return type_; }
871

            
872
  OptRef<const envoy::config::cluster::v3::Cluster::CustomClusterType>
873
692
  clusterType() const override {
874
692
    if (cluster_type_ == nullptr) {
875
633
      return absl::nullopt;
876
633
    }
877
59
    return *cluster_type_;
878
692
  }
879
49458
  OptRef<const envoy::config::core::v3::TypedExtensionConfig> upstreamConfig() const override {
880
49458
    if (upstream_config_ == nullptr) {
881
49435
      return absl::nullopt;
882
49435
    }
883
23
    return *upstream_config_;
884
49458
  }
885
  bool maintenanceMode() const override;
886
36871
  uint32_t maxRequestsPerConnection() const override { return max_requests_per_connection_; }
887
28894
  uint32_t maxResponseHeadersCount() const override { return max_response_headers_count_; }
888
28894
  absl::optional<uint16_t> maxResponseHeadersKb() const override {
889
28894
    return max_response_headers_kb_;
890
28894
  }
891
392400
  const std::string& name() const override { return name_; }
892
453
  const std::string& observabilityName() const override {
893
453
    if (observability_name_ != nullptr) {
894
      return *observability_name_;
895
    }
896
453
    return name_;
897
453
  }
898
  ResourceManager& resourceManager(ResourcePriority priority) const override;
899
49206
  TransportSocketMatcher& transportSocketMatcher() const override { return *socket_matcher_; }
900
956960
  DeferredCreationCompatibleClusterTrafficStats& trafficStats() const override {
901
956960
    return traffic_stats_;
902
956960
  }
903
61986
  ClusterConfigUpdateStats& configUpdateStats() const override { return config_update_stats_; }
904
33524
  ClusterLbStats& lbStats() const override { return lb_stats_; }
905
90394
  ClusterEndpointStats& endpointStats() const override { return endpoint_stats_; }
906
99589
  Stats::Scope& statsScope() const override { return *stats_scope_; }
907

            
908
47101
  ClusterRequestResponseSizeStatsOptRef requestResponseSizeStats() const override {
909
47101
    if (optional_cluster_stats_ == nullptr ||
910
47101
        optional_cluster_stats_->request_response_size_stats_ == nullptr) {
911
47061
      return absl::nullopt;
912
47061
    }
913

            
914
40
    return std::ref(*(optional_cluster_stats_->request_response_size_stats_));
915
47101
  }
916

            
917
228
  ClusterLoadReportStats& loadReportStats() const override { return load_report_stats_; }
918

            
919
84402
  ClusterTimeoutBudgetStatsOptRef timeoutBudgetStats() const override {
920
84402
    if (optional_cluster_stats_ == nullptr ||
921
84402
        optional_cluster_stats_->timeout_budget_stats_ == nullptr) {
922
84394
      return absl::nullopt;
923
84394
    }
924

            
925
8
    return std::ref(*(optional_cluster_stats_->timeout_budget_stats_));
926
84402
  }
927

            
928
36568
  bool perEndpointStatsEnabled() const override { return per_endpoint_stats_; }
929

            
930
31185
  UpstreamLocalAddressSelectorConstSharedPtr getUpstreamLocalAddressSelector() const override {
931
31185
    return upstream_local_address_selector_;
932
31185
  }
933
  using DefaultMetadata = ConstSingleton<envoy::config::core::v3::Metadata>;
934
253887
  const envoy::config::core::v3::Metadata& metadata() const override {
935
253887
    if (metadata_ != nullptr) {
936
29
      return *metadata_;
937
29
    }
938
253858
    return DefaultMetadata::get();
939
253887
  }
940
  using ClusterTypedMetadata = Envoy::Config::TypedMetadataImpl<ClusterTypedMetadataFactory>;
941
4
  const Envoy::Config::TypedMetadata& typedMetadata() const override {
942
4
    if (typed_metadata_ != nullptr) {
943
4
      return *typed_metadata_;
944
4
    }
945
    CONSTRUCT_ON_FIRST_USE(ClusterTypedMetadata, DefaultMetadata::get());
946
  }
947

            
948
181
  bool drainConnectionsOnHostRemoval() const override { return drain_connections_on_host_removal_; }
949
49584
  bool connectionPoolPerDownstreamConnection() const override {
950
49584
    return connection_pool_per_downstream_connection_;
951
49584
  }
952
92
  bool warmHosts() const override { return warm_hosts_; }
953
30202
  bool setLocalInterfaceNameOnUpstreamConnections() const override {
954
30202
    return set_local_interface_name_on_upstream_connections_;
955
30202
  }
956
1499
  const std::string& edsServiceName() const override {
957
1499
    return eds_service_name_ != nullptr ? *eds_service_name_ : EMPTY_STRING;
958
1499
  }
959

            
960
  void createNetworkFilterChain(Network::Connection&) const override;
961
  std::vector<Http::Protocol>
962
  upstreamHttpProtocol(absl::optional<Http::Protocol> downstream_protocol) const override;
963

            
964
  // Http::FilterChainFactory
965
47093
  bool createFilterChain(Http::FilterChainFactoryCallbacks& callbacks) const override {
966
47093
    if (http_filter_factories_.empty()) {
967
45706
      return false;
968
45706
    }
969

            
970
1387
    Http::FilterChainUtility::createFilterChainForFactories(callbacks, http_filter_factories_);
971
1387
    return true;
972
47093
  }
973
  bool createUpgradeFilterChain(absl::string_view, const UpgradeMap*,
974
2
                                Http::FilterChainFactoryCallbacks&) const override {
975
    // Upgrade filter chains not yet supported for upstream HTTP filters.
976
2
    return false;
977
2
  }
978

            
979
  Http::Http1::CodecStats& http1CodecStats() const override;
980
  Http::Http2::CodecStats& http2CodecStats() const override;
981
  Http::Http3::CodecStats& http3CodecStats() const override;
982
  Http::ClientHeaderValidatorPtr makeHeaderValidator(Http::Protocol protocol) const override;
983

            
984
  absl::optional<bool> processHttpForOutlierDetection(Http::ResponseHeaderMap&) const override;
985

            
986
  OptRef<const envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig>
987
38
  happyEyeballsConfig() const override {
988
38
    if (happy_eyeballs_config_ == nullptr) {
989
35
      return absl::nullopt;
990
35
    }
991
3
    return *happy_eyeballs_config_;
992
38
  }
993

            
994
42974
  OptRef<const Envoy::Orca::LrsReportMetricNames> lrsReportMetricNames() const override {
995
42974
    if (lrs_report_metric_names_ == nullptr) {
996
42854
      return absl::nullopt;
997
42854
    }
998
120
    return *lrs_report_metric_names_;
999
42974
  }
protected:
  ClusterInfoImpl(Init::Manager& info, Server::Configuration::ServerFactoryContext& server_context,
                  const envoy::config::cluster::v3::Cluster& config,
                  const absl::optional<envoy::config::core::v3::BindConfig>& bind_config,
                  Runtime::Loader& runtime, TransportSocketMatcherPtr&& socket_matcher,
                  Stats::ScopeSharedPtr&& stats_scope, bool added_via_api,
                  Server::Configuration::TransportSocketFactoryContext& context,
                  absl::Status& creation_status);
  // Gets the retry budget percent/concurrency from the circuit breaker thresholds. If the retry
  // budget message is specified, defaults will be filled in if either params are unspecified.
  static std::pair<absl::optional<double>, absl::optional<uint32_t>>
  getRetryBudgetParams(const envoy::config::cluster::v3::CircuitBreakers::Thresholds& thresholds);
private:
  std::shared_ptr<UpstreamNetworkFilterConfigProviderManager>
  createSingletonUpstreamNetworkFilterConfigProviderManager(
      Server::Configuration::ServerFactoryContext& context);
  struct ResourceManagers {
    ResourceManagers(const envoy::config::cluster::v3::Cluster& config, Runtime::Loader& runtime,
                     const std::string& cluster_name, Stats::Scope& stats_scope,
                     const ClusterCircuitBreakersStatNames& circuit_breakers_stat_names);
    absl::StatusOr<ResourceManagerImplPtr>
    load(const envoy::config::cluster::v3::Cluster& config, Runtime::Loader& runtime,
         const std::string& cluster_name, Stats::Scope& stats_scope,
         const envoy::config::core::v3::RoutingPriority& priority);
    using Managers = std::array<ResourceManagerImplPtr, NumResourcePriorities>;
    Managers managers_;
    const ClusterCircuitBreakersStatNames& circuit_breakers_stat_names_;
  };
  struct OptionalClusterStats {
    OptionalClusterStats(const envoy::config::cluster::v3::Cluster& config,
                         Stats::Scope& stats_scope, const ClusterManager& manager);
    const ClusterTimeoutBudgetStatsPtr timeout_budget_stats_;
    const ClusterRequestResponseSizeStatsPtr request_response_size_stats_;
  };
#ifdef ENVOY_ENABLE_UHV
  ::Envoy::Http::HeaderValidatorStats& getHeaderValidatorStats(Http::Protocol protocol) const;
#endif
  const Runtime::Loader& runtime_;
  const std::string name_;
  const std::unique_ptr<const std::string> observability_name_;
  const std::unique_ptr<const std::string> eds_service_name_;
  const absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr>
      extension_protocol_options_;
  const std::shared_ptr<const HttpProtocolOptionsConfigImpl> http_protocol_options_;
  const std::shared_ptr<const TcpProtocolOptionsConfigImpl> tcp_protocol_options_;
  const uint32_t max_requests_per_connection_;
  const std::chrono::milliseconds connect_timeout_;
  OptionalTimeouts optional_timeouts_;
  const float per_upstream_preconnect_ratio_;
  const float peekahead_ratio_;
  TransportSocketMatcherPtr socket_matcher_;
  Stats::ScopeSharedPtr stats_scope_;
  mutable DeferredCreationCompatibleClusterTrafficStats traffic_stats_;
  mutable ClusterConfigUpdateStats config_update_stats_;
  mutable ClusterLbStats lb_stats_;
  mutable ClusterEndpointStats endpoint_stats_;
  Stats::IsolatedStoreImpl load_report_stats_store_;
  mutable ClusterLoadReportStats load_report_stats_;
  const std::unique_ptr<OptionalClusterStats> optional_cluster_stats_;
  const uint64_t features_;
  mutable ResourceManagers resource_managers_;
  const std::string maintenance_mode_runtime_key_;
  const UpstreamLocalAddressSelectorConstSharedPtr upstream_local_address_selector_;
  const std::unique_ptr<const envoy::config::core::v3::TypedExtensionConfig> upstream_config_;
  const std::unique_ptr<const envoy::config::core::v3::Metadata> metadata_;
  const std::unique_ptr<ClusterTypedMetadata> typed_metadata_;
  LoadBalancerConfigPtr load_balancer_config_;
  TypedLoadBalancerFactory* load_balancer_factory_ = nullptr;
  const std::shared_ptr<const envoy::config::cluster::v3::Cluster::CommonLbConfig>
      common_lb_config_;
  const std::unique_ptr<const envoy::config::cluster::v3::Cluster::CustomClusterType> cluster_type_;
  // TODO(ohadvano): http_filter_config_provider_manager_ and
  // network_filter_config_provider_manager_ should be maintained in the ClusterManager object as
  // a singleton. This is currently not possible due to circular dependency (filter config
  // provider manager depends on the ClusterManager object). The circular dependency can be
  // resolved when the following issue is resolved:
  // https://github.com/envoyproxy/envoy/issues/26653.
  std::shared_ptr<Http::UpstreamFilterConfigProviderManager> http_filter_config_provider_manager_;
  std::shared_ptr<UpstreamNetworkFilterConfigProviderManager>
      network_filter_config_provider_manager_;
  Filter::NetworkFilterFactoriesList filter_factories_;
  Http::FilterChainUtility::FilterFactoriesList http_filter_factories_;
  mutable Http::Http1::CodecStats::AtomicPtr http1_codec_stats_;
  mutable Http::Http2::CodecStats::AtomicPtr http2_codec_stats_;
  mutable Http::Http3::CodecStats::AtomicPtr http3_codec_stats_;
  UpstreamFactoryContextImpl upstream_context_;
  const std::unique_ptr<
      const envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig>
      happy_eyeballs_config_;
  const std::unique_ptr<const Envoy::Orca::LrsReportMetricNames> lrs_report_metric_names_;
  const std::vector<Router::ShadowPolicyPtr> shadow_policies_;
  // Keep small values like bools and enums at the end of the class to reduce
  // overhead via alignment
  const uint32_t per_connection_buffer_limit_bytes_;
  const std::chrono::milliseconds buffer_high_watermark_timeout_;
  const uint32_t max_response_headers_count_;
  const absl::optional<uint16_t> max_response_headers_kb_;
  const envoy::config::cluster::v3::Cluster::DiscoveryType type_;
  const bool drain_connections_on_host_removal_ : 1;
  const bool connection_pool_per_downstream_connection_ : 1;
  const bool warm_hosts_ : 1;
  const bool set_local_interface_name_on_upstream_connections_ : 1;
  const bool added_via_api_ : 1;
  const bool per_endpoint_stats_ : 1;
};
/**
 * Function that creates a Network::UpstreamTransportSocketFactoryPtr
 * given a cluster configuration and transport socket factory
 * context.
 */
absl::StatusOr<Network::UpstreamTransportSocketFactoryPtr>
createTransportSocketFactory(const envoy::config::cluster::v3::Cluster& config,
                             Server::Configuration::TransportSocketFactoryContext& factory_context);
/**
 * Base class all primary clusters.
 */
class ClusterImplBase : public Cluster, protected Logger::Loggable<Logger::Id::upstream> {
public:
  // Upstream::Cluster
149841
  PrioritySet& prioritySet() override { return priority_set_; }
301
  const PrioritySet& prioritySet() const override { return priority_set_; }
  /**
   * Optionally set the health checker for the primary cluster. This is done after cluster
   * creation since the health checker assumes that the cluster has already been fully initialized
   * so there is a cyclic dependency. However we want the cluster to own the health checker.
   */
  void setHealthChecker(const HealthCheckerSharedPtr& health_checker);
  /**
   * Optionally set the outlier detector for the primary cluster. Done for the same reason as
   * documented in setHealthChecker().
   */
  void setOutlierDetector(const Outlier::DetectorSharedPtr& outlier_detector);
  /**
   * Wrapper around Network::Address::resolveProtoAddress() that provides improved error message
   * based on the cluster's type.
   * @param address supplies the address proto to resolve.
   * @return Network::Address::InstanceConstSharedPtr the resolved address.
   */
  absl::StatusOr<const Network::Address::InstanceConstSharedPtr>
  resolveProtoAddress(const envoy::config::core::v3::Address& address);
  // Partitions the provided list of hosts into three new lists containing the healthy, degraded
  // and excluded hosts respectively.
  static std::tuple<HealthyHostVectorConstSharedPtr, DegradedHostVectorConstSharedPtr,
                    ExcludedHostVectorConstSharedPtr>
  partitionHostList(const HostVector& hosts);
  // Partitions the provided list of hosts per locality into three new lists containing the
  // healthy, degraded and excluded hosts respectively.
  static std::tuple<HostsPerLocalityConstSharedPtr, HostsPerLocalityConstSharedPtr,
                    HostsPerLocalityConstSharedPtr>
  partitionHostsPerLocality(const HostsPerLocality& hosts);
75
  Config::ConstMetadataSharedPoolSharedPtr constMetadataSharedPool() {
75
    return const_metadata_shared_pool_;
75
  }
18287
  ConstLocalitySharedPoolSharedPtr constLocalitySharedPool() const {
18287
    return const_locality_shared_pool_;
18287
  }
  // Upstream::Cluster
17673
  HealthChecker* healthChecker() override { return health_checker_.get(); }
259551
  ClusterInfoConstSharedPtr info() const override { return info_; }
17566
  Outlier::Detector* outlierDetector() override { return outlier_detector_.get(); }
6
  const Outlier::Detector* outlierDetector() const override { return outlier_detector_.get(); }
  void initialize(std::function<absl::Status()> callback) override;
17784
  UnitFloat dropOverload() const override { return drop_overload_; }
17751
  const std::string& dropCategory() const override { return drop_category_; }
  void setDropOverload(UnitFloat drop_overload) override { drop_overload_ = drop_overload; }
  void setDropCategory(absl::string_view drop_category) override { drop_category_ = drop_category; }
protected:
  ClusterImplBase(const envoy::config::cluster::v3::Cluster& cluster,
                  ClusterFactoryContext& cluster_context, absl::Status& creation_status);
  /**
   * Overridden by every concrete cluster. The cluster should do whatever pre-init is needed.
   * E.g., query DNS, contact EDS, etc.
   */
  virtual void startPreInit() PURE;
  /**
   * Called by every concrete cluster when pre-init is complete. At this point,
   * shared init starts init_manager_ initialization and determines if there
   * is an initial health check pass needed, etc.
   */
  void onPreInitComplete();
  /**
   * Called by every concrete cluster after all targets registered at init manager are
   * initialized. At this point, shared init takes over and determines if there is an initial
   * health check pass needed, etc.
   */
  void onInitDone();
  virtual void reloadHealthyHostsHelper(const HostSharedPtr& host);
  absl::Status parseDropOverloadConfig(
      const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment);
  // This init manager is shared via TransportSocketFactoryContext. The initialization targets
  // that register with this init manager are expected to be for implementations of SdsApi (see
  // SdsApi::init_target_).
  Init::ManagerImpl init_manager_;
  // Once all targets are initialized (i.e. once all dynamic secrets are loaded), this watcher
  // calls onInitDone() above.
  Init::WatcherImpl init_watcher_;
  Runtime::Loader& runtime_;
  ClusterInfoConstSharedPtr info_; // This cluster info stores the stats scope so it must be
                                   // initialized first and destroyed last.
  HealthCheckerSharedPtr health_checker_;
  Outlier::DetectorSharedPtr outlier_detector_;
  const bool wait_for_warm_on_init_;
  Server::Configuration::TransportSocketFactoryContextImplPtr transport_factory_context_{};
protected:
  Random::RandomGenerator& random_;
  MainPrioritySetImpl priority_set_;
  absl::Status validateEndpoints(
      absl::Span<const envoy::config::endpoint::v3::LocalityLbEndpoints* const> endpoints,
      OptRef<const PriorityState> priorities) const;
private:
  static const absl::string_view DoNotValidateAlpnRuntimeKey;
  static const absl::string_view DropOverloadRuntimeKey;
  void finishInitialization();
  void reloadHealthyHosts(const HostSharedPtr& host);
  bool initialization_started_{};
  std::function<absl::Status()> initialization_complete_callback_;
  uint64_t pending_initialize_health_checks_{};
  const bool local_cluster_;
  Config::ConstMetadataSharedPoolSharedPtr const_metadata_shared_pool_;
  ConstLocalitySharedPoolSharedPtr const_locality_shared_pool_;
  Common::CallbackHandlePtr priority_update_cb_;
  UnitFloat drop_overload_{0};
  std::string drop_category_;
  static constexpr int kDropOverloadSize = 1;
};
using ClusterImplBaseSharedPtr = std::shared_ptr<ClusterImplBase>;
/**
 * Manages PriorityState of a cluster. PriorityState is a per-priority binding of a set of hosts
 * with its corresponding locality weight map. This is useful to store priorities/hosts/localities
 * before updating the cluster priority set.
 */
class PriorityStateManager : protected Logger::Loggable<Logger::Id::upstream> {
public:
  PriorityStateManager(ClusterImplBase& cluster, const LocalInfo::LocalInfo& local_info,
                       PrioritySet::HostUpdateCb* update_cb);
  // Initializes the PriorityState vector based on the priority specified in locality_lb_endpoint.
  void initializePriorityFor(
      const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint);
  // Registers a host based on its address to the PriorityState based on the specified priority
  // (the priority is specified by locality_lb_endpoint.priority()).
  //
  // The specified health_checker_flag is used to set the registered-host's health-flag when the
  // lb_endpoint health status is unhealthy, draining or timeout.
  void registerHostForPriority(
      const std::string& hostname, Network::Address::InstanceConstSharedPtr address,
      const HostDescription::AddressVector& address_list,
      const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint,
      const envoy::config::endpoint::v3::LbEndpoint& lb_endpoint);
  void registerHostForPriority(
      const HostSharedPtr& host,
      const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint);
  void
  updateClusterPrioritySet(const uint32_t priority, HostVectorSharedPtr&& current_hosts,
                           const absl::optional<HostVector>& hosts_added,
                           const absl::optional<HostVector>& hosts_removed,
                           const absl::optional<Upstream::Host::HealthFlag> health_checker_flag,
                           absl::optional<bool> weighted_priority_health = absl::nullopt,
                           absl::optional<uint32_t> overprovisioning_factor = absl::nullopt);
  // Returns the saved priority state.
34317
  PriorityState& priorityState() { return priority_state_; }
private:
  ClusterImplBase& parent_;
  PriorityState priority_state_;
  const envoy::config::core::v3::Node& local_info_node_;
  PrioritySet::HostUpdateCb* update_cb_;
};
using PriorityStateManagerPtr = std::unique_ptr<PriorityStateManager>;
/**
 * Base for all dynamic cluster types.
 */
class BaseDynamicClusterImpl : public ClusterImplBase {
protected:
  using ClusterImplBase::ClusterImplBase;
  /**
   * Updates the host list of a single priority by reconciling the list of new hosts
   * with existing hosts.
   *
   * @param new_hosts the full lists of hosts in the new configuration.
   * @param current_priority_hosts the full lists of hosts for the priority to be updated. The
   * list will be modified to contain the updated list of hosts.
   * @param hosts_added_to_current_priority will be populated with hosts added to the priority.
   * @param hosts_removed_from_current_priority will be populated with hosts removed from the
   * priority.
   * @param all_hosts all known hosts prior to this host update across all priorities.
   * @param all_new_hosts addresses of all hosts in the new configuration across all priorities.
   * @return whether the hosts for the priority changed.
   */
  bool updateDynamicHostList(const HostVector& new_hosts, HostVector& current_priority_hosts,
                             HostVector& hosts_added_to_current_priority,
                             HostVector& hosts_removed_from_current_priority,
                             const HostMap& all_hosts,
                             const absl::flat_hash_set<std::string>& all_new_hosts);
};
/**
 * Utility function to get Dns from cluster/enum.
 */
Network::DnsLookupFamily
getDnsLookupFamilyFromCluster(const envoy::config::cluster::v3::Cluster& cluster);
/**
 * Utility function to report upstream cx destroy metrics
 */
void reportUpstreamCxDestroy(const Upstream::HostDescriptionConstSharedPtr& host,
                             Network::ConnectionEvent event);
/**
 * Utility function to report upstream cx destroy active request metrics
 */
void reportUpstreamCxDestroyActiveRequest(const Upstream::HostDescriptionConstSharedPtr& host,
                                          Network::ConnectionEvent event);
/**
 * Utility function to resolve health check address.
 */
Network::Address::InstanceConstSharedPtr resolveHealthCheckAddress(
    const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
    Network::Address::InstanceConstSharedPtr host_address);
} // namespace Upstream
} // namespace Envoy