1
#include "source/common/upstream/cds_api_impl.h"
2

            
3
#include "source/common/common/assert.h"
4
#include "source/common/grpc/common.h"
5

            
6
#include "absl/strings/str_join.h"
7

            
8
namespace Envoy {
9
namespace Upstream {
10

            
11
absl::StatusOr<CdsApiPtr>
12
CdsApiImpl::create(const envoy::config::core::v3::ConfigSource& cds_config,
13
                   const xds::core::v3::ResourceLocator* cds_resources_locator, ClusterManager& cm,
14
                   Stats::Scope& scope, ProtobufMessage::ValidationVisitor& validation_visitor,
15
                   Server::Configuration::ServerFactoryContext& factory_context,
16
730
                   bool support_multi_ads_sources) {
17
730
  absl::Status creation_status = absl::OkStatus();
18
730
  auto ret =
19
730
      CdsApiPtr{new CdsApiImpl(cds_config, cds_resources_locator, cm, scope, validation_visitor,
20
730
                               factory_context, support_multi_ads_sources, creation_status)};
21
730
  RETURN_IF_NOT_OK(creation_status);
22
730
  return ret;
23
730
}
24

            
25
CdsApiImpl::CdsApiImpl(const envoy::config::core::v3::ConfigSource& cds_config,
26
                       const xds::core::v3::ResourceLocator* cds_resources_locator,
27
                       ClusterManager& cm, Stats::Scope& scope,
28
                       ProtobufMessage::ValidationVisitor& validation_visitor,
29
                       Server::Configuration::ServerFactoryContext& factory_context,
30
                       bool support_multi_ads_sources, absl::Status& creation_status)
31
730
    : Envoy::Config::SubscriptionBase<envoy::config::cluster::v3::Cluster>(validation_visitor,
32
730
                                                                           "name"),
33
730
      helper_(cm, factory_context.xdsManager(), "cds"), cm_(cm),
34
730
      scope_(scope.createScope("cluster_manager.cds.")), factory_context_(factory_context),
35
730
      stats_({ALL_CDS_STATS(POOL_COUNTER(*scope_), POOL_GAUGE(*scope_))}),
36
730
      support_multi_ads_sources_(support_multi_ads_sources) {
37
730
  const auto resource_name = getResourceName();
38
730
  absl::StatusOr<Config::SubscriptionPtr> subscription_or_error;
39
730
  if (cds_resources_locator == nullptr) {
40
716
    subscription_or_error = cm_.subscriptionFactory().subscriptionFromConfigSource(
41
716
        cds_config, Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_, {});
42
716
  } else {
43
14
    subscription_or_error = cm.subscriptionFactory().collectionSubscriptionFromUrl(
44
14
        *cds_resources_locator, cds_config, resource_name, *scope_, *this, resource_decoder_);
45
14
  }
46
730
  SET_AND_RETURN_IF_NOT_OK(subscription_or_error.status(), creation_status);
47
730
  subscription_ = std::move(*subscription_or_error);
48
730
}
49

            
50
absl::Status CdsApiImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
51
1036
                                        const std::string& version_info) {
52
  // If another source may be adding clusters to the cluster-manager, Envoy needs to
53
  // track which clusters are received via the SotW CDS configuration, so only
54
  // clusters that were added through SotW CDS and are not updated will be removed.
55
1036
  if (support_multi_ads_sources_) {
56
    // The input resources will be the next sotw_resource_names_.
57
20
    absl::flat_hash_set<std::string> next_sotw_resource_names;
58
20
    next_sotw_resource_names.reserve(resources.size());
59
20
    std::transform(resources.cbegin(), resources.cend(),
60
20
                   std::inserter(next_sotw_resource_names, next_sotw_resource_names.begin()),
61
20
                   [](const Config::DecodedResourceRef resource) { return resource.get().name(); });
62
    // Find all the clusters that are currently used, but no longer appear in
63
    // the next step.
64
20
    Protobuf::RepeatedPtrField<std::string> to_remove;
65
20
    for (const std::string& cluster_name : sotw_resource_names_) {
66
8
      if (!next_sotw_resource_names.contains(cluster_name)) {
67
1
        to_remove.Add(std::string(cluster_name));
68
1
      }
69
8
    }
70
20
    absl::Status status = onConfigUpdate(resources, to_remove, version_info);
71
    // Even if the onConfigUpdate() above returns an error, some of the clusters
72
    // may have been updated. Either way, we use the new update to override the
73
    // contents.
74
    // TODO(adisuissa): This will not be needed once the xDS-Cache layer is
75
    // introduced, as it will keep track of only the valid resources.
76
20
    sotw_resource_names_ = std::move(next_sotw_resource_names);
77
20
    return status;
78
20
  }
79

            
80
1016
  auto all_existing_clusters = cm_.clusters();
81
  // Exclude the clusters which CDS wants to add.
82
1077
  for (const auto& resource : resources) {
83
839
    all_existing_clusters.active_clusters_.erase(resource.get().name());
84
839
    all_existing_clusters.warming_clusters_.erase(resource.get().name());
85
839
  }
86
1016
  Protobuf::RepeatedPtrField<std::string> to_remove_repeated;
87
1624
  for (const auto& [cluster_name, _] : all_existing_clusters.active_clusters_) {
88
1459
    UNREFERENCED_PARAMETER(_);
89
1459
    *to_remove_repeated.Add() = cluster_name;
90
1459
  }
91
1016
  for (const auto& [cluster_name, _] : all_existing_clusters.warming_clusters_) {
92
2
    UNREFERENCED_PARAMETER(_);
93
    // Do not add the cluster twice when the cluster is both active and warming.
94
2
    if (!all_existing_clusters.active_clusters_.contains(cluster_name)) {
95
2
      *to_remove_repeated.Add() = cluster_name;
96
2
    }
97
2
  }
98
1016
  return onConfigUpdate(resources, to_remove_repeated, version_info);
99
1036
}
100

            
101
absl::Status
102
CdsApiImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& added_resources,
103
                           const Protobuf::RepeatedPtrField<std::string>& removed_resources,
104
1942
                           const std::string& system_version_info) {
105
1942
  auto [added_or_updated, exception_msgs] =
106
1942
      helper_.onConfigUpdate(added_resources, removed_resources, system_version_info);
107
1942
  runInitializeCallbackIfAny();
108
1942
  if (!exception_msgs.empty()) {
109
17
    return absl::InvalidArgumentError(
110
17
        fmt::format("Error adding/updating cluster(s) {}", absl::StrJoin(exception_msgs, ", ")));
111
17
  }
112
1925
  if (added_or_updated > 0) {
113
1312
    stats_.config_reload_.inc();
114
1312
    stats_.config_reload_time_ms_.set(DateUtil::nowToMilliseconds(factory_context_.timeSource()));
115
1312
  }
116
1925
  return absl::OkStatus();
117
1942
}
118

            
119
void CdsApiImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
120
39
                                      const EnvoyException*) {
121
39
  ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
122
  // We need to allow server startup to continue, even if we have a bad
123
  // config.
124
39
  runInitializeCallbackIfAny();
125
39
}
126

            
127
1981
void CdsApiImpl::runInitializeCallbackIfAny() {
128
1981
  if (initialize_callback_) {
129
703
    initialize_callback_();
130
703
    initialize_callback_ = nullptr;
131
703
  }
132
1981
}
133

            
134
} // namespace Upstream
135
} // namespace Envoy