Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/extensions/clusters/eds/eds.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/extensions/clusters/eds/eds.h"
2
3
#include "envoy/common/exception.h"
4
#include "envoy/config/cluster/v3/cluster.pb.h"
5
#include "envoy/config/core/v3/config_source.pb.h"
6
#include "envoy/service/discovery/v3/discovery.pb.h"
7
8
#include "source/common/common/assert.h"
9
#include "source/common/common/utility.h"
10
#include "source/common/config/api_version.h"
11
#include "source/common/config/decoded_resource_impl.h"
12
#include "source/common/grpc/common.h"
13
14
namespace Envoy {
15
namespace Upstream {
16
17
absl::StatusOr<std::unique_ptr<EdsClusterImpl>>
18
EdsClusterImpl::create(const envoy::config::cluster::v3::Cluster& cluster,
19
14
                       ClusterFactoryContext& cluster_context) {
20
14
  absl::Status creation_status = absl::OkStatus();
21
14
  std::unique_ptr<EdsClusterImpl> ret =
22
14
      absl::WrapUnique(new EdsClusterImpl(cluster, cluster_context, creation_status));
23
14
  RETURN_IF_NOT_OK(creation_status);
24
14
  return ret;
25
14
}
26
27
EdsClusterImpl::EdsClusterImpl(const envoy::config::cluster::v3::Cluster& cluster,
28
                               ClusterFactoryContext& cluster_context,
29
                               absl::Status& creation_status)
30
    : BaseDynamicClusterImpl(cluster, cluster_context, creation_status),
31
      Envoy::Config::SubscriptionBase<envoy::config::endpoint::v3::ClusterLoadAssignment>(
32
          cluster_context.messageValidationVisitor(), "cluster_name"),
33
      local_info_(cluster_context.serverFactoryContext().localInfo()),
34
      eds_resources_cache_(
35
          Runtime::runtimeFeatureEnabled("envoy.restart_features.use_eds_cache_for_ads")
36
              ? cluster_context.clusterManager().edsResourcesCache()
37
14
              : absl::nullopt) {
38
14
  Event::Dispatcher& dispatcher = cluster_context.serverFactoryContext().mainThreadDispatcher();
39
14
  assignment_timeout_ = dispatcher.createTimer([this]() -> void { onAssignmentTimeout(); });
40
14
  const auto& eds_config = cluster.eds_cluster_config().eds_config();
41
14
  if (Config::SubscriptionFactory::isPathBasedConfigSource(
42
14
          eds_config.config_source_specifier_case())) {
43
0
    initialize_phase_ = InitializePhase::Primary;
44
14
  } else {
45
14
    initialize_phase_ = InitializePhase::Secondary;
46
14
  }
47
14
  const auto resource_name = getResourceName();
48
14
  subscription_ = THROW_OR_RETURN_VALUE(
49
14
      cluster_context.clusterManager().subscriptionFactory().subscriptionFromConfigSource(
50
14
          eds_config, Grpc::Common::typeUrl(resource_name), info_->statsScope(), *this,
51
14
          resource_decoder_, {}),
52
14
      Config::SubscriptionPtr);
53
14
}
54
55
14
EdsClusterImpl::~EdsClusterImpl() {
56
14
  if (using_cached_resource_) {
57
    // Clear the callback as the subscription is no longer valid.
58
0
    eds_resources_cache_->removeCallback(edsServiceName(), this);
59
0
  }
60
14
}
61
62
14
void EdsClusterImpl::startPreInit() { subscription_->start({edsServiceName()}); }
63
64
14
void EdsClusterImpl::BatchUpdateHelper::batchUpdate(PrioritySet::HostUpdateCb& host_update_cb) {
65
14
  absl::flat_hash_set<std::string> all_new_hosts;
66
14
  PriorityStateManager priority_state_manager(parent_, parent_.local_info_, &host_update_cb,
67
14
                                              parent_.random_);
68
14
  for (const auto& locality_lb_endpoint : cluster_load_assignment_.endpoints()) {
69
14
    THROW_IF_NOT_OK(parent_.validateEndpointsForZoneAwareRouting(locality_lb_endpoint));
70
71
14
    priority_state_manager.initializePriorityFor(locality_lb_endpoint);
72
73
14
    if (locality_lb_endpoint.has_leds_cluster_locality_config()) {
74
      // The locality uses LEDS, fetch its dynamic data, which must be ready, or otherwise
75
      // the batchUpdate method should not have been called.
76
0
      const auto& leds_config = locality_lb_endpoint.leds_cluster_locality_config();
77
78
      // The batchUpdate call must be performed after all the endpoints of all localities
79
      // were received.
80
0
      ASSERT(parent_.leds_localities_.find(leds_config) != parent_.leds_localities_.end() &&
81
0
             parent_.leds_localities_[leds_config]->isUpdated());
82
0
      for (const auto& [_, lb_endpoint] :
83
0
           parent_.leds_localities_[leds_config]->getEndpointsMap()) {
84
0
        updateLocalityEndpoints(lb_endpoint, locality_lb_endpoint, priority_state_manager,
85
0
                                all_new_hosts);
86
0
      }
87
14
    } else {
88
14
      for (const auto& lb_endpoint : locality_lb_endpoint.lb_endpoints()) {
89
14
        updateLocalityEndpoints(lb_endpoint, locality_lb_endpoint, priority_state_manager,
90
14
                                all_new_hosts);
91
14
      }
92
14
    }
93
14
  }
94
95
  // Track whether we rebuilt any LB structures.
96
14
  bool cluster_rebuilt = false;
97
98
  // Get the map of all the latest existing hosts, which is used to filter out the existing
99
  // hosts in the process of updating cluster memberships.
100
14
  HostMapConstSharedPtr all_hosts = parent_.prioritySet().crossPriorityHostMap();
101
14
  ASSERT(all_hosts != nullptr);
102
103
14
  const uint32_t overprovisioning_factor = PROTOBUF_GET_WRAPPED_OR_DEFAULT(
104
14
      cluster_load_assignment_.policy(), overprovisioning_factor, kDefaultOverProvisioningFactor);
105
14
  const bool weighted_priority_health =
106
14
      cluster_load_assignment_.policy().weighted_priority_health();
107
108
14
  LocalityWeightsMap empty_locality_map;
109
110
  // Loop over all priorities that exist in the new configuration.
111
14
  auto& priority_state = priority_state_manager.priorityState();
112
28
  for (size_t i = 0; i < priority_state.size(); ++i) {
113
14
    if (parent_.locality_weights_map_.size() <= i) {
114
14
      parent_.locality_weights_map_.resize(i + 1);
115
14
    }
116
14
    if (priority_state[i].first != nullptr) {
117
14
      cluster_rebuilt |= parent_.updateHostsPerLocality(
118
14
          i, weighted_priority_health, overprovisioning_factor, *priority_state[i].first,
119
14
          parent_.locality_weights_map_[i], priority_state[i].second, priority_state_manager,
120
14
          *all_hosts, all_new_hosts);
121
14
    } else {
122
      // If the new update contains a priority with no hosts, call the update function with an empty
123
      // set of hosts.
124
0
      cluster_rebuilt |=
125
0
          parent_.updateHostsPerLocality(i, weighted_priority_health, overprovisioning_factor, {},
126
0
                                         parent_.locality_weights_map_[i], empty_locality_map,
127
0
                                         priority_state_manager, *all_hosts, all_new_hosts);
128
0
    }
129
14
  }
130
131
  // Loop over all priorities not present in the config that already exists. This will
132
  // empty out any remaining priority that the config update did not refer to.
133
14
  for (size_t i = priority_state.size(); i < parent_.priority_set_.hostSetsPerPriority().size();
134
14
       ++i) {
135
0
    if (parent_.locality_weights_map_.size() <= i) {
136
0
      parent_.locality_weights_map_.resize(i + 1);
137
0
    }
138
0
    cluster_rebuilt |= parent_.updateHostsPerLocality(
139
0
        i, weighted_priority_health, overprovisioning_factor, {}, parent_.locality_weights_map_[i],
140
0
        empty_locality_map, priority_state_manager, *all_hosts, all_new_hosts);
141
0
  }
142
143
14
  if (!cluster_rebuilt) {
144
0
    parent_.info_->configUpdateStats().update_no_rebuild_.inc();
145
0
  }
146
147
  // If we didn't setup to initialize when our first round of health checking is complete, just
148
  // do it now.
149
14
  parent_.onPreInitComplete();
150
14
}
151
152
void EdsClusterImpl::BatchUpdateHelper::updateLocalityEndpoints(
153
    const envoy::config::endpoint::v3::LbEndpoint& lb_endpoint,
154
    const envoy::config::endpoint::v3::LocalityLbEndpoints& locality_lb_endpoint,
155
14
    PriorityStateManager& priority_state_manager, absl::flat_hash_set<std::string>& all_new_hosts) {
156
14
  const auto address =
157
14
      THROW_OR_RETURN_VALUE(parent_.resolveProtoAddress(lb_endpoint.endpoint().address()),
158
14
                            const Network::Address::InstanceConstSharedPtr);
159
14
  std::vector<Network::Address::InstanceConstSharedPtr> address_list;
160
14
  if (!lb_endpoint.endpoint().additional_addresses().empty()) {
161
0
    address_list.push_back(address);
162
0
    for (const auto& additional_address : lb_endpoint.endpoint().additional_addresses()) {
163
0
      address_list.emplace_back(
164
0
          THROW_OR_RETURN_VALUE(parent_.resolveProtoAddress(additional_address.address()),
165
0
                                const Network::Address::InstanceConstSharedPtr));
166
0
    }
167
0
  }
168
169
  // When the configuration contains duplicate hosts, only the first one will be retained.
170
14
  const auto address_as_string = address->asString();
171
14
  if (all_new_hosts.contains(address_as_string)) {
172
0
    return;
173
0
  }
174
175
14
  priority_state_manager.registerHostForPriority(lb_endpoint.endpoint().hostname(), address,
176
14
                                                 address_list, locality_lb_endpoint, lb_endpoint,
177
14
                                                 parent_.time_source_);
178
14
  all_new_hosts.emplace(address_as_string);
179
14
}
180
181
absl::Status
182
EdsClusterImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
183
14
                               const std::string&) {
184
14
  if (resources.empty()) {
185
0
    ENVOY_LOG(debug, "Missing ClusterLoadAssignment for {} in onConfigUpdate()", edsServiceName());
186
0
    info_->configUpdateStats().update_empty_.inc();
187
0
    onPreInitComplete();
188
0
    return absl::OkStatus();
189
0
  }
190
14
  if (resources.size() != 1) {
191
0
    return absl::InvalidArgumentError(
192
0
        fmt::format("Unexpected EDS resource length: {}", resources.size()));
193
0
  }
194
195
14
  envoy::config::endpoint::v3::ClusterLoadAssignment cluster_load_assignment =
196
14
      dynamic_cast<const envoy::config::endpoint::v3::ClusterLoadAssignment&>(
197
14
          resources[0].get().resource());
198
14
  if (cluster_load_assignment.cluster_name() != edsServiceName()) {
199
0
    return absl::InvalidArgumentError(fmt::format("Unexpected EDS cluster (expecting {}): {}",
200
0
                                                  edsServiceName(),
201
0
                                                  cluster_load_assignment.cluster_name()));
202
0
  }
203
  // Validate that each locality doesn't have both LEDS and endpoints defined.
204
  // TODO(adisuissa): This is only needed for the API v3 support. In future major versions
205
  // the oneof definition will take care of it.
206
14
  for (const auto& locality : cluster_load_assignment.endpoints()) {
207
14
    if (locality.has_leds_cluster_locality_config() && locality.lb_endpoints_size() > 0) {
208
0
      return absl::InvalidArgumentError(fmt::format(
209
0
          "A ClusterLoadAssignment for cluster {} cannot include both LEDS (resource: {}) and a "
210
0
          "list of endpoints.",
211
0
          edsServiceName(), locality.leds_cluster_locality_config().leds_collection_name()));
212
0
    }
213
14
  }
214
215
  // Disable timer (if enabled) as we have received new assignment.
216
14
  if (assignment_timeout_->enabled()) {
217
0
    assignment_timeout_->disableTimer();
218
0
    if (eds_resources_cache_.has_value()) {
219
0
      eds_resources_cache_->disableExpiryTimer(edsServiceName());
220
0
    }
221
0
  }
222
  // Check if endpoint_stale_after is set.
223
14
  const uint64_t stale_after_ms =
224
14
      PROTOBUF_GET_MS_OR_DEFAULT(cluster_load_assignment.policy(), endpoint_stale_after, 0);
225
14
  if (stale_after_ms > 0) {
226
    // Stat to track how often we receive valid assignment_timeout in response.
227
0
    info_->configUpdateStats().assignment_timeout_received_.inc();
228
0
    assignment_timeout_->enableTimer(std::chrono::milliseconds(stale_after_ms));
229
0
    if (eds_resources_cache_.has_value()) {
230
0
      eds_resources_cache_->setExpiryTimer(edsServiceName(),
231
0
                                           std::chrono::milliseconds(stale_after_ms));
232
0
    }
233
0
  }
234
235
  // Drop overload configuration parsing.
236
14
  absl::Status status = parseDropOverloadConfig(cluster_load_assignment);
237
14
  if (!status.ok()) {
238
0
    return status;
239
0
  }
240
241
  // Pause LEDS messages until the EDS config is finished processing.
242
14
  Config::ScopedResume maybe_resume_leds;
243
14
  if (transport_factory_context_->clusterManager().adsMux()) {
244
14
    const auto type_url = Config::getTypeUrl<envoy::config::endpoint::v3::LbEndpoint>();
245
14
    maybe_resume_leds = transport_factory_context_->clusterManager().adsMux()->pause(type_url);
246
14
  }
247
248
14
  update(cluster_load_assignment);
249
  // If previously used a cached version, remove the subscription from the cache's
250
  // callbacks.
251
14
  if (using_cached_resource_) {
252
0
    eds_resources_cache_->removeCallback(edsServiceName(), this);
253
0
    using_cached_resource_ = false;
254
0
  }
255
14
  return absl::OkStatus();
256
14
}
257
258
void EdsClusterImpl::update(
259
14
    const envoy::config::endpoint::v3::ClusterLoadAssignment& cluster_load_assignment) {
260
  // Compare the current set of LEDS localities (localities using LEDS) to the one received in the
261
  // update. A LEDS locality can either be added, removed, or kept. If it is added we add a
262
  // subscription to it, and if it is removed we delete the subscription.
263
14
  LedsConfigSet cla_leds_configs;
264
265
14
  for (const auto& locality : cluster_load_assignment.endpoints()) {
266
14
    if (locality.has_leds_cluster_locality_config()) {
267
0
      cla_leds_configs.emplace(locality.leds_cluster_locality_config());
268
0
    }
269
14
  }
270
271
  // Remove the LEDS localities that are not needed anymore.
272
14
  absl::erase_if(leds_localities_, [&cla_leds_configs](const auto& item) {
273
0
    auto const& [leds_config, _] = item;
274
    // Returns true if the leds_config isn't in the cla_leds_configs
275
0
    return cla_leds_configs.find(leds_config) == cla_leds_configs.end();
276
0
  });
277
278
  // In case LEDS is used, store the cluster load assignment as a field
279
  // (optimize for no-copy).
280
14
  const envoy::config::endpoint::v3::ClusterLoadAssignment* used_load_assignment;
281
14
  if (!cla_leds_configs.empty() || eds_resources_cache_.has_value()) {
282
14
    cluster_load_assignment_ = std::make_unique<envoy::config::endpoint::v3::ClusterLoadAssignment>(
283
14
        std::move(cluster_load_assignment));
284
14
    used_load_assignment = cluster_load_assignment_.get();
285
14
  } else {
286
0
    cluster_load_assignment_ = nullptr;
287
0
    used_load_assignment = &cluster_load_assignment;
288
0
  }
289
290
  // Add all the LEDS localities that are new.
291
14
  for (const auto& leds_config : cla_leds_configs) {
292
0
    if (leds_localities_.find(leds_config) == leds_localities_.end()) {
293
0
      ENVOY_LOG(trace, "Found new LEDS config in EDS onConfigUpdate() for cluster {}: {}",
294
0
                edsServiceName(), leds_config.DebugString());
295
296
      // Create a new LEDS subscription and add it to the subscriptions map.
297
0
      LedsSubscriptionPtr leds_locality_subscription = std::make_unique<LedsSubscription>(
298
0
          leds_config, edsServiceName(), *transport_factory_context_, info_->statsScope(),
299
0
          [&, used_load_assignment]() {
300
            // Called upon an update to the locality.
301
0
            if (validateAllLedsUpdated()) {
302
0
              BatchUpdateHelper helper(*this, *used_load_assignment);
303
0
              priority_set_.batchHostUpdate(helper);
304
0
            }
305
0
          });
306
0
      leds_localities_.emplace(leds_config, std::move(leds_locality_subscription));
307
0
    }
308
0
  }
309
310
  // If all the LEDS localities are updated, the EDS update can occur. If not, then when the last
311
  // LEDS locality will be updated, it will trigger the EDS update helper.
312
14
  if (!validateAllLedsUpdated()) {
313
0
    return;
314
0
  }
315
316
14
  BatchUpdateHelper helper(*this, *used_load_assignment);
317
14
  priority_set_.batchHostUpdate(helper);
318
14
}
319
320
absl::Status
321
EdsClusterImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& added_resources,
322
4
                               const Protobuf::RepeatedPtrField<std::string>&, const std::string&) {
323
4
  return onConfigUpdate(added_resources, "");
324
4
}
325
326
0
void EdsClusterImpl::onAssignmentTimeout() {
327
  // We can no longer use the assignments, remove them.
328
  // TODO(vishalpowar) This is not going to work for incremental updates, and we
329
  // need to instead change the health status to indicate the assignments are
330
  // stale.
331
  // TODO(snowp): This should probably just use xDS TTLs?
332
0
  envoy::config::endpoint::v3::ClusterLoadAssignment resource;
333
0
  resource.set_cluster_name(edsServiceName());
334
0
  update(resource);
335
336
0
  if (eds_resources_cache_.has_value()) {
337
    // Clear the resource so it won't be used, and its watchers will be notified.
338
0
    eds_resources_cache_->removeResource(edsServiceName());
339
0
  }
340
  // Stat to track how often we end up with stale assignments.
341
0
  info_->configUpdateStats().assignment_stale_.inc();
342
0
}
343
344
0
void EdsClusterImpl::onCachedResourceRemoved(absl::string_view resource_name) {
345
0
  ASSERT(resource_name == edsServiceName());
346
  // Disable the timer if previously started.
347
0
  if (assignment_timeout_->enabled()) {
348
0
    assignment_timeout_->disableTimer();
349
0
    eds_resources_cache_->disableExpiryTimer(edsServiceName());
350
0
  }
351
0
  envoy::config::endpoint::v3::ClusterLoadAssignment resource;
352
0
  resource.set_cluster_name(edsServiceName());
353
0
  update(resource);
354
0
}
355
356
0
void EdsClusterImpl::reloadHealthyHostsHelper(const HostSharedPtr& host) {
357
  // Here we will see if we have a host that has been marked for deletion by service discovery
358
  // but has been stabilized due to passing active health checking. If such a host is now
359
  // failing active health checking we can remove it during this health check update.
360
0
  HostSharedPtr host_to_exclude = host;
361
0
  if (host_to_exclude != nullptr &&
362
0
      host_to_exclude->healthFlagGet(Host::HealthFlag::FAILED_ACTIVE_HC) &&
363
0
      host_to_exclude->healthFlagGet(Host::HealthFlag::PENDING_DYNAMIC_REMOVAL)) {
364
    // Empty for clarity.
365
0
  } else {
366
    // Do not exclude and remove the host during the update.
367
0
    host_to_exclude = nullptr;
368
0
  }
369
370
0
  const auto& host_sets = prioritySet().hostSetsPerPriority();
371
0
  for (size_t priority = 0; priority < host_sets.size(); ++priority) {
372
0
    const auto& host_set = host_sets[priority];
373
374
    // Filter current hosts in case we need to exclude a host.
375
0
    HostVectorSharedPtr hosts_copy(new HostVector());
376
0
    std::copy_if(host_set->hosts().begin(), host_set->hosts().end(),
377
0
                 std::back_inserter(*hosts_copy),
378
0
                 [&host_to_exclude](const HostSharedPtr& host) { return host_to_exclude != host; });
379
380
    // Setup a hosts to remove vector in case we need to exclude a host.
381
0
    HostVector hosts_to_remove;
382
0
    if (hosts_copy->size() != host_set->hosts().size()) {
383
0
      ASSERT(hosts_copy->size() == host_set->hosts().size() - 1);
384
0
      hosts_to_remove.emplace_back(host_to_exclude);
385
0
    }
386
387
    // Filter hosts per locality in case we need to exclude a host.
388
0
    HostsPerLocalityConstSharedPtr hosts_per_locality_copy = host_set->hostsPerLocality().filter(
389
0
        {[&host_to_exclude](const Host& host) { return &host != host_to_exclude.get(); }})[0];
390
391
0
    prioritySet().updateHosts(priority,
392
0
                              HostSetImpl::partitionHosts(hosts_copy, hosts_per_locality_copy),
393
0
                              host_set->localityWeights(), {}, hosts_to_remove, random_.random(),
394
0
                              absl::nullopt, absl::nullopt);
395
0
  }
396
0
}
397
398
bool EdsClusterImpl::updateHostsPerLocality(
399
    const uint32_t priority, bool weighted_priority_health, const uint32_t overprovisioning_factor,
400
    const HostVector& new_hosts, LocalityWeightsMap& locality_weights_map,
401
    LocalityWeightsMap& new_locality_weights_map, PriorityStateManager& priority_state_manager,
402
14
    const HostMap& all_hosts, const absl::flat_hash_set<std::string>& all_new_hosts) {
403
14
  const auto& host_set = priority_set_.getOrCreateHostSet(priority, overprovisioning_factor);
404
14
  HostVectorSharedPtr current_hosts_copy(new HostVector(host_set.hosts()));
405
406
14
  HostVector hosts_added;
407
14
  HostVector hosts_removed;
408
  // We need to trigger updateHosts with the new host vectors if they have changed. We also do this
409
  // when the locality weight map or the overprovisioning factor. Note calling updateDynamicHostList
410
  // is responsible for both determining whether there was a change and to perform the actual update
411
  // to current_hosts_copy, so it must be called even if we know that we need to update (e.g. if the
412
  // overprovisioning factor changes).
413
  //
414
  // TODO(htuch): We eagerly update all the host sets here on weight changes, which may have
415
  // performance implications, since this has the knock on effect that we rebuild the load balancers
416
  // and locality scheduler. See the comment in BaseDynamicClusterImpl::updateDynamicHostList
417
  // about this. In the future we may need to do better here.
418
14
  const bool hosts_updated = updateDynamicHostList(new_hosts, *current_hosts_copy, hosts_added,
419
14
                                                   hosts_removed, all_hosts, all_new_hosts);
420
14
  if (hosts_updated || host_set.weightedPriorityHealth() != weighted_priority_health ||
421
14
      host_set.overprovisioningFactor() != overprovisioning_factor ||
422
14
      locality_weights_map != new_locality_weights_map) {
423
14
    ASSERT(std::all_of(current_hosts_copy->begin(), current_hosts_copy->end(),
424
14
                       [&](const auto& host) { return host->priority() == priority; }));
425
14
    locality_weights_map = new_locality_weights_map;
426
14
    ENVOY_LOG(debug,
427
14
              "EDS hosts or locality weights changed for cluster: {} current hosts {} priority {}",
428
14
              info_->name(), host_set.hosts().size(), host_set.priority());
429
430
14
    priority_state_manager.updateClusterPrioritySet(
431
14
        priority, std::move(current_hosts_copy), hosts_added, hosts_removed, absl::nullopt,
432
14
        weighted_priority_health, overprovisioning_factor);
433
14
    return true;
434
14
  }
435
0
  return false;
436
14
}
437
438
void EdsClusterImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
439
0
                                          const EnvoyException*) {
440
0
  ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
441
  // Config failure may happen if Envoy times out waiting for the EDS resource.
442
  // If it is a timeout, the eds resources cache is enabled,
443
  // and there is a cached ClusterLoadAssignment, then the cached assignment should be used.
444
0
  if (reason == Envoy::Config::ConfigUpdateFailureReason::FetchTimedout &&
445
0
      eds_resources_cache_.has_value()) {
446
0
    ENVOY_LOG(trace, "onConfigUpdateFailed due to timeout for {}, looking for cached resources",
447
0
              edsServiceName());
448
0
    auto cached_resource = eds_resources_cache_->getResource(edsServiceName(), this);
449
0
    if (cached_resource.has_value()) {
450
0
      ENVOY_LOG(
451
0
          debug,
452
0
          "Did not receive EDS response on time, using cached ClusterLoadAssignment for cluster {}",
453
0
          edsServiceName());
454
0
      envoy::config::endpoint::v3::ClusterLoadAssignment cached_load_assignment =
455
0
          dynamic_cast<const envoy::config::endpoint::v3::ClusterLoadAssignment&>(*cached_resource);
456
0
      info_->configUpdateStats().assignment_use_cached_.inc();
457
0
      using_cached_resource_ = true;
458
0
      update(cached_load_assignment);
459
0
      return;
460
0
    }
461
0
  }
462
  // We need to allow server startup to continue, even if we have a bad config.
463
0
  onPreInitComplete();
464
0
}
465
466
absl::StatusOr<std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>>
467
EdsClusterFactory::createClusterImpl(const envoy::config::cluster::v3::Cluster& cluster,
468
14
                                     ClusterFactoryContext& context) {
469
  // TODO(kbaichoo): EDS cluster should be able to support loading it's
470
  // configuration from the CustomClusterType protobuf. Currently it does not.
471
  // See: https://github.com/envoyproxy/envoy/issues/28752
472
14
  if (!cluster.has_eds_cluster_config()) {
473
0
    return absl::InvalidArgumentError("cannot create an EDS cluster without an EDS config");
474
0
  }
475
476
14
  absl::StatusOr<std::unique_ptr<EdsClusterImpl>> cluster_or_error =
477
14
      EdsClusterImpl::create(cluster, context);
478
14
  RETURN_IF_NOT_OK(cluster_or_error.status());
479
14
  return std::make_pair(std::move(*cluster_or_error), nullptr);
480
14
}
481
482
14
bool EdsClusterImpl::validateAllLedsUpdated() const {
483
  // Iterate through all LEDS based localities, and if they are all updated return true.
484
14
  for (const auto& [_, leds_subscription] : leds_localities_) {
485
0
    if (!leds_subscription->isUpdated()) {
486
0
      return false;
487
0
    }
488
0
  }
489
14
  return true;
490
14
}
491
492
/**
493
 * Static registration for the Eds cluster factory. @see RegisterFactory.
494
 */
495
REGISTER_FACTORY(EdsClusterFactory, ClusterFactory);
496
497
} // namespace Upstream
498
} // namespace Envoy