Line data Source code
1 : #include "source/common/upstream/health_discovery_service.h"
2 :
3 : #include "envoy/config/cluster/v3/cluster.pb.h"
4 : #include "envoy/config/core/v3/address.pb.h"
5 : #include "envoy/config/core/v3/base.pb.h"
6 : #include "envoy/config/core/v3/health_check.pb.h"
7 : #include "envoy/config/endpoint/v3/endpoint_components.pb.h"
8 : #include "envoy/service/health/v3/hds.pb.h"
9 : #include "envoy/service/health/v3/hds.pb.validate.h"
10 : #include "envoy/stats/scope.h"
11 :
12 : #include "source/common/protobuf/message_validator_impl.h"
13 : #include "source/common/protobuf/protobuf.h"
14 : #include "source/common/protobuf/utility.h"
15 : #include "source/common/upstream/upstream_impl.h"
16 :
17 : namespace Envoy {
18 : namespace Upstream {
19 :
20 : /**
21 : * TODO(lilika): Add API knob for RetryInitialDelayMilliseconds
22 : * and RetryMaxDelayMilliseconds, instead of hardcoding them.
23 : *
24 : * Parameters of the jittered backoff strategy that defines how often
25 : * we retry to establish a stream to the management server
26 : */
27 : static constexpr uint32_t RetryInitialDelayMilliseconds = 1000;
28 : static constexpr uint32_t RetryMaxDelayMilliseconds = 30000;
29 :
30 : HdsDelegate::HdsDelegate(Server::Configuration::ServerFactoryContext& server_context,
31 : Stats::Scope& scope, Grpc::RawAsyncClientPtr async_client,
32 : Envoy::Stats::Store& stats, Ssl::ContextManager& ssl_context_manager,
33 : ClusterInfoFactory& info_factory)
34 : : stats_{ALL_HDS_STATS(POOL_COUNTER_PREFIX(scope, "hds_delegate."))},
35 : service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
36 : "envoy.service.health.v3.HealthDiscoveryService.StreamHealthCheck")),
37 : async_client_(std::move(async_client)), dispatcher_(server_context.mainThreadDispatcher()),
38 : server_context_(server_context), store_stats_(stats),
39 : ssl_context_manager_(ssl_context_manager), info_factory_(info_factory),
40 0 : tls_(server_context_.threadLocal()) {
41 0 : health_check_request_.mutable_health_check_request()->mutable_node()->MergeFrom(
42 0 : server_context.localInfo().node());
43 0 : backoff_strategy_ = std::make_unique<JitteredExponentialBackOffStrategy>(
44 0 : RetryInitialDelayMilliseconds, RetryMaxDelayMilliseconds,
45 0 : server_context_.api().randomGenerator());
46 0 : hds_retry_timer_ = dispatcher_.createTimer([this]() -> void { establishNewStream(); });
47 0 : hds_stream_response_timer_ = dispatcher_.createTimer([this]() -> void { sendResponse(); });
48 :
49 : // TODO(lilika): Add support for other types of healthchecks
50 0 : health_check_request_.mutable_health_check_request()
51 0 : ->mutable_capability()
52 0 : ->add_health_check_protocols(envoy::service::health::v3::Capability::HTTP);
53 0 : health_check_request_.mutable_health_check_request()
54 0 : ->mutable_capability()
55 0 : ->add_health_check_protocols(envoy::service::health::v3::Capability::TCP);
56 :
57 0 : establishNewStream();
58 0 : }
59 :
60 0 : void HdsDelegate::setHdsRetryTimer() {
61 0 : const auto retry_ms = std::chrono::milliseconds(backoff_strategy_->nextBackOffMs());
62 0 : ENVOY_LOG(warn, "HdsDelegate stream/connection failure, will retry in {} ms.", retry_ms.count());
63 :
64 0 : hds_retry_timer_->enableTimer(retry_ms);
65 0 : }
66 :
67 0 : void HdsDelegate::setHdsStreamResponseTimer() {
68 0 : hds_stream_response_timer_->enableTimer(std::chrono::milliseconds(server_response_ms_));
69 0 : }
70 :
71 0 : void HdsDelegate::establishNewStream() {
72 0 : ENVOY_LOG(debug, "Establishing new gRPC bidi stream for {}", service_method_.DebugString());
73 0 : stream_ = async_client_->start(service_method_, *this, Http::AsyncClient::StreamOptions());
74 0 : if (stream_ == nullptr) {
75 0 : ENVOY_LOG(warn, "Unable to establish new stream");
76 0 : handleFailure();
77 0 : return;
78 0 : }
79 :
80 0 : ENVOY_LOG(debug, "Sending HealthCheckRequest {} ", health_check_request_.DebugString());
81 0 : stream_->sendMessage(health_check_request_, false);
82 0 : stats_.responses_.inc();
83 0 : backoff_strategy_->reset();
84 0 : }
85 :
86 0 : void HdsDelegate::handleFailure() {
87 0 : stats_.errors_.inc();
88 0 : setHdsRetryTimer();
89 0 : }
90 :
91 0 : envoy::service::health::v3::HealthCheckRequestOrEndpointHealthResponse HdsDelegate::sendResponse() {
92 0 : envoy::service::health::v3::HealthCheckRequestOrEndpointHealthResponse response;
93 :
94 0 : for (const auto& cluster : hds_clusters_) {
95 : // Add cluster health response and set name.
96 0 : auto* cluster_health =
97 0 : response.mutable_endpoint_health_response()->add_cluster_endpoints_health();
98 0 : cluster_health->set_cluster_name(cluster->info()->name());
99 :
100 : // Iterate through all hosts in our priority set.
101 0 : for (const auto& hosts : cluster->prioritySet().hostSetsPerPriority()) {
102 : // Get a grouping of hosts by locality.
103 0 : for (const auto& locality_hosts : hosts->hostsPerLocality().get()) {
104 : // For this locality, add the response grouping.
105 0 : envoy::service::health::v3::LocalityEndpointsHealth* locality_health =
106 0 : cluster_health->add_locality_endpoints_health();
107 0 : locality_health->mutable_locality()->MergeFrom(locality_hosts[0]->locality());
108 :
109 : // Add all hosts to this locality.
110 0 : for (const auto& host : locality_hosts) {
111 : // Add this endpoint's health status to this locality grouping.
112 0 : auto* endpoint = locality_health->add_endpoints_health();
113 0 : Network::Utility::addressToProtobufAddress(
114 0 : *host->address(), *endpoint->mutable_endpoint()->mutable_address());
115 : // TODO(lilika): Add support for more granular options of
116 : // envoy::config::core::v3::HealthStatus
117 0 : if (host->coarseHealth() == Host::Health::Healthy) {
118 0 : endpoint->set_health_status(envoy::config::core::v3::HEALTHY);
119 0 : } else {
120 0 : if (host->healthFlagGet(Host::HealthFlag::ACTIVE_HC_TIMEOUT)) {
121 0 : endpoint->set_health_status(envoy::config::core::v3::TIMEOUT);
122 0 : } else {
123 0 : endpoint->set_health_status(envoy::config::core::v3::UNHEALTHY);
124 0 : }
125 0 : }
126 :
127 : // TODO(drewsortega): remove this once we are on v4 and endpoint_health_response is
128 : // removed. Copy this endpoint's health info to the legacy flat-list.
129 0 : response.mutable_endpoint_health_response()->add_endpoints_health()->MergeFrom(*endpoint);
130 0 : }
131 0 : }
132 0 : }
133 0 : }
134 0 : ENVOY_LOG(debug, "Sending EndpointHealthResponse to server {}", response.DebugString());
135 0 : stream_->sendMessage(response, false);
136 0 : stats_.responses_.inc();
137 0 : setHdsStreamResponseTimer();
138 0 : return response;
139 0 : }
140 :
141 0 : void HdsDelegate::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
142 0 : UNREFERENCED_PARAMETER(metadata);
143 0 : }
144 :
145 0 : void HdsDelegate::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) {
146 0 : UNREFERENCED_PARAMETER(metadata);
147 0 : }
148 :
149 : envoy::config::cluster::v3::Cluster HdsDelegate::createClusterConfig(
150 0 : const envoy::service::health::v3::ClusterHealthCheck& cluster_health_check) {
151 : // Create HdsCluster config
152 0 : envoy::config::cluster::v3::Cluster cluster_config;
153 :
154 0 : cluster_config.set_name(cluster_health_check.cluster_name());
155 0 : cluster_config.mutable_connect_timeout()->set_seconds(ClusterTimeoutSeconds);
156 0 : cluster_config.mutable_per_connection_buffer_limit_bytes()->set_value(
157 0 : ClusterConnectionBufferLimitBytes);
158 :
159 : // Add endpoints to cluster
160 0 : for (const auto& locality_endpoints : cluster_health_check.locality_endpoints()) {
161 : // add endpoint group by locality to config
162 0 : auto* endpoints = cluster_config.mutable_load_assignment()->add_endpoints();
163 : // if this group contains locality information, save it.
164 0 : if (locality_endpoints.has_locality()) {
165 0 : endpoints->mutable_locality()->MergeFrom(locality_endpoints.locality());
166 0 : }
167 :
168 : // add all endpoints for this locality group to the config
169 0 : for (const auto& endpoint : locality_endpoints.endpoints()) {
170 0 : if (endpoint.has_health_check_config() &&
171 0 : endpoint.health_check_config().disable_active_health_check()) {
172 0 : ENVOY_LOG(debug, "Skip adding the endpoint {} with optional disabled health check for HDS.",
173 0 : endpoint.DebugString());
174 0 : continue;
175 0 : }
176 0 : auto* new_endpoint = endpoints->add_lb_endpoints()->mutable_endpoint();
177 0 : new_endpoint->mutable_address()->MergeFrom(endpoint.address());
178 0 : new_endpoint->mutable_health_check_config()->MergeFrom(endpoint.health_check_config());
179 0 : }
180 0 : }
181 :
182 : // TODO(lilika): Add support for optional per-endpoint health checks
183 :
184 : // Add healthchecks to cluster
185 0 : for (auto& health_check : cluster_health_check.health_checks()) {
186 0 : cluster_config.add_health_checks()->MergeFrom(health_check);
187 0 : }
188 :
189 : // Add transport_socket_match to cluster for use in host connections.
190 0 : cluster_config.mutable_transport_socket_matches()->MergeFrom(
191 0 : cluster_health_check.transport_socket_matches());
192 :
193 0 : ENVOY_LOG(debug, "New HdsCluster config {} ", cluster_config.DebugString());
194 :
195 0 : return cluster_config;
196 0 : }
197 :
198 : absl::Status
199 : HdsDelegate::updateHdsCluster(HdsClusterPtr cluster,
200 : const envoy::config::cluster::v3::Cluster& cluster_config,
201 0 : const envoy::config::core::v3::BindConfig& bind_config) {
202 0 : return cluster->update(cluster_config, bind_config, info_factory_, tls_);
203 0 : }
204 :
205 : HdsClusterPtr
206 : HdsDelegate::createHdsCluster(const envoy::config::cluster::v3::Cluster& cluster_config,
207 0 : const envoy::config::core::v3::BindConfig& bind_config) {
208 : // Create HdsCluster.
209 0 : auto new_cluster =
210 0 : std::make_shared<HdsCluster>(server_context_, std::move(cluster_config), bind_config,
211 0 : store_stats_, ssl_context_manager_, false, info_factory_, tls_);
212 :
213 : // Begin HCs in the background.
214 0 : new_cluster->initialize([] {});
215 0 : new_cluster->initHealthchecks();
216 :
217 0 : return new_cluster;
218 0 : }
219 :
220 : absl::Status HdsDelegate::processMessage(
221 0 : std::unique_ptr<envoy::service::health::v3::HealthCheckSpecifier>&& message) {
222 0 : ENVOY_LOG(debug, "New health check response message {} ", message->DebugString());
223 0 : ASSERT(message);
224 0 : std::vector<HdsClusterPtr> hds_clusters;
225 : // Maps to replace the current member variable versions.
226 0 : absl::flat_hash_map<std::string, HdsClusterPtr> new_hds_clusters_name_map;
227 :
228 0 : for (const auto& cluster_health_check : message->cluster_health_checks()) {
229 0 : if (!new_hds_clusters_name_map.contains(cluster_health_check.cluster_name())) {
230 0 : HdsClusterPtr cluster_ptr;
231 :
232 : // Create a new configuration for a cluster based on our different or new config.
233 0 : auto cluster_config = createClusterConfig(cluster_health_check);
234 :
235 : // If this particular cluster configuration happens to have a name, then it is possible
236 : // this particular cluster exists in the name map. We check and if we found a match,
237 : // attempt to update this cluster. If no match was found, either the cluster name is empty
238 : // or we have not seen a cluster by this name before. In either case, create a new cluster.
239 0 : auto cluster_map_pair = hds_clusters_name_map_.find(cluster_health_check.cluster_name());
240 0 : if (cluster_map_pair != hds_clusters_name_map_.end()) {
241 : // We have a previous cluster with this name, update.
242 0 : cluster_ptr = cluster_map_pair->second;
243 0 : absl::Status status = updateHdsCluster(cluster_ptr, cluster_config,
244 0 : cluster_health_check.upstream_bind_config());
245 0 : if (!status.ok()) {
246 0 : return status;
247 0 : }
248 0 : } else {
249 : // There is no cluster with this name previously or its an empty string, so just create a
250 : // new cluster.
251 0 : cluster_ptr = createHdsCluster(cluster_config, cluster_health_check.upstream_bind_config());
252 0 : }
253 :
254 : // If this cluster does not have a name, do not add it to the name map since cluster_name is
255 : // an optional field, and reconstruct these clusters on every update.
256 0 : if (!cluster_health_check.cluster_name().empty()) {
257 : // Since this cluster has a name, add it to our by-name map so we can update it later.
258 0 : new_hds_clusters_name_map.insert({cluster_health_check.cluster_name(), cluster_ptr});
259 0 : } else {
260 0 : ENVOY_LOG(warn,
261 0 : "HDS Cluster has no cluster_name, it will be recreated instead of updated on "
262 0 : "every reconfiguration.");
263 0 : }
264 :
265 : // Add this cluster to the flat list for health checking.
266 0 : hds_clusters.push_back(cluster_ptr);
267 0 : } else {
268 0 : ENVOY_LOG(warn, "An HDS Cluster with this cluster_name has already been added, not using.");
269 0 : }
270 0 : }
271 :
272 : // Overwrite our map data structures.
273 0 : hds_clusters_name_map_ = std::move(new_hds_clusters_name_map);
274 0 : hds_clusters_ = std::move(hds_clusters);
275 :
276 : // TODO: add stats reporting for number of clusters added, removed, and reused.
277 0 : return absl::OkStatus();
278 0 : }
279 :
280 : void HdsDelegate::onReceiveMessage(
281 0 : std::unique_ptr<envoy::service::health::v3::HealthCheckSpecifier>&& message) {
282 0 : stats_.requests_.inc();
283 0 : ENVOY_LOG(debug, "New health check response message {} ", message->DebugString());
284 :
285 0 : const uint64_t hash = MessageUtil::hash(*message);
286 :
287 0 : if (hash == specifier_hash_) {
288 0 : ENVOY_LOG(debug, "New health check specifier is unchanged, no action taken.");
289 0 : return;
290 0 : }
291 :
292 : // Validate message fields
293 0 : TRY_ASSERT_MAIN_THREAD {
294 0 : MessageUtil::validate(*message,
295 0 : server_context_.messageValidationContext().dynamicValidationVisitor());
296 0 : }
297 0 : END_TRY
298 0 : CATCH(const ProtoValidationException& ex, {
299 : // Increment error count
300 0 : stats_.errors_.inc();
301 0 : ENVOY_LOG(warn, "Unable to validate health check specifier: {}", ex.what());
302 :
303 : // Do not continue processing message
304 0 : return;
305 0 : });
306 :
307 : // Set response
308 0 : auto server_response_ms = PROTOBUF_GET_MS_OR_DEFAULT(*message, interval, 1000);
309 :
310 : /// Process the HealthCheckSpecifier message.
311 0 : absl::Status status = processMessage(std::move(message));
312 0 : if (!status.ok()) {
313 0 : stats_.errors_.inc();
314 0 : ENVOY_LOG(warn, "Unable to validate health check specifier: {}", status.message());
315 : // Do not continue processing message
316 0 : return;
317 0 : }
318 :
319 0 : stats_.updates_.inc();
320 :
321 : // Update the stored hash.
322 0 : specifier_hash_ = hash;
323 :
324 0 : if (server_response_ms_ != server_response_ms) {
325 0 : server_response_ms_ = server_response_ms;
326 0 : setHdsStreamResponseTimer();
327 0 : }
328 0 : }
329 :
330 0 : void HdsDelegate::onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) {
331 0 : UNREFERENCED_PARAMETER(metadata);
332 0 : }
333 :
334 0 : void HdsDelegate::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) {
335 0 : ENVOY_LOG(warn, "{} gRPC config stream closed: {}, {}", service_method_.name(), status, message);
336 0 : hds_stream_response_timer_->disableTimer();
337 0 : stream_ = nullptr;
338 0 : server_response_ms_ = 0;
339 0 : specifier_hash_ = 0;
340 0 : handleFailure();
341 0 : }
342 :
343 : HdsCluster::HdsCluster(Server::Configuration::ServerFactoryContext& server_context,
344 : envoy::config::cluster::v3::Cluster cluster,
345 : const envoy::config::core::v3::BindConfig& bind_config, Stats::Store& stats,
346 : Ssl::ContextManager& ssl_context_manager, bool added_via_api,
347 : ClusterInfoFactory& info_factory, ThreadLocal::SlotAllocator& tls)
348 : : server_context_(server_context), cluster_(std::move(cluster)), stats_(stats),
349 : ssl_context_manager_(ssl_context_manager), added_via_api_(added_via_api),
350 0 : hosts_(new HostVector()), time_source_(server_context_.mainThreadDispatcher().timeSource()) {
351 0 : ENVOY_LOG(debug, "Creating an HdsCluster");
352 0 : priority_set_.getOrCreateHostSet(0);
353 : // Set initial hashes for possible delta updates.
354 0 : config_hash_ = MessageUtil::hash(cluster_);
355 0 : socket_match_hash_ = RepeatedPtrUtil::hash(cluster_.transport_socket_matches());
356 :
357 0 : info_ = info_factory.createClusterInfo(
358 0 : {server_context, cluster_, bind_config, stats_, ssl_context_manager_, added_via_api_, tls});
359 :
360 : // Temporary structure to hold Host pointers grouped by locality, to build
361 : // initial_hosts_per_locality_.
362 0 : std::vector<HostVector> hosts_by_locality;
363 0 : hosts_by_locality.reserve(cluster_.load_assignment().endpoints_size());
364 :
365 : // Iterate over every endpoint in every cluster.
366 0 : for (const auto& locality_endpoints : cluster_.load_assignment().endpoints()) {
367 : // Add a locality grouping to the hosts sorted by locality.
368 0 : hosts_by_locality.emplace_back();
369 0 : hosts_by_locality.back().reserve(locality_endpoints.lb_endpoints_size());
370 :
371 0 : for (const auto& host : locality_endpoints.lb_endpoints()) {
372 0 : const LocalityEndpointTuple endpoint_key = {locality_endpoints.locality(), host};
373 : // Initialize an endpoint host object.
374 0 : HostSharedPtr endpoint = std::make_shared<HostImpl>(
375 0 : info_, "", Network::Address::resolveProtoAddress(host.endpoint().address()), nullptr, 1,
376 0 : locality_endpoints.locality(), host.endpoint().health_check_config(), 0,
377 0 : envoy::config::core::v3::UNKNOWN, time_source_);
378 : // Add this host/endpoint pointer to our flat list of endpoints for health checking.
379 0 : hosts_->push_back(endpoint);
380 : // Add this host/endpoint pointer to our structured list by locality so results can be
381 : // requested by locality.
382 0 : hosts_by_locality.back().push_back(endpoint);
383 : // Add this host/endpoint pointer to our map so we can rebuild this later.
384 0 : hosts_map_.insert({endpoint_key, endpoint});
385 0 : }
386 0 : }
387 : // Create the HostsPerLocality.
388 0 : hosts_per_locality_ =
389 0 : std::make_shared<Envoy::Upstream::HostsPerLocalityImpl>(std::move(hosts_by_locality), false);
390 0 : }
391 :
392 : absl::Status HdsCluster::update(envoy::config::cluster::v3::Cluster cluster,
393 : const envoy::config::core::v3::BindConfig& bind_config,
394 0 : ClusterInfoFactory& info_factory, ThreadLocal::SlotAllocator& tls) {
395 :
396 : // check to see if the config changed. If it did, update.
397 0 : const uint64_t config_hash = MessageUtil::hash(cluster);
398 0 : if (config_hash_ != config_hash) {
399 0 : config_hash_ = config_hash;
400 0 : cluster_ = std::move(cluster);
401 :
402 : // Check to see if our list of socket matches have changed. If they have, create a new matcher
403 : // in info_.
404 0 : bool update_cluster_info = false;
405 0 : const uint64_t socket_match_hash = RepeatedPtrUtil::hash(cluster_.transport_socket_matches());
406 0 : if (socket_match_hash_ != socket_match_hash) {
407 0 : socket_match_hash_ = socket_match_hash;
408 0 : update_cluster_info = true;
409 0 : info_ = info_factory.createClusterInfo({server_context_, cluster_, bind_config, stats_,
410 0 : ssl_context_manager_, added_via_api_, tls});
411 0 : }
412 :
413 : // Check to see if anything in the endpoints list has changed.
414 0 : updateHosts(cluster_.load_assignment().endpoints(), update_cluster_info);
415 :
416 : // Check to see if any of the health checkers have changed.
417 0 : absl::Status status = updateHealthchecks(cluster_.health_checks());
418 0 : if (!status.ok()) {
419 0 : return status;
420 0 : }
421 0 : }
422 0 : return absl::OkStatus();
423 0 : }
424 :
425 : absl::Status HdsCluster::updateHealthchecks(
426 0 : const Protobuf::RepeatedPtrField<envoy::config::core::v3::HealthCheck>& health_checks) {
427 0 : std::vector<Upstream::HealthCheckerSharedPtr> health_checkers;
428 0 : HealthCheckerMap health_checkers_map;
429 :
430 0 : for (const auto& health_check : health_checks) {
431 : // Check to see if this exact same health_check config already has a health checker.
432 0 : auto health_checker = health_checkers_map_.find(health_check);
433 0 : if (health_checker != health_checkers_map_.end()) {
434 : // If it does, use it.
435 0 : health_checkers_map.insert({health_check, health_checker->second});
436 0 : health_checkers.push_back(health_checker->second);
437 0 : } else {
438 : // If it does not, create a new one.
439 0 : auto checker_or_error =
440 0 : Upstream::HealthCheckerFactory::create(health_check, *this, server_context_);
441 0 : RETURN_IF_STATUS_NOT_OK(checker_or_error);
442 0 : auto new_health_checker = checker_or_error.value();
443 0 : health_checkers_map.insert({health_check, new_health_checker});
444 0 : health_checkers.push_back(new_health_checker);
445 :
446 : // Start these health checks now because upstream assumes they already have been started.
447 0 : new_health_checker->start();
448 0 : }
449 0 : }
450 :
451 : // replace our member data structures with our newly created ones.
452 0 : health_checkers_ = std::move(health_checkers);
453 0 : health_checkers_map_ = std::move(health_checkers_map);
454 :
455 : // TODO: add stats reporting for number of health checkers added, removed, and reused.
456 0 : return absl::OkStatus();
457 0 : }
458 :
459 : void HdsCluster::updateHosts(
460 : const Protobuf::RepeatedPtrField<envoy::config::endpoint::v3::LocalityLbEndpoints>&
461 : locality_endpoints,
462 0 : bool update_cluster_info) {
463 : // Create the data structures needed for PrioritySet::update.
464 0 : HostVectorSharedPtr hosts = std::make_shared<std::vector<HostSharedPtr>>();
465 0 : std::vector<HostSharedPtr> hosts_added;
466 0 : std::vector<HostSharedPtr> hosts_removed;
467 0 : std::vector<HostVector> hosts_by_locality;
468 :
469 : // Use for delta update comparison.
470 0 : HostsMap hosts_map;
471 :
472 0 : for (auto& endpoints : locality_endpoints) {
473 0 : hosts_by_locality.emplace_back();
474 0 : for (auto& endpoint : endpoints.lb_endpoints()) {
475 0 : LocalityEndpointTuple endpoint_key = {endpoints.locality(), endpoint};
476 :
477 : // Check to see if this exact Locality+Endpoint has been seen before.
478 : // Also, if we made changes to our info, re-create all endpoints.
479 0 : auto host_pair = hosts_map_.find(endpoint_key);
480 0 : HostSharedPtr host;
481 0 : if (!update_cluster_info && host_pair != hosts_map_.end()) {
482 : // If we have this exact pair, save the shared pointer.
483 0 : host = host_pair->second;
484 0 : } else {
485 : // We do not have this endpoint saved, so create a new one.
486 0 : host = std::make_shared<HostImpl>(
487 0 : info_, "", Network::Address::resolveProtoAddress(endpoint.endpoint().address()),
488 0 : nullptr, 1, endpoints.locality(), endpoint.endpoint().health_check_config(), 0,
489 0 : envoy::config::core::v3::UNKNOWN, time_source_);
490 :
491 : // Set the initial health status as in HdsCluster::initialize.
492 0 : host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC);
493 :
494 : // Add to our hosts added list and save the shared pointer.
495 0 : hosts_added.push_back(host);
496 0 : }
497 :
498 : // No matter if it is reused or new, always add to these data structures.
499 0 : hosts_by_locality.back().push_back(host);
500 0 : hosts->push_back(host);
501 0 : hosts_map.insert({endpoint_key, host});
502 0 : }
503 0 : }
504 :
505 : // Compare the old map to the new to find out which endpoints are going to be removed.
506 0 : for (auto& host_pair : hosts_map_) {
507 0 : if (!hosts_map.contains(host_pair.first)) {
508 0 : hosts_removed.push_back(host_pair.second);
509 0 : }
510 0 : }
511 :
512 : // Update the member data structures.
513 0 : hosts_ = std::move(hosts);
514 0 : hosts_map_ = std::move(hosts_map);
515 :
516 : // TODO: add stats reporting for number of endpoints added, removed, and reused.
517 0 : ENVOY_LOG(debug, "Hosts Added: {}, Removed: {}, Reused: {}", hosts_added.size(),
518 0 : hosts_removed.size(), hosts_->size() - hosts_added.size());
519 :
520 : // Update the priority set.
521 0 : hosts_per_locality_ =
522 0 : std::make_shared<Envoy::Upstream::HostsPerLocalityImpl>(std::move(hosts_by_locality), false);
523 0 : priority_set_.updateHosts(0, HostSetImpl::partitionHosts(hosts_, hosts_per_locality_), {},
524 0 : hosts_added, hosts_removed, absl::nullopt, absl::nullopt);
525 0 : }
526 :
527 0 : ClusterSharedPtr HdsCluster::create() { return nullptr; }
528 :
529 : ClusterInfoConstSharedPtr
530 0 : ProdClusterInfoFactory::createClusterInfo(const CreateClusterInfoParams& params) {
531 0 : Envoy::Stats::ScopeSharedPtr scope =
532 0 : params.stats_.createScope(fmt::format("cluster.{}.", params.cluster_.name()));
533 :
534 0 : Envoy::Server::Configuration::TransportSocketFactoryContextImpl factory_context(
535 0 : params.server_context_, params.ssl_context_manager_, *scope,
536 0 : params.server_context_.clusterManager(), params.server_context_.messageValidationVisitor());
537 :
538 : // TODO(JimmyCYJ): Support SDS for HDS cluster.
539 0 : Network::UpstreamTransportSocketFactoryPtr socket_factory =
540 0 : Upstream::createTransportSocketFactory(params.cluster_, factory_context);
541 0 : auto socket_matcher = std::make_unique<TransportSocketMatcherImpl>(
542 0 : params.cluster_.transport_socket_matches(), factory_context, socket_factory, *scope);
543 :
544 0 : return std::make_unique<ClusterInfoImpl>(
545 0 : params.server_context_.initManager(), params.server_context_, params.cluster_,
546 0 : params.bind_config_, params.server_context_.runtime(), std::move(socket_matcher),
547 0 : std::move(scope), params.added_via_api_, factory_context);
548 0 : }
549 :
550 0 : void HdsCluster::initHealthchecks() {
551 0 : for (auto& health_check : cluster_.health_checks()) {
552 0 : auto health_checker_or_error =
553 0 : Upstream::HealthCheckerFactory::create(health_check, *this, server_context_);
554 0 : THROW_IF_STATUS_NOT_OK(health_checker_or_error, throw);
555 :
556 0 : auto health_checker = health_checker_or_error.value();
557 0 : health_checkers_.push_back(health_checker);
558 0 : health_checkers_map_.insert({health_check, health_checker});
559 0 : health_checker->start();
560 0 : }
561 0 : }
562 :
563 0 : void HdsCluster::initialize(std::function<void()> callback) {
564 0 : initialization_complete_callback_ = callback;
565 :
566 : // If this function gets called again we do not want to touch the priority set again with the
567 : // initial hosts, because the hosts may have changed.
568 0 : if (!initialized_) {
569 0 : for (const auto& host : *hosts_) {
570 0 : host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC);
571 0 : }
572 : // Use the ungrouped and grouped hosts lists to retain locality structure in the priority set.
573 0 : priority_set_.updateHosts(0, HostSetImpl::partitionHosts(hosts_, hosts_per_locality_), {},
574 0 : *hosts_, {}, absl::nullopt, absl::nullopt);
575 :
576 0 : initialized_ = true;
577 0 : }
578 0 : }
579 :
580 0 : void HdsCluster::setOutlierDetector(const Outlier::DetectorSharedPtr&) {}
581 :
582 : } // namespace Upstream
583 : } // namespace Envoy
|