1
#include "source/extensions/clusters/eds/leds.h"
2

            
3
#include "envoy/common/exception.h"
4
#include "envoy/config/core/v3/config_source.pb.h"
5

            
6
#include "source/common/common/assert.h"
7
#include "source/common/config/decoded_resource_impl.h"
8
#include "source/common/config/xds_resource.h"
9

            
10
namespace Envoy {
11
namespace Upstream {
12

            
13
LedsSubscription::LedsSubscription(
14
    const envoy::config::endpoint::v3::LedsClusterLocalityConfig& leds_config,
15
    const std::string& cluster_name,
16
    Server::Configuration::TransportSocketFactoryContext& factory_context,
17
    Stats::Scope& cluster_stats_scope, const UpdateCb& callback)
18
45
    : Envoy::Config::SubscriptionBase<envoy::config::endpoint::v3::LbEndpoint>(
19
45
          factory_context.messageValidationVisitor(), leds_config.leds_collection_name()),
20
45
      local_info_(factory_context.serverFactoryContext().localInfo()), cluster_name_(cluster_name),
21
45
      stats_scope_(cluster_stats_scope.createScope("leds.")),
22
45
      stats_({ALL_LEDS_STATS(POOL_COUNTER(*stats_scope_))}), callback_(callback) {
23
45
  const xds::core::v3::ResourceLocator leds_resource_locator = THROW_OR_RETURN_VALUE(
24
45
      Config::XdsResourceIdentifier::decodeUrl(leds_config.leds_collection_name()),
25
45
      xds::core::v3::ResourceLocator);
26
45
  const auto resource_name = getResourceName();
27
45
  subscription_ = THROW_OR_RETURN_VALUE(
28
45
      factory_context.serverFactoryContext()
29
45
          .clusterManager()
30
45
          .subscriptionFactory()
31
45
          .collectionSubscriptionFromUrl(leds_resource_locator, leds_config.leds_config(),
32
45
                                         resource_name, *stats_scope_, *this, resource_decoder_),
33
45
      Config::SubscriptionPtr);
34
45
  subscription_->start({});
35
45
}
36

            
37
absl::Status
38
LedsSubscription::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& added_resources,
39
                                 const Protobuf::RepeatedPtrField<std::string>& removed_resources,
40
70
                                 const std::string&) {
41
  // At least one resource must be added or removed.
42
70
  if (added_resources.empty() && removed_resources.empty()) {
43
2
    ENVOY_LOG(debug, "No added or removed LbEndpoint entries for cluster {} in onConfigUpdate()",
44
2
              cluster_name_);
45
2
    stats_.update_empty_.inc();
46
    // If it's the first update, and it has no resources, set the locality as active,
47
    // and update whoever is waiting for it, to allow the system to initialize.
48
2
    if (!initial_update_attempt_complete_) {
49
1
      initial_update_attempt_complete_ = true;
50
1
      callback_();
51
1
    }
52
2
    return absl::OkStatus();
53
2
  }
54

            
55
68
  ENVOY_LOG(info, "{}: add {} endpoint(s), remove {} endpoints(s)", cluster_name_,
56
68
            added_resources.size(), removed_resources.size());
57

            
58
  // Update the internal host list with the removed hosts.
59
68
  for (const auto& removed_resource_name : removed_resources) {
60
    // Remove the entry from the endpoints list.
61
13
    ENVOY_LOG(debug, "Removing endpoint {} using LEDS update.", removed_resource_name);
62
13
    endpoints_map_.erase(removed_resource_name);
63
13
  }
64

            
65
  // Update the internal host list with the added hosts.
66
83
  for (const auto& added_resource : added_resources) {
67
83
    const auto& added_resource_name = added_resource.get().name();
68
83
    ENVOY_LOG(trace, "Adding/Updating endpoint {} using LEDS update.", added_resource_name);
69
83
    envoy::config::endpoint::v3::LbEndpoint lb_endpoint =
70
83
        static_cast<const envoy::config::endpoint::v3::LbEndpoint&>(
71
83
            added_resource.get().resource());
72
83
    endpoints_map_[added_resource_name] = std::move(lb_endpoint);
73
83
  }
74

            
75
  // Notify the callbacks that the host list has been modified.
76
68
  initial_update_attempt_complete_ = true;
77
68
  callback_();
78
68
  return absl::OkStatus();
79
70
}
80

            
81
void LedsSubscription::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
82
5
                                            const EnvoyException*) {
83
5
  ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
84
5
  ENVOY_LOG(debug, "LEDS update failed");
85

            
86
  // Similar to EDS, we need to let the system initialize. Set the locality as
87
  // active, and update whoever is waiting for it.
88
5
  initial_update_attempt_complete_ = true;
89
5
  callback_();
90
5
}
91

            
92
} // namespace Upstream
93
} // namespace Envoy