Line data Source code
1 : #pragma once
2 :
3 : #include <array>
4 : #include <atomic>
5 : #include <chrono>
6 : #include <cstdint>
7 : #include <functional>
8 : #include <list>
9 : #include <memory>
10 : #include <string>
11 : #include <utility>
12 : #include <variant>
13 : #include <vector>
14 :
15 : #include "envoy/common/callback.h"
16 : #include "envoy/common/optref.h"
17 : #include "envoy/common/time.h"
18 : #include "envoy/config/cluster/v3/cluster.pb.h"
19 : #include "envoy/config/core/v3/address.pb.h"
20 : #include "envoy/config/core/v3/base.pb.h"
21 : #include "envoy/config/core/v3/health_check.pb.h"
22 : #include "envoy/config/core/v3/protocol.pb.h"
23 : #include "envoy/config/endpoint/v3/endpoint_components.pb.h"
24 : #include "envoy/config/typed_metadata.h"
25 : #include "envoy/event/timer.h"
26 : #include "envoy/local_info/local_info.h"
27 : #include "envoy/network/dns.h"
28 : #include "envoy/network/filter.h"
29 : #include "envoy/runtime/runtime.h"
30 : #include "envoy/secret/secret_manager.h"
31 : #include "envoy/server/filter_config.h"
32 : #include "envoy/server/transport_socket_config.h"
33 : #include "envoy/ssl/context_manager.h"
34 : #include "envoy/stats/scope.h"
35 : #include "envoy/thread_local/thread_local.h"
36 : #include "envoy/upstream/cluster_factory.h"
37 : #include "envoy/upstream/cluster_manager.h"
38 : #include "envoy/upstream/health_checker.h"
39 : #include "envoy/upstream/load_balancer.h"
40 : #include "envoy/upstream/locality.h"
41 : #include "envoy/upstream/outlier_detection.h"
42 : #include "envoy/upstream/upstream.h"
43 :
44 : #include "source/common/common/callback_impl.h"
45 : #include "source/common/common/empty_string.h"
46 : #include "source/common/common/enum_to_int.h"
47 : #include "source/common/common/logger.h"
48 : #include "source/common/common/packed_struct.h"
49 : #include "source/common/common/thread.h"
50 : #include "source/common/config/metadata.h"
51 : #include "source/common/config/well_known_names.h"
52 : #include "source/common/http/filter_chain_helper.h"
53 : #include "source/common/http/http1/codec_stats.h"
54 : #include "source/common/http/http2/codec_stats.h"
55 : #include "source/common/http/http3/codec_stats.h"
56 : #include "source/common/init/manager_impl.h"
57 : #include "source/common/network/utility.h"
58 : #include "source/common/shared_pool/shared_pool.h"
59 : #include "source/common/stats/isolated_store_impl.h"
60 : #include "source/common/upstream/load_balancer_impl.h"
61 : #include "source/common/upstream/resource_manager_impl.h"
62 : #include "source/common/upstream/transport_socket_match_impl.h"
63 : #include "source/common/upstream/upstream_factory_context_impl.h"
64 : #include "source/extensions/upstreams/http/config.h"
65 : #include "source/extensions/upstreams/tcp/config.h"
66 : #include "source/server/transport_socket_config_impl.h"
67 :
68 : #include "absl/container/node_hash_set.h"
69 : #include "absl/synchronization/mutex.h"
70 :
71 : namespace Envoy {
72 : namespace Upstream {
73 :
74 : using ClusterProto = envoy::config::cluster::v3::Cluster;
75 :
76 : using UpstreamNetworkFilterConfigProviderManager =
77 : Filter::FilterConfigProviderManager<Network::FilterFactoryCb,
78 : Server::Configuration::UpstreamFactoryContext>;
79 :
80 : class LegacyLbPolicyConfigHelper {
81 : public:
82 : struct Result {
83 : TypedLoadBalancerFactory* factory;
84 : LoadBalancerConfigPtr config;
85 : };
86 :
87 : static absl::StatusOr<Result>
88 : getTypedLbConfigFromLegacyProtoWithoutSubset(const ClusterProto& cluster,
89 : ProtobufMessage::ValidationVisitor& visitor);
90 :
91 : static absl::StatusOr<Result>
92 : getTypedLbConfigFromLegacyProto(const ClusterProto& cluster,
93 : ProtobufMessage::ValidationVisitor& visitor);
94 : };
95 :
96 : /**
97 : * Class for LBPolicies
98 : * Uses a absl::variant to store pointers for the LBPolicy
99 : */
100 : class LBPolicyConfig {
101 : public:
102 : LBPolicyConfig(const envoy::config::cluster::v3::Cluster& config);
103 :
104 0 : OptRef<const envoy::config::cluster::v3::Cluster::RoundRobinLbConfig> lbRoundRobinConfig() const {
105 0 : return getConfig<envoy::config::cluster::v3::Cluster::RoundRobinLbConfig>();
106 0 : }
107 :
108 : OptRef<const envoy::config::cluster::v3::Cluster::LeastRequestLbConfig>
109 0 : lbLeastRequestConfig() const {
110 0 : return getConfig<envoy::config::cluster::v3::Cluster::LeastRequestLbConfig>();
111 0 : }
112 :
113 0 : OptRef<const envoy::config::cluster::v3::Cluster::RingHashLbConfig> lbRingHashConfig() const {
114 0 : return getConfig<envoy::config::cluster::v3::Cluster::RingHashLbConfig>();
115 0 : }
116 :
117 0 : OptRef<const envoy::config::cluster::v3::Cluster::MaglevLbConfig> lbMaglevConfig() const {
118 0 : return getConfig<envoy::config::cluster::v3::Cluster::MaglevLbConfig>();
119 0 : }
120 :
121 : OptRef<const envoy::config::cluster::v3::Cluster::OriginalDstLbConfig>
122 0 : lbOriginalDstConfig() const {
123 0 : return getConfig<envoy::config::cluster::v3::Cluster::OriginalDstLbConfig>();
124 0 : }
125 :
126 : private:
127 0 : template <typename T> OptRef<const T> getConfig() const {
128 : // Condition checks for the type of LbConfig, it also checks that the value is not nullptr
129 : // The Round Robin config being set to nullptr is the default value of the variant
130 0 : if (const auto lbPtr = absl::get_if<std::unique_ptr<const T>>(&lb_policy_); lbPtr && *lbPtr) {
131 0 : return *(*lbPtr);
132 0 : } else {
133 0 : return absl::nullopt;
134 0 : }
135 0 : }
136 :
137 : absl::variant<std::unique_ptr<const envoy::config::cluster::v3::Cluster::RoundRobinLbConfig>,
138 : std::unique_ptr<const envoy::config::cluster::v3::Cluster::LeastRequestLbConfig>,
139 : std::unique_ptr<const envoy::config::cluster::v3::Cluster::RingHashLbConfig>,
140 : std::unique_ptr<const envoy::config::cluster::v3::Cluster::MaglevLbConfig>,
141 : std::unique_ptr<const envoy::config::cluster::v3::Cluster::OriginalDstLbConfig>>
142 : lb_policy_;
143 : };
144 :
145 : /**
146 : * Null implementation of HealthCheckHostMonitor.
147 : */
148 : class HealthCheckHostMonitorNullImpl : public HealthCheckHostMonitor {
149 : public:
150 : // Upstream::HealthCheckHostMonitor
151 0 : void setUnhealthy(UnhealthyType) override {}
152 : };
153 :
154 : /**
155 : * Implementation of LoadMetricStats.
156 : */
157 : class LoadMetricStatsImpl : public LoadMetricStats {
158 : public:
159 : void add(const absl::string_view key, double value) override;
160 : StatMapPtr latch() override;
161 :
162 : private:
163 : absl::Mutex mu_;
164 : StatMapPtr map_ ABSL_GUARDED_BY(mu_);
165 : };
166 :
167 : /**
168 : * Null host monitor implementation.
169 : */
170 : class DetectorHostMonitorNullImpl : public Outlier::DetectorHostMonitor {
171 : public:
172 : // Upstream::Outlier::DetectorHostMonitor
173 0 : uint32_t numEjections() override { return 0; }
174 141 : void putHttpResponseCode(uint64_t) override {}
175 288 : void putResult(Outlier::Result, absl::optional<uint64_t>) override {}
176 73 : void putResponseTime(std::chrono::milliseconds) override {}
177 0 : const absl::optional<MonotonicTime>& lastEjectionTime() override { return time_; }
178 0 : const absl::optional<MonotonicTime>& lastUnejectionTime() override { return time_; }
179 0 : double successRate(SuccessRateMonitorType) const override { return -1; }
180 :
181 : private:
182 : const absl::optional<MonotonicTime> time_{};
183 : };
184 :
185 : /**
186 : * Implementation of Upstream::HostDescription.
187 : */
188 : class HostDescriptionImpl : virtual public HostDescription,
189 : protected Logger::Loggable<Logger::Id::upstream> {
190 : public:
191 : HostDescriptionImpl(
192 : ClusterInfoConstSharedPtr cluster, const std::string& hostname,
193 : Network::Address::InstanceConstSharedPtr dest_address, MetadataConstSharedPtr metadata,
194 : const envoy::config::core::v3::Locality& locality,
195 : const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
196 : uint32_t priority, TimeSource& time_source);
197 :
198 653 : Network::UpstreamTransportSocketFactory& transportSocketFactory() const override {
199 653 : absl::ReaderMutexLock lock(&metadata_mutex_);
200 653 : return socket_factory_;
201 653 : }
202 :
203 340 : bool canary() const override { return canary_; }
204 0 : void canary(bool is_canary) override { canary_ = is_canary; }
205 :
206 : // Metadata getter/setter.
207 : //
208 : // It's possible that the lock that guards the metadata will become highly contended (e.g.:
209 : // endpoints churning during a deploy of a large cluster). A possible improvement
210 : // would be to use TLS and post metadata updates from the main thread. This model would
211 : // possibly benefit other related and expensive computations too (e.g.: updating subsets).
212 0 : MetadataConstSharedPtr metadata() const override {
213 0 : absl::ReaderMutexLock lock(&metadata_mutex_);
214 0 : return metadata_;
215 0 : }
216 0 : void metadata(MetadataConstSharedPtr new_metadata) override {
217 0 : auto& new_socket_factory = resolveTransportSocketFactory(address_, new_metadata.get());
218 0 : {
219 0 : absl::WriterMutexLock lock(&metadata_mutex_);
220 0 : metadata_ = new_metadata;
221 0 : // Update data members dependent on metadata.
222 0 : socket_factory_ = new_socket_factory;
223 0 : }
224 0 : }
225 :
226 8947 : const ClusterInfo& cluster() const override { return *cluster_; }
227 0 : HealthCheckHostMonitor& healthChecker() const override {
228 0 : if (health_checker_) {
229 0 : return *health_checker_;
230 0 : }
231 :
232 0 : static HealthCheckHostMonitorNullImpl* null_health_checker =
233 0 : new HealthCheckHostMonitorNullImpl();
234 0 : return *null_health_checker;
235 0 : }
236 :
237 173 : bool canCreateConnection(Upstream::ResourcePriority priority) const override {
238 173 : if (stats().cx_active_.value() >= cluster().resourceManager(priority).maxConnectionsPerHost()) {
239 0 : return false;
240 0 : }
241 173 : return cluster().resourceManager(priority).connections().canCreate();
242 173 : }
243 :
244 502 : Outlier::DetectorHostMonitor& outlierDetector() const override {
245 502 : if (outlier_detector_) {
246 0 : return *outlier_detector_;
247 0 : }
248 :
249 502 : static DetectorHostMonitorNullImpl* null_outlier_detector = new DetectorHostMonitorNullImpl();
250 502 : return *null_outlier_detector;
251 502 : }
252 2709080 : HostStats& stats() const override { return stats_; }
253 0 : LoadMetricStats& loadMetricStats() const override { return load_metric_stats_; }
254 48 : const std::string& hostnameForHealthChecks() const override { return health_checks_hostname_; }
255 0 : const std::string& hostname() const override { return hostname_; }
256 528 : Network::Address::InstanceConstSharedPtr address() const override { return address_; }
257 173 : const std::vector<Network::Address::InstanceConstSharedPtr>& addressList() const override {
258 173 : return address_list_;
259 173 : }
260 72 : Network::Address::InstanceConstSharedPtr healthCheckAddress() const override {
261 72 : return health_check_address_;
262 72 : }
263 159 : const envoy::config::core::v3::Locality& locality() const override { return locality_; }
264 272 : Stats::StatName localityZoneStatName() const override {
265 272 : return locality_zone_stat_name_.statName();
266 272 : }
267 0 : uint32_t priority() const override { return priority_; }
268 0 : void priority(uint32_t priority) override { priority_ = priority; }
269 : Network::UpstreamTransportSocketFactory&
270 : resolveTransportSocketFactory(const Network::Address::InstanceConstSharedPtr& dest_address,
271 : const envoy::config::core::v3::Metadata* metadata) const;
272 16370 : absl::optional<MonotonicTime> lastHcPassTime() const override { return last_hc_pass_time_; }
273 :
274 0 : void setAddressList(const std::vector<Network::Address::InstanceConstSharedPtr>& address_list) {
275 0 : address_list_ = address_list;
276 0 : ASSERT(address_list_.empty() || *address_list_.front() == *address_);
277 0 : }
278 :
279 : protected:
280 0 : void setAddress(Network::Address::InstanceConstSharedPtr address) { address_ = address; }
281 :
282 0 : void setHealthCheckAddress(Network::Address::InstanceConstSharedPtr address) {
283 0 : health_check_address_ = address;
284 0 : }
285 :
286 46 : void setHealthCheckerImpl(HealthCheckHostMonitorPtr&& health_checker) {
287 46 : health_checker_ = std::move(health_checker);
288 46 : }
289 :
290 0 : void setOutlierDetectorImpl(Outlier::DetectorHostMonitorPtr&& outlier_detector) {
291 0 : outlier_detector_ = std::move(outlier_detector);
292 0 : }
293 :
294 197 : void setLastHcPassTimeImpl(MonotonicTime last_hc_pass_time) {
295 197 : last_hc_pass_time_.emplace(std::move(last_hc_pass_time));
296 197 : }
297 :
298 : private:
299 : ClusterInfoConstSharedPtr cluster_;
300 : const std::string hostname_;
301 : const std::string health_checks_hostname_;
302 : Network::Address::InstanceConstSharedPtr address_;
303 : // The first entry in the address_list_ should match the value in address_.
304 : std::vector<Network::Address::InstanceConstSharedPtr> address_list_;
305 : Network::Address::InstanceConstSharedPtr health_check_address_;
306 : std::atomic<bool> canary_;
307 : mutable absl::Mutex metadata_mutex_;
308 : MetadataConstSharedPtr metadata_ ABSL_GUARDED_BY(metadata_mutex_);
309 : const envoy::config::core::v3::Locality locality_;
310 : Stats::StatNameDynamicStorage locality_zone_stat_name_;
311 : mutable HostStats stats_;
312 : mutable LoadMetricStatsImpl load_metric_stats_;
313 : Outlier::DetectorHostMonitorPtr outlier_detector_;
314 : HealthCheckHostMonitorPtr health_checker_;
315 : std::atomic<uint32_t> priority_;
316 : std::reference_wrapper<Network::UpstreamTransportSocketFactory>
317 : socket_factory_ ABSL_GUARDED_BY(metadata_mutex_);
318 : const MonotonicTime creation_time_;
319 : absl::optional<MonotonicTime> last_hc_pass_time_;
320 : };
321 :
322 : /**
323 : * Implementation of Upstream::Host.
324 : */
325 : class HostImpl : public HostDescriptionImpl,
326 : public Host,
327 : public std::enable_shared_from_this<HostImpl> {
328 : public:
329 : HostImpl(ClusterInfoConstSharedPtr cluster, const std::string& hostname,
330 : Network::Address::InstanceConstSharedPtr address, MetadataConstSharedPtr metadata,
331 : uint32_t initial_weight, const envoy::config::core::v3::Locality& locality,
332 : const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
333 : uint32_t priority, const envoy::config::core::v3::HealthStatus health_status,
334 : TimeSource& time_source)
335 : : HostDescriptionImpl(cluster, hostname, address, metadata, locality, health_check_config,
336 : priority, time_source),
337 180259 : disable_active_health_check_(health_check_config.disable_active_health_check()) {
338 : // This EDS flags setting is still necessary for stats, configuration dump, canonical
339 : // coarseHealth() etc.
340 180259 : HostImpl::setEdsHealthStatus(health_status);
341 180259 : HostImpl::weight(initial_weight);
342 180259 : }
343 :
344 46 : bool disableActiveHealthCheck() const override { return disable_active_health_check_; }
345 0 : void setDisableActiveHealthCheck(bool disable_active_health_check) override {
346 0 : disable_active_health_check_ = disable_active_health_check;
347 0 : }
348 :
349 : // Upstream::Host
350 : std::vector<std::pair<absl::string_view, Stats::PrimitiveCounterReference>>
351 0 : counters() const override {
352 0 : return stats().counters();
353 0 : }
354 : CreateConnectionData createConnection(
355 : Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
356 : Network::TransportSocketOptionsConstSharedPtr transport_socket_options) const override;
357 : CreateConnectionData createHealthCheckConnection(
358 : Event::Dispatcher& dispatcher,
359 : Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
360 : const envoy::config::core::v3::Metadata* metadata) const override;
361 :
362 : std::vector<std::pair<absl::string_view, Stats::PrimitiveGaugeReference>>
363 0 : gauges() const override {
364 0 : return stats().gauges();
365 0 : }
366 476195 : void healthFlagClear(HealthFlag flag) override { health_flags_ &= ~enumToInt(flag); }
367 976 : bool healthFlagGet(HealthFlag flag) const override { return health_flags_ & enumToInt(flag); }
368 115658 : void healthFlagSet(HealthFlag flag) final { health_flags_ |= enumToInt(flag); }
369 0 : uint32_t healthFlagsGetAll() const override { return health_flags_; }
370 0 : void healthFlagsSetAll(uint32_t bits) override { health_flags_ |= bits; }
371 :
372 46 : void setHealthChecker(HealthCheckHostMonitorPtr&& health_checker) override {
373 46 : setHealthCheckerImpl(std::move(health_checker));
374 46 : }
375 0 : void setOutlierDetector(Outlier::DetectorHostMonitorPtr&& outlier_detector) override {
376 0 : setOutlierDetectorImpl(std::move(outlier_detector));
377 0 : }
378 :
379 197 : void setLastHcPassTime(MonotonicTime last_hc_pass_time) override {
380 197 : setLastHcPassTimeImpl(std::move(last_hc_pass_time));
381 197 : }
382 :
383 0 : Host::HealthStatus healthStatus() const override {
384 : // Evaluate active health status first.
385 :
386 : // Active unhealthy.
387 0 : if (healthFlagsGet(enumToInt(HealthFlag::FAILED_ACTIVE_HC) |
388 0 : enumToInt(HealthFlag::FAILED_OUTLIER_CHECK))) {
389 0 : return HealthStatus::UNHEALTHY;
390 0 : }
391 :
392 : // Eds unhealthy.
393 0 : if (eds_health_status_ == envoy::config::core::v3::UNHEALTHY ||
394 0 : eds_health_status_ == envoy::config::core::v3::DRAINING ||
395 0 : eds_health_status_ == envoy::config::core::v3::TIMEOUT) {
396 0 : return eds_health_status_;
397 0 : }
398 :
399 : // Active degraded.
400 0 : if (healthFlagGet(HealthFlag::DEGRADED_ACTIVE_HC)) {
401 0 : return HealthStatus::DEGRADED;
402 0 : }
403 :
404 : // Eds degraded or healthy.
405 0 : return eds_health_status_;
406 0 : }
407 :
408 7723 : Host::Health coarseHealth() const override {
409 : // If any of the unhealthy flags are set, host is unhealthy.
410 7723 : if (healthFlagsGet(enumToInt(HealthFlag::FAILED_ACTIVE_HC) |
411 7723 : enumToInt(HealthFlag::FAILED_OUTLIER_CHECK) |
412 7723 : enumToInt(HealthFlag::FAILED_EDS_HEALTH))) {
413 0 : return Host::Health::Unhealthy;
414 0 : }
415 :
416 : // If any of the degraded flags are set, host is degraded.
417 7723 : if (healthFlagsGet(enumToInt(HealthFlag::DEGRADED_ACTIVE_HC) |
418 7723 : enumToInt(HealthFlag::DEGRADED_EDS_HEALTH))) {
419 0 : return Host::Health::Degraded;
420 0 : }
421 :
422 : // The host must have no flags or be pending removal.
423 7723 : ASSERT(health_flags_ == 0 || healthFlagGet(HealthFlag::PENDING_DYNAMIC_REMOVAL));
424 7723 : return Host::Health::Healthy;
425 7723 : }
426 :
427 180259 : void setEdsHealthStatus(envoy::config::core::v3::HealthStatus eds_health_status) override {
428 180259 : eds_health_status_ = eds_health_status;
429 180259 : setEdsHealthFlag(eds_health_status);
430 180259 : }
431 0 : Host::HealthStatus edsHealthStatus() const override {
432 0 : return Host::HealthStatus(eds_health_status_.load());
433 0 : }
434 :
435 2206151 : uint32_t weight() const override { return weight_; }
436 : void weight(uint32_t new_weight) override;
437 0 : bool used() const override { return handle_count_ > 0; }
438 173 : HostHandlePtr acquireHandle() const override {
439 173 : return std::make_unique<HostHandleImpl>(shared_from_this());
440 173 : }
441 :
442 : protected:
443 : static CreateConnectionData
444 : createConnection(Event::Dispatcher& dispatcher, const ClusterInfo& cluster,
445 : const Network::Address::InstanceConstSharedPtr& address,
446 : const std::vector<Network::Address::InstanceConstSharedPtr>& address_list,
447 : Network::UpstreamTransportSocketFactory& socket_factory,
448 : const Network::ConnectionSocket::OptionsSharedPtr& options,
449 : Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
450 : HostDescriptionConstSharedPtr host);
451 :
452 : private:
453 : // Helper function to check multiple health flags at once.
454 15446 : bool healthFlagsGet(uint32_t flags) const { return health_flags_ & flags; }
455 :
456 : void setEdsHealthFlag(envoy::config::core::v3::HealthStatus health_status);
457 :
458 : std::atomic<uint32_t> health_flags_{};
459 : std::atomic<uint32_t> weight_;
460 : bool disable_active_health_check_;
461 : // TODO(wbpcode): should we store the EDS health status to health_flags_ to get unified status or
462 : // flag access? May be we could refactor HealthFlag to contain all these statuses and flags in the
463 : // future.
464 : std::atomic<Host::HealthStatus> eds_health_status_{};
465 :
466 : struct HostHandleImpl : HostHandle {
467 173 : HostHandleImpl(const std::shared_ptr<const HostImpl>& parent) : parent_(parent) {
468 173 : parent->handle_count_++;
469 173 : }
470 173 : ~HostHandleImpl() override {
471 173 : if (const auto host = parent_.lock()) {
472 173 : ASSERT(host->handle_count_ > 0);
473 173 : host->handle_count_--;
474 173 : }
475 173 : }
476 : const std::weak_ptr<const HostImpl> parent_;
477 : };
478 : mutable std::atomic<uint32_t> handle_count_{};
479 : };
480 :
481 : class HostsPerLocalityImpl : public HostsPerLocality {
482 : public:
483 4026 : HostsPerLocalityImpl() : HostsPerLocalityImpl(std::vector<HostVector>(), false) {}
484 :
485 : // Single locality constructor
486 : //
487 : // Parameter requirements:
488 : // 1. All entries in hosts must have the same locality.
489 : // 2. If has_local_locality is true, then the locality of all entries in hosts
490 : // must be equal to the current envoy's locality.
491 : HostsPerLocalityImpl(const HostVector& hosts, bool has_local_locality = false)
492 0 : : HostsPerLocalityImpl(std::vector<HostVector>({hosts}), has_local_locality) {}
493 :
494 : // Multiple localities constructor
495 : //
496 : // locality_hosts must adhere to the following ordering constraints:
497 : // 1. All hosts within a single HostVector bucket must have the same locality
498 : // 2. No hosts in different HostVector buckets can have the same locality
499 : // 3. If has_local_locality is true, then the locality of all hosts in the first HostVector bucket
500 : // must be equal to the current envoy's locality.
501 : // 4. All non-local HostVector buckets must be sorted in ascending order by the LocalityLess
502 : // comparator
503 : HostsPerLocalityImpl(std::vector<HostVector>&& locality_hosts, bool has_local_locality)
504 5753 : : local_(has_local_locality), hosts_per_locality_(std::move(locality_hosts)) {
505 5753 : ASSERT(!has_local_locality || !hosts_per_locality_.empty());
506 5753 : }
507 :
508 0 : bool hasLocalLocality() const override { return local_; }
509 8386 : const std::vector<HostVector>& get() const override { return hosts_per_locality_; }
510 : std::vector<HostsPerLocalityConstSharedPtr>
511 : filter(const std::vector<std::function<bool(const Host&)>>& predicate) const override;
512 :
513 : // The const shared pointer for the empty HostsPerLocalityImpl.
514 2324 : static HostsPerLocalityConstSharedPtr empty() {
515 2324 : static HostsPerLocalityConstSharedPtr empty = std::make_shared<HostsPerLocalityImpl>();
516 2324 : return empty;
517 2324 : }
518 :
519 : private:
520 : // Does an entry exist for the local locality?
521 : bool local_{};
522 : // The first entry is for local hosts in the local locality.
523 : std::vector<HostVector> hosts_per_locality_;
524 : };
525 :
526 : /**
527 : * A class for management of the set of hosts for a given priority level.
528 : */
529 : class HostSetImpl : public HostSet {
530 : public:
531 : HostSetImpl(uint32_t priority, absl::optional<bool> weighted_priority_health,
532 : absl::optional<uint32_t> overprovisioning_factor)
533 : : priority_(priority), overprovisioning_factor_(overprovisioning_factor.has_value()
534 : ? overprovisioning_factor.value()
535 : : kDefaultOverProvisioningFactor),
536 : weighted_priority_health_(weighted_priority_health.value_or(false)),
537 : hosts_(new HostVector()), healthy_hosts_(new HealthyHostVector()),
538 581 : degraded_hosts_(new DegradedHostVector()), excluded_hosts_(new ExcludedHostVector()) {}
539 :
540 : /**
541 : * Install a callback that will be invoked when the host set membership changes.
542 : * @param callback supplies the callback to invoke.
543 : * @return Common::CallbackHandlePtr the callback handle.
544 : */
545 : ABSL_MUST_USE_RESULT Common::CallbackHandlePtr
546 581 : addPriorityUpdateCb(PrioritySet::PriorityUpdateCb callback) const {
547 581 : return member_update_cb_helper_.add(callback);
548 581 : }
549 :
550 : // Upstream::HostSet
551 2647 : const HostVector& hosts() const override { return *hosts_; }
552 159 : HostVectorConstSharedPtr hostsPtr() const override { return hosts_; }
553 1634 : const HostVector& healthyHosts() const override { return healthy_hosts_->get(); }
554 159 : HealthyHostVectorConstSharedPtr healthyHostsPtr() const override { return healthy_hosts_; }
555 1077 : const HostVector& degradedHosts() const override { return degraded_hosts_->get(); }
556 159 : DegradedHostVectorConstSharedPtr degradedHostsPtr() const override { return degraded_hosts_; }
557 1077 : const HostVector& excludedHosts() const override { return excluded_hosts_->get(); }
558 159 : ExcludedHostVectorConstSharedPtr excludedHostsPtr() const override { return excluded_hosts_; }
559 164 : const HostsPerLocality& hostsPerLocality() const override { return *hosts_per_locality_; }
560 159 : HostsPerLocalityConstSharedPtr hostsPerLocalityPtr() const override {
561 159 : return hosts_per_locality_;
562 159 : }
563 1224 : const HostsPerLocality& healthyHostsPerLocality() const override {
564 1224 : return *healthy_hosts_per_locality_;
565 1224 : }
566 159 : HostsPerLocalityConstSharedPtr healthyHostsPerLocalityPtr() const override {
567 159 : return healthy_hosts_per_locality_;
568 159 : }
569 1224 : const HostsPerLocality& degradedHostsPerLocality() const override {
570 1224 : return *degraded_hosts_per_locality_;
571 1224 : }
572 159 : HostsPerLocalityConstSharedPtr degradedHostsPerLocalityPtr() const override {
573 159 : return degraded_hosts_per_locality_;
574 159 : }
575 0 : const HostsPerLocality& excludedHostsPerLocality() const override {
576 0 : return *excluded_hosts_per_locality_;
577 0 : }
578 159 : HostsPerLocalityConstSharedPtr excludedHostsPerLocalityPtr() const override {
579 159 : return excluded_hosts_per_locality_;
580 159 : }
581 159 : LocalityWeightsConstSharedPtr localityWeights() const override { return locality_weights_; }
582 : absl::optional<uint32_t> chooseHealthyLocality() override;
583 : absl::optional<uint32_t> chooseDegradedLocality() override;
584 967 : uint32_t priority() const override { return priority_; }
585 771 : uint32_t overprovisioningFactor() const override { return overprovisioning_factor_; }
586 465 : bool weightedPriorityHealth() const override { return weighted_priority_health_; }
587 :
588 : static PrioritySet::UpdateHostsParams
589 : updateHostsParams(HostVectorConstSharedPtr hosts,
590 : HostsPerLocalityConstSharedPtr hosts_per_locality,
591 : HealthyHostVectorConstSharedPtr healthy_hosts,
592 : HostsPerLocalityConstSharedPtr healthy_hosts_per_locality,
593 : DegradedHostVectorConstSharedPtr degraded_hosts,
594 : HostsPerLocalityConstSharedPtr degraded_hosts_per_locality,
595 : ExcludedHostVectorConstSharedPtr excluded_hosts,
596 : HostsPerLocalityConstSharedPtr excluded_hosts_per_locality);
597 : static PrioritySet::UpdateHostsParams updateHostsParams(const HostSet& host_set);
598 : static PrioritySet::UpdateHostsParams
599 : partitionHosts(HostVectorConstSharedPtr hosts, HostsPerLocalityConstSharedPtr hosts_per_locality);
600 :
601 : void updateHosts(PrioritySet::UpdateHostsParams&& update_hosts_params,
602 : LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
603 : const HostVector& hosts_removed,
604 : absl::optional<bool> weighted_priority_health = absl::nullopt,
605 : absl::optional<uint32_t> overprovisioning_factor = absl::nullopt);
606 :
607 : protected:
608 663 : virtual void runUpdateCallbacks(const HostVector& hosts_added, const HostVector& hosts_removed) {
609 663 : member_update_cb_helper_.runCallbacks(priority_, hosts_added, hosts_removed);
610 663 : }
611 :
612 : private:
613 : // Weight for a locality taking into account health status using the provided eligible hosts per
614 : // locality.
615 : static double effectiveLocalityWeight(uint32_t index,
616 : const HostsPerLocality& eligible_hosts_per_locality,
617 : const HostsPerLocality& excluded_hosts_per_locality,
618 : const HostsPerLocality& all_hosts_per_locality,
619 : const LocalityWeights& locality_weights,
620 : uint32_t overprovisioning_factor);
621 :
622 : uint32_t priority_;
623 : uint32_t overprovisioning_factor_;
624 : bool weighted_priority_health_;
625 : HostVectorConstSharedPtr hosts_;
626 : HealthyHostVectorConstSharedPtr healthy_hosts_;
627 : DegradedHostVectorConstSharedPtr degraded_hosts_;
628 : ExcludedHostVectorConstSharedPtr excluded_hosts_;
629 : HostsPerLocalityConstSharedPtr hosts_per_locality_{HostsPerLocalityImpl::empty()};
630 : HostsPerLocalityConstSharedPtr healthy_hosts_per_locality_{HostsPerLocalityImpl::empty()};
631 : HostsPerLocalityConstSharedPtr degraded_hosts_per_locality_{HostsPerLocalityImpl::empty()};
632 : HostsPerLocalityConstSharedPtr excluded_hosts_per_locality_{HostsPerLocalityImpl::empty()};
633 : // TODO(mattklein123): Remove mutable.
634 : mutable Common::CallbackManager<uint32_t, const HostVector&, const HostVector&>
635 : member_update_cb_helper_;
636 : // Locality weights (used to build WRR locality_scheduler_);
637 : LocalityWeightsConstSharedPtr locality_weights_;
638 : // WRR locality scheduler state.
639 : struct LocalityEntry {
640 : LocalityEntry(uint32_t index, double effective_weight)
641 0 : : index_(index), effective_weight_(effective_weight) {}
642 : const uint32_t index_;
643 : const double effective_weight_;
644 : };
645 :
646 : // Rebuilds the provided locality scheduler with locality entries based on the locality weights
647 : // and eligible hosts.
648 : //
649 : // @param locality_scheduler the locality scheduler to rebuild. Will be set to nullptr if no
650 : // localities are eligible.
651 : // @param locality_entries the vector that holds locality entries. Will be reset and populated
652 : // with entries corresponding to the new scheduler.
653 : // @param eligible_hosts_per_locality eligible hosts for this scheduler grouped by locality.
654 : // @param eligible_hosts all eligible hosts for this scheduler.
655 : // @param all_hosts_per_locality all hosts for this HostSet grouped by locality.
656 : // @param locality_weights the weighting of each locality.
657 : // @param overprovisioning_factor the overprovisioning factor to use when computing the effective
658 : // weight of a locality.
659 : static void rebuildLocalityScheduler(
660 : std::unique_ptr<EdfScheduler<LocalityEntry>>& locality_scheduler,
661 : std::vector<std::shared_ptr<LocalityEntry>>& locality_entries,
662 : const HostsPerLocality& eligible_hosts_per_locality, const HostVector& eligible_hosts,
663 : HostsPerLocalityConstSharedPtr all_hosts_per_locality,
664 : HostsPerLocalityConstSharedPtr excluded_hosts_per_locality,
665 : LocalityWeightsConstSharedPtr locality_weights, uint32_t overprovisioning_factor);
666 :
667 : static absl::optional<uint32_t> chooseLocality(EdfScheduler<LocalityEntry>* locality_scheduler);
668 :
669 : std::vector<std::shared_ptr<LocalityEntry>> healthy_locality_entries_;
670 : std::unique_ptr<EdfScheduler<LocalityEntry>> healthy_locality_scheduler_;
671 : std::vector<std::shared_ptr<LocalityEntry>> degraded_locality_entries_;
672 : std::unique_ptr<EdfScheduler<LocalityEntry>> degraded_locality_scheduler_;
673 : };
674 :
675 : using HostSetImplPtr = std::unique_ptr<HostSetImpl>;
676 :
677 : /**
678 : * A class for management of the set of hosts in a given cluster.
679 : */
680 : class PrioritySetImpl : public PrioritySet {
681 : public:
682 581 : PrioritySetImpl() : batch_update_(false) {}
683 : // From PrioritySet
684 : ABSL_MUST_USE_RESULT Common::CallbackHandlePtr
685 465 : addMemberUpdateCb(MemberUpdateCb callback) const override {
686 465 : return member_update_cb_helper_.add(callback);
687 465 : }
688 : ABSL_MUST_USE_RESULT Common::CallbackHandlePtr
689 1318 : addPriorityUpdateCb(PriorityUpdateCb callback) const override {
690 1318 : return priority_update_cb_helper_.add(callback);
691 1318 : }
692 8821 : const std::vector<std::unique_ptr<HostSet>>& hostSetsPerPriority() const override {
693 8821 : return host_sets_;
694 8821 : }
695 : // Get the host set for this priority level, creating it if necessary.
696 : const HostSet&
697 : getOrCreateHostSet(uint32_t priority,
698 : absl::optional<bool> weighted_priority_health = absl::nullopt,
699 : absl::optional<uint32_t> overprovisioning_factor = absl::nullopt);
700 :
701 : void updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params,
702 : LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
703 : const HostVector& hosts_removed,
704 : absl::optional<bool> weighted_priority_health = absl::nullopt,
705 : absl::optional<uint32_t> overprovisioning_factor = absl::nullopt,
706 : HostMapConstSharedPtr cross_priority_host_map = nullptr) override;
707 :
708 : void batchHostUpdate(BatchUpdateCb& callback) override;
709 :
710 251 : HostMapConstSharedPtr crossPriorityHostMap() const override {
711 251 : return const_cross_priority_host_map_;
712 251 : }
713 :
714 : protected:
715 : // Allows subclasses of PrioritySetImpl to create their own type of HostSetImpl.
716 : virtual HostSetImplPtr createHostSet(uint32_t priority,
717 : absl::optional<bool> weighted_priority_health,
718 581 : absl::optional<uint32_t> overprovisioning_factor) {
719 581 : return std::make_unique<HostSetImpl>(priority, weighted_priority_health,
720 581 : overprovisioning_factor);
721 581 : }
722 :
723 : protected:
724 663 : virtual void runUpdateCallbacks(const HostVector& hosts_added, const HostVector& hosts_removed) {
725 663 : member_update_cb_helper_.runCallbacks(hosts_added, hosts_removed);
726 663 : }
727 : virtual void runReferenceUpdateCallbacks(uint32_t priority, const HostVector& hosts_added,
728 663 : const HostVector& hosts_removed) {
729 663 : priority_update_cb_helper_.runCallbacks(priority, hosts_added, hosts_removed);
730 663 : }
731 : // This vector will generally have at least one member, for priority level 0.
732 : // It will expand as host sets are added but currently does not shrink to
733 : // avoid any potential lifetime issues.
734 : std::vector<std::unique_ptr<HostSet>> host_sets_;
735 :
736 : // Read only all host map for fast host searching. This will never be null.
737 : mutable HostMapConstSharedPtr const_cross_priority_host_map_{std::make_shared<HostMap>()};
738 :
739 : private:
740 : // This is a matching vector to store the callback handles for host_sets_. It is kept separately
741 : // because host_sets_ is directly returned so we avoid translation.
742 : std::vector<Common::CallbackHandlePtr> host_sets_priority_update_cbs_;
743 : // TODO(mattklein123): Remove mutable.
744 : mutable Common::CallbackManager<const HostVector&, const HostVector&> member_update_cb_helper_;
745 : mutable Common::CallbackManager<uint32_t, const HostVector&, const HostVector&>
746 : priority_update_cb_helper_;
747 : bool batch_update_ : 1;
748 :
749 : // Helper class to maintain state as we perform multiple host updates. Keeps track of all hosts
750 : // that have been added/removed throughout the batch update, and ensures that we properly manage
751 : // the batch_update_ flag.
752 : class BatchUpdateScope : public HostUpdateCb {
753 : public:
754 28 : explicit BatchUpdateScope(PrioritySetImpl& parent) : parent_(parent) {
755 28 : ASSERT(!parent_.batch_update_);
756 28 : parent_.batch_update_ = true;
757 28 : }
758 28 : ~BatchUpdateScope() override { parent_.batch_update_ = false; }
759 :
760 : void updateHosts(uint32_t priority, PrioritySet::UpdateHostsParams&& update_hosts_params,
761 : LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
762 : const HostVector& hosts_removed, absl::optional<bool> weighted_priority_health,
763 : absl::optional<uint32_t> overprovisioning_factor) override;
764 :
765 : absl::node_hash_set<HostSharedPtr> all_hosts_added_;
766 : absl::node_hash_set<HostSharedPtr> all_hosts_removed_;
767 :
768 : private:
769 : PrioritySetImpl& parent_;
770 : absl::node_hash_set<uint32_t> priorities_;
771 : };
772 : };
773 :
774 : /**
775 : * Specialized PrioritySetImpl designed for the main thread. It will update and maintain the read
776 : * only cross priority host map when the host set changes.
777 : */
778 : class MainPrioritySetImpl : public PrioritySetImpl, public Logger::Loggable<Logger::Id::upstream> {
779 : public:
780 : // PrioritySet
781 : void updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params,
782 : LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added,
783 : const HostVector& hosts_removed,
784 : absl::optional<bool> weighted_priority_health = absl::nullopt,
785 : absl::optional<uint32_t> overprovisioning_factor = absl::nullopt,
786 : HostMapConstSharedPtr cross_priority_host_map = nullptr) override;
787 : HostMapConstSharedPtr crossPriorityHostMap() const override;
788 :
789 : protected:
790 : void updateCrossPriorityHostMap(const HostVector& hosts_added, const HostVector& hosts_removed);
791 :
792 : mutable HostMapSharedPtr mutable_cross_priority_host_map_;
793 : };
794 :
795 : /**
796 : * Implementation of ClusterInfo that reads from JSON.
797 : */
798 : class ClusterInfoImpl : public ClusterInfo,
799 : public Event::DispatcherThreadDeletable,
800 : protected Logger::Loggable<Logger::Id::upstream> {
801 : public:
802 : using HttpProtocolOptionsConfigImpl =
803 : Envoy::Extensions::Upstreams::Http::ProtocolOptionsConfigImpl;
804 : using TcpProtocolOptionsConfigImpl = Envoy::Extensions::Upstreams::Tcp::ProtocolOptionsConfigImpl;
805 : ClusterInfoImpl(Init::Manager& info, Server::Configuration::ServerFactoryContext& server_context,
806 : const envoy::config::cluster::v3::Cluster& config,
807 : const absl::optional<envoy::config::core::v3::BindConfig>& bind_config,
808 : Runtime::Loader& runtime, TransportSocketMatcherPtr&& socket_matcher,
809 : Stats::ScopeSharedPtr&& stats_scope, bool added_via_api,
810 : Server::Configuration::TransportSocketFactoryContext&);
811 :
812 : static DeferredCreationCompatibleClusterTrafficStats
813 : generateStats(Stats::ScopeSharedPtr scope, const ClusterTrafficStatNames& cluster_stat_names,
814 : bool defer_creation);
815 : static ClusterLoadReportStats
816 : generateLoadReportStats(Stats::Scope& scope, const ClusterLoadReportStatNames& stat_names);
817 : static ClusterCircuitBreakersStats
818 : generateCircuitBreakersStats(Stats::Scope& scope, Stats::StatName prefix, bool track_remaining,
819 : const ClusterCircuitBreakersStatNames& stat_names);
820 : static ClusterRequestResponseSizeStats
821 : generateRequestResponseSizeStats(Stats::Scope&,
822 : const ClusterRequestResponseSizeStatNames& stat_names);
823 : static ClusterTimeoutBudgetStats
824 : generateTimeoutBudgetStats(Stats::Scope&, const ClusterTimeoutBudgetStatNames& stat_names);
825 :
826 : // Upstream::ClusterInfo
827 274 : bool addedViaApi() const override { return added_via_api_; }
828 159 : OptRef<const LoadBalancerConfig> loadBalancerConfig() const override {
829 159 : return makeOptRefFromPtr<const LoadBalancerConfig>(load_balancer_config_.get());
830 159 : }
831 318 : TypedLoadBalancerFactory* loadBalancerFactory() const override { return load_balancer_factory_; }
832 799 : const envoy::config::cluster::v3::Cluster::CommonLbConfig& lbConfig() const override {
833 799 : return *common_lb_config_;
834 799 : }
835 173 : std::chrono::milliseconds connectTimeout() const override { return connect_timeout_; }
836 :
837 : // `OptionalTimeouts` manages various `optional` values. We pack them in a separate data
838 : // structure for memory efficiency -- avoiding overhead of `absl::optional` per variable, and
839 : // avoiding overhead of storing unset timeouts.
840 : enum class OptionalTimeoutNames { IdleTimeout = 0, TcpPoolIdleTimeout, MaxConnectionDuration };
841 : using OptionalTimeouts = PackedStruct<std::chrono::milliseconds, 3, OptionalTimeoutNames>;
842 :
843 173 : const absl::optional<std::chrono::milliseconds> idleTimeout() const override {
844 173 : auto timeout = optional_timeouts_.get<OptionalTimeoutNames::IdleTimeout>();
845 173 : if (timeout.has_value()) {
846 173 : return *timeout;
847 173 : }
848 0 : return absl::nullopt;
849 173 : }
850 0 : const absl::optional<std::chrono::milliseconds> tcpPoolIdleTimeout() const override {
851 0 : auto timeout = optional_timeouts_.get<OptionalTimeoutNames::TcpPoolIdleTimeout>();
852 0 : if (timeout.has_value()) {
853 0 : return *timeout;
854 0 : }
855 0 : return absl::nullopt;
856 0 : }
857 165 : const absl::optional<std::chrono::milliseconds> maxConnectionDuration() const override {
858 165 : auto timeout = optional_timeouts_.get<OptionalTimeoutNames::MaxConnectionDuration>();
859 165 : if (timeout.has_value()) {
860 0 : return *timeout;
861 0 : }
862 165 : return absl::nullopt;
863 165 : }
864 :
865 432 : float perUpstreamPreconnectRatio() const override { return per_upstream_preconnect_ratio_; }
866 251 : float peekaheadRatio() const override { return peekahead_ratio_; }
867 173 : uint32_t perConnectionBufferLimitBytes() const override {
868 173 : return per_connection_buffer_limit_bytes_;
869 173 : }
870 569 : uint64_t features() const override { return features_; }
871 68 : const Http::Http1Settings& http1Settings() const override {
872 68 : return http_protocol_options_->http1_settings_;
873 68 : }
874 315 : const envoy::config::core::v3::Http2ProtocolOptions& http2Options() const override {
875 315 : return http_protocol_options_->http2_options_;
876 315 : }
877 0 : const envoy::config::core::v3::Http3ProtocolOptions& http3Options() const override {
878 0 : return http_protocol_options_->http3_options_;
879 0 : }
880 202 : const envoy::config::core::v3::HttpProtocolOptions& commonHttpProtocolOptions() const override {
881 202 : return http_protocol_options_->common_http_protocol_options_;
882 202 : }
883 : void configureLbPolicies(const envoy::config::cluster::v3::Cluster& config,
884 : Server::Configuration::ServerFactoryContext& context);
885 : ProtocolOptionsConfigConstSharedPtr
886 : extensionProtocolOptions(const std::string& name) const override;
887 1260 : LoadBalancerType lbType() const override { return lb_type_; }
888 0 : envoy::config::cluster::v3::Cluster::DiscoveryType type() const override { return type_; }
889 :
890 : OptRef<const envoy::config::cluster::v3::Cluster::CustomClusterType>
891 0 : clusterType() const override {
892 0 : if (cluster_type_ == nullptr) {
893 0 : return absl::nullopt;
894 0 : }
895 0 : return *cluster_type_;
896 0 : }
897 : OptRef<const envoy::config::cluster::v3::Cluster::RoundRobinLbConfig>
898 0 : lbRoundRobinConfig() const override {
899 0 : if (lb_policy_config_ == nullptr) {
900 0 : return {};
901 0 : }
902 :
903 0 : return lb_policy_config_->lbRoundRobinConfig();
904 0 : }
905 : OptRef<const envoy::config::cluster::v3::Cluster::LeastRequestLbConfig>
906 0 : lbLeastRequestConfig() const override {
907 0 : if (lb_policy_config_ == nullptr) {
908 0 : return {};
909 0 : }
910 :
911 0 : return lb_policy_config_->lbLeastRequestConfig();
912 0 : }
913 : OptRef<const envoy::config::cluster::v3::Cluster::RingHashLbConfig>
914 0 : lbRingHashConfig() const override {
915 0 : if (lb_policy_config_ == nullptr) {
916 0 : return {};
917 0 : }
918 :
919 0 : return lb_policy_config_->lbRingHashConfig();
920 0 : }
921 : OptRef<const envoy::config::cluster::v3::Cluster::MaglevLbConfig>
922 0 : lbMaglevConfig() const override {
923 0 : if (lb_policy_config_ == nullptr) {
924 0 : return {};
925 0 : }
926 :
927 0 : return lb_policy_config_->lbMaglevConfig();
928 0 : }
929 : OptRef<const envoy::config::cluster::v3::Cluster::OriginalDstLbConfig>
930 0 : lbOriginalDstConfig() const override {
931 0 : if (lb_policy_config_ == nullptr) {
932 0 : return {};
933 0 : }
934 :
935 0 : return lb_policy_config_->lbOriginalDstConfig();
936 0 : }
937 251 : OptRef<const envoy::config::core::v3::TypedExtensionConfig> upstreamConfig() const override {
938 251 : if (upstream_config_ == nullptr) {
939 251 : return absl::nullopt;
940 251 : }
941 0 : return *upstream_config_;
942 251 : }
943 : bool maintenanceMode() const override;
944 278 : uint64_t maxRequestsPerConnection() const override { return max_requests_per_connection_; }
945 173 : uint32_t maxResponseHeadersCount() const override { return max_response_headers_count_; }
946 3337 : const std::string& name() const override { return name_; }
947 0 : const std::string& observabilityName() const override {
948 0 : if (observability_name_ != nullptr) {
949 0 : return *observability_name_;
950 0 : }
951 0 : return name_;
952 0 : }
953 : ResourceManager& resourceManager(ResourcePriority priority) const override;
954 159 : TransportSocketMatcher& transportSocketMatcher() const override { return *socket_matcher_; }
955 2465 : DeferredCreationCompatibleClusterTrafficStats& trafficStats() const override {
956 2465 : return traffic_stats_;
957 2465 : }
958 477 : ClusterConfigUpdateStats& configUpdateStats() const override { return config_update_stats_; }
959 306 : ClusterLbStats& lbStats() const override { return lb_stats_; }
960 823 : ClusterEndpointStats& endpointStats() const override { return endpoint_stats_; }
961 459 : Stats::Scope& statsScope() const override { return *stats_scope_; }
962 :
963 251 : ClusterRequestResponseSizeStatsOptRef requestResponseSizeStats() const override {
964 251 : if (optional_cluster_stats_ == nullptr ||
965 251 : optional_cluster_stats_->request_response_size_stats_ == nullptr) {
966 251 : return absl::nullopt;
967 251 : }
968 :
969 0 : return std::ref(*(optional_cluster_stats_->request_response_size_stats_));
970 251 : }
971 :
972 0 : ClusterLoadReportStats& loadReportStats() const override { return load_report_stats_; }
973 :
974 349 : ClusterTimeoutBudgetStatsOptRef timeoutBudgetStats() const override {
975 349 : if (optional_cluster_stats_ == nullptr ||
976 349 : optional_cluster_stats_->timeout_budget_stats_ == nullptr) {
977 349 : return absl::nullopt;
978 349 : }
979 :
980 0 : return std::ref(*(optional_cluster_stats_->timeout_budget_stats_));
981 349 : }
982 :
983 318 : bool perEndpointStatsEnabled() const override { return per_endpoint_stats_; }
984 :
985 173 : UpstreamLocalAddressSelectorConstSharedPtr getUpstreamLocalAddressSelector() const override {
986 173 : return upstream_local_address_selector_;
987 173 : }
988 306 : const LoadBalancerSubsetInfo& lbSubsetInfo() const override {
989 306 : if (lb_subset_ != nullptr) {
990 0 : return *lb_subset_;
991 0 : }
992 306 : return DefaultLoadBalancerSubsetInfoImpl::get();
993 306 : }
994 : using DefaultMetadata = ConstSingleton<envoy::config::core::v3::Metadata>;
995 0 : const envoy::config::core::v3::Metadata& metadata() const override {
996 0 : if (metadata_ != nullptr) {
997 0 : return *metadata_;
998 0 : }
999 0 : return DefaultMetadata::get();
1000 0 : }
1001 : using ClusterTypedMetadata = Envoy::Config::TypedMetadataImpl<ClusterTypedMetadataFactory>;
1002 0 : const Envoy::Config::TypedMetadata& typedMetadata() const override {
1003 0 : if (typed_metadata_ != nullptr) {
1004 0 : return *typed_metadata_;
1005 0 : }
1006 0 : CONSTRUCT_ON_FIRST_USE(ClusterTypedMetadata, DefaultMetadata::get());
1007 0 : }
1008 :
1009 0 : bool drainConnectionsOnHostRemoval() const override { return drain_connections_on_host_removal_; }
1010 251 : bool connectionPoolPerDownstreamConnection() const override {
1011 251 : return connection_pool_per_downstream_connection_;
1012 251 : }
1013 0 : bool warmHosts() const override { return warm_hosts_; }
1014 173 : bool setLocalInterfaceNameOnUpstreamConnections() const override {
1015 173 : return set_local_interface_name_on_upstream_connections_;
1016 173 : }
1017 : const absl::optional<envoy::config::core::v3::UpstreamHttpProtocolOptions>&
1018 251 : upstreamHttpProtocolOptions() const override {
1019 251 : return http_protocol_options_->upstream_http_protocol_options_;
1020 251 : }
1021 :
1022 : const absl::optional<const envoy::config::core::v3::AlternateProtocolsCacheOptions>&
1023 251 : alternateProtocolsCacheOptions() const override {
1024 251 : return http_protocol_options_->alternate_protocol_cache_options_;
1025 251 : }
1026 :
1027 56 : const std::string& edsServiceName() const override {
1028 56 : return eds_service_name_ != nullptr ? *eds_service_name_ : EMPTY_STRING;
1029 56 : }
1030 :
1031 : void createNetworkFilterChain(Network::Connection&) const override;
1032 : std::vector<Http::Protocol>
1033 : upstreamHttpProtocol(absl::optional<Http::Protocol> downstream_protocol) const override;
1034 :
1035 : // Http::FilterChainFactory
1036 : bool createFilterChain(Http::FilterChainManager& manager, bool only_create_if_configured,
1037 502 : const Http::FilterChainOptions&) const override {
1038 502 : if (!has_configured_http_filters_ && only_create_if_configured) {
1039 251 : return false;
1040 251 : }
1041 251 : Http::FilterChainUtility::createFilterChainForFactories(
1042 251 : manager, Http::EmptyFilterChainOptions{}, http_filter_factories_);
1043 251 : return true;
1044 502 : }
1045 : bool createUpgradeFilterChain(absl::string_view, const UpgradeMap*,
1046 0 : Http::FilterChainManager&) const override {
1047 : // Upgrade filter chains not yet supported for upstream HTTP filters.
1048 0 : return false;
1049 0 : }
1050 :
1051 : Http::Http1::CodecStats& http1CodecStats() const override;
1052 : Http::Http2::CodecStats& http2CodecStats() const override;
1053 : Http::Http3::CodecStats& http3CodecStats() const override;
1054 : Http::ClientHeaderValidatorPtr makeHeaderValidator(Http::Protocol protocol) const override;
1055 :
1056 : protected:
1057 : // Gets the retry budget percent/concurrency from the circuit breaker thresholds. If the retry
1058 : // budget message is specified, defaults will be filled in if either params are unspecified.
1059 : static std::pair<absl::optional<double>, absl::optional<uint32_t>>
1060 : getRetryBudgetParams(const envoy::config::cluster::v3::CircuitBreakers::Thresholds& thresholds);
1061 :
1062 : private:
1063 : std::shared_ptr<UpstreamNetworkFilterConfigProviderManager>
1064 : createSingletonUpstreamNetworkFilterConfigProviderManager(
1065 : Server::Configuration::ServerFactoryContext& context);
1066 :
1067 : struct ResourceManagers {
1068 : ResourceManagers(const envoy::config::cluster::v3::Cluster& config, Runtime::Loader& runtime,
1069 : const std::string& cluster_name, Stats::Scope& stats_scope,
1070 : const ClusterCircuitBreakersStatNames& circuit_breakers_stat_names);
1071 : ResourceManagerImplPtr load(const envoy::config::cluster::v3::Cluster& config,
1072 : Runtime::Loader& runtime, const std::string& cluster_name,
1073 : Stats::Scope& stats_scope,
1074 : const envoy::config::core::v3::RoutingPriority& priority);
1075 :
1076 : using Managers = std::array<ResourceManagerImplPtr, NumResourcePriorities>;
1077 :
1078 : Managers managers_;
1079 : const ClusterCircuitBreakersStatNames& circuit_breakers_stat_names_;
1080 : };
1081 :
1082 : struct OptionalClusterStats {
1083 : OptionalClusterStats(const envoy::config::cluster::v3::Cluster& config,
1084 : Stats::Scope& stats_scope, const ClusterManager& manager);
1085 : const ClusterTimeoutBudgetStatsPtr timeout_budget_stats_;
1086 : const ClusterRequestResponseSizeStatsPtr request_response_size_stats_;
1087 : };
1088 :
1089 : #ifdef ENVOY_ENABLE_UHV
1090 : ::Envoy::Http::HeaderValidatorStats& getHeaderValidatorStats(Http::Protocol protocol) const;
1091 : #endif
1092 :
1093 : Runtime::Loader& runtime_;
1094 : const std::string name_;
1095 : std::unique_ptr<const std::string> observability_name_;
1096 : std::unique_ptr<const std::string> eds_service_name_;
1097 : const absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr>
1098 : extension_protocol_options_;
1099 : const std::shared_ptr<const HttpProtocolOptionsConfigImpl> http_protocol_options_;
1100 : const std::shared_ptr<const TcpProtocolOptionsConfigImpl> tcp_protocol_options_;
1101 : const uint64_t max_requests_per_connection_;
1102 : const std::chrono::milliseconds connect_timeout_;
1103 : OptionalTimeouts optional_timeouts_;
1104 : const float per_upstream_preconnect_ratio_;
1105 : const float peekahead_ratio_;
1106 : TransportSocketMatcherPtr socket_matcher_;
1107 : Stats::ScopeSharedPtr stats_scope_;
1108 : mutable DeferredCreationCompatibleClusterTrafficStats traffic_stats_;
1109 : mutable ClusterConfigUpdateStats config_update_stats_;
1110 : mutable ClusterLbStats lb_stats_;
1111 : mutable ClusterEndpointStats endpoint_stats_;
1112 : Stats::IsolatedStoreImpl load_report_stats_store_;
1113 : mutable ClusterLoadReportStats load_report_stats_;
1114 : const std::unique_ptr<OptionalClusterStats> optional_cluster_stats_;
1115 : const uint64_t features_;
1116 : mutable ResourceManagers resource_managers_;
1117 : const std::string maintenance_mode_runtime_key_;
1118 : UpstreamLocalAddressSelectorConstSharedPtr upstream_local_address_selector_;
1119 : std::unique_ptr<const LBPolicyConfig> lb_policy_config_;
1120 : std::unique_ptr<envoy::config::core::v3::TypedExtensionConfig> upstream_config_;
1121 : std::unique_ptr<LoadBalancerSubsetInfoImpl> lb_subset_;
1122 : std::unique_ptr<const envoy::config::core::v3::Metadata> metadata_;
1123 : std::unique_ptr<ClusterTypedMetadata> typed_metadata_;
1124 : LoadBalancerConfigPtr load_balancer_config_;
1125 : TypedLoadBalancerFactory* load_balancer_factory_ = nullptr;
1126 : const std::shared_ptr<const envoy::config::cluster::v3::Cluster::CommonLbConfig>
1127 : common_lb_config_;
1128 : std::unique_ptr<const envoy::config::cluster::v3::Cluster::CustomClusterType> cluster_type_;
1129 : // TODO(ohadvano): http_filter_config_provider_manager_ and
1130 : // network_filter_config_provider_manager_ should be maintained in the ClusterManager object as a
1131 : // singleton. This is currently not possible due to circular dependency (filter config provider
1132 : // manager depends on the ClusterManager object). The circular dependency can be resolved when the
1133 : // following issue is resolved: https://github.com/envoyproxy/envoy/issues/26653.
1134 : std::shared_ptr<Http::UpstreamFilterConfigProviderManager> http_filter_config_provider_manager_;
1135 : std::shared_ptr<UpstreamNetworkFilterConfigProviderManager>
1136 : network_filter_config_provider_manager_;
1137 : Filter::NetworkFilterFactoriesList filter_factories_;
1138 : Http::FilterChainUtility::FilterFactoriesList http_filter_factories_;
1139 : mutable Http::Http1::CodecStats::AtomicPtr http1_codec_stats_;
1140 : mutable Http::Http2::CodecStats::AtomicPtr http2_codec_stats_;
1141 : mutable Http::Http3::CodecStats::AtomicPtr http3_codec_stats_;
1142 : UpstreamFactoryContextImpl upstream_context_;
1143 :
1144 : // Keep small values like bools and enums at the end of the class to reduce
1145 : // overhead via alignment
1146 : const uint32_t per_connection_buffer_limit_bytes_;
1147 : const uint32_t max_response_headers_count_;
1148 : LoadBalancerType lb_type_;
1149 : const envoy::config::cluster::v3::Cluster::DiscoveryType type_;
1150 : const bool drain_connections_on_host_removal_ : 1;
1151 : const bool connection_pool_per_downstream_connection_ : 1;
1152 : const bool warm_hosts_ : 1;
1153 : const bool set_local_interface_name_on_upstream_connections_ : 1;
1154 : const bool added_via_api_ : 1;
1155 : // true iff the cluster proto specified upstream http filters.
1156 : bool has_configured_http_filters_ : 1;
1157 : const bool per_endpoint_stats_ : 1;
1158 : };
1159 :
1160 : /**
1161 : * Function that creates a Network::UpstreamTransportSocketFactoryPtr
1162 : * given a cluster configuration and transport socket factory
1163 : * context.
1164 : */
1165 : Network::UpstreamTransportSocketFactoryPtr
1166 : createTransportSocketFactory(const envoy::config::cluster::v3::Cluster& config,
1167 : Server::Configuration::TransportSocketFactoryContext& factory_context);
1168 :
1169 : /**
1170 : * Base class all primary clusters.
1171 : */
1172 : class ClusterImplBase : public Cluster, protected Logger::Loggable<Logger::Id::upstream> {
1173 :
1174 : public:
1175 : // Upstream::Cluster
1176 1272 : PrioritySet& prioritySet() override { return priority_set_; }
1177 0 : const PrioritySet& prioritySet() const override { return priority_set_; }
1178 :
1179 : /**
1180 : * Optionally set the health checker for the primary cluster. This is done after cluster
1181 : * creation since the health checker assumes that the cluster has already been fully initialized
1182 : * so there is a cyclic dependency. However we want the cluster to own the health checker.
1183 : */
1184 : void setHealthChecker(const HealthCheckerSharedPtr& health_checker);
1185 :
1186 : /**
1187 : * Optionally set the outlier detector for the primary cluster. Done for the same reason as
1188 : * documented in setHealthChecker().
1189 : */
1190 : void setOutlierDetector(const Outlier::DetectorSharedPtr& outlier_detector);
1191 :
1192 : /**
1193 : * Wrapper around Network::Address::resolveProtoAddress() that provides improved error message
1194 : * based on the cluster's type.
1195 : * @param address supplies the address proto to resolve.
1196 : * @return Network::Address::InstanceConstSharedPtr the resolved address.
1197 : */
1198 : const Network::Address::InstanceConstSharedPtr
1199 : resolveProtoAddress(const envoy::config::core::v3::Address& address);
1200 :
1201 : // Partitions the provided list of hosts into three new lists containing the healthy, degraded
1202 : // and excluded hosts respectively.
1203 : static std::tuple<HealthyHostVectorConstSharedPtr, DegradedHostVectorConstSharedPtr,
1204 : ExcludedHostVectorConstSharedPtr>
1205 : partitionHostList(const HostVector& hosts);
1206 :
1207 : // Partitions the provided list of hosts per locality into three new lists containing the healthy,
1208 : // degraded and excluded hosts respectively.
1209 : static std::tuple<HostsPerLocalityConstSharedPtr, HostsPerLocalityConstSharedPtr,
1210 : HostsPerLocalityConstSharedPtr>
1211 : partitionHostsPerLocality(const HostsPerLocality& hosts);
1212 0 : Config::ConstMetadataSharedPoolSharedPtr constMetadataSharedPool() {
1213 0 : return const_metadata_shared_pool_;
1214 0 : }
1215 :
1216 : // Upstream::Cluster
1217 159 : HealthChecker* healthChecker() override { return health_checker_.get(); }
1218 2715 : ClusterInfoConstSharedPtr info() const override { return info_; }
1219 159 : Outlier::Detector* outlierDetector() override { return outlier_detector_.get(); }
1220 0 : const Outlier::Detector* outlierDetector() const override { return outlier_detector_.get(); }
1221 : void initialize(std::function<void()> callback) override;
1222 159 : UnitFloat dropOverload() const override { return drop_overload_; }
1223 0 : void setDropOverload(UnitFloat drop_overload) override { drop_overload_ = drop_overload; }
1224 :
1225 : protected:
1226 : ClusterImplBase(const envoy::config::cluster::v3::Cluster& cluster,
1227 : ClusterFactoryContext& cluster_context);
1228 :
1229 : /**
1230 : * Overridden by every concrete cluster. The cluster should do whatever pre-init is needed. E.g.,
1231 : * query DNS, contact EDS, etc.
1232 : */
1233 : virtual void startPreInit() PURE;
1234 :
1235 : /**
1236 : * Called by every concrete cluster when pre-init is complete. At this point,
1237 : * shared init starts init_manager_ initialization and determines if there
1238 : * is an initial health check pass needed, etc.
1239 : */
1240 : void onPreInitComplete();
1241 :
1242 : /**
1243 : * Called by every concrete cluster after all targets registered at init manager are
1244 : * initialized. At this point, shared init takes over and determines if there is an initial health
1245 : * check pass needed, etc.
1246 : */
1247 : void onInitDone();
1248 :
1249 : virtual void reloadHealthyHostsHelper(const HostSharedPtr& host);
1250 :
1251 : absl::Status parseDropOverloadConfig(
1252 : const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment);
1253 :
1254 : // This init manager is shared via TransportSocketFactoryContext. The initialization targets that
1255 : // register with this init manager are expected to be for implementations of SdsApi (see
1256 : // SdsApi::init_target_).
1257 : Init::ManagerImpl init_manager_;
1258 :
1259 : // Once all targets are initialized (i.e. once all dynamic secrets are loaded), this watcher calls
1260 : // onInitDone() above.
1261 : Init::WatcherImpl init_watcher_;
1262 :
1263 : Runtime::Loader& runtime_;
1264 : ClusterInfoConstSharedPtr info_; // This cluster info stores the stats scope so it must be
1265 : // initialized first and destroyed last.
1266 : HealthCheckerSharedPtr health_checker_;
1267 : Outlier::DetectorSharedPtr outlier_detector_;
1268 : const bool wait_for_warm_on_init_;
1269 :
1270 : Server::Configuration::TransportSocketFactoryContextImplPtr transport_factory_context_{};
1271 :
1272 : protected:
1273 : TimeSource& time_source_;
1274 : MainPrioritySetImpl priority_set_;
1275 :
1276 : void validateEndpointsForZoneAwareRouting(
1277 : const envoy::config::endpoint::v3::LocalityLbEndpoints& endpoints) const;
1278 :
1279 : private:
1280 : static const absl::string_view DoNotValidateAlpnRuntimeKey;
1281 :
1282 : void finishInitialization();
1283 : void reloadHealthyHosts(const HostSharedPtr& host);
1284 :
1285 : bool initialization_started_{};
1286 : std::function<void()> initialization_complete_callback_;
1287 : uint64_t pending_initialize_health_checks_{};
1288 : const bool local_cluster_;
1289 : Config::ConstMetadataSharedPoolSharedPtr const_metadata_shared_pool_;
1290 : Common::CallbackHandlePtr priority_update_cb_;
1291 : UnitFloat drop_overload_{0};
1292 : static constexpr int kDropOverloadSize = 1;
1293 : };
1294 :
1295 : using ClusterImplBaseSharedPtr = std::shared_ptr<ClusterImplBase>;
1296 :
1297 : /**
1298 : * Manages PriorityState of a cluster. PriorityState is a per-priority binding of a set of hosts
1299 : * with its corresponding locality weight map. This is useful to store priorities/hosts/localities
1300 : * before updating the cluster priority set.
1301 : */
1302 : class PriorityStateManager : protected Logger::Loggable<Logger::Id::upstream> {
1303 : public:
1304 : PriorityStateManager(ClusterImplBase& cluster, const LocalInfo::LocalInfo& local_info,
1305 : PrioritySet::HostUpdateCb* update_cb);
1306 :
1307 : // Initializes the PriorityState vector based on the priority specified in locality_lb_endpoint.
1308 : void initializePriorityFor(
1309 : const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint);
1310 :
1311 : // Registers a host based on its address to the PriorityState based on the specified priority (the
1312 : // priority is specified by locality_lb_endpoint.priority()).
1313 : //
1314 : // The specified health_checker_flag is used to set the registered-host's health-flag when the
1315 : // lb_endpoint health status is unhealthy, draining or timeout.
1316 : void registerHostForPriority(
1317 : const std::string& hostname, Network::Address::InstanceConstSharedPtr address,
1318 : const std::vector<Network::Address::InstanceConstSharedPtr>& address_list,
1319 : const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint,
1320 : const envoy::config::endpoint::v3::LbEndpoint& lb_endpoint, TimeSource& time_source);
1321 :
1322 : void registerHostForPriority(
1323 : const HostSharedPtr& host,
1324 : const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint);
1325 :
1326 : void
1327 : updateClusterPrioritySet(const uint32_t priority, HostVectorSharedPtr&& current_hosts,
1328 : const absl::optional<HostVector>& hosts_added,
1329 : const absl::optional<HostVector>& hosts_removed,
1330 : const absl::optional<Upstream::Host::HealthFlag> health_checker_flag,
1331 : absl::optional<bool> weighted_priority_health = absl::nullopt,
1332 : absl::optional<uint32_t> overprovisioning_factor = absl::nullopt);
1333 :
1334 : // Returns the saved priority state.
1335 159 : PriorityState& priorityState() { return priority_state_; }
1336 :
1337 : private:
1338 : ClusterImplBase& parent_;
1339 : PriorityState priority_state_;
1340 : const envoy::config::core::v3::Node& local_info_node_;
1341 : PrioritySet::HostUpdateCb* update_cb_;
1342 : };
1343 :
1344 : using PriorityStateManagerPtr = std::unique_ptr<PriorityStateManager>;
1345 :
1346 : /**
1347 : * Base for all dynamic cluster types.
1348 : */
1349 : class BaseDynamicClusterImpl : public ClusterImplBase {
1350 : protected:
1351 : using ClusterImplBase::ClusterImplBase;
1352 :
1353 : /**
1354 : * Updates the host list of a single priority by reconciling the list of new hosts
1355 : * with existing hosts.
1356 : *
1357 : * @param new_hosts the full lists of hosts in the new configuration.
1358 : * @param current_priority_hosts the full lists of hosts for the priority to be updated. The list
1359 : * will be modified to contain the updated list of hosts.
1360 : * @param hosts_added_to_current_priority will be populated with hosts added to the priority.
1361 : * @param hosts_removed_from_current_priority will be populated with hosts removed from the
1362 : * priority.
1363 : * @param all_hosts all known hosts prior to this host update across all priorities.
1364 : * @param all_new_hosts addresses of all hosts in the new configuration across all priorities.
1365 : * @return whether the hosts for the priority changed.
1366 : */
1367 : bool updateDynamicHostList(const HostVector& new_hosts, HostVector& current_priority_hosts,
1368 : HostVector& hosts_added_to_current_priority,
1369 : HostVector& hosts_removed_from_current_priority,
1370 : const HostMap& all_hosts,
1371 : const absl::flat_hash_set<std::string>& all_new_hosts);
1372 : };
1373 :
1374 : /**
1375 : * Utility function to get Dns from cluster/enum.
1376 : */
1377 : Network::DnsLookupFamily
1378 : getDnsLookupFamilyFromCluster(const envoy::config::cluster::v3::Cluster& cluster);
1379 :
1380 : /**
1381 : * Utility function to report upstream cx destroy metrics
1382 : */
1383 : void reportUpstreamCxDestroy(const Upstream::HostDescriptionConstSharedPtr& host,
1384 : Network::ConnectionEvent event);
1385 :
1386 : /**
1387 : * Utility function to report upstream cx destroy active request metrics
1388 : */
1389 : void reportUpstreamCxDestroyActiveRequest(const Upstream::HostDescriptionConstSharedPtr& host,
1390 : Network::ConnectionEvent event);
1391 :
1392 : /**
1393 : * Utility function to resolve health check address.
1394 : */
1395 : Network::Address::InstanceConstSharedPtr resolveHealthCheckAddress(
1396 : const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config,
1397 : Network::Address::InstanceConstSharedPtr host_address);
1398 :
1399 : } // namespace Upstream
1400 : } // namespace Envoy
|