1
#include "source/common/upstream/health_discovery_service.h"
2

            
3
#include "envoy/config/cluster/v3/cluster.pb.h"
4
#include "envoy/config/core/v3/address.pb.h"
5
#include "envoy/config/core/v3/base.pb.h"
6
#include "envoy/config/core/v3/health_check.pb.h"
7
#include "envoy/config/endpoint/v3/endpoint_components.pb.h"
8
#include "envoy/service/health/v3/hds.pb.h"
9
#include "envoy/service/health/v3/hds.pb.validate.h"
10
#include "envoy/stats/scope.h"
11

            
12
#include "source/common/protobuf/message_validator_impl.h"
13
#include "source/common/protobuf/protobuf.h"
14
#include "source/common/protobuf/utility.h"
15
#include "source/common/upstream/upstream_impl.h"
16

            
17
namespace Envoy {
18
namespace Upstream {
19

            
20
/**
21
 * TODO(lilika): Add API knob for RetryInitialDelayMilliseconds
22
 * and RetryMaxDelayMilliseconds, instead of hardcoding them.
23
 *
24
 * Parameters of the jittered backoff strategy that defines how often
25
 * we retry to establish a stream to the management server
26
 */
27
static constexpr uint32_t RetryInitialDelayMilliseconds = 1000;
28
static constexpr uint32_t RetryMaxDelayMilliseconds = 30000;
29

            
30
HdsDelegate::HdsDelegate(Server::Configuration::ServerFactoryContext& server_context,
31
                         Stats::Scope& scope, Grpc::RawAsyncClientPtr async_client,
32
                         Envoy::Stats::Store& stats, Ssl::ContextManager& ssl_context_manager)
33
63
    : stats_{ALL_HDS_STATS(POOL_COUNTER_PREFIX(scope, "hds_delegate."))},
34
63
      service_method_(*Protobuf::DescriptorPool::generated_pool()->FindMethodByName(
35
63
          "envoy.service.health.v3.HealthDiscoveryService.StreamHealthCheck")),
36
63
      async_client_(std::move(async_client)), dispatcher_(server_context.mainThreadDispatcher()),
37
63
      server_context_(server_context), store_stats_(stats),
38
63
      ssl_context_manager_(ssl_context_manager),
39
63
      info_factory_(std::make_unique<ProdClusterInfoFactory>()),
40
63
      tls_(server_context_.threadLocal()) {
41
63
  health_check_request_.mutable_health_check_request()->mutable_node()->MergeFrom(
42
63
      server_context.localInfo().node());
43
63
  backoff_strategy_ = std::make_unique<JitteredExponentialBackOffStrategy>(
44
63
      RetryInitialDelayMilliseconds, RetryMaxDelayMilliseconds,
45
63
      server_context_.api().randomGenerator());
46
63
  hds_retry_timer_ = dispatcher_.createTimer([this]() -> void { establishNewStream(); });
47
93
  hds_stream_response_timer_ = dispatcher_.createTimer([this]() -> void { sendResponse(); });
48

            
49
  // TODO(lilika): Add support for other types of healthchecks
50
63
  health_check_request_.mutable_health_check_request()
51
63
      ->mutable_capability()
52
63
      ->add_health_check_protocols(envoy::service::health::v3::Capability::HTTP);
53
63
  health_check_request_.mutable_health_check_request()
54
63
      ->mutable_capability()
55
63
      ->add_health_check_protocols(envoy::service::health::v3::Capability::TCP);
56

            
57
63
  establishNewStream();
58
63
}
59

            
60
49
void HdsDelegate::setHdsRetryTimer() {
61
49
  const auto retry_ms = std::chrono::milliseconds(backoff_strategy_->nextBackOffMs());
62
49
  ENVOY_LOG(warn, "HdsDelegate stream/connection failure, will retry in {} ms.", retry_ms.count());
63

            
64
49
  hds_retry_timer_->enableTimer(retry_ms);
65
49
}
66

            
67
140
void HdsDelegate::setHdsStreamResponseTimer() {
68
140
  hds_stream_response_timer_->enableTimer(std::chrono::milliseconds(server_response_ms_));
69
140
}
70

            
71
72
void HdsDelegate::establishNewStream() {
72
72
  ENVOY_LOG(debug, "Establishing new gRPC bidi stream for {}", service_method_.DebugString());
73
72
  stream_ = async_client_->start(service_method_, *this, Http::AsyncClient::StreamOptions());
74
72
  if (stream_ == nullptr) {
75
5
    ENVOY_LOG(warn, "Unable to establish new stream");
76
5
    handleFailure();
77
5
    return;
78
5
  }
79

            
80
67
  ENVOY_LOG(debug, "Sending HealthCheckRequest {} ", health_check_request_.DebugString());
81
67
  stream_->sendMessage(health_check_request_, false);
82
67
  stats_.responses_.inc();
83
67
  backoff_strategy_->reset();
84
67
}
85

            
86
49
void HdsDelegate::handleFailure() {
87
49
  stats_.errors_.inc();
88
49
  setHdsRetryTimer();
89
49
}
90

            
91
81
envoy::service::health::v3::HealthCheckRequestOrEndpointHealthResponse HdsDelegate::sendResponse() {
92
81
  envoy::service::health::v3::HealthCheckRequestOrEndpointHealthResponse response;
93

            
94
86
  for (const auto& cluster : hds_clusters_) {
95
    // Add cluster health response and set name.
96
86
    auto* cluster_health =
97
86
        response.mutable_endpoint_health_response()->add_cluster_endpoints_health();
98
86
    cluster_health->set_cluster_name(cluster->info()->name());
99

            
100
    // Iterate through all hosts in our priority set.
101
86
    for (const auto& hosts : cluster->prioritySet().hostSetsPerPriority()) {
102
      // Get a grouping of hosts by locality.
103
90
      for (const auto& locality_hosts : hosts->hostsPerLocality().get()) {
104
        // For this locality, add the response grouping.
105
90
        envoy::service::health::v3::LocalityEndpointsHealth* locality_health =
106
90
            cluster_health->add_locality_endpoints_health();
107
90
        locality_health->mutable_locality()->MergeFrom(locality_hosts[0]->locality());
108

            
109
        // Add all hosts to this locality.
110
100
        for (const auto& host : locality_hosts) {
111
          // Add this endpoint's health status to this locality grouping.
112
100
          auto* endpoint = locality_health->add_endpoints_health();
113
100
          Network::Utility::addressToProtobufAddress(
114
100
              *host->address(), *endpoint->mutable_endpoint()->mutable_address());
115
          // TODO(lilika): Add support for more granular options of
116
          // envoy::config::core::v3::HealthStatus
117
100
          if (host->coarseHealth() == Host::Health::Healthy) {
118
26
            endpoint->set_health_status(envoy::config::core::v3::HEALTHY);
119
74
          } else {
120
74
            if (host->healthFlagGet(Host::HealthFlag::ACTIVE_HC_TIMEOUT)) {
121
4
              endpoint->set_health_status(envoy::config::core::v3::TIMEOUT);
122
70
            } else {
123
70
              endpoint->set_health_status(envoy::config::core::v3::UNHEALTHY);
124
70
            }
125
74
          }
126

            
127
          // TODO(drewsortega): remove this once we are on v4 and endpoint_health_response is
128
          // removed. Copy this endpoint's health info to the legacy flat-list.
129
100
          response.mutable_endpoint_health_response()->add_endpoints_health()->MergeFrom(*endpoint);
130
100
        }
131
90
      }
132
86
    }
133
86
  }
134
81
  ENVOY_LOG(debug, "Sending EndpointHealthResponse to server {}", response.DebugString());
135
81
  stream_->sendMessage(response, false);
136
81
  stats_.responses_.inc();
137
81
  setHdsStreamResponseTimer();
138
81
  return response;
139
81
}
140

            
141
44
void HdsDelegate::onCreateInitialMetadata(Http::RequestHeaderMap& metadata) {
142
44
  UNREFERENCED_PARAMETER(metadata);
143
44
}
144

            
145
44
void HdsDelegate::onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) {
146
44
  UNREFERENCED_PARAMETER(metadata);
147
44
}
148

            
149
envoy::config::cluster::v3::Cluster HdsDelegate::createClusterConfig(
150
97
    const envoy::service::health::v3::ClusterHealthCheck& cluster_health_check) {
151
  // Create HdsCluster config
152
97
  envoy::config::cluster::v3::Cluster cluster_config;
153

            
154
97
  cluster_config.set_name(cluster_health_check.cluster_name());
155
97
  cluster_config.mutable_connect_timeout()->set_seconds(ClusterTimeoutSeconds);
156
97
  cluster_config.mutable_per_connection_buffer_limit_bytes()->set_value(
157
97
      ClusterConnectionBufferLimitBytes);
158

            
159
  // Add endpoints to cluster
160
106
  for (const auto& locality_endpoints : cluster_health_check.locality_endpoints()) {
161
    // add endpoint group by locality to config
162
106
    auto* endpoints = cluster_config.mutable_load_assignment()->add_endpoints();
163
    // if this group contains locality information, save it.
164
106
    if (locality_endpoints.has_locality()) {
165
103
      endpoints->mutable_locality()->MergeFrom(locality_endpoints.locality());
166
103
    }
167

            
168
    // add all endpoints for this locality group to the config
169
128
    for (const auto& endpoint : locality_endpoints.endpoints()) {
170
128
      if (endpoint.has_health_check_config() &&
171
128
          endpoint.health_check_config().disable_active_health_check()) {
172
2
        ENVOY_LOG(debug, "Skip adding the endpoint {} with optional disabled health check for HDS.",
173
2
                  endpoint.DebugString());
174
2
        continue;
175
2
      }
176
126
      auto* new_endpoint = endpoints->add_lb_endpoints()->mutable_endpoint();
177
126
      new_endpoint->mutable_address()->MergeFrom(endpoint.address());
178
126
      new_endpoint->mutable_health_check_config()->MergeFrom(endpoint.health_check_config());
179
126
    }
180
106
  }
181

            
182
  // TODO(lilika): Add support for optional per-endpoint health checks
183

            
184
  // Add healthchecks to cluster
185
97
  for (auto& health_check : cluster_health_check.health_checks()) {
186
97
    cluster_config.add_health_checks()->MergeFrom(health_check);
187
97
  }
188

            
189
  // Add transport_socket_match to cluster for use in host connections.
190
97
  cluster_config.mutable_transport_socket_matches()->MergeFrom(
191
97
      cluster_health_check.transport_socket_matches());
192

            
193
97
  ENVOY_LOG(debug, "New HdsCluster config {} ", cluster_config.DebugString());
194

            
195
97
  return cluster_config;
196
97
}
197

            
198
absl::Status
199
HdsDelegate::updateHdsCluster(HdsClusterPtr cluster,
200
                              const envoy::config::cluster::v3::Cluster& cluster_config,
201
21
                              const envoy::config::core::v3::BindConfig& bind_config) {
202
21
  return cluster->update(cluster_config, bind_config, *info_factory_, tls_);
203
21
}
204

            
205
HdsClusterPtr
206
HdsDelegate::createHdsCluster(const envoy::config::cluster::v3::Cluster& cluster_config,
207
76
                              const envoy::config::core::v3::BindConfig& bind_config) {
208
  // Create HdsCluster.
209
76
  auto new_cluster =
210
76
      std::make_shared<HdsCluster>(server_context_, std::move(cluster_config), bind_config,
211
76
                                   store_stats_, ssl_context_manager_, false, *info_factory_, tls_);
212

            
213
  // Begin HCs in the background.
214
76
  new_cluster->initialize([] { return absl::OkStatus(); });
215
76
  new_cluster->initHealthchecks();
216

            
217
76
  return new_cluster;
218
76
}
219

            
220
absl::Status HdsDelegate::processMessage(
221
83
    std::unique_ptr<envoy::service::health::v3::HealthCheckSpecifier>&& message) {
222
83
  ENVOY_LOG(debug, "New health check response message {} ", message->DebugString());
223
83
  ASSERT(message);
224
83
  std::vector<HdsClusterPtr> hds_clusters;
225
  // Maps to replace the current member variable versions.
226
83
  absl::flat_hash_map<std::string, HdsClusterPtr> new_hds_clusters_name_map;
227

            
228
98
  for (const auto& cluster_health_check : message->cluster_health_checks()) {
229
98
    if (!new_hds_clusters_name_map.contains(cluster_health_check.cluster_name())) {
230
97
      HdsClusterPtr cluster_ptr;
231

            
232
      // Create a new configuration for a cluster based on our different or new config.
233
97
      auto cluster_config = createClusterConfig(cluster_health_check);
234

            
235
      // If this particular cluster configuration happens to have a name, then it is possible
236
      // this particular cluster exists in the name map. We check and if we found a match,
237
      // attempt to update this cluster. If no match was found, either the cluster name is empty
238
      // or we have not seen a cluster by this name before. In either case, create a new cluster.
239
97
      auto cluster_map_pair = hds_clusters_name_map_.find(cluster_health_check.cluster_name());
240
97
      if (cluster_map_pair != hds_clusters_name_map_.end()) {
241
        // We have a previous cluster with this name, update.
242
21
        cluster_ptr = cluster_map_pair->second;
243
21
        absl::Status status = updateHdsCluster(cluster_ptr, cluster_config,
244
21
                                               cluster_health_check.upstream_bind_config());
245
21
        if (!status.ok()) {
246
1
          return status;
247
1
        }
248
76
      } else {
249
        // There is no cluster with this name previously or its an empty string, so just create a
250
        // new cluster.
251
76
        cluster_ptr = createHdsCluster(cluster_config, cluster_health_check.upstream_bind_config());
252
76
      }
253

            
254
      // If this cluster does not have a name, do not add it to the name map since cluster_name is
255
      // an optional field, and reconstruct these clusters on every update.
256
96
      if (!cluster_health_check.cluster_name().empty()) {
257
        // Since this cluster has a name, add it to our by-name map so we can update it later.
258
92
        new_hds_clusters_name_map.insert({cluster_health_check.cluster_name(), cluster_ptr});
259
92
      } else {
260
4
        ENVOY_LOG(warn,
261
4
                  "HDS Cluster has no cluster_name, it will be recreated instead of updated on "
262
4
                  "every reconfiguration.");
263
4
      }
264

            
265
      // Add this cluster to the flat list for health checking.
266
96
      hds_clusters.push_back(cluster_ptr);
267
96
    } else {
268
1
      ENVOY_LOG(warn, "An HDS Cluster with this cluster_name has already been added, not using.");
269
1
    }
270
98
  }
271

            
272
  // Overwrite our map data structures.
273
82
  hds_clusters_name_map_ = std::move(new_hds_clusters_name_map);
274
82
  hds_clusters_ = std::move(hds_clusters);
275

            
276
  // TODO: add stats reporting for number of clusters added, removed, and reused.
277
82
  return absl::OkStatus();
278
83
}
279

            
280
void HdsDelegate::onReceiveMessage(
281
80
    std::unique_ptr<envoy::service::health::v3::HealthCheckSpecifier>&& message) {
282
80
  stats_.requests_.inc();
283
80
  ENVOY_LOG(debug, "New health check response message {} ", message->DebugString());
284

            
285
80
  const uint64_t hash = MessageUtil::hash(*message);
286

            
287
80
  if (hash == specifier_hash_) {
288
1
    ENVOY_LOG(debug, "New health check specifier is unchanged, no action taken.");
289
1
    return;
290
1
  }
291

            
292
  // Validate message fields
293
79
  TRY_ASSERT_MAIN_THREAD {
294
79
    MessageUtil::validate(*message,
295
79
                          server_context_.messageValidationContext().dynamicValidationVisitor());
296
79
  }
297
79
  END_TRY
298
79
  CATCH(const ProtoValidationException& ex, {
299
    // Increment error count
300
79
    stats_.errors_.inc();
301
79
    ENVOY_LOG(warn, "Unable to validate health check specifier: {}", ex.what());
302

            
303
    // Do not continue processing message
304
79
    return;
305
79
  });
306

            
307
  // Set response
308
77
  auto server_response_ms = PROTOBUF_GET_MS_OR_DEFAULT(*message, interval, 1000);
309

            
310
  /// Process the HealthCheckSpecifier message.
311
77
  absl::Status status = processMessage(std::move(message));
312
77
  if (!status.ok()) {
313
1
    stats_.errors_.inc();
314
1
    ENVOY_LOG(warn, "Unable to validate health check specifier: {}", status.message());
315
    // Do not continue processing message
316
1
    return;
317
1
  }
318

            
319
76
  stats_.updates_.inc();
320

            
321
  // Update the stored hash.
322
76
  specifier_hash_ = hash;
323

            
324
76
  if (server_response_ms_ != server_response_ms) {
325
59
    server_response_ms_ = server_response_ms;
326
59
    setHdsStreamResponseTimer();
327
59
  }
328
76
}
329

            
330
44
void HdsDelegate::onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) {
331
44
  UNREFERENCED_PARAMETER(metadata);
332
44
}
333

            
334
44
void HdsDelegate::onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) {
335
44
  ENVOY_LOG(warn, "{} gRPC config stream closed: {}, {}", service_method_.name(), status, message);
336
44
  hds_stream_response_timer_->disableTimer();
337
44
  stream_ = nullptr;
338
44
  server_response_ms_ = 0;
339
44
  specifier_hash_ = 0;
340
44
  handleFailure();
341
44
}
342

            
343
HdsCluster::HdsCluster(Server::Configuration::ServerFactoryContext& server_context,
344
                       envoy::config::cluster::v3::Cluster cluster,
345
                       const envoy::config::core::v3::BindConfig& bind_config, Stats::Store& stats,
346
                       Ssl::ContextManager& ssl_context_manager, bool added_via_api,
347
                       ClusterInfoFactory& info_factory, ThreadLocal::SlotAllocator& tls)
348
76
    : server_context_(server_context), cluster_(std::move(cluster)), stats_(stats),
349
76
      ssl_context_manager_(ssl_context_manager), added_via_api_(added_via_api),
350
76
      hosts_(new HostVector()) {
351
76
  ENVOY_LOG(debug, "Creating an HdsCluster");
352
76
  priority_set_.getOrCreateHostSet(0);
353
  // Set initial hashes for possible delta updates.
354
76
  config_hash_ = MessageUtil::hash(cluster_);
355
76
  socket_match_hash_ = RepeatedPtrUtil::hash(cluster_.transport_socket_matches());
356

            
357
76
  info_ = info_factory.createClusterInfo(
358
76
      {server_context, cluster_, bind_config, stats_, ssl_context_manager_, added_via_api_, tls});
359

            
360
  // Temporary structure to hold Host pointers grouped by locality, to build
361
  // initial_hosts_per_locality_.
362
76
  std::vector<HostVector> hosts_by_locality;
363
76
  hosts_by_locality.reserve(cluster_.load_assignment().endpoints_size());
364

            
365
  // Iterate over every endpoint in every cluster.
366
84
  for (const auto& locality_endpoints : cluster_.load_assignment().endpoints()) {
367
    // Add a locality grouping to the hosts sorted by locality.
368
84
    hosts_by_locality.emplace_back();
369
84
    hosts_by_locality.back().reserve(locality_endpoints.lb_endpoints_size());
370

            
371
95
    for (const auto& host : locality_endpoints.lb_endpoints()) {
372
95
      const LocalityEndpointTuple endpoint_key = {locality_endpoints.locality(), host};
373
      // Initialize an endpoint host object.
374
95
      auto address_or_error = Network::Address::resolveProtoAddress(host.endpoint().address());
375
95
      THROW_IF_NOT_OK_REF(address_or_error.status());
376
95
      auto const_locality_shared_pool = LocalityPool::getConstLocalitySharedPool(
377
95
          server_context_.singletonManager(), server_context_.mainThreadDispatcher());
378
95
      HostSharedPtr endpoint = std::shared_ptr<HostImpl>(THROW_OR_RETURN_VALUE(
379
95
          HostImpl::create(info_, "", std::move(address_or_error.value()), nullptr, nullptr, 1,
380
95
                           const_locality_shared_pool->getObject(locality_endpoints.locality()),
381
95
                           host.endpoint().health_check_config(), 0,
382
95
                           envoy::config::core::v3::UNKNOWN),
383
95
          std::unique_ptr<HostImpl>));
384
      // Add this host/endpoint pointer to our flat list of endpoints for health checking.
385
95
      hosts_->push_back(endpoint);
386
      // Add this host/endpoint pointer to our structured list by locality so results can be
387
      // requested by locality.
388
95
      hosts_by_locality.back().push_back(endpoint);
389
      // Add this host/endpoint pointer to our map so we can rebuild this later.
390
95
      hosts_map_.insert({endpoint_key, endpoint});
391
95
    }
392
84
  }
393
  // Create the HostsPerLocality.
394
76
  hosts_per_locality_ =
395
76
      std::make_shared<Envoy::Upstream::HostsPerLocalityImpl>(std::move(hosts_by_locality), false);
396
76
}
397

            
398
absl::Status HdsCluster::update(envoy::config::cluster::v3::Cluster cluster,
399
                                const envoy::config::core::v3::BindConfig& bind_config,
400
21
                                ClusterInfoFactory& info_factory, ThreadLocal::SlotAllocator& tls) {
401

            
402
  // check to see if the config changed. If it did, update.
403
21
  const uint64_t config_hash = MessageUtil::hash(cluster);
404
21
  if (config_hash_ != config_hash) {
405
10
    config_hash_ = config_hash;
406
10
    cluster_ = std::move(cluster);
407

            
408
    // Check to see if our list of socket matches have changed. If they have, create a new matcher
409
    // in info_.
410
10
    bool update_cluster_info = false;
411
10
    const uint64_t socket_match_hash = RepeatedPtrUtil::hash(cluster_.transport_socket_matches());
412
10
    if (socket_match_hash_ != socket_match_hash) {
413
1
      socket_match_hash_ = socket_match_hash;
414
1
      update_cluster_info = true;
415
1
      info_ = info_factory.createClusterInfo({server_context_, cluster_, bind_config, stats_,
416
1
                                              ssl_context_manager_, added_via_api_, tls});
417
1
    }
418

            
419
    // Check to see if anything in the endpoints list has changed.
420
10
    updateHosts(cluster_.load_assignment().endpoints(), update_cluster_info);
421

            
422
    // Check to see if any of the health checkers have changed.
423
10
    absl::Status status = updateHealthchecks(cluster_.health_checks());
424
10
    if (!status.ok()) {
425
1
      return status;
426
1
    }
427
10
  }
428
20
  return absl::OkStatus();
429
21
}
430

            
431
absl::Status HdsCluster::updateHealthchecks(
432
10
    const Protobuf::RepeatedPtrField<envoy::config::core::v3::HealthCheck>& health_checks) {
433
10
  std::vector<Upstream::HealthCheckerSharedPtr> health_checkers;
434
10
  HealthCheckerMap health_checkers_map;
435

            
436
11
  for (const auto& health_check : health_checks) {
437
    // Check to see if this exact same health_check config already has a health checker.
438
11
    auto health_checker = health_checkers_map_.find(health_check);
439
11
    if (health_checker != health_checkers_map_.end()) {
440
      // If it does, use it.
441
7
      health_checkers_map.insert({health_check, health_checker->second});
442
7
      health_checkers.push_back(health_checker->second);
443
7
    } else {
444
      // If it does not, create a new one.
445
4
      auto checker_or_error =
446
4
          Upstream::HealthCheckerFactory::create(health_check, *this, server_context_);
447
4
      RETURN_IF_NOT_OK_REF(checker_or_error.status());
448
3
      auto new_health_checker = checker_or_error.value();
449
3
      health_checkers_map.insert({health_check, new_health_checker});
450
3
      health_checkers.push_back(new_health_checker);
451

            
452
      // Start these health checks now because upstream assumes they already have been started.
453
3
      new_health_checker->start();
454
3
    }
455
11
  }
456

            
457
  // replace our member data structures with our newly created ones.
458
9
  health_checkers_ = std::move(health_checkers);
459
9
  health_checkers_map_ = std::move(health_checkers_map);
460

            
461
  // TODO: add stats reporting for number of health checkers added, removed, and reused.
462
9
  return absl::OkStatus();
463
10
}
464

            
465
void HdsCluster::updateHosts(
466
    const Protobuf::RepeatedPtrField<envoy::config::endpoint::v3::LocalityLbEndpoints>&
467
        locality_endpoints,
468
10
    bool update_cluster_info) {
469
  // Create the data structures needed for PrioritySet::update.
470
10
  HostVectorSharedPtr hosts = std::make_shared<std::vector<HostSharedPtr>>();
471
10
  std::vector<HostSharedPtr> hosts_added;
472
10
  std::vector<HostSharedPtr> hosts_removed;
473
10
  std::vector<HostVector> hosts_by_locality;
474

            
475
  // Use for delta update comparison.
476
10
  HostsMap hosts_map;
477

            
478
11
  for (auto& endpoints : locality_endpoints) {
479
11
    hosts_by_locality.emplace_back();
480
20
    for (auto& endpoint : endpoints.lb_endpoints()) {
481
20
      LocalityEndpointTuple endpoint_key = {endpoints.locality(), endpoint};
482

            
483
      // Check to see if this exact Locality+Endpoint has been seen before.
484
      // Also, if we made changes to our info, re-create all endpoints.
485
20
      auto host_pair = hosts_map_.find(endpoint_key);
486
20
      HostSharedPtr host;
487
20
      if (!update_cluster_info && host_pair != hosts_map_.end()) {
488
        // If we have this exact pair, save the shared pointer.
489
9
        host = host_pair->second;
490
11
      } else {
491
        // We do not have this endpoint saved, so create a new one.
492
11
        auto address_or_error =
493
11
            Network::Address::resolveProtoAddress(endpoint.endpoint().address());
494
11
        THROW_IF_NOT_OK_REF(address_or_error.status());
495
11
        auto const_locality_shared_pool = LocalityPool::getConstLocalitySharedPool(
496
11
            server_context_.singletonManager(), server_context_.mainThreadDispatcher());
497
11
        host = std::shared_ptr<HostImpl>(THROW_OR_RETURN_VALUE(
498
11
            HostImpl::create(info_, "", std::move(address_or_error.value()), nullptr, nullptr, 1,
499
11
                             const_locality_shared_pool->getObject(endpoints.locality()),
500
11
                             endpoint.endpoint().health_check_config(), 0,
501
11
                             envoy::config::core::v3::UNKNOWN),
502
11
            std::unique_ptr<HostImpl>));
503

            
504
        // Set the initial health status as in HdsCluster::initialize.
505
11
        host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC);
506

            
507
        // Add to our hosts added list and save the shared pointer.
508
11
        hosts_added.push_back(host);
509
11
      }
510

            
511
      // No matter if it is reused or new, always add to these data structures.
512
20
      hosts_by_locality.back().push_back(host);
513
20
      hosts->push_back(host);
514
20
      hosts_map.insert({endpoint_key, host});
515
20
    }
516
11
  }
517

            
518
  // Compare the old map to the new to find out which endpoints are going to be removed.
519
18
  for (auto& host_pair : hosts_map_) {
520
18
    if (!hosts_map.contains(host_pair.first)) {
521
8
      hosts_removed.push_back(host_pair.second);
522
8
    }
523
18
  }
524

            
525
  // Update the member data structures.
526
10
  hosts_ = std::move(hosts);
527
10
  hosts_map_ = std::move(hosts_map);
528

            
529
  // TODO: add stats reporting for number of endpoints added, removed, and reused.
530
10
  ENVOY_LOG(debug, "Hosts Added: {}, Removed: {}, Reused: {}", hosts_added.size(),
531
10
            hosts_removed.size(), hosts_->size() - hosts_added.size());
532

            
533
  // Update the priority set.
534
10
  hosts_per_locality_ =
535
10
      std::make_shared<Envoy::Upstream::HostsPerLocalityImpl>(std::move(hosts_by_locality), false);
536
10
  priority_set_.updateHosts(0, HostSetImpl::partitionHosts(hosts_, hosts_per_locality_), {},
537
10
                            hosts_added, hosts_removed, absl::nullopt, absl::nullopt);
538
10
}
539

            
540
ClusterSharedPtr HdsCluster::create() { return nullptr; }
541

            
542
76
void HdsCluster::initHealthchecks() {
543
76
  for (auto& health_check : cluster_.health_checks()) {
544
75
    auto health_checker_or_error =
545
75
        Upstream::HealthCheckerFactory::create(health_check, *this, server_context_);
546
75
    THROW_IF_NOT_OK_REF(health_checker_or_error.status());
547

            
548
75
    auto health_checker = health_checker_or_error.value();
549
75
    health_checkers_.push_back(health_checker);
550
75
    health_checkers_map_.insert({health_check, health_checker});
551
75
    health_checker->start();
552
75
  }
553
76
}
554

            
555
76
void HdsCluster::initialize(std::function<absl::Status()> callback) {
556
76
  initialization_complete_callback_ = callback;
557

            
558
  // If this function gets called again we do not want to touch the priority set again with the
559
  // initial hosts, because the hosts may have changed.
560
76
  if (!initialized_) {
561
95
    for (const auto& host : *hosts_) {
562
95
      host->healthFlagSet(Host::HealthFlag::FAILED_ACTIVE_HC);
563
95
    }
564
    // Use the ungrouped and grouped hosts lists to retain locality structure in the priority set.
565
76
    priority_set_.updateHosts(0, HostSetImpl::partitionHosts(hosts_, hosts_per_locality_), {},
566
76
                              *hosts_, {}, absl::nullopt, absl::nullopt);
567

            
568
76
    initialized_ = true;
569
76
  }
570
76
}
571

            
572
void HdsCluster::setOutlierDetector(const Outlier::DetectorSharedPtr&) {}
573

            
574
} // namespace Upstream
575
} // namespace Envoy