LCOV - code coverage report
Current view: top level - source/common/upstream - health_discovery_service.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 352 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 27 0.0 %

          Line data    Source code
       1             : #include "source/common/upstream/health_discovery_service.h"
       2             : 
       3             : #include "envoy/config/cluster/v3/cluster.pb.h"
       4             : #include "envoy/config/core/v3/address.pb.h"
       5             : #include "envoy/config/core/v3/base.pb.h"
       6             : #include "envoy/config/core/v3/health_check.pb.h"
       7             : #include "envoy/config/endpoint/v3/endpoint_components.pb.h"
       8             : #include "envoy/service/health/v3/hds.pb.h"
       9             : #include "envoy/service/health/v3/hds.pb.validate.h"
      10             : #include "envoy/stats/scope.h"
      11             : 
      12             : #include "source/common/protobuf/message_validator_impl.h"
      13             : #include "source/common/protobuf/protobuf.h"
      14             : #include "source/common/protobuf/utility.h"
      15             : #include "source/common/upstream/upstream_impl.h"
      16             : 
      17             : namespace Envoy {
      18             : namespace Upstream {
      19             : 
      20             : /**
      21             :  * TODO(lilika): Add API knob for RetryInitialDelayMilliseconds
      22             :  * and RetryMaxDelayMilliseconds, instead of hardcoding them.
      23             :  *
      24             :  * Parameters of the jittered backoff strategy that defines how often
      25             :  * we retry to establish a stream to the management server
      26             :  */
      27             : static constexpr uint32_t RetryInitialDelayMilliseconds = 1000;
      28             : static constexpr uint32_t RetryMaxDelayMilliseconds = 30000;
      29             : 
      30             : HdsDelegate::HdsDelegate(Server::Configuration::ServerFactoryContext& server_context,
      31             :                          Stats::Scope& scope, Grpc::RawAsyncClientPtr async_client,
      32             :                          Envoy::Stats::Store& stats, Ssl::ContextManager& ssl_context_manager,
      33             :                          ClusterInfoFactory& info_factory)
      34             :     : stats_{ALL_HDS_STATS(POOL_COUNTER_PREFIX(scope, "hds_delegate."))},
      35             :       service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
      36             :           "envoy.service.health.v3.HealthDiscoveryService.StreamHealthCheck")),
      37             :       async_client_(std::move(async_client)), dispatcher_(server_context.mainThreadDispatcher()),
      38             :       server_context_(server_context), store_stats_(stats),
      39             :       ssl_context_manager_(ssl_context_manager), info_factory_(info_factory),
      40           0 :       tls_(server_context_.threadLocal()) {
      41           0 :   health_check_request_.mutable_health_check_request()->mutable_node()->MergeFrom(
      42           0 :       server_context.localInfo().node());
      43           0 :   backoff_strategy_ = std::make_unique<JitteredExponentialBackOffStrategy>(
      44           0 :       RetryInitialDelayMilliseconds, RetryMaxDelayMilliseconds,
      45           0 :       server_context_.api().randomGenerator());
      46           0 :   hds_retry_timer_ = dispatcher_.createTimer([this]() -> void { establishNewStream(); });
      47           0 :   hds_stream_response_timer_ = dispatcher_.createTimer([this]() -> void { sendResponse(); });
      48             : 
      49             :   // TODO(lilika): Add support for other types of healthchecks
      50           0 :   health_check_request_.mutable_health_check_request()
      51           0 :       ->mutable_capability()
      52           0 :       ->add_health_check_protocols(envoy::service::health::v3::Capability::HTTP);
      53           0 :   health_check_request_.mutable_health_check_request()
      54           0 :       ->mutable_capability()
      55           0 :       ->add_health_check_protocols(envoy::service::health::v3::Capability::TCP);
      56             : 
      57           0 :   establishNewStream();
      58           0 : }
      59             : 
      60           0 : void HdsDelegate::setHdsRetryTimer() {
      61           0 :   const auto retry_ms = std::chrono::milliseconds(backoff_strategy_->nextBackOffMs());
      62           0 :   ENVOY_LOG(warn, "HdsDelegate stream/connection failure, will retry in {} ms.", retry_ms.count());
      63             : 
      64           0 :   hds_retry_timer_->enableTimer(retry_ms);
      65           0 : }
      66             : 
      67           0 : void HdsDelegate::setHdsStreamResponseTimer() {
      68           0 :   hds_stream_response_timer_->enableTimer(std::chrono::milliseconds(server_response_ms_));
      69           0 : }
      70             : 
      71           0 : void HdsDelegate::establishNewStream() {
      72           0 :   ENVOY_LOG(debug, "Establishing new gRPC bidi stream for {}", service_method_.DebugString());
      73           0 :   stream_ = async_client_->start(service_method_, *this, Http::AsyncClient::StreamOptions());
      74           0 :   if (stream_ == nullptr) {
      75           0 :     ENVOY_LOG(warn, "Unable to establish new stream");
      76           0 :     handleFailure();
      77           0 :     return;
      78           0 :   }
      79             : 
      80           0 :   ENVOY_LOG(debug, "Sending HealthCheckRequest {} ", health_check_request_.DebugString());
      81           0 :   stream_->sendMessage(health_check_request_, false);
      82           0 :   stats_.responses_.inc();
      83           0 :   backoff_strategy_->reset();
      84           0 : }
      85             : 
      86           0 : void HdsDelegate::handleFailure() {
      87           0 :   stats_.errors_.inc();
      88           0 :   setHdsRetryTimer();
      89           0 : }
      90             : 
      91           0 : envoy::service::health::v3::HealthCheckRequestOrEndpointHealthResponse HdsDelegate::sendResponse() {
      92           0 :   envoy::service::health::v3::HealthCheckRequestOrEndpointHealthResponse response;
      93             : 
      94           0 :   for (const auto& cluster : hds_clusters_) {
      95             :     // Add cluster health response and set name.
      96           0 :     auto* cluster_health =
      97           0 :         response.mutable_endpoint_health_response()->add_cluster_endpoints_health();
      98           0 :     cluster_health->set_cluster_name(cluster->info()->name());
      99             : 
     100             :     // Iterate through all hosts in our priority set.
     101           0 :     for (const auto& hosts : cluster->prioritySet().hostSetsPerPriority()) {
     102             :       // Get a grouping of hosts by locality.
     103           0 :       for (const auto& locality_hosts : hosts->hostsPerLocality().get()) {
     104             :         // For this locality, add the response grouping.
     105           0 :         envoy::service::health::v3::LocalityEndpointsHealth* locality_health =
     106           0 :             cluster_health->add_locality_endpoints_health();
     107           0 :         locality_health->mutable_locality()->MergeFrom(locality_hosts[0]->locality());
     108             : 
     109             :         // Add all hosts to this locality.
     110           0 :         for (const auto& host : locality_hosts) {
     111             :           // Add this endpoint's health status to this locality grouping.
     112           0 :           auto* endpoint = locality_health->add_endpoints_health();
     113           0 :           Network::Utility::addressToProtobufAddress(
     114           0 :               *host->address(), *endpoint->mutable_endpoint()->mutable_address());
     115             :           // TODO(lilika): Add support for more granular options of
     116             :           // envoy::config::core::v3::HealthStatus
     117           0 :           if (host->coarseHealth() == Host::Health::Healthy) {
     118           0 :             endpoint->set_health_status(envoy::config::core::v3::HEALTHY);
     119           0 :           } else {
     120           0 :             if (host->healthFlagGet(Host::HealthFlag::ACTIVE_HC_TIMEOUT)) {
     121           0 :               endpoint->set_health_status(envoy::config::core::v3::TIMEOUT);
     122           0 :             } else {
     123           0 :               endpoint->set_health_status(envoy::config::core::v3::UNHEALTHY);
     124           0 :             }
     125           0 :           }
     126             : 
     127             :           // TODO(drewsortega): remove this once we are on v4 and endpoint_health_response is
     128             :           // removed. Copy this endpoint's health info to the legacy flat-list.
     129           0 :           response.mutable_endpoint_health_response()->add_endpoints_health()->MergeFrom(*endpoint);
     130           0 :         }
     131           0 :       }
     132           0 :     }
     133           0 :   }
     134           0 :   ENVOY_LOG(debug, "Sending EndpointHealthResponse to server {}", response.DebugString());
     135           0 :   stream_->sendMessage(response, false);
     136           0 :   stats_.responses_.inc();
     137           0 :   setHdsStreamResponseTimer();
     138           0 :   return response;
     139           0 : }
     140             : 
     141           0 : void HdsDelegate::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
     142           0 :   UNREFERENCED_PARAMETER(metadata);
     143           0 : }
     144             : 
     145           0 : void HdsDelegate::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) {
     146           0 :   UNREFERENCED_PARAMETER(metadata);
     147           0 : }
     148             : 
     149             : envoy::config::cluster::v3::Cluster HdsDelegate::createClusterConfig(
     150           0 :     const envoy::service::health::v3::ClusterHealthCheck& cluster_health_check) {
     151             :   // Create HdsCluster config
     152           0 :   envoy::config::cluster::v3::Cluster cluster_config;
     153             : 
     154           0 :   cluster_config.set_name(cluster_health_check.cluster_name());
     155           0 :   cluster_config.mutable_connect_timeout()->set_seconds(ClusterTimeoutSeconds);
     156           0 :   cluster_config.mutable_per_connection_buffer_limit_bytes()->set_value(
     157           0 :       ClusterConnectionBufferLimitBytes);
     158             : 
     159             :   // Add endpoints to cluster
     160           0 :   for (const auto& locality_endpoints : cluster_health_check.locality_endpoints()) {
     161             :     // add endpoint group by locality to config
     162           0 :     auto* endpoints = cluster_config.mutable_load_assignment()->add_endpoints();
     163             :     // if this group contains locality information, save it.
     164           0 :     if (locality_endpoints.has_locality()) {
     165           0 :       endpoints->mutable_locality()->MergeFrom(locality_endpoints.locality());
     166           0 :     }
     167             : 
     168             :     // add all endpoints for this locality group to the config
     169           0 :     for (const auto& endpoint : locality_endpoints.endpoints()) {
     170           0 :       if (endpoint.has_health_check_config() &&
     171           0 :           endpoint.health_check_config().disable_active_health_check()) {
     172           0 :         ENVOY_LOG(debug, "Skip adding the endpoint {} with optional disabled health check for HDS.",
     173           0 :                   endpoint.DebugString());
     174           0 :         continue;
     175           0 :       }
     176           0 :       auto* new_endpoint = endpoints->add_lb_endpoints()->mutable_endpoint();
     177           0 :       new_endpoint->mutable_address()->MergeFrom(endpoint.address());
     178           0 :       new_endpoint->mutable_health_check_config()->MergeFrom(endpoint.health_check_config());
     179           0 :     }
     180           0 :   }
     181             : 
     182             :   // TODO(lilika): Add support for optional per-endpoint health checks
     183             : 
     184             :   // Add healthchecks to cluster
     185           0 :   for (auto& health_check : cluster_health_check.health_checks()) {
     186           0 :     cluster_config.add_health_checks()->MergeFrom(health_check);
     187           0 :   }
     188             : 
     189             :   // Add transport_socket_match to cluster for use in host connections.
     190           0 :   cluster_config.mutable_transport_socket_matches()->MergeFrom(
     191           0 :       cluster_health_check.transport_socket_matches());
     192             : 
     193           0 :   ENVOY_LOG(debug, "New HdsCluster config {} ", cluster_config.DebugString());
     194             : 
     195           0 :   return cluster_config;
     196           0 : }
     197             : 
     198             : absl::Status
     199             : HdsDelegate::updateHdsCluster(HdsClusterPtr cluster,
     200             :                               const envoy::config::cluster::v3::Cluster& cluster_config,
     201           0 :                               const envoy::config::core::v3::BindConfig& bind_config) {
     202           0 :   return cluster->update(cluster_config, bind_config, info_factory_, tls_);
     203           0 : }
     204             : 
     205             : HdsClusterPtr
     206             : HdsDelegate::createHdsCluster(const envoy::config::cluster::v3::Cluster& cluster_config,
     207           0 :                               const envoy::config::core::v3::BindConfig& bind_config) {
     208             :   // Create HdsCluster.
     209           0 :   auto new_cluster =
     210           0 :       std::make_shared<HdsCluster>(server_context_, std::move(cluster_config), bind_config,
     211           0 :                                    store_stats_, ssl_context_manager_, false, info_factory_, tls_);
     212             : 
     213             :   // Begin HCs in the background.
     214           0 :   new_cluster->initialize([] {});
     215           0 :   new_cluster->initHealthchecks();
     216             : 
     217           0 :   return new_cluster;
     218           0 : }
     219             : 
     220             : absl::Status HdsDelegate::processMessage(
     221           0 :     std::unique_ptr<envoy::service::health::v3::HealthCheckSpecifier>&& message) {
     222           0 :   ENVOY_LOG(debug, "New health check response message {} ", message->DebugString());
     223           0 :   ASSERT(message);
     224           0 :   std::vector<HdsClusterPtr> hds_clusters;
     225             :   // Maps to replace the current member variable versions.
     226           0 :   absl::flat_hash_map<std::string, HdsClusterPtr> new_hds_clusters_name_map;
     227             : 
     228           0 :   for (const auto& cluster_health_check : message->cluster_health_checks()) {
     229           0 :     if (!new_hds_clusters_name_map.contains(cluster_health_check.cluster_name())) {
     230           0 :       HdsClusterPtr cluster_ptr;
     231             : 
     232             :       // Create a new configuration for a cluster based on our different or new config.
     233           0 :       auto cluster_config = createClusterConfig(cluster_health_check);
     234             : 
     235             :       // If this particular cluster configuration happens to have a name, then it is possible
     236             :       // this particular cluster exists in the name map. We check and if we found a match,
     237             :       // attempt to update this cluster. If no match was found, either the cluster name is empty
     238             :       // or we have not seen a cluster by this name before. In either case, create a new cluster.
     239           0 :       auto cluster_map_pair = hds_clusters_name_map_.find(cluster_health_check.cluster_name());
     240           0 :       if (cluster_map_pair != hds_clusters_name_map_.end()) {
     241             :         // We have a previous cluster with this name, update.
     242           0 :         cluster_ptr = cluster_map_pair->second;
     243           0 :         absl::Status status = updateHdsCluster(cluster_ptr, cluster_config,
     244           0 :                                                cluster_health_check.upstream_bind_config());
     245           0 :         if (!status.ok()) {
     246           0 :           return status;
     247           0 :         }
     248           0 :       } else {
     249             :         // There is no cluster with this name previously or its an empty string, so just create a
     250             :         // new cluster.
     251           0 :         cluster_ptr = createHdsCluster(cluster_config, cluster_health_check.upstream_bind_config());
     252           0 :       }
     253             : 
     254             :       // If this cluster does not have a name, do not add it to the name map since cluster_name is
     255             :       // an optional field, and reconstruct these clusters on every update.
     256           0 :       if (!cluster_health_check.cluster_name().empty()) {
     257             :         // Since this cluster has a name, add it to our by-name map so we can update it later.
     258           0 :         new_hds_clusters_name_map.insert({cluster_health_check.cluster_name(), cluster_ptr});
     259           0 :       } else {
     260           0 :         ENVOY_LOG(warn,
     261           0 :                   "HDS Cluster has no cluster_name, it will be recreated instead of updated on "
     262           0 :                   "every reconfiguration.");
     263           0 :       }
     264             : 
     265             :       // Add this cluster to the flat list for health checking.
     266           0 :       hds_clusters.push_back(cluster_ptr);
     267           0 :     } else {
     268           0 :       ENVOY_LOG(warn, "An HDS Cluster with this cluster_name has already been added, not using.");
     269           0 :     }
     270           0 :   }
     271             : 
     272             :   // Overwrite our map data structures.
     273           0 :   hds_clusters_name_map_ = std::move(new_hds_clusters_name_map);
     274           0 :   hds_clusters_ = std::move(hds_clusters);
     275             : 
     276             :   // TODO: add stats reporting for number of clusters added, removed, and reused.
     277           0 :   return absl::OkStatus();
     278           0 : }
     279             : 
     280             : void HdsDelegate::onReceiveMessage(
     281           0 :     std::unique_ptr<envoy::service::health::v3::HealthCheckSpecifier>&& message) {
     282           0 :   stats_.requests_.inc();
     283           0 :   ENVOY_LOG(debug, "New health check response message {} ", message->DebugString());
     284             : 
     285           0 :   const uint64_t hash = MessageUtil::hash(*message);
     286             : 
     287           0 :   if (hash == specifier_hash_) {
     288           0 :     ENVOY_LOG(debug, "New health check specifier is unchanged, no action taken.");
     289           0 :     return;
     290           0 :   }
     291             : 
     292             :   // Validate message fields
     293           0 :   TRY_ASSERT_MAIN_THREAD {
     294           0 :     MessageUtil::validate(*message,
     295           0 :                           server_context_.messageValidationContext().dynamicValidationVisitor());
     296           0 :   }
     297           0 :   END_TRY
     298           0 :   CATCH(const ProtoValidationException& ex, {
     299             :     // Increment error count
     300           0 :     stats_.errors_.inc();
     301           0 :     ENVOY_LOG(warn, "Unable to validate health check specifier: {}", ex.what());
     302             : 
     303             :     // Do not continue processing message
     304           0 :     return;
     305           0 :   });
     306             : 
     307             :   // Set response
     308           0 :   auto server_response_ms = PROTOBUF_GET_MS_OR_DEFAULT(*message, interval, 1000);
     309             : 
     310             :   /// Process the HealthCheckSpecifier message.
     311           0 :   absl::Status status = processMessage(std::move(message));
     312           0 :   if (!status.ok()) {
     313           0 :     stats_.errors_.inc();
     314           0 :     ENVOY_LOG(warn, "Unable to validate health check specifier: {}", status.message());
     315             :     // Do not continue processing message
     316           0 :     return;
     317           0 :   }
     318             : 
     319           0 :   stats_.updates_.inc();
     320             : 
     321             :   // Update the stored hash.
     322           0 :   specifier_hash_ = hash;
     323             : 
     324           0 :   if (server_response_ms_ != server_response_ms) {
     325           0 :     server_response_ms_ = server_response_ms;
     326           0 :     setHdsStreamResponseTimer();
     327           0 :   }
     328           0 : }
     329             : 
     330           0 : void HdsDelegate::onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) {
     331           0 :   UNREFERENCED_PARAMETER(metadata);
     332           0 : }
     333             : 
     334           0 : void HdsDelegate::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) {
     335           0 :   ENVOY_LOG(warn, "{} gRPC config stream closed: {}, {}", service_method_.name(), status, message);
     336           0 :   hds_stream_response_timer_->disableTimer();
     337           0 :   stream_ = nullptr;
     338           0 :   server_response_ms_ = 0;
     339           0 :   specifier_hash_ = 0;
     340           0 :   handleFailure();
     341           0 : }
     342             : 
     343             : HdsCluster::HdsCluster(Server::Configuration::ServerFactoryContext& server_context,
     344             :                        envoy::config::cluster::v3::Cluster cluster,
     345             :                        const envoy::config::core::v3::BindConfig& bind_config, Stats::Store& stats,
     346             :                        Ssl::ContextManager& ssl_context_manager, bool added_via_api,
     347             :                        ClusterInfoFactory& info_factory, ThreadLocal::SlotAllocator& tls)
     348             :     : server_context_(server_context), cluster_(std::move(cluster)), stats_(stats),
     349             :       ssl_context_manager_(ssl_context_manager), added_via_api_(added_via_api),
     350           0 :       hosts_(new HostVector()), time_source_(server_context_.mainThreadDispatcher().timeSource()) {
     351           0 :   ENVOY_LOG(debug, "Creating an HdsCluster");
     352           0 :   priority_set_.getOrCreateHostSet(0);
     353             :   // Set initial hashes for possible delta updates.
     354           0 :   config_hash_ = MessageUtil::hash(cluster_);
     355           0 :   socket_match_hash_ = RepeatedPtrUtil::hash(cluster_.transport_socket_matches());
     356             : 
     357           0 :   info_ = info_factory.createClusterInfo(
     358           0 :       {server_context, cluster_, bind_config, stats_, ssl_context_manager_, added_via_api_, tls});
     359             : 
     360             :   // Temporary structure to hold Host pointers grouped by locality, to build
     361             :   // initial_hosts_per_locality_.
     362           0 :   std::vector<HostVector> hosts_by_locality;
     363           0 :   hosts_by_locality.reserve(cluster_.load_assignment().endpoints_size());
     364             : 
     365             :   // Iterate over every endpoint in every cluster.
     366           0 :   for (const auto& locality_endpoints : cluster_.load_assignment().endpoints()) {
     367             :     // Add a locality grouping to the hosts sorted by locality.
     368           0 :     hosts_by_locality.emplace_back();
     369           0 :     hosts_by_locality.back().reserve(locality_endpoints.lb_endpoints_size());
     370             : 
     371           0 :     for (const auto& host : locality_endpoints.lb_endpoints()) {
     372           0 :       const LocalityEndpointTuple endpoint_key = {locality_endpoints.locality(), host};
     373             :       // Initialize an endpoint host object.
     374           0 :       HostSharedPtr endpoint = std::make_shared<HostImpl>(
     375           0 :           info_, "", Network::Address::resolveProtoAddress(host.endpoint().address()), nullptr, 1,
     376           0 :           locality_endpoints.locality(), host.endpoint().health_check_config(), 0,
     377           0 :           envoy::config::core::v3::UNKNOWN, time_source_);
     378             :       // Add this host/endpoint pointer to our flat list of endpoints for health checking.
     379           0 :       hosts_->push_back(endpoint);
     380             :       // Add this host/endpoint pointer to our structured list by locality so results can be
     381             :       // requested by locality.
     382           0 :       hosts_by_locality.back().push_back(endpoint);
     383             :       // Add this host/endpoint pointer to our map so we can rebuild this later.
     384           0 :       hosts_map_.insert({endpoint_key, endpoint});
     385           0 :     }
     386           0 :   }
     387             :   // Create the HostsPerLocality.
     388           0 :   hosts_per_locality_ =
     389           0 :       std::make_shared<Envoy::Upstream::HostsPerLocalityImpl>(std::move(hosts_by_locality), false);
     390           0 : }
     391             : 
     392             : absl::Status HdsCluster::update(envoy::config::cluster::v3::Cluster cluster,
     393             :                                 const envoy::config::core::v3::BindConfig& bind_config,
     394           0 :                                 ClusterInfoFactory& info_factory, ThreadLocal::SlotAllocator& tls) {
     395             : 
     396             :   // check to see if the config changed. If it did, update.
     397           0 :   const uint64_t config_hash = MessageUtil::hash(cluster);
     398           0 :   if (config_hash_ != config_hash) {
     399           0 :     config_hash_ = config_hash;
     400           0 :     cluster_ = std::move(cluster);
     401             : 
     402             :     // Check to see if our list of socket matches have changed. If they have, create a new matcher
     403             :     // in info_.
     404           0 :     bool update_cluster_info = false;
     405           0 :     const uint64_t socket_match_hash = RepeatedPtrUtil::hash(cluster_.transport_socket_matches());
     406           0 :     if (socket_match_hash_ != socket_match_hash) {
     407           0 :       socket_match_hash_ = socket_match_hash;
     408           0 :       update_cluster_info = true;
     409           0 :       info_ = info_factory.createClusterInfo({server_context_, cluster_, bind_config, stats_,
     410           0 :                                               ssl_context_manager_, added_via_api_, tls});
     411           0 :     }
     412             : 
     413             :     // Check to see if anything in the endpoints list has changed.
     414           0 :     updateHosts(cluster_.load_assignment().endpoints(), update_cluster_info);
     415             : 
     416             :     // Check to see if any of the health checkers have changed.
     417           0 :     absl::Status status = updateHealthchecks(cluster_.health_checks());
     418           0 :     if (!status.ok()) {
     419           0 :       return status;
     420           0 :     }
     421           0 :   }
     422           0 :   return absl::OkStatus();
     423           0 : }
     424             : 
     425             : absl::Status HdsCluster::updateHealthchecks(
     426           0 :     const Protobuf::RepeatedPtrField<envoy::config::core::v3::HealthCheck>& health_checks) {
     427           0 :   std::vector<Upstream::HealthCheckerSharedPtr> health_checkers;
     428           0 :   HealthCheckerMap health_checkers_map;
     429             : 
     430           0 :   for (const auto& health_check : health_checks) {
     431             :     // Check to see if this exact same health_check config already has a health checker.
     432           0 :     auto health_checker = health_checkers_map_.find(health_check);
     433           0 :     if (health_checker != health_checkers_map_.end()) {
     434             :       // If it does, use it.
     435           0 :       health_checkers_map.insert({health_check, health_checker->second});
     436           0 :       health_checkers.push_back(health_checker->second);
     437           0 :     } else {
     438             :       // If it does not, create a new one.
     439           0 :       auto checker_or_error =
     440           0 :           Upstream::HealthCheckerFactory::create(health_check, *this, server_context_);
     441           0 :       RETURN_IF_STATUS_NOT_OK(checker_or_error);
     442           0 :       auto new_health_checker = checker_or_error.value();
     443           0 :       health_checkers_map.insert({health_check, new_health_checker});
     444           0 :       health_checkers.push_back(new_health_checker);
     445             : 
     446             :       // Start these health checks now because upstream assumes they already have been started.
     447           0 :       new_health_checker->start();
     448           0 :     }
     449           0 :   }
     450             : 
     451             :   // replace our member data structures with our newly created ones.
     452           0 :   health_checkers_ = std::move(health_checkers);
     453           0 :   health_checkers_map_ = std::move(health_checkers_map);
     454             : 
     455             :   // TODO: add stats reporting for number of health checkers added, removed, and reused.
     456           0 :   return absl::OkStatus();
     457           0 : }
     458             : 
     459             : void HdsCluster::updateHosts(
     460             :     const Protobuf::RepeatedPtrField<envoy::config::endpoint::v3::LocalityLbEndpoints>&
     461             :         locality_endpoints,
     462           0 :     bool update_cluster_info) {
     463             :   // Create the data structures needed for PrioritySet::update.
     464           0 :   HostVectorSharedPtr hosts = std::make_shared<std::vector<HostSharedPtr>>();
     465           0 :   std::vector<HostSharedPtr> hosts_added;
     466           0 :   std::vector<HostSharedPtr> hosts_removed;
     467           0 :   std::vector<HostVector> hosts_by_locality;
     468             : 
     469             :   // Use for delta update comparison.
     470           0 :   HostsMap hosts_map;
     471             : 
     472           0 :   for (auto& endpoints : locality_endpoints) {
     473           0 :     hosts_by_locality.emplace_back();
     474           0 :     for (auto& endpoint : endpoints.lb_endpoints()) {
     475           0 :       LocalityEndpointTuple endpoint_key = {endpoints.locality(), endpoint};
     476             : 
     477             :       // Check to see if this exact Locality+Endpoint has been seen before.
     478             :       // Also, if we made changes to our info, re-create all endpoints.
     479           0 :       auto host_pair = hosts_map_.find(endpoint_key);
     480           0 :       HostSharedPtr host;
     481           0 :       if (!update_cluster_info && host_pair != hosts_map_.end()) {
     482             :         // If we have this exact pair, save the shared pointer.
     483           0 :         host = host_pair->second;
     484           0 :       } else {
     485             :         // We do not have this endpoint saved, so create a new one.
     486           0 :         host = std::make_shared<HostImpl>(
     487           0 :             info_, "", Network::Address::resolveProtoAddress(endpoint.endpoint().address()),
     488           0 :             nullptr, 1, endpoints.locality(), endpoint.endpoint().health_check_config(), 0,
     489           0 :             envoy::config::core::v3::UNKNOWN, time_source_);
     490             : 
     491             :         // Set the initial health status as in HdsCluster::initialize.
     492           0 :         host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC);
     493             : 
     494             :         // Add to our hosts added list and save the shared pointer.
     495           0 :         hosts_added.push_back(host);
     496           0 :       }
     497             : 
     498             :       // No matter if it is reused or new, always add to these data structures.
     499           0 :       hosts_by_locality.back().push_back(host);
     500           0 :       hosts->push_back(host);
     501           0 :       hosts_map.insert({endpoint_key, host});
     502           0 :     }
     503           0 :   }
     504             : 
     505             :   // Compare the old map to the new to find out which endpoints are going to be removed.
     506           0 :   for (auto& host_pair : hosts_map_) {
     507           0 :     if (!hosts_map.contains(host_pair.first)) {
     508           0 :       hosts_removed.push_back(host_pair.second);
     509           0 :     }
     510           0 :   }
     511             : 
     512             :   // Update the member data structures.
     513           0 :   hosts_ = std::move(hosts);
     514           0 :   hosts_map_ = std::move(hosts_map);
     515             : 
     516             :   // TODO: add stats reporting for number of endpoints added, removed, and reused.
     517           0 :   ENVOY_LOG(debug, "Hosts Added: {}, Removed: {}, Reused: {}", hosts_added.size(),
     518           0 :             hosts_removed.size(), hosts_->size() - hosts_added.size());
     519             : 
     520             :   // Update the priority set.
     521           0 :   hosts_per_locality_ =
     522           0 :       std::make_shared<Envoy::Upstream::HostsPerLocalityImpl>(std::move(hosts_by_locality), false);
     523           0 :   priority_set_.updateHosts(0, HostSetImpl::partitionHosts(hosts_, hosts_per_locality_), {},
     524           0 :                             hosts_added, hosts_removed, absl::nullopt, absl::nullopt);
     525           0 : }
     526             : 
     527           0 : ClusterSharedPtr HdsCluster::create() { return nullptr; }
     528             : 
     529             : ClusterInfoConstSharedPtr
     530           0 : ProdClusterInfoFactory::createClusterInfo(const CreateClusterInfoParams& params) {
     531           0 :   Envoy::Stats::ScopeSharedPtr scope =
     532           0 :       params.stats_.createScope(fmt::format("cluster.{}.", params.cluster_.name()));
     533             : 
     534           0 :   Envoy::Server::Configuration::TransportSocketFactoryContextImpl factory_context(
     535           0 :       params.server_context_, params.ssl_context_manager_, *scope,
     536           0 :       params.server_context_.clusterManager(), params.server_context_.messageValidationVisitor());
     537             : 
     538             :   // TODO(JimmyCYJ): Support SDS for HDS cluster.
     539           0 :   Network::UpstreamTransportSocketFactoryPtr socket_factory =
     540           0 :       Upstream::createTransportSocketFactory(params.cluster_, factory_context);
     541           0 :   auto socket_matcher = std::make_unique<TransportSocketMatcherImpl>(
     542           0 :       params.cluster_.transport_socket_matches(), factory_context, socket_factory, *scope);
     543             : 
     544           0 :   return std::make_unique<ClusterInfoImpl>(
     545           0 :       params.server_context_.initManager(), params.server_context_, params.cluster_,
     546           0 :       params.bind_config_, params.server_context_.runtime(), std::move(socket_matcher),
     547           0 :       std::move(scope), params.added_via_api_, factory_context);
     548           0 : }
     549             : 
     550           0 : void HdsCluster::initHealthchecks() {
     551           0 :   for (auto& health_check : cluster_.health_checks()) {
     552           0 :     auto health_checker_or_error =
     553           0 :         Upstream::HealthCheckerFactory::create(health_check, *this, server_context_);
     554           0 :     THROW_IF_STATUS_NOT_OK(health_checker_or_error, throw);
     555             : 
     556           0 :     auto health_checker = health_checker_or_error.value();
     557           0 :     health_checkers_.push_back(health_checker);
     558           0 :     health_checkers_map_.insert({health_check, health_checker});
     559           0 :     health_checker->start();
     560           0 :   }
     561           0 : }
     562             : 
     563           0 : void HdsCluster::initialize(std::function<void()> callback) {
     564           0 :   initialization_complete_callback_ = callback;
     565             : 
     566             :   // If this function gets called again we do not want to touch the priority set again with the
     567             :   // initial hosts, because the hosts may have changed.
     568           0 :   if (!initialized_) {
     569           0 :     for (const auto& host : *hosts_) {
     570           0 :       host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC);
     571           0 :     }
     572             :     // Use the ungrouped and grouped hosts lists to retain locality structure in the priority set.
     573           0 :     priority_set_.updateHosts(0, HostSetImpl::partitionHosts(hosts_, hosts_per_locality_), {},
     574           0 :                               *hosts_, {}, absl::nullopt, absl::nullopt);
     575             : 
     576           0 :     initialized_ = true;
     577           0 :   }
     578           0 : }
     579             : 
     580           0 : void HdsCluster::setOutlierDetector(const Outlier::DetectorSharedPtr&) {}
     581             : 
     582             : } // namespace Upstream
     583             : } // namespace Envoy

Generated by: LCOV version 1.15