1
#include "source/common/upstream/cds_api_helper.h"
2

            
3
#include "envoy/common/exception.h"
4
#include "envoy/config/cluster/v3/cluster.pb.h"
5
#include "envoy/config/endpoint/v3/endpoint.pb.h"
6
#include "envoy/config/grpc_mux.h"
7

            
8
#include "source/common/common/fmt.h"
9
#include "source/common/config/resource_name.h"
10
#include "source/common/runtime/runtime_features.h"
11

            
12
#include "absl/container/flat_hash_set.h"
13

            
14
namespace Envoy {
15
namespace Upstream {
16

            
17
std::pair<uint32_t, std::vector<std::string>>
18
CdsApiHelper::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& added_resources,
19
                             const Protobuf::RepeatedPtrField<std::string>& removed_resources,
20
2126
                             const std::string& system_version_info) {
21
  // A cluster update pauses sending EDS and LEDS requests.
22
2126
  const std::vector<std::string> paused_xds_types{
23
2126
      Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>(),
24
2126
      Config::getTypeUrl<envoy::config::endpoint::v3::LbEndpoint>(),
25
2126
      Config::getTypeUrl<envoy::extensions::transport_sockets::tls::v3::Secret>()};
26
2126
  Config::ScopedResume resume_eds_leds_sds = xds_manager_.pause(paused_xds_types);
27

            
28
2126
  ENVOY_LOG(
29
2126
      info,
30
2126
      "{}: response indicates {} added/updated cluster(s), {} removed cluster(s); applying changes",
31
2126
      name_, added_resources.size(), removed_resources.size());
32

            
33
2126
  std::vector<std::string> exception_msgs;
34
2126
  absl::flat_hash_set<std::string> cluster_names(added_resources.size());
35
2126
  bool any_applied = false;
36
2126
  uint32_t added_or_updated = 0;
37
2126
  uint32_t skipped = 0;
38
2224
  for (const auto& resource : added_resources) {
39
    // Holds a reference to the name of the currently parsed cluster resource.
40
    // This is needed for the CATCH clause below.
41
1662
    absl::string_view cluster_name = EMPTY_STRING;
42
1662
    TRY_ASSERT_MAIN_THREAD {
43
1662
      const envoy::config::cluster::v3::Cluster& cluster =
44
1662
          dynamic_cast<const envoy::config::cluster::v3::Cluster&>(resource.get().resource());
45
1662
      cluster_name = cluster.name();
46
1662
      if (!cluster_names.insert(cluster.name()).second) {
47
        // NOTE: at this point, the first of these duplicates has already been successfully applied.
48
11
        exception_msgs.push_back(
49
11
            fmt::format("{}: duplicate cluster {} found", cluster_name, cluster_name));
50
11
        continue;
51
11
      }
52
1651
      auto update_or_error = cm_.addOrUpdateCluster(cluster, resource.get().version());
53
1651
      if (!update_or_error.status().ok()) {
54
        exception_msgs.push_back(
55
            fmt::format("{}: {}", cluster_name, update_or_error.status().message()));
56
        continue;
57
      }
58
1651
      if (*update_or_error) {
59
1490
        any_applied = true;
60
1490
        ENVOY_LOG(debug, "{}: add/update cluster '{}'", name_, cluster_name);
61
1490
        ++added_or_updated;
62
1501
      } else {
63
161
        ENVOY_LOG(debug, "{}: add/update cluster '{}' skipped", name_, cluster_name);
64
161
        ++skipped;
65
161
      }
66
1651
    }
67
1651
    END_TRY
68
1662
    CATCH(const EnvoyException& e,
69
1662
          { exception_msgs.push_back(fmt::format("{}: {}", cluster_name, e.what())); });
70
1651
  }
71

            
72
2126
  uint32_t removed = 0;
73
2588
  for (const auto& resource_name : removed_resources) {
74
1758
    if (cm_.removeCluster(resource_name)) {
75
581
      any_applied = true;
76
581
      ENVOY_LOG(debug, "{}: remove cluster '{}'", name_, resource_name);
77
581
      ++removed;
78
581
    }
79
1758
  }
80

            
81
2126
  ENVOY_LOG(
82
2126
      info,
83
2126
      "{}: added/updated {} cluster(s) (skipped {} unmodified cluster(s)); removed {} cluster(s)",
84
2126
      name_, added_or_updated, skipped, removed);
85

            
86
2126
  if (any_applied) {
87
1922
    system_version_info_ = system_version_info;
88
1922
  }
89
2126
  return std::pair{added_or_updated, exception_msgs};
90
2126
}
91

            
92
} // namespace Upstream
93
} // namespace Envoy