/proc/self/cwd/source/common/upstream/upstream_impl.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #include <array> |
4 | | #include <atomic> |
5 | | #include <chrono> |
6 | | #include <cstdint> |
7 | | #include <functional> |
8 | | #include <list> |
9 | | #include <memory> |
10 | | #include <string> |
11 | | #include <utility> |
12 | | #include <variant> |
13 | | #include <vector> |
14 | | |
15 | | #include "envoy/common/callback.h" |
16 | | #include "envoy/common/optref.h" |
17 | | #include "envoy/common/time.h" |
18 | | #include "envoy/config/cluster/v3/cluster.pb.h" |
19 | | #include "envoy/config/core/v3/address.pb.h" |
20 | | #include "envoy/config/core/v3/base.pb.h" |
21 | | #include "envoy/config/core/v3/health_check.pb.h" |
22 | | #include "envoy/config/core/v3/protocol.pb.h" |
23 | | #include "envoy/config/endpoint/v3/endpoint_components.pb.h" |
24 | | #include "envoy/config/typed_metadata.h" |
25 | | #include "envoy/event/timer.h" |
26 | | #include "envoy/local_info/local_info.h" |
27 | | #include "envoy/network/dns.h" |
28 | | #include "envoy/network/filter.h" |
29 | | #include "envoy/runtime/runtime.h" |
30 | | #include "envoy/secret/secret_manager.h" |
31 | | #include "envoy/server/filter_config.h" |
32 | | #include "envoy/server/transport_socket_config.h" |
33 | | #include "envoy/ssl/context_manager.h" |
34 | | #include "envoy/stats/scope.h" |
35 | | #include "envoy/thread_local/thread_local.h" |
36 | | #include "envoy/upstream/cluster_factory.h" |
37 | | #include "envoy/upstream/cluster_manager.h" |
38 | | #include "envoy/upstream/health_checker.h" |
39 | | #include "envoy/upstream/load_balancer.h" |
40 | | #include "envoy/upstream/locality.h" |
41 | | #include "envoy/upstream/outlier_detection.h" |
42 | | #include "envoy/upstream/upstream.h" |
43 | | |
44 | | #include "source/common/common/callback_impl.h" |
45 | | #include "source/common/common/empty_string.h" |
46 | | #include "source/common/common/enum_to_int.h" |
47 | | #include "source/common/common/logger.h" |
48 | | #include "source/common/common/packed_struct.h" |
49 | | #include "source/common/common/thread.h" |
50 | | #include "source/common/config/metadata.h" |
51 | | #include "source/common/config/well_known_names.h" |
52 | | #include "source/common/http/filter_chain_helper.h" |
53 | | #include "source/common/http/http1/codec_stats.h" |
54 | | #include "source/common/http/http2/codec_stats.h" |
55 | | #include "source/common/http/http3/codec_stats.h" |
56 | | #include "source/common/init/manager_impl.h" |
57 | | #include "source/common/network/utility.h" |
58 | | #include "source/common/orca/orca_load_metrics.h" |
59 | | #include "source/common/shared_pool/shared_pool.h" |
60 | | #include "source/common/stats/isolated_store_impl.h" |
61 | | #include "source/common/upstream/edf_scheduler.h" |
62 | | #include "source/common/upstream/load_balancer_context_base.h" |
63 | | #include "source/common/upstream/resource_manager_impl.h" |
64 | | #include "source/common/upstream/transport_socket_match_impl.h" |
65 | | #include "source/common/upstream/upstream_factory_context_impl.h" |
66 | | #include "source/extensions/upstreams/http/config.h" |
67 | | #include "source/extensions/upstreams/tcp/config.h" |
68 | | #include "source/server/transport_socket_config_impl.h" |
69 | | |
70 | | #include "absl/container/node_hash_set.h" |
71 | | #include "absl/synchronization/mutex.h" |
72 | | |
73 | | namespace Envoy { |
74 | | namespace Upstream { |
75 | | |
76 | | // Priority levels and localities are considered overprovisioned with this factor. |
77 | | static constexpr uint32_t kDefaultOverProvisioningFactor = 140; |
78 | | |
79 | | using ClusterProto = envoy::config::cluster::v3::Cluster; |
80 | | |
81 | | using UpstreamNetworkFilterConfigProviderManager = |
82 | | Filter::FilterConfigProviderManager<Network::FilterFactoryCb, |
83 | | Server::Configuration::UpstreamFactoryContext>; |
84 | | |
85 | | class LegacyLbPolicyConfigHelper { |
86 | | public: |
87 | | struct Result { |
88 | | TypedLoadBalancerFactory* factory; |
89 | | LoadBalancerConfigPtr config; |
90 | | }; |
91 | | |
92 | | static absl::StatusOr<Result> |
93 | | getTypedLbConfigFromLegacyProtoWithoutSubset(LoadBalancerFactoryContext& lb_factory_context, |
94 | | const ClusterProto& cluster, |
95 | | ProtobufMessage::ValidationVisitor& visitor); |
96 | | |
97 | | static absl::StatusOr<Result> |
98 | | getTypedLbConfigFromLegacyProto(LoadBalancerFactoryContext& lb_factory_context, |
99 | | const ClusterProto& cluster, |
100 | | ProtobufMessage::ValidationVisitor& visitor); |
101 | | }; |
102 | | |
103 | | /** |
104 | | * Null implementation of HealthCheckHostMonitor. |
105 | | */ |
106 | | class HealthCheckHostMonitorNullImpl : public HealthCheckHostMonitor { |
107 | | public: |
108 | | // Upstream::HealthCheckHostMonitor |
109 | 0 | void setUnhealthy(UnhealthyType) override {} |
110 | | }; |
111 | | |
112 | | /** |
113 | | * Implementation of LoadMetricStats. |
114 | | */ |
115 | | class LoadMetricStatsImpl : public LoadMetricStats { |
116 | | public: |
117 | | void add(const absl::string_view key, double value) override; |
118 | | StatMapPtr latch() override; |
119 | | |
120 | | private: |
121 | | absl::Mutex mu_; |
122 | | StatMapPtr map_ ABSL_GUARDED_BY(mu_); |
123 | | }; |
124 | | |
125 | | /** |
126 | | * Null host monitor implementation. |
127 | | */ |
128 | | class DetectorHostMonitorNullImpl : public Outlier::DetectorHostMonitor { |
129 | | public: |
130 | | // Upstream::Outlier::DetectorHostMonitor |
131 | 0 | uint32_t numEjections() override { return 0; } |
132 | 1.63k | void putHttpResponseCode(uint64_t) override {} |
133 | 1.88k | void putResult(Outlier::Result, absl::optional<uint64_t>) override {} |
134 | 799 | void putResponseTime(std::chrono::milliseconds) override {} |
135 | 0 | const absl::optional<MonotonicTime>& lastEjectionTime() override { return time_; } |
136 | 0 | const absl::optional<MonotonicTime>& lastUnejectionTime() override { return time_; } |
137 | 0 | double successRate(SuccessRateMonitorType) const override { return -1; } |
138 | | |
139 | | private: |
140 | | const absl::optional<MonotonicTime> time_{}; |
141 | | }; |
142 | | |
143 | | /** |
144 | | * Base implementation of most of Upstream::HostDescription, shared between |
145 | | * HostDescriptionImpl and LogicalHost, which is in |
146 | | * source/extensions/clusters/common/logical_host.h. These differ in threading. |
147 | | * |
148 | | * HostDescriptionImpl and HostImpl are intended to be initialized in the main |
149 | | * thread, and are thereafter read-only, and thus do not require locking. |
150 | | * |
151 | | * LogicalHostImpl is intended to be dynamically changed due to DNS resolution |
152 | | * and Happy Eyeballs from multiple threads, and thus requires an address_lock |
153 | | * and lock annotations to enforce this. |
154 | | * |
155 | | * The two level implementation inheritance allows most of the implementation |
156 | | * to be shared, but sinks the ones requiring different lock semantics into |
157 | | * the leaf subclasses. |
158 | | */ |
159 | | class HostDescriptionImplBase : virtual public HostDescription, |
160 | | protected Logger::Loggable<Logger::Id::upstream> { |
161 | | public: |
162 | | HostDescriptionImplBase( |
163 | | ClusterInfoConstSharedPtr cluster, const std::string& hostname, |
164 | | Network::Address::InstanceConstSharedPtr dest_address, |
165 | | MetadataConstSharedPtr endpoint_metadata, MetadataConstSharedPtr locality_metadata, |
166 | | const envoy::config::core::v3::Locality& locality, |
167 | | const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config, |
168 | | uint32_t priority, TimeSource& time_source); |
169 | | |
170 | 48.4k | Network::UpstreamTransportSocketFactory& transportSocketFactory() const override { |
171 | 48.4k | absl::ReaderMutexLock lock(&metadata_mutex_); |
172 | 48.4k | return socket_factory_; |
173 | 48.4k | } |
174 | | |
175 | 3.37k | bool canary() const override { return canary_; } |
176 | 0 | void canary(bool is_canary) override { canary_ = is_canary; } |
177 | | |
178 | | // Metadata getter/setter. |
179 | | // |
180 | | // It's possible that the lock that guards the metadata will become highly contended (e.g.: |
181 | | // endpoints churning during a deploy of a large cluster). A possible improvement |
182 | | // would be to use TLS and post metadata updates from the main thread. This model would |
183 | | // possibly benefit other related and expensive computations too (e.g.: updating subsets). |
184 | 23.3k | MetadataConstSharedPtr metadata() const override { |
185 | 23.3k | absl::ReaderMutexLock lock(&metadata_mutex_); |
186 | 23.3k | return endpoint_metadata_; |
187 | 23.3k | } |
188 | 0 | void metadata(MetadataConstSharedPtr new_metadata) override { |
189 | 0 | auto& new_socket_factory = resolveTransportSocketFactory(address(), new_metadata.get()); |
190 | 0 | { |
191 | 0 | absl::WriterMutexLock lock(&metadata_mutex_); |
192 | 0 | endpoint_metadata_ = new_metadata; |
193 | | // Update data members dependent on metadata. |
194 | 0 | socket_factory_ = new_socket_factory; |
195 | 0 | } |
196 | 0 | } |
197 | | |
198 | 121k | const ClusterInfo& cluster() const override { return *cluster_; } |
199 | 0 | HealthCheckHostMonitor& healthChecker() const override { |
200 | 0 | if (health_checker_) { |
201 | 0 | return *health_checker_; |
202 | 0 | } |
203 | | |
204 | 0 | static HealthCheckHostMonitorNullImpl* null_health_checker = |
205 | 0 | new HealthCheckHostMonitorNullImpl(); |
206 | 0 | return *null_health_checker; |
207 | 0 | } |
208 | | |
209 | 911 | bool canCreateConnection(Upstream::ResourcePriority priority) const override { |
210 | 911 | if (stats().cx_active_.value() >= cluster().resourceManager(priority).maxConnectionsPerHost()) { |
211 | 0 | return false; |
212 | 0 | } |
213 | 911 | return cluster().resourceManager(priority).connections().canCreate(); |
214 | 911 | } |
215 | | |
216 | 4.31k | Outlier::DetectorHostMonitor& outlierDetector() const override { |
217 | 4.31k | if (outlier_detector_) { |
218 | 0 | return *outlier_detector_; |
219 | 0 | } |
220 | | |
221 | 4.31k | static DetectorHostMonitorNullImpl* null_outlier_detector = new DetectorHostMonitorNullImpl(); |
222 | 4.31k | return *null_outlier_detector; |
223 | 4.31k | } |
224 | 21.9M | HostStats& stats() const override { return stats_; } |
225 | 0 | LoadMetricStats& loadMetricStats() const override { return load_metric_stats_; } |
226 | 19.5k | const std::string& hostnameForHealthChecks() const override { return health_checks_hostname_; } |
227 | 0 | const std::string& hostname() const override { return hostname_; } |
228 | 2.40k | const envoy::config::core::v3::Locality& locality() const override { return locality_; } |
229 | 23.3k | const MetadataConstSharedPtr localityMetadata() const override { return locality_metadata_; } |
230 | 2.54k | Stats::StatName localityZoneStatName() const override { |
231 | 2.54k | return locality_zone_stat_name_.statName(); |
232 | 2.54k | } |
233 | 14 | uint32_t priority() const override { return priority_; } |
234 | 0 | void priority(uint32_t priority) override { priority_ = priority; } |
235 | | Network::UpstreamTransportSocketFactory& |
236 | | resolveTransportSocketFactory(const Network::Address::InstanceConstSharedPtr& dest_address, |
237 | | const envoy::config::core::v3::Metadata* metadata) const override; |
238 | 271M | absl::optional<MonotonicTime> lastHcPassTime() const override { return last_hc_pass_time_; } |
239 | | |
240 | 2.95k | void setHealthChecker(HealthCheckHostMonitorPtr&& health_checker) override { |
241 | 2.95k | health_checker_ = std::move(health_checker); |
242 | 2.95k | } |
243 | | |
244 | 0 | void setOutlierDetector(Outlier::DetectorHostMonitorPtr&& outlier_detector) override { |
245 | 0 | outlier_detector_ = std::move(outlier_detector); |
246 | 0 | } |
247 | | |
248 | 88.0k | void setLastHcPassTime(MonotonicTime last_hc_pass_time) override { |
249 | 88.0k | last_hc_pass_time_.emplace(std::move(last_hc_pass_time)); |
250 | 88.0k | } |
251 | | |
252 | | protected: |
253 | | /** |
254 | | * @return nullptr if address_list is empty, otherwise a shared_ptr copy of address_list. |
255 | | */ |
256 | | static SharedConstAddressVector |
257 | | makeAddressListOrNull(const Network::Address::InstanceConstSharedPtr& address, |
258 | | const AddressVector& address_list); |
259 | | |
260 | | private: |
261 | | ClusterInfoConstSharedPtr cluster_; |
262 | | const std::string hostname_; |
263 | | const std::string health_checks_hostname_; |
264 | | std::atomic<bool> canary_; |
265 | | mutable absl::Mutex metadata_mutex_; |
266 | | MetadataConstSharedPtr endpoint_metadata_ ABSL_GUARDED_BY(metadata_mutex_); |
267 | | const MetadataConstSharedPtr locality_metadata_; |
268 | | const envoy::config::core::v3::Locality locality_; |
269 | | Stats::StatNameDynamicStorage locality_zone_stat_name_; |
270 | | mutable HostStats stats_; |
271 | | mutable LoadMetricStatsImpl load_metric_stats_; |
272 | | Outlier::DetectorHostMonitorPtr outlier_detector_; |
273 | | HealthCheckHostMonitorPtr health_checker_; |
274 | | std::atomic<uint32_t> priority_; |
275 | | std::reference_wrapper<Network::UpstreamTransportSocketFactory> |
276 | | socket_factory_ ABSL_GUARDED_BY(metadata_mutex_); |
277 | | const MonotonicTime creation_time_; |
278 | | absl::optional<MonotonicTime> last_hc_pass_time_; |
279 | | }; |
280 | | |
281 | | /** |
282 | | * Final implementation of most of Upstream::HostDescription, providing const |
283 | | * of the address-related member variables. |
284 | | * |
285 | | * See also LogicalHostDescriptionImpl in |
286 | | * source/extensions/clusters/common/logical_host.h for a variant that allows |
287 | | * safe dynamic update to addresses. |
288 | | */ |
289 | | class HostDescriptionImpl : public HostDescriptionImplBase { |
290 | | public: |
291 | | HostDescriptionImpl( |
292 | | ClusterInfoConstSharedPtr cluster, const std::string& hostname, |
293 | | Network::Address::InstanceConstSharedPtr dest_address, |
294 | | MetadataConstSharedPtr endpoint_metadata, MetadataConstSharedPtr locality_metadata, |
295 | | const envoy::config::core::v3::Locality& locality, |
296 | | const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config, |
297 | | uint32_t priority, TimeSource& time_source, const AddressVector& address_list = {}); |
298 | | |
299 | | // HostDescription |
300 | 4.28k | Network::Address::InstanceConstSharedPtr address() const override { return address_; } |
301 | 23.3k | Network::Address::InstanceConstSharedPtr healthCheckAddress() const override { |
302 | 23.3k | return health_check_address_; |
303 | 23.3k | } |
304 | 911 | SharedConstAddressVector addressListOrNull() const override { return address_list_or_null_; } |
305 | | |
306 | | private: |
307 | | // No locks are required in this implementation: all address-related member |
308 | | // variables are set at construction and never change. See |
309 | | // LogicalHostDescription in source/extensions/clusters/common/logical_host.h |
310 | | // for an alternative that supports dynamic update. |
311 | | const Network::Address::InstanceConstSharedPtr address_; |
312 | | const SharedConstAddressVector address_list_or_null_; |
313 | | const Network::Address::InstanceConstSharedPtr health_check_address_; |
314 | | }; |
315 | | |
316 | | /** |
317 | | * Implementation of Upstream::Host. |
318 | | */ |
319 | | class HostImplBase : public Host, |
320 | | protected Logger::Loggable<Logger::Id::upstream>, |
321 | | public std::enable_shared_from_this<HostImplBase> { |
322 | | public: |
323 | | HostImplBase(uint32_t initial_weight, |
324 | | const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config, |
325 | | const envoy::config::core::v3::HealthStatus health_status) |
326 | 205k | : disable_active_health_check_(health_check_config.disable_active_health_check()) { |
327 | | // This EDS flags setting is still necessary for stats, configuration dump, canonical |
328 | | // coarseHealth() etc. |
329 | 205k | HostImplBase::setEdsHealthStatus(health_status); |
330 | 205k | HostImplBase::weight(initial_weight); |
331 | 205k | } |
332 | | |
333 | 2.95k | bool disableActiveHealthCheck() const override { return disable_active_health_check_; } |
334 | 0 | void setDisableActiveHealthCheck(bool disable_active_health_check) override { |
335 | 0 | disable_active_health_check_ = disable_active_health_check; |
336 | 0 | } |
337 | | |
338 | | // Upstream::Host |
339 | | std::vector<std::pair<absl::string_view, Stats::PrimitiveCounterReference>> |
340 | 0 | counters() const override { |
341 | 0 | return stats().counters(); |
342 | 0 | } |
343 | | CreateConnectionData createConnection( |
344 | | Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options, |
345 | | Network::TransportSocketOptionsConstSharedPtr transport_socket_options) const override; |
346 | | CreateConnectionData createHealthCheckConnection( |
347 | | Event::Dispatcher& dispatcher, |
348 | | Network::TransportSocketOptionsConstSharedPtr transport_socket_options, |
349 | | const envoy::config::core::v3::Metadata* metadata) const override; |
350 | | |
351 | | std::vector<std::pair<absl::string_view, Stats::PrimitiveGaugeReference>> |
352 | 0 | gauges() const override { |
353 | 0 | return stats().gauges(); |
354 | 0 | } |
355 | 1.29M | void healthFlagClear(HealthFlag flag) override { health_flags_ &= ~enumToInt(flag); } |
356 | 74.1k | bool healthFlagGet(HealthFlag flag) const override { return health_flags_ & enumToInt(flag); } |
357 | 657k | void healthFlagSet(HealthFlag flag) final { health_flags_ |= enumToInt(flag); } |
358 | 0 | uint32_t healthFlagsGetAll() const override { return health_flags_; } |
359 | 0 | void healthFlagsSetAll(uint32_t bits) override { health_flags_ |= bits; } |
360 | | |
361 | 0 | Host::HealthStatus healthStatus() const override { |
362 | | // Evaluate active health status first. |
363 | | |
364 | | // Active unhealthy. |
365 | 0 | if (healthFlagsGet(enumToInt(HealthFlag::FAILED_ACTIVE_HC) | |
366 | 0 | enumToInt(HealthFlag::FAILED_OUTLIER_CHECK))) { |
367 | 0 | return HealthStatus::UNHEALTHY; |
368 | 0 | } |
369 | | |
370 | | // Eds unhealthy. |
371 | 0 | if (eds_health_status_ == envoy::config::core::v3::UNHEALTHY || |
372 | 0 | eds_health_status_ == envoy::config::core::v3::DRAINING || |
373 | 0 | eds_health_status_ == envoy::config::core::v3::TIMEOUT) { |
374 | 0 | return eds_health_status_; |
375 | 0 | } |
376 | | |
377 | | // Active degraded. |
378 | 0 | if (healthFlagGet(HealthFlag::DEGRADED_ACTIVE_HC)) { |
379 | 0 | return HealthStatus::DEGRADED; |
380 | 0 | } |
381 | | |
382 | | // Eds degraded or healthy. |
383 | 0 | return eds_health_status_; |
384 | 0 | } |
385 | | |
386 | 133M | Host::Health coarseHealth() const override { |
387 | | // If any of the unhealthy flags are set, host is unhealthy. |
388 | 133M | if (healthFlagsGet(enumToInt(HealthFlag::FAILED_ACTIVE_HC) | |
389 | 133M | enumToInt(HealthFlag::FAILED_OUTLIER_CHECK) | |
390 | 133M | enumToInt(HealthFlag::FAILED_EDS_HEALTH) | |
391 | 133M | enumToInt(HealthFlag::EDS_STATUS_DRAINING))) { |
392 | 882k | return Host::Health::Unhealthy; |
393 | 882k | } |
394 | | |
395 | | // If any of the degraded flags are set, host is degraded. |
396 | 132M | if (healthFlagsGet(enumToInt(HealthFlag::DEGRADED_ACTIVE_HC) | |
397 | 132M | enumToInt(HealthFlag::DEGRADED_EDS_HEALTH))) { |
398 | 4.40M | return Host::Health::Degraded; |
399 | 4.40M | } |
400 | | |
401 | | // The host must have no flags or be pending removal. |
402 | 128M | ASSERT(health_flags_ == 0 || healthFlagGet(HealthFlag::PENDING_DYNAMIC_REMOVAL)); |
403 | 128M | return Host::Health::Healthy; |
404 | 128M | } |
405 | | |
406 | 205k | void setEdsHealthStatus(envoy::config::core::v3::HealthStatus eds_health_status) override { |
407 | 205k | eds_health_status_ = eds_health_status; |
408 | 205k | setEdsHealthFlag(eds_health_status); |
409 | 205k | } |
410 | 0 | Host::HealthStatus edsHealthStatus() const override { |
411 | 0 | return Host::HealthStatus(eds_health_status_.load()); |
412 | 0 | } |
413 | | |
414 | 157M | uint32_t weight() const override { return weight_; } |
415 | | void weight(uint32_t new_weight) override; |
416 | 0 | bool used() const override { return handle_count_ > 0; } |
417 | 910 | HostHandlePtr acquireHandle() const override { |
418 | 910 | return std::make_unique<HostHandleImpl>(shared_from_this()); |
419 | 910 | } |
420 | | |
421 | 0 | void setLbPolicyData(HostLbPolicyDataPtr lb_policy_data) override { |
422 | 0 | lb_policy_data_ = std::move(lb_policy_data); |
423 | 0 | } |
424 | 0 | const HostLbPolicyDataPtr& lbPolicyData() const override { return lb_policy_data_; } |
425 | | |
426 | | protected: |
427 | | static CreateConnectionData |
428 | | createConnection(Event::Dispatcher& dispatcher, const ClusterInfo& cluster, |
429 | | const Network::Address::InstanceConstSharedPtr& address, |
430 | | const SharedConstAddressVector& address_list, |
431 | | Network::UpstreamTransportSocketFactory& socket_factory, |
432 | | const Network::ConnectionSocket::OptionsSharedPtr& options, |
433 | | Network::TransportSocketOptionsConstSharedPtr transport_socket_options, |
434 | | HostDescriptionConstSharedPtr host); |
435 | | static absl::optional<Network::Address::InstanceConstSharedPtr> maybeGetProxyRedirectAddress( |
436 | | const Network::TransportSocketOptionsConstSharedPtr transport_socket_options, |
437 | | HostDescriptionConstSharedPtr host); |
438 | | |
439 | | private: |
440 | | // Helper function to check multiple health flags at once. |
441 | 265M | bool healthFlagsGet(uint32_t flags) const { return health_flags_ & flags; } |
442 | | |
443 | | void setEdsHealthFlag(envoy::config::core::v3::HealthStatus health_status); |
444 | | |
445 | | std::atomic<uint32_t> health_flags_{}; |
446 | | std::atomic<uint32_t> weight_; |
447 | | bool disable_active_health_check_; |
448 | | // TODO(wbpcode): should we store the EDS health status to health_flags_ to get unified status or |
449 | | // flag access? May be we could refactor HealthFlag to contain all these statuses and flags in the |
450 | | // future. |
451 | | std::atomic<Host::HealthStatus> eds_health_status_{}; |
452 | | HostLbPolicyDataPtr lb_policy_data_; |
453 | | |
454 | | struct HostHandleImpl : HostHandle { |
455 | 910 | HostHandleImpl(const std::shared_ptr<const HostImplBase>& parent) : parent_(parent) { |
456 | 910 | parent->handle_count_++; |
457 | 910 | } |
458 | 910 | ~HostHandleImpl() override { |
459 | 910 | if (const auto host = parent_.lock()) { |
460 | 910 | ASSERT(host->handle_count_ > 0); |
461 | 910 | host->handle_count_--; |
462 | 910 | } |
463 | 910 | } |
464 | | const std::weak_ptr<const HostImplBase> parent_; |
465 | | }; |
466 | | mutable std::atomic<uint32_t> handle_count_{}; |
467 | | }; |
468 | | |
469 | | class HostImpl : public HostImplBase, public HostDescriptionImpl { |
470 | | public: |
471 | | HostImpl(ClusterInfoConstSharedPtr cluster, const std::string& hostname, |
472 | | Network::Address::InstanceConstSharedPtr address, |
473 | | MetadataConstSharedPtr endpoint_metadata, MetadataConstSharedPtr locality_metadata, |
474 | | uint32_t initial_weight, const envoy::config::core::v3::Locality& locality, |
475 | | const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config, |
476 | | uint32_t priority, const envoy::config::core::v3::HealthStatus health_status, |
477 | | TimeSource& time_source, const AddressVector& address_list = {}) |
478 | | : HostImplBase(initial_weight, health_check_config, health_status), |
479 | | HostDescriptionImpl(cluster, hostname, address, endpoint_metadata, locality_metadata, |
480 | 205k | locality, health_check_config, priority, time_source, address_list) {} Unexecuted instantiation: Envoy::Upstream::HostImpl::HostImpl(std::__1::shared_ptr<Envoy::Upstream::ClusterInfo const>, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<Envoy::Network::Address::Instance const>, std::__1::shared_ptr<envoy::config::core::v3::Metadata const>, std::__1::shared_ptr<envoy::config::core::v3::Metadata const>, unsigned int, envoy::config::core::v3::Locality const&, envoy::config::endpoint::v3::Endpoint_HealthCheckConfig const&, unsigned int, envoy::config::core::v3::HealthStatus, Envoy::TimeSource&, std::__1::vector<std::__1::shared_ptr<Envoy::Network::Address::Instance const>, std::__1::allocator<std::__1::shared_ptr<Envoy::Network::Address::Instance const> > > const&) Envoy::Upstream::HostImpl::HostImpl(std::__1::shared_ptr<Envoy::Upstream::ClusterInfo const>, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&, std::__1::shared_ptr<Envoy::Network::Address::Instance const>, std::__1::shared_ptr<envoy::config::core::v3::Metadata const>, std::__1::shared_ptr<envoy::config::core::v3::Metadata const>, unsigned int, envoy::config::core::v3::Locality const&, envoy::config::endpoint::v3::Endpoint_HealthCheckConfig const&, unsigned int, envoy::config::core::v3::HealthStatus, Envoy::TimeSource&, std::__1::vector<std::__1::shared_ptr<Envoy::Network::Address::Instance const>, std::__1::allocator<std::__1::shared_ptr<Envoy::Network::Address::Instance const> > > const&) Line | Count | Source | 480 | 205k | locality, health_check_config, priority, time_source, address_list) {} |
|
481 | | }; |
482 | | |
483 | | class HostsPerLocalityImpl : public HostsPerLocality { |
484 | | public: |
485 | 218k | HostsPerLocalityImpl() : HostsPerLocalityImpl(std::vector<HostVector>(), false) {} |
486 | | |
487 | | // Single locality constructor |
488 | | // |
489 | | // Parameter requirements: |
490 | | // 1. All entries in hosts must have the same locality. |
491 | | // 2. If has_local_locality is true, then the locality of all entries in hosts |
492 | | // must be equal to the current envoy's locality. |
493 | | HostsPerLocalityImpl(const HostVector& hosts, bool has_local_locality = false) |
494 | 0 | : HostsPerLocalityImpl(std::vector<HostVector>({hosts}), has_local_locality) {} |
495 | | |
496 | | // Multiple localities constructor |
497 | | // |
498 | | // locality_hosts must adhere to the following ordering constraints: |
499 | | // 1. All hosts within a single HostVector bucket must have the same locality |
500 | | // 2. No hosts in different HostVector buckets can have the same locality |
501 | | // 3. If has_local_locality is true, then the locality of all hosts in the first HostVector bucket |
502 | | // must be equal to the current envoy's locality. |
503 | | // 4. All non-local HostVector buckets must be sorted in ascending order by the LocalityLess |
504 | | // comparator |
505 | | HostsPerLocalityImpl(std::vector<HostVector>&& locality_hosts, bool has_local_locality) |
506 | 259k | : local_(has_local_locality), hosts_per_locality_(std::move(locality_hosts)) { |
507 | 259k | ASSERT(!has_local_locality || !hosts_per_locality_.empty()); |
508 | 259k | } |
509 | | |
510 | 0 | bool hasLocalLocality() const override { return local_; } |
511 | 210k | const std::vector<HostVector>& get() const override { return hosts_per_locality_; } |
512 | | std::vector<HostsPerLocalityConstSharedPtr> |
513 | | filter(const std::vector<std::function<bool(const Host&)>>& predicate) const override; |
514 | | |
515 | | // The const shared pointer for the empty HostsPerLocalityImpl. |
516 | 29.2k | static HostsPerLocalityConstSharedPtr empty() { |
517 | 29.2k | static HostsPerLocalityConstSharedPtr empty = std::make_shared<HostsPerLocalityImpl>(); |
518 | 29.2k | return empty; |
519 | 29.2k | } |
520 | | |
521 | | private: |
522 | | // Does an entry exist for the local locality? |
523 | | bool local_{}; |
524 | | // The first entry is for local hosts in the local locality. |
525 | | std::vector<HostVector> hosts_per_locality_; |
526 | | }; |
527 | | |
528 | | /** |
529 | | * A class for management of the set of hosts for a given priority level. |
530 | | */ |
531 | | class HostSetImpl : public HostSet { |
532 | | public: |
533 | | HostSetImpl(uint32_t priority, absl::optional<bool> weighted_priority_health, |
534 | | absl::optional<uint32_t> overprovisioning_factor) |
535 | | : priority_(priority), overprovisioning_factor_(overprovisioning_factor.has_value() |
536 | | ? overprovisioning_factor.value() |
537 | | : kDefaultOverProvisioningFactor), |
538 | | weighted_priority_health_(weighted_priority_health.value_or(false)), |
539 | | hosts_(new HostVector()), healthy_hosts_(new HealthyHostVector()), |
540 | 7.31k | degraded_hosts_(new DegradedHostVector()), excluded_hosts_(new ExcludedHostVector()) {} |
541 | | |
542 | | /** |
543 | | * Install a callback that will be invoked when the host set membership changes. |
544 | | * @param callback supplies the callback to invoke. |
545 | | * @return Common::CallbackHandlePtr the callback handle. |
546 | | */ |
547 | | ABSL_MUST_USE_RESULT Common::CallbackHandlePtr |
548 | 7.31k | addPriorityUpdateCb(PrioritySet::PriorityUpdateCb callback) const { |
549 | 7.31k | return member_update_cb_helper_.add(callback); |
550 | 7.31k | } |
551 | | |
552 | | // Upstream::HostSet |
553 | 40.9k | const HostVector& hosts() const override { return *hosts_; } |
554 | 2.40k | HostVectorConstSharedPtr hostsPtr() const override { return hosts_; } |
555 | 23.4k | const HostVector& healthyHosts() const override { return healthy_hosts_->get(); } |
556 | 2.40k | HealthyHostVectorConstSharedPtr healthyHostsPtr() const override { return healthy_hosts_; } |
557 | 16.8k | const HostVector& degradedHosts() const override { return degraded_hosts_->get(); } |
558 | 2.40k | DegradedHostVectorConstSharedPtr degradedHostsPtr() const override { return degraded_hosts_; } |
559 | 16.8k | const HostVector& excludedHosts() const override { return excluded_hosts_->get(); } |
560 | 2.40k | ExcludedHostVectorConstSharedPtr excludedHostsPtr() const override { return excluded_hosts_; } |
561 | 2.89k | const HostsPerLocality& hostsPerLocality() const override { return *hosts_per_locality_; } |
562 | 2.40k | HostsPerLocalityConstSharedPtr hostsPerLocalityPtr() const override { |
563 | 2.40k | return hosts_per_locality_; |
564 | 2.40k | } |
565 | 19.2k | const HostsPerLocality& healthyHostsPerLocality() const override { |
566 | 19.2k | return *healthy_hosts_per_locality_; |
567 | 19.2k | } |
568 | 2.40k | HostsPerLocalityConstSharedPtr healthyHostsPerLocalityPtr() const override { |
569 | 2.40k | return healthy_hosts_per_locality_; |
570 | 2.40k | } |
571 | 19.2k | const HostsPerLocality& degradedHostsPerLocality() const override { |
572 | 19.2k | return *degraded_hosts_per_locality_; |
573 | 19.2k | } |
574 | 2.40k | HostsPerLocalityConstSharedPtr degradedHostsPerLocalityPtr() const override { |
575 | 2.40k | return degraded_hosts_per_locality_; |
576 | 2.40k | } |
577 | 0 | const HostsPerLocality& excludedHostsPerLocality() const override { |
578 | 0 | return *excluded_hosts_per_locality_; |
579 | 0 | } |
580 | 2.40k | HostsPerLocalityConstSharedPtr excludedHostsPerLocalityPtr() const override { |
581 | 2.40k | return excluded_hosts_per_locality_; |
582 | 2.40k | } |
583 | 2.40k | LocalityWeightsConstSharedPtr localityWeights() const override { return locality_weights_; } |
584 | | absl::optional<uint32_t> chooseHealthyLocality() override; |
585 | | absl::optional<uint32_t> chooseDegradedLocality() override; |
586 | 10.8k | uint32_t priority() const override { return priority_; } |
587 | 12.0k | uint32_t overprovisioningFactor() const override { return overprovisioning_factor_; } |
588 | 7.22k | bool weightedPriorityHealth() const override { return weighted_priority_health_; } |
589 | | |
590 | | static PrioritySet::UpdateHostsParams |
591 | | updateHostsParams(HostVectorConstSharedPtr hosts, |
592 | | HostsPerLocalityConstSharedPtr hosts_per_locality, |
593 | | HealthyHostVectorConstSharedPtr healthy_hosts, |
594 | | HostsPerLocalityConstSharedPtr healthy_hosts_per_locality, |
595 | | DegradedHostVectorConstSharedPtr degraded_hosts, |
596 | | HostsPerLocalityConstSharedPtr degraded_hosts_per_locality, |
597 | | ExcludedHostVectorConstSharedPtr excluded_hosts, |
598 | | HostsPerLocalityConstSharedPtr excluded_hosts_per_locality); |
599 | | static PrioritySet::UpdateHostsParams updateHostsParams(const HostSet& host_set); |
600 | | static PrioritySet::UpdateHostsParams |
601 | | partitionHosts(HostVectorConstSharedPtr hosts, HostsPerLocalityConstSharedPtr hosts_per_locality); |
602 | | |
603 | | void updateHosts(PrioritySet::UpdateHostsParams&& update_hosts_params, |
604 | | LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, |
605 | | const HostVector& hosts_removed, uint64_t seed, |
606 | | absl::optional<bool> weighted_priority_health = absl::nullopt, |
607 | | absl::optional<uint32_t> overprovisioning_factor = absl::nullopt); |
608 | | |
609 | | protected: |
610 | 8.76k | virtual void runUpdateCallbacks(const HostVector& hosts_added, const HostVector& hosts_removed) { |
611 | 8.76k | THROW_IF_NOT_OK(member_update_cb_helper_.runCallbacks(priority_, hosts_added, hosts_removed)); |
612 | 8.76k | } |
613 | | |
614 | | private: |
615 | | // Weight for a locality taking into account health status using the provided eligible hosts per |
616 | | // locality. |
617 | | static double effectiveLocalityWeight(uint32_t index, |
618 | | const HostsPerLocality& eligible_hosts_per_locality, |
619 | | const HostsPerLocality& excluded_hosts_per_locality, |
620 | | const HostsPerLocality& all_hosts_per_locality, |
621 | | const LocalityWeights& locality_weights, |
622 | | uint32_t overprovisioning_factor); |
623 | | |
624 | | uint32_t priority_; |
625 | | uint32_t overprovisioning_factor_; |
626 | | bool weighted_priority_health_; |
627 | | HostVectorConstSharedPtr hosts_; |
628 | | HealthyHostVectorConstSharedPtr healthy_hosts_; |
629 | | DegradedHostVectorConstSharedPtr degraded_hosts_; |
630 | | ExcludedHostVectorConstSharedPtr excluded_hosts_; |
631 | | HostsPerLocalityConstSharedPtr hosts_per_locality_{HostsPerLocalityImpl::empty()}; |
632 | | HostsPerLocalityConstSharedPtr healthy_hosts_per_locality_{HostsPerLocalityImpl::empty()}; |
633 | | HostsPerLocalityConstSharedPtr degraded_hosts_per_locality_{HostsPerLocalityImpl::empty()}; |
634 | | HostsPerLocalityConstSharedPtr excluded_hosts_per_locality_{HostsPerLocalityImpl::empty()}; |
635 | | // TODO(mattklein123): Remove mutable. |
636 | | mutable Common::CallbackManager<uint32_t, const HostVector&, const HostVector&> |
637 | | member_update_cb_helper_; |
638 | | // Locality weights (used to build WRR locality_scheduler_); |
639 | | LocalityWeightsConstSharedPtr locality_weights_; |
640 | | // WRR locality scheduler state. |
641 | | struct LocalityEntry { |
642 | | LocalityEntry(uint32_t index, double effective_weight) |
643 | 0 | : index_(index), effective_weight_(effective_weight) {} |
644 | | const uint32_t index_; |
645 | | const double effective_weight_; |
646 | | }; |
647 | | |
648 | | // Rebuilds the provided locality scheduler with locality entries based on the locality weights |
649 | | // and eligible hosts. |
650 | | // |
651 | | // @param locality_scheduler the locality scheduler to rebuild. Will be set to nullptr if no |
652 | | // localities are eligible. |
653 | | // @param locality_entries the vector that holds locality entries. Will be reset and populated |
654 | | // with entries corresponding to the new scheduler. |
655 | | // @param eligible_hosts_per_locality eligible hosts for this scheduler grouped by locality. |
656 | | // @param eligible_hosts all eligible hosts for this scheduler. |
657 | | // @param all_hosts_per_locality all hosts for this HostSet grouped by locality. |
658 | | // @param locality_weights the weighting of each locality. |
659 | | // @param overprovisioning_factor the overprovisioning factor to use when computing the effective |
660 | | // weight of a locality. |
661 | | // @param seed a random number of initial picks to "invoke" on the locality scheduler. This |
662 | | // allows to distribute the load between different localities across worker threads and a fleet |
663 | | // of Envoys. |
664 | | static void |
665 | | rebuildLocalityScheduler(std::unique_ptr<EdfScheduler<LocalityEntry>>& locality_scheduler, |
666 | | std::vector<std::shared_ptr<LocalityEntry>>& locality_entries, |
667 | | const HostsPerLocality& eligible_hosts_per_locality, |
668 | | const HostVector& eligible_hosts, |
669 | | HostsPerLocalityConstSharedPtr all_hosts_per_locality, |
670 | | HostsPerLocalityConstSharedPtr excluded_hosts_per_locality, |
671 | | LocalityWeightsConstSharedPtr locality_weights, |
672 | | uint32_t overprovisioning_factor, uint64_t seed); |
673 | | |
674 | | static absl::optional<uint32_t> chooseLocality(EdfScheduler<LocalityEntry>* locality_scheduler); |
675 | | |
676 | | std::vector<std::shared_ptr<LocalityEntry>> healthy_locality_entries_; |
677 | | std::unique_ptr<EdfScheduler<LocalityEntry>> healthy_locality_scheduler_; |
678 | | std::vector<std::shared_ptr<LocalityEntry>> degraded_locality_entries_; |
679 | | std::unique_ptr<EdfScheduler<LocalityEntry>> degraded_locality_scheduler_; |
680 | | }; |
681 | | |
682 | | using HostSetImplPtr = std::unique_ptr<HostSetImpl>; |
683 | | |
684 | | /** |
685 | | * A class for management of the set of hosts in a given cluster. |
686 | | */ |
687 | | class PrioritySetImpl : public PrioritySet { |
688 | | public: |
689 | 7.31k | PrioritySetImpl() : batch_update_(false) {} |
690 | | // From PrioritySet |
691 | | ABSL_MUST_USE_RESULT Common::CallbackHandlePtr |
692 | 7.22k | addMemberUpdateCb(MemberUpdateCb callback) const override { |
693 | 7.22k | return member_update_cb_helper_.add(callback); |
694 | 7.22k | } |
695 | | ABSL_MUST_USE_RESULT Common::CallbackHandlePtr |
696 | 19.3k | addPriorityUpdateCb(PriorityUpdateCb callback) const override { |
697 | 19.3k | return priority_update_cb_helper_.add(callback); |
698 | 19.3k | } |
699 | 139k | const std::vector<std::unique_ptr<HostSet>>& hostSetsPerPriority() const override { |
700 | 139k | return host_sets_; |
701 | 139k | } |
702 | | // Get the host set for this priority level, creating it if necessary. |
703 | | const HostSet& |
704 | | getOrCreateHostSet(uint32_t priority, |
705 | | absl::optional<bool> weighted_priority_health = absl::nullopt, |
706 | | absl::optional<uint32_t> overprovisioning_factor = absl::nullopt); |
707 | | |
708 | | void updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params, |
709 | | LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, |
710 | | const HostVector& hosts_removed, uint64_t seed, |
711 | | absl::optional<bool> weighted_priority_health = absl::nullopt, |
712 | | absl::optional<uint32_t> overprovisioning_factor = absl::nullopt, |
713 | | HostMapConstSharedPtr cross_priority_host_map = nullptr) override; |
714 | | |
715 | | void batchHostUpdate(BatchUpdateCb& callback) override; |
716 | | |
717 | 1.81k | HostMapConstSharedPtr crossPriorityHostMap() const override { |
718 | 1.81k | return const_cross_priority_host_map_; |
719 | 1.81k | } |
720 | | |
721 | | protected: |
722 | | // Allows subclasses of PrioritySetImpl to create their own type of HostSetImpl. |
723 | | virtual HostSetImplPtr createHostSet(uint32_t priority, |
724 | | absl::optional<bool> weighted_priority_health, |
725 | 7.31k | absl::optional<uint32_t> overprovisioning_factor) { |
726 | 7.31k | return std::make_unique<HostSetImpl>(priority, weighted_priority_health, |
727 | 7.31k | overprovisioning_factor); |
728 | 7.31k | } |
729 | | |
730 | 8.76k | virtual void runUpdateCallbacks(const HostVector& hosts_added, const HostVector& hosts_removed) { |
731 | 8.76k | THROW_IF_NOT_OK(member_update_cb_helper_.runCallbacks(hosts_added, hosts_removed)); |
732 | 8.76k | } |
733 | | virtual void runReferenceUpdateCallbacks(uint32_t priority, const HostVector& hosts_added, |
734 | 8.76k | const HostVector& hosts_removed) { |
735 | 8.76k | THROW_IF_NOT_OK(priority_update_cb_helper_.runCallbacks(priority, hosts_added, hosts_removed)); |
736 | 8.76k | } |
737 | | // This vector will generally have at least one member, for priority level 0. |
738 | | // It will expand as host sets are added but currently does not shrink to |
739 | | // avoid any potential lifetime issues. |
740 | | std::vector<std::unique_ptr<HostSet>> host_sets_; |
741 | | |
742 | | // Read only all host map for fast host searching. This will never be null. |
743 | | mutable HostMapConstSharedPtr const_cross_priority_host_map_{std::make_shared<HostMap>()}; |
744 | | |
745 | | private: |
746 | | // This is a matching vector to store the callback handles for host_sets_. It is kept separately |
747 | | // because host_sets_ is directly returned so we avoid translation. |
748 | | std::vector<Common::CallbackHandlePtr> host_sets_priority_update_cbs_; |
749 | | // TODO(mattklein123): Remove mutable. |
750 | | mutable Common::CallbackManager<const HostVector&, const HostVector&> member_update_cb_helper_; |
751 | | mutable Common::CallbackManager<uint32_t, const HostVector&, const HostVector&> |
752 | | priority_update_cb_helper_; |
753 | | bool batch_update_ : 1; |
754 | | |
755 | | // Helper class to maintain state as we perform multiple host updates. Keeps track of all hosts |
756 | | // that have been added/removed throughout the batch update, and ensures that we properly manage |
757 | | // the batch_update_ flag. |
758 | | class BatchUpdateScope : public HostUpdateCb { |
759 | | public: |
760 | 14 | explicit BatchUpdateScope(PrioritySetImpl& parent) : parent_(parent) { |
761 | 14 | ASSERT(!parent_.batch_update_); |
762 | 14 | parent_.batch_update_ = true; |
763 | 14 | } |
764 | 14 | ~BatchUpdateScope() override { parent_.batch_update_ = false; } |
765 | | |
766 | | void updateHosts(uint32_t priority, PrioritySet::UpdateHostsParams&& update_hosts_params, |
767 | | LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, |
768 | | const HostVector& hosts_removed, uint64_t seed, |
769 | | absl::optional<bool> weighted_priority_health, |
770 | | absl::optional<uint32_t> overprovisioning_factor) override; |
771 | | |
772 | | absl::node_hash_set<HostSharedPtr> all_hosts_added_; |
773 | | absl::node_hash_set<HostSharedPtr> all_hosts_removed_; |
774 | | |
775 | | private: |
776 | | PrioritySetImpl& parent_; |
777 | | absl::node_hash_set<uint32_t> priorities_; |
778 | | }; |
779 | | }; |
780 | | |
781 | | /** |
782 | | * Specialized PrioritySetImpl designed for the main thread. It will update and maintain the read |
783 | | * only cross priority host map when the host set changes. |
784 | | */ |
785 | | class MainPrioritySetImpl : public PrioritySetImpl, public Logger::Loggable<Logger::Id::upstream> { |
786 | | public: |
787 | | // PrioritySet |
788 | | void updateHosts(uint32_t priority, UpdateHostsParams&& update_hosts_params, |
789 | | LocalityWeightsConstSharedPtr locality_weights, const HostVector& hosts_added, |
790 | | const HostVector& hosts_removed, uint64_t seed, |
791 | | absl::optional<bool> weighted_priority_health = absl::nullopt, |
792 | | absl::optional<uint32_t> overprovisioning_factor = absl::nullopt, |
793 | | HostMapConstSharedPtr cross_priority_host_map = nullptr) override; |
794 | | HostMapConstSharedPtr crossPriorityHostMap() const override; |
795 | | |
796 | | protected: |
797 | | void updateCrossPriorityHostMap(uint32_t priority, const HostVector& hosts_added, |
798 | | const HostVector& hosts_removed); |
799 | | |
800 | | mutable HostMapSharedPtr mutable_cross_priority_host_map_; |
801 | | }; |
802 | | |
803 | | /** |
804 | | * Implementation of ClusterInfo that reads from JSON. |
805 | | */ |
806 | | class ClusterInfoImpl : public ClusterInfo, |
807 | | public Event::DispatcherThreadDeletable, |
808 | | protected Logger::Loggable<Logger::Id::upstream> { |
809 | | public: |
810 | | using HttpProtocolOptionsConfigImpl = |
811 | | Envoy::Extensions::Upstreams::Http::ProtocolOptionsConfigImpl; |
812 | | using TcpProtocolOptionsConfigImpl = Envoy::Extensions::Upstreams::Tcp::ProtocolOptionsConfigImpl; |
813 | | ClusterInfoImpl(Init::Manager& info, Server::Configuration::ServerFactoryContext& server_context, |
814 | | const envoy::config::cluster::v3::Cluster& config, |
815 | | const absl::optional<envoy::config::core::v3::BindConfig>& bind_config, |
816 | | Runtime::Loader& runtime, TransportSocketMatcherPtr&& socket_matcher, |
817 | | Stats::ScopeSharedPtr&& stats_scope, bool added_via_api, |
818 | | Server::Configuration::TransportSocketFactoryContext&); |
819 | | |
820 | | static DeferredCreationCompatibleClusterTrafficStats |
821 | | generateStats(Stats::ScopeSharedPtr scope, const ClusterTrafficStatNames& cluster_stat_names, |
822 | | bool defer_creation); |
823 | | static ClusterLoadReportStats |
824 | | generateLoadReportStats(Stats::Scope& scope, const ClusterLoadReportStatNames& stat_names); |
825 | | static ClusterCircuitBreakersStats |
826 | | generateCircuitBreakersStats(Stats::Scope& scope, Stats::StatName prefix, bool track_remaining, |
827 | | const ClusterCircuitBreakersStatNames& stat_names); |
828 | | static ClusterRequestResponseSizeStats |
829 | | generateRequestResponseSizeStats(Stats::Scope&, |
830 | | const ClusterRequestResponseSizeStatNames& stat_names); |
831 | | static ClusterTimeoutBudgetStats |
832 | | generateTimeoutBudgetStats(Stats::Scope&, const ClusterTimeoutBudgetStatNames& stat_names); |
833 | | |
834 | | // Upstream::ClusterInfo |
835 | 4.79k | bool addedViaApi() const override { return added_via_api_; } |
836 | 2.40k | OptRef<const LoadBalancerConfig> loadBalancerConfig() const override { |
837 | 2.40k | return makeOptRefFromPtr<const LoadBalancerConfig>(load_balancer_config_.get()); |
838 | 2.40k | } |
839 | 2.40k | TypedLoadBalancerFactory& loadBalancerFactory() const override { |
840 | 2.40k | ASSERT(load_balancer_factory_ != nullptr, "null load balancer factory"); |
841 | 2.40k | return *load_balancer_factory_; |
842 | 2.40k | } |
843 | 9.64k | const envoy::config::cluster::v3::Cluster::CommonLbConfig& lbConfig() const override { |
844 | 9.64k | return *common_lb_config_; |
845 | 9.64k | } |
846 | 911 | std::chrono::milliseconds connectTimeout() const override { return connect_timeout_; } |
847 | | |
848 | | // `OptionalTimeouts` manages various `optional` values. We pack them in a separate data |
849 | | // structure for memory efficiency -- avoiding overhead of `absl::optional` per variable, and |
850 | | // avoiding overhead of storing unset timeouts. |
851 | | enum class OptionalTimeoutNames { IdleTimeout = 0, TcpPoolIdleTimeout, MaxConnectionDuration }; |
852 | | using OptionalTimeouts = PackedStruct<std::chrono::milliseconds, 3, OptionalTimeoutNames>; |
853 | | |
854 | 911 | const absl::optional<std::chrono::milliseconds> idleTimeout() const override { |
855 | 911 | auto timeout = optional_timeouts_.get<OptionalTimeoutNames::IdleTimeout>(); |
856 | 911 | if (timeout.has_value()) { |
857 | 911 | return *timeout; |
858 | 911 | } |
859 | 0 | return absl::nullopt; |
860 | 911 | } |
861 | 0 | const absl::optional<std::chrono::milliseconds> tcpPoolIdleTimeout() const override { |
862 | 0 | auto timeout = optional_timeouts_.get<OptionalTimeoutNames::TcpPoolIdleTimeout>(); |
863 | 0 | if (timeout.has_value()) { |
864 | 0 | return *timeout; |
865 | 0 | } |
866 | 0 | return absl::nullopt; |
867 | 0 | } |
868 | 898 | const absl::optional<std::chrono::milliseconds> maxConnectionDuration() const override { |
869 | 898 | auto timeout = optional_timeouts_.get<OptionalTimeoutNames::MaxConnectionDuration>(); |
870 | 898 | if (timeout.has_value()) { |
871 | 0 | return *timeout; |
872 | 0 | } |
873 | 898 | return absl::nullopt; |
874 | 898 | } |
875 | | |
876 | 2.73k | float perUpstreamPreconnectRatio() const override { return per_upstream_preconnect_ratio_; } |
877 | 1.81k | float peekaheadRatio() const override { return peekahead_ratio_; } |
878 | 911 | uint32_t perConnectionBufferLimitBytes() const override { |
879 | 911 | return per_connection_buffer_limit_bytes_; |
880 | 911 | } |
881 | 6.62k | uint64_t features() const override { return features_; } |
882 | 30 | const Http::Http1Settings& http1Settings() const override { |
883 | 30 | return http_protocol_options_->http1_settings_; |
884 | 30 | } |
885 | 2.64k | const envoy::config::core::v3::Http2ProtocolOptions& http2Options() const override { |
886 | 2.64k | return http_protocol_options_->http2_options_; |
887 | 2.64k | } |
888 | 0 | const envoy::config::core::v3::Http3ProtocolOptions& http3Options() const override { |
889 | 0 | return http_protocol_options_->http3_options_; |
890 | 0 | } |
891 | 1.76k | const envoy::config::core::v3::HttpProtocolOptions& commonHttpProtocolOptions() const override { |
892 | 1.76k | return http_protocol_options_->common_http_protocol_options_; |
893 | 1.76k | } |
894 | | absl::Status configureLbPolicies(const envoy::config::cluster::v3::Cluster& config, |
895 | | Server::Configuration::ServerFactoryContext& context); |
896 | | ProtocolOptionsConfigConstSharedPtr |
897 | | extensionProtocolOptions(const std::string& name) const override; |
898 | 0 | envoy::config::cluster::v3::Cluster::DiscoveryType type() const override { return type_; } |
899 | | |
900 | | OptRef<const envoy::config::cluster::v3::Cluster::CustomClusterType> |
901 | 0 | clusterType() const override { |
902 | 0 | if (cluster_type_ == nullptr) { |
903 | 0 | return absl::nullopt; |
904 | 0 | } |
905 | 0 | return *cluster_type_; |
906 | 0 | } |
907 | 1.81k | OptRef<const envoy::config::core::v3::TypedExtensionConfig> upstreamConfig() const override { |
908 | 1.81k | if (upstream_config_ == nullptr) { |
909 | 1.81k | return absl::nullopt; |
910 | 1.81k | } |
911 | 0 | return *upstream_config_; |
912 | 1.81k | } |
913 | | bool maintenanceMode() const override; |
914 | 1.79k | uint64_t maxRequestsPerConnection() const override { return max_requests_per_connection_; } |
915 | 911 | uint32_t maxResponseHeadersCount() const override { return max_response_headers_count_; } |
916 | 54.6k | const std::string& name() const override { return name_; } |
917 | 0 | const std::string& observabilityName() const override { |
918 | 0 | if (observability_name_ != nullptr) { |
919 | 0 | return *observability_name_; |
920 | 0 | } |
921 | 0 | return name_; |
922 | 0 | } |
923 | | ResourceManager& resourceManager(ResourcePriority priority) const override; |
924 | 2.40k | TransportSocketMatcher& transportSocketMatcher() const override { return *socket_matcher_; } |
925 | 15.9k | DeferredCreationCompatibleClusterTrafficStats& trafficStats() const override { |
926 | 15.9k | return traffic_stats_; |
927 | 15.9k | } |
928 | 7.22k | ClusterConfigUpdateStats& configUpdateStats() const override { return config_update_stats_; } |
929 | 4.81k | ClusterLbStats& lbStats() const override { return lb_stats_; } |
930 | 12.0k | ClusterEndpointStats& endpointStats() const override { return endpoint_stats_; } |
931 | 4.96k | Stats::Scope& statsScope() const override { return *stats_scope_; } |
932 | | |
933 | 1.81k | ClusterRequestResponseSizeStatsOptRef requestResponseSizeStats() const override { |
934 | 1.81k | if (optional_cluster_stats_ == nullptr || |
935 | 1.81k | optional_cluster_stats_->request_response_size_stats_ == nullptr) { |
936 | 1.81k | return absl::nullopt; |
937 | 1.81k | } |
938 | | |
939 | 0 | return std::ref(*(optional_cluster_stats_->request_response_size_stats_)); |
940 | 1.81k | } |
941 | | |
942 | 0 | ClusterLoadReportStats& loadReportStats() const override { return load_report_stats_; } |
943 | | |
944 | 2.71k | ClusterTimeoutBudgetStatsOptRef timeoutBudgetStats() const override { |
945 | 2.71k | if (optional_cluster_stats_ == nullptr || |
946 | 2.71k | optional_cluster_stats_->timeout_budget_stats_ == nullptr) { |
947 | 2.71k | return absl::nullopt; |
948 | 2.71k | } |
949 | | |
950 | 0 | return std::ref(*(optional_cluster_stats_->timeout_budget_stats_)); |
951 | 2.71k | } |
952 | | |
953 | 4.81k | bool perEndpointStatsEnabled() const override { return per_endpoint_stats_; } |
954 | | |
955 | 911 | UpstreamLocalAddressSelectorConstSharedPtr getUpstreamLocalAddressSelector() const override { |
956 | 911 | return upstream_local_address_selector_; |
957 | 911 | } |
958 | | using DefaultMetadata = ConstSingleton<envoy::config::core::v3::Metadata>; |
959 | 0 | const envoy::config::core::v3::Metadata& metadata() const override { |
960 | 0 | if (metadata_ != nullptr) { |
961 | 0 | return *metadata_; |
962 | 0 | } |
963 | 0 | return DefaultMetadata::get(); |
964 | 0 | } |
965 | | using ClusterTypedMetadata = Envoy::Config::TypedMetadataImpl<ClusterTypedMetadataFactory>; |
966 | 0 | const Envoy::Config::TypedMetadata& typedMetadata() const override { |
967 | 0 | if (typed_metadata_ != nullptr) { |
968 | 0 | return *typed_metadata_; |
969 | 0 | } |
970 | 0 | CONSTRUCT_ON_FIRST_USE(ClusterTypedMetadata, DefaultMetadata::get()); |
971 | 0 | } |
972 | | |
973 | 0 | bool drainConnectionsOnHostRemoval() const override { return drain_connections_on_host_removal_; } |
974 | 1.81k | bool connectionPoolPerDownstreamConnection() const override { |
975 | 1.81k | return connection_pool_per_downstream_connection_; |
976 | 1.81k | } |
977 | 0 | bool warmHosts() const override { return warm_hosts_; } |
978 | 911 | bool setLocalInterfaceNameOnUpstreamConnections() const override { |
979 | 911 | return set_local_interface_name_on_upstream_connections_; |
980 | 911 | } |
981 | | const absl::optional<envoy::config::core::v3::UpstreamHttpProtocolOptions>& |
982 | 1.81k | upstreamHttpProtocolOptions() const override { |
983 | 1.81k | return http_protocol_options_->upstream_http_protocol_options_; |
984 | 1.81k | } |
985 | | |
986 | | const absl::optional<const envoy::config::core::v3::AlternateProtocolsCacheOptions>& |
987 | 1.81k | alternateProtocolsCacheOptions() const override { |
988 | 1.81k | return http_protocol_options_->alternate_protocol_cache_options_; |
989 | 1.81k | } |
990 | | |
991 | 28 | const std::string& edsServiceName() const override { |
992 | 28 | return eds_service_name_ != nullptr ? *eds_service_name_ : EMPTY_STRING; |
993 | 28 | } |
994 | | |
995 | | void createNetworkFilterChain(Network::Connection&) const override; |
996 | | std::vector<Http::Protocol> |
997 | | upstreamHttpProtocol(absl::optional<Http::Protocol> downstream_protocol) const override; |
998 | | |
999 | | // Http::FilterChainFactory |
1000 | | bool createFilterChain(Http::FilterChainManager& manager, bool only_create_if_configured, |
1001 | 3.62k | const Http::FilterChainOptions&) const override { |
1002 | 3.62k | if (!has_configured_http_filters_ && only_create_if_configured) { |
1003 | 1.81k | return false; |
1004 | 1.81k | } |
1005 | 1.81k | Http::FilterChainUtility::createFilterChainForFactories( |
1006 | 1.81k | manager, Http::EmptyFilterChainOptions{}, http_filter_factories_); |
1007 | 1.81k | return true; |
1008 | 3.62k | } |
1009 | | bool createUpgradeFilterChain(absl::string_view, const UpgradeMap*, Http::FilterChainManager&, |
1010 | 0 | const Http::FilterChainOptions&) const override { |
1011 | | // Upgrade filter chains not yet supported for upstream HTTP filters. |
1012 | 0 | return false; |
1013 | 0 | } |
1014 | | |
1015 | | Http::Http1::CodecStats& http1CodecStats() const override; |
1016 | | Http::Http2::CodecStats& http2CodecStats() const override; |
1017 | | Http::Http3::CodecStats& http3CodecStats() const override; |
1018 | | Http::ClientHeaderValidatorPtr makeHeaderValidator(Http::Protocol protocol) const override; |
1019 | | |
1020 | | OptRef<const envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig> |
1021 | 0 | happyEyeballsConfig() const override { |
1022 | 0 | if (happy_eyeballs_config_ == nullptr) { |
1023 | 0 | return absl::nullopt; |
1024 | 0 | } |
1025 | 0 | return *happy_eyeballs_config_; |
1026 | 0 | } |
1027 | | |
1028 | 1.63k | OptRef<const Envoy::Orca::LrsReportMetricNames> lrsReportMetricNames() const override { |
1029 | 1.63k | if (lrs_report_metric_names_ == nullptr) { |
1030 | 1.63k | return absl::nullopt; |
1031 | 1.63k | } |
1032 | 0 | return *lrs_report_metric_names_; |
1033 | 1.63k | } |
1034 | | |
1035 | | protected: |
1036 | | // Gets the retry budget percent/concurrency from the circuit breaker thresholds. If the retry |
1037 | | // budget message is specified, defaults will be filled in if either params are unspecified. |
1038 | | static std::pair<absl::optional<double>, absl::optional<uint32_t>> |
1039 | | getRetryBudgetParams(const envoy::config::cluster::v3::CircuitBreakers::Thresholds& thresholds); |
1040 | | |
1041 | | private: |
1042 | | std::shared_ptr<UpstreamNetworkFilterConfigProviderManager> |
1043 | | createSingletonUpstreamNetworkFilterConfigProviderManager( |
1044 | | Server::Configuration::ServerFactoryContext& context); |
1045 | | |
1046 | | struct ResourceManagers { |
1047 | | ResourceManagers(const envoy::config::cluster::v3::Cluster& config, Runtime::Loader& runtime, |
1048 | | const std::string& cluster_name, Stats::Scope& stats_scope, |
1049 | | const ClusterCircuitBreakersStatNames& circuit_breakers_stat_names); |
1050 | | absl::StatusOr<ResourceManagerImplPtr> |
1051 | | load(const envoy::config::cluster::v3::Cluster& config, Runtime::Loader& runtime, |
1052 | | const std::string& cluster_name, Stats::Scope& stats_scope, |
1053 | | const envoy::config::core::v3::RoutingPriority& priority); |
1054 | | |
1055 | | using Managers = std::array<ResourceManagerImplPtr, NumResourcePriorities>; |
1056 | | |
1057 | | Managers managers_; |
1058 | | const ClusterCircuitBreakersStatNames& circuit_breakers_stat_names_; |
1059 | | }; |
1060 | | |
1061 | | struct OptionalClusterStats { |
1062 | | OptionalClusterStats(const envoy::config::cluster::v3::Cluster& config, |
1063 | | Stats::Scope& stats_scope, const ClusterManager& manager); |
1064 | | const ClusterTimeoutBudgetStatsPtr timeout_budget_stats_; |
1065 | | const ClusterRequestResponseSizeStatsPtr request_response_size_stats_; |
1066 | | }; |
1067 | | |
1068 | | #ifdef ENVOY_ENABLE_UHV |
1069 | | ::Envoy::Http::HeaderValidatorStats& getHeaderValidatorStats(Http::Protocol protocol) const; |
1070 | | #endif |
1071 | | |
1072 | | Runtime::Loader& runtime_; |
1073 | | const std::string name_; |
1074 | | std::unique_ptr<const std::string> observability_name_; |
1075 | | std::unique_ptr<const std::string> eds_service_name_; |
1076 | | const absl::flat_hash_map<std::string, ProtocolOptionsConfigConstSharedPtr> |
1077 | | extension_protocol_options_; |
1078 | | const std::shared_ptr<const HttpProtocolOptionsConfigImpl> http_protocol_options_; |
1079 | | const std::shared_ptr<const TcpProtocolOptionsConfigImpl> tcp_protocol_options_; |
1080 | | const uint64_t max_requests_per_connection_; |
1081 | | const std::chrono::milliseconds connect_timeout_; |
1082 | | OptionalTimeouts optional_timeouts_; |
1083 | | const float per_upstream_preconnect_ratio_; |
1084 | | const float peekahead_ratio_; |
1085 | | TransportSocketMatcherPtr socket_matcher_; |
1086 | | Stats::ScopeSharedPtr stats_scope_; |
1087 | | mutable DeferredCreationCompatibleClusterTrafficStats traffic_stats_; |
1088 | | mutable ClusterConfigUpdateStats config_update_stats_; |
1089 | | mutable ClusterLbStats lb_stats_; |
1090 | | mutable ClusterEndpointStats endpoint_stats_; |
1091 | | Stats::IsolatedStoreImpl load_report_stats_store_; |
1092 | | mutable ClusterLoadReportStats load_report_stats_; |
1093 | | const std::unique_ptr<OptionalClusterStats> optional_cluster_stats_; |
1094 | | const uint64_t features_; |
1095 | | mutable ResourceManagers resource_managers_; |
1096 | | const std::string maintenance_mode_runtime_key_; |
1097 | | UpstreamLocalAddressSelectorConstSharedPtr upstream_local_address_selector_; |
1098 | | std::unique_ptr<envoy::config::core::v3::TypedExtensionConfig> upstream_config_; |
1099 | | std::unique_ptr<const envoy::config::core::v3::Metadata> metadata_; |
1100 | | std::unique_ptr<ClusterTypedMetadata> typed_metadata_; |
1101 | | LoadBalancerConfigPtr load_balancer_config_; |
1102 | | TypedLoadBalancerFactory* load_balancer_factory_ = nullptr; |
1103 | | const std::shared_ptr<const envoy::config::cluster::v3::Cluster::CommonLbConfig> |
1104 | | common_lb_config_; |
1105 | | std::unique_ptr<const envoy::config::cluster::v3::Cluster::CustomClusterType> cluster_type_; |
1106 | | // TODO(ohadvano): http_filter_config_provider_manager_ and |
1107 | | // network_filter_config_provider_manager_ should be maintained in the ClusterManager object as |
1108 | | // a singleton. This is currently not possible due to circular dependency (filter config |
1109 | | // provider manager depends on the ClusterManager object). The circular dependency can be |
1110 | | // resolved when the following issue is resolved: |
1111 | | // https://github.com/envoyproxy/envoy/issues/26653. |
1112 | | std::shared_ptr<Http::UpstreamFilterConfigProviderManager> http_filter_config_provider_manager_; |
1113 | | std::shared_ptr<UpstreamNetworkFilterConfigProviderManager> |
1114 | | network_filter_config_provider_manager_; |
1115 | | Filter::NetworkFilterFactoriesList filter_factories_; |
1116 | | Http::FilterChainUtility::FilterFactoriesList http_filter_factories_; |
1117 | | mutable Http::Http1::CodecStats::AtomicPtr http1_codec_stats_; |
1118 | | mutable Http::Http2::CodecStats::AtomicPtr http2_codec_stats_; |
1119 | | mutable Http::Http3::CodecStats::AtomicPtr http3_codec_stats_; |
1120 | | UpstreamFactoryContextImpl upstream_context_; |
1121 | | std::unique_ptr<envoy::config::cluster::v3::UpstreamConnectionOptions::HappyEyeballsConfig> |
1122 | | happy_eyeballs_config_; |
1123 | | const std::unique_ptr<Envoy::Orca::LrsReportMetricNames> lrs_report_metric_names_; |
1124 | | |
1125 | | // Keep small values like bools and enums at the end of the class to reduce |
1126 | | // overhead via alignment |
1127 | | const uint32_t per_connection_buffer_limit_bytes_; |
1128 | | const uint32_t max_response_headers_count_; |
1129 | | const envoy::config::cluster::v3::Cluster::DiscoveryType type_; |
1130 | | const bool drain_connections_on_host_removal_ : 1; |
1131 | | const bool connection_pool_per_downstream_connection_ : 1; |
1132 | | const bool warm_hosts_ : 1; |
1133 | | const bool set_local_interface_name_on_upstream_connections_ : 1; |
1134 | | const bool added_via_api_ : 1; |
1135 | | // true iff the cluster proto specified upstream http filters. |
1136 | | bool has_configured_http_filters_ : 1; |
1137 | | const bool per_endpoint_stats_ : 1; |
1138 | | }; |
1139 | | |
1140 | | /** |
1141 | | * Function that creates a Network::UpstreamTransportSocketFactoryPtr |
1142 | | * given a cluster configuration and transport socket factory |
1143 | | * context. |
1144 | | */ |
1145 | | absl::StatusOr<Network::UpstreamTransportSocketFactoryPtr> |
1146 | | createTransportSocketFactory(const envoy::config::cluster::v3::Cluster& config, |
1147 | | Server::Configuration::TransportSocketFactoryContext& factory_context); |
1148 | | |
1149 | | /** |
1150 | | * Base class all primary clusters. |
1151 | | */ |
1152 | | class ClusterImplBase : public Cluster, protected Logger::Loggable<Logger::Id::upstream> { |
1153 | | |
1154 | | public: |
1155 | | // Upstream::Cluster |
1156 | 19.2k | PrioritySet& prioritySet() override { return priority_set_; } |
1157 | 0 | const PrioritySet& prioritySet() const override { return priority_set_; } |
1158 | | |
1159 | | /** |
1160 | | * Optionally set the health checker for the primary cluster. This is done after cluster |
1161 | | * creation since the health checker assumes that the cluster has already been fully initialized |
1162 | | * so there is a cyclic dependency. However we want the cluster to own the health checker. |
1163 | | */ |
1164 | | void setHealthChecker(const HealthCheckerSharedPtr& health_checker); |
1165 | | |
1166 | | /** |
1167 | | * Optionally set the outlier detector for the primary cluster. Done for the same reason as |
1168 | | * documented in setHealthChecker(). |
1169 | | */ |
1170 | | void setOutlierDetector(const Outlier::DetectorSharedPtr& outlier_detector); |
1171 | | |
1172 | | /** |
1173 | | * Wrapper around Network::Address::resolveProtoAddress() that provides improved error message |
1174 | | * based on the cluster's type. |
1175 | | * @param address supplies the address proto to resolve. |
1176 | | * @return Network::Address::InstanceConstSharedPtr the resolved address. |
1177 | | */ |
1178 | | absl::StatusOr<const Network::Address::InstanceConstSharedPtr> |
1179 | | resolveProtoAddress(const envoy::config::core::v3::Address& address); |
1180 | | |
1181 | | // Partitions the provided list of hosts into three new lists containing the healthy, degraded |
1182 | | // and excluded hosts respectively. |
1183 | | static std::tuple<HealthyHostVectorConstSharedPtr, DegradedHostVectorConstSharedPtr, |
1184 | | ExcludedHostVectorConstSharedPtr> |
1185 | | partitionHostList(const HostVector& hosts); |
1186 | | |
1187 | | // Partitions the provided list of hosts per locality into three new lists containing the |
1188 | | // healthy, degraded and excluded hosts respectively. |
1189 | | static std::tuple<HostsPerLocalityConstSharedPtr, HostsPerLocalityConstSharedPtr, |
1190 | | HostsPerLocalityConstSharedPtr> |
1191 | | partitionHostsPerLocality(const HostsPerLocality& hosts); |
1192 | 0 | Config::ConstMetadataSharedPoolSharedPtr constMetadataSharedPool() { |
1193 | 0 | return const_metadata_shared_pool_; |
1194 | 0 | } |
1195 | | |
1196 | | // Upstream::Cluster |
1197 | 2.40k | HealthChecker* healthChecker() override { return health_checker_.get(); } |
1198 | 36.1k | ClusterInfoConstSharedPtr info() const override { return info_; } |
1199 | 2.40k | Outlier::Detector* outlierDetector() override { return outlier_detector_.get(); } |
1200 | 0 | const Outlier::Detector* outlierDetector() const override { return outlier_detector_.get(); } |
1201 | | void initialize(std::function<void()> callback) override; |
1202 | 2.40k | UnitFloat dropOverload() const override { return drop_overload_; } |
1203 | 2.40k | const std::string& dropCategory() const override { return drop_category_; } |
1204 | 0 | void setDropOverload(UnitFloat drop_overload) override { drop_overload_ = drop_overload; } |
1205 | 0 | void setDropCategory(absl::string_view drop_category) override { drop_category_ = drop_category; } |
1206 | | |
1207 | | protected: |
1208 | | ClusterImplBase(const envoy::config::cluster::v3::Cluster& cluster, |
1209 | | ClusterFactoryContext& cluster_context, absl::Status& creation_status); |
1210 | | |
1211 | | /** |
1212 | | * Overridden by every concrete cluster. The cluster should do whatever pre-init is needed. |
1213 | | * E.g., query DNS, contact EDS, etc. |
1214 | | */ |
1215 | | virtual void startPreInit() PURE; |
1216 | | |
1217 | | /** |
1218 | | * Called by every concrete cluster when pre-init is complete. At this point, |
1219 | | * shared init starts init_manager_ initialization and determines if there |
1220 | | * is an initial health check pass needed, etc. |
1221 | | */ |
1222 | | void onPreInitComplete(); |
1223 | | |
1224 | | /** |
1225 | | * Called by every concrete cluster after all targets registered at init manager are |
1226 | | * initialized. At this point, shared init takes over and determines if there is an initial |
1227 | | * health check pass needed, etc. |
1228 | | */ |
1229 | | void onInitDone(); |
1230 | | |
1231 | | virtual void reloadHealthyHostsHelper(const HostSharedPtr& host); |
1232 | | |
1233 | | absl::Status parseDropOverloadConfig( |
1234 | | const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment); |
1235 | | |
1236 | | // This init manager is shared via TransportSocketFactoryContext. The initialization targets |
1237 | | // that register with this init manager are expected to be for implementations of SdsApi (see |
1238 | | // SdsApi::init_target_). |
1239 | | Init::ManagerImpl init_manager_; |
1240 | | |
1241 | | // Once all targets are initialized (i.e. once all dynamic secrets are loaded), this watcher |
1242 | | // calls onInitDone() above. |
1243 | | Init::WatcherImpl init_watcher_; |
1244 | | |
1245 | | Runtime::Loader& runtime_; |
1246 | | ClusterInfoConstSharedPtr info_; // This cluster info stores the stats scope so it must be |
1247 | | // initialized first and destroyed last. |
1248 | | HealthCheckerSharedPtr health_checker_; |
1249 | | Outlier::DetectorSharedPtr outlier_detector_; |
1250 | | const bool wait_for_warm_on_init_; |
1251 | | |
1252 | | Server::Configuration::TransportSocketFactoryContextImplPtr transport_factory_context_{}; |
1253 | | |
1254 | | protected: |
1255 | | Random::RandomGenerator& random_; |
1256 | | TimeSource& time_source_; |
1257 | | MainPrioritySetImpl priority_set_; |
1258 | | |
1259 | | absl::Status validateEndpointsForZoneAwareRouting( |
1260 | | const envoy::config::endpoint::v3::LocalityLbEndpoints& endpoints) const; |
1261 | | |
1262 | | private: |
1263 | | static const absl::string_view DoNotValidateAlpnRuntimeKey; |
1264 | | static const absl::string_view DropOverloadRuntimeKey; |
1265 | | |
1266 | | void finishInitialization(); |
1267 | | void reloadHealthyHosts(const HostSharedPtr& host); |
1268 | | |
1269 | | bool initialization_started_{}; |
1270 | | std::function<void()> initialization_complete_callback_; |
1271 | | uint64_t pending_initialize_health_checks_{}; |
1272 | | const bool local_cluster_; |
1273 | | Config::ConstMetadataSharedPoolSharedPtr const_metadata_shared_pool_; |
1274 | | Common::CallbackHandlePtr priority_update_cb_; |
1275 | | UnitFloat drop_overload_{0}; |
1276 | | std::string drop_category_; |
1277 | | static constexpr int kDropOverloadSize = 1; |
1278 | | }; |
1279 | | |
1280 | | using ClusterImplBaseSharedPtr = std::shared_ptr<ClusterImplBase>; |
1281 | | |
1282 | | /** |
1283 | | * Manages PriorityState of a cluster. PriorityState is a per-priority binding of a set of hosts |
1284 | | * with its corresponding locality weight map. This is useful to store priorities/hosts/localities |
1285 | | * before updating the cluster priority set. |
1286 | | */ |
1287 | | class PriorityStateManager : protected Logger::Loggable<Logger::Id::upstream> { |
1288 | | public: |
1289 | | PriorityStateManager(ClusterImplBase& cluster, const LocalInfo::LocalInfo& local_info, |
1290 | | PrioritySet::HostUpdateCb* update_cb, Random::RandomGenerator& random); |
1291 | | |
1292 | | // Initializes the PriorityState vector based on the priority specified in locality_lb_endpoint. |
1293 | | void initializePriorityFor( |
1294 | | const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint); |
1295 | | |
1296 | | // Registers a host based on its address to the PriorityState based on the specified priority |
1297 | | // (the priority is specified by locality_lb_endpoint.priority()). |
1298 | | // |
1299 | | // The specified health_checker_flag is used to set the registered-host's health-flag when the |
1300 | | // lb_endpoint health status is unhealthy, draining or timeout. |
1301 | | void registerHostForPriority( |
1302 | | const std::string& hostname, Network::Address::InstanceConstSharedPtr address, |
1303 | | const HostDescription::AddressVector& address_list, |
1304 | | const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint, |
1305 | | const envoy::config::endpoint::v3::LbEndpoint& lb_endpoint, TimeSource& time_source); |
1306 | | |
1307 | | void registerHostForPriority( |
1308 | | const HostSharedPtr& host, |
1309 | | const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint); |
1310 | | |
1311 | | void |
1312 | | updateClusterPrioritySet(const uint32_t priority, HostVectorSharedPtr&& current_hosts, |
1313 | | const absl::optional<HostVector>& hosts_added, |
1314 | | const absl::optional<HostVector>& hosts_removed, |
1315 | | const absl::optional<Upstream::Host::HealthFlag> health_checker_flag, |
1316 | | absl::optional<bool> weighted_priority_health = absl::nullopt, |
1317 | | absl::optional<uint32_t> overprovisioning_factor = absl::nullopt); |
1318 | | |
1319 | | // Returns the saved priority state. |
1320 | 2.40k | PriorityState& priorityState() { return priority_state_; } |
1321 | | |
1322 | | private: |
1323 | | ClusterImplBase& parent_; |
1324 | | PriorityState priority_state_; |
1325 | | const envoy::config::core::v3::Node& local_info_node_; |
1326 | | PrioritySet::HostUpdateCb* update_cb_; |
1327 | | Random::RandomGenerator& random_; |
1328 | | }; |
1329 | | |
1330 | | using PriorityStateManagerPtr = std::unique_ptr<PriorityStateManager>; |
1331 | | |
1332 | | /** |
1333 | | * Base for all dynamic cluster types. |
1334 | | */ |
1335 | | class BaseDynamicClusterImpl : public ClusterImplBase { |
1336 | | protected: |
1337 | | using ClusterImplBase::ClusterImplBase; |
1338 | | |
1339 | | /** |
1340 | | * Updates the host list of a single priority by reconciling the list of new hosts |
1341 | | * with existing hosts. |
1342 | | * |
1343 | | * @param new_hosts the full lists of hosts in the new configuration. |
1344 | | * @param current_priority_hosts the full lists of hosts for the priority to be updated. The |
1345 | | * list will be modified to contain the updated list of hosts. |
1346 | | * @param hosts_added_to_current_priority will be populated with hosts added to the priority. |
1347 | | * @param hosts_removed_from_current_priority will be populated with hosts removed from the |
1348 | | * priority. |
1349 | | * @param all_hosts all known hosts prior to this host update across all priorities. |
1350 | | * @param all_new_hosts addresses of all hosts in the new configuration across all priorities. |
1351 | | * @return whether the hosts for the priority changed. |
1352 | | */ |
1353 | | bool updateDynamicHostList(const HostVector& new_hosts, HostVector& current_priority_hosts, |
1354 | | HostVector& hosts_added_to_current_priority, |
1355 | | HostVector& hosts_removed_from_current_priority, |
1356 | | const HostMap& all_hosts, |
1357 | | const absl::flat_hash_set<std::string>& all_new_hosts); |
1358 | | }; |
1359 | | |
1360 | | /** |
1361 | | * Utility function to get Dns from cluster/enum. |
1362 | | */ |
1363 | | Network::DnsLookupFamily |
1364 | | getDnsLookupFamilyFromCluster(const envoy::config::cluster::v3::Cluster& cluster); |
1365 | | |
1366 | | /** |
1367 | | * Utility function to report upstream cx destroy metrics |
1368 | | */ |
1369 | | void reportUpstreamCxDestroy(const Upstream::HostDescriptionConstSharedPtr& host, |
1370 | | Network::ConnectionEvent event); |
1371 | | |
1372 | | /** |
1373 | | * Utility function to report upstream cx destroy active request metrics |
1374 | | */ |
1375 | | void reportUpstreamCxDestroyActiveRequest(const Upstream::HostDescriptionConstSharedPtr& host, |
1376 | | Network::ConnectionEvent event); |
1377 | | |
1378 | | /** |
1379 | | * Utility function to resolve health check address. |
1380 | | */ |
1381 | | Network::Address::InstanceConstSharedPtr resolveHealthCheckAddress( |
1382 | | const envoy::config::endpoint::v3::Endpoint::HealthCheckConfig& health_check_config, |
1383 | | Network::Address::InstanceConstSharedPtr host_address); |
1384 | | |
1385 | | } // namespace Upstream |
1386 | | } // namespace Envoy |