Line data Source code
1 : #include "source/common/upstream/cds_api_helper.h" 2 : 3 : #include "envoy/common/exception.h" 4 : #include "envoy/config/cluster/v3/cluster.pb.h" 5 : #include "envoy/config/endpoint/v3/endpoint.pb.h" 6 : #include "envoy/config/grpc_mux.h" 7 : 8 : #include "source/common/common/fmt.h" 9 : #include "source/common/config/resource_name.h" 10 : #include "source/common/runtime/runtime_features.h" 11 : 12 : #include "absl/container/flat_hash_set.h" 13 : 14 : namespace Envoy { 15 : namespace Upstream { 16 : 17 : std::vector<std::string> 18 : CdsApiHelper::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& added_resources, 19 : const Protobuf::RepeatedPtrField<std::string>& removed_resources, 20 28 : const std::string& system_version_info) { 21 28 : Config::ScopedResume maybe_resume_eds_leds_sds; 22 28 : if (cm_.adsMux()) { 23 : // A cluster update pauses sending EDS and LEDS requests. 24 28 : const std::vector<std::string> paused_xds_types{ 25 28 : Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>(), 26 28 : Config::getTypeUrl<envoy::config::endpoint::v3::LbEndpoint>(), 27 28 : Config::getTypeUrl<envoy::extensions::transport_sockets::tls::v3::Secret>()}; 28 28 : maybe_resume_eds_leds_sds = cm_.adsMux()->pause(paused_xds_types); 29 28 : } 30 : 31 28 : ENVOY_LOG(info, "{}: add {} cluster(s), remove {} cluster(s)", name_, added_resources.size(), 32 28 : removed_resources.size()); 33 : 34 28 : std::vector<std::string> exception_msgs; 35 28 : absl::flat_hash_set<std::string> cluster_names(added_resources.size()); 36 28 : bool any_applied = false; 37 28 : uint32_t added_or_updated = 0; 38 28 : uint32_t skipped = 0; 39 28 : for (const auto& resource : added_resources) { 40 28 : envoy::config::cluster::v3::Cluster cluster; 41 28 : TRY_ASSERT_MAIN_THREAD { 42 28 : cluster = dynamic_cast<const envoy::config::cluster::v3::Cluster&>(resource.get().resource()); 43 28 : if (!cluster_names.insert(cluster.name()).second) { 44 : // NOTE: at this point, the first of these duplicates has already been successfully applied. 45 0 : exception_msgs.push_back( 46 0 : fmt::format("{}: duplicate cluster {} found", cluster.name(), cluster.name())); 47 0 : continue; 48 0 : } 49 28 : if (cm_.addOrUpdateCluster(cluster, resource.get().version())) { 50 28 : any_applied = true; 51 28 : ENVOY_LOG(debug, "{}: add/update cluster '{}'", name_, cluster.name()); 52 28 : ++added_or_updated; 53 28 : } else { 54 0 : ENVOY_LOG(debug, "{}: add/update cluster '{}' skipped", name_, cluster.name()); 55 0 : ++skipped; 56 0 : } 57 28 : } 58 28 : END_TRY 59 28 : CATCH(const EnvoyException& e, 60 28 : { exception_msgs.push_back(fmt::format("{}: {}", cluster.name(), e.what())); }); 61 28 : } 62 : 63 40 : for (const auto& resource_name : removed_resources) { 64 40 : if (cm_.removeCluster(resource_name)) { 65 0 : any_applied = true; 66 0 : ENVOY_LOG(debug, "{}: remove cluster '{}'", name_, resource_name); 67 0 : } 68 40 : } 69 : 70 28 : ENVOY_LOG(info, "{}: added/updated {} cluster(s), skipped {} unmodified cluster(s)", name_, 71 28 : added_or_updated, skipped); 72 : 73 28 : if (any_applied) { 74 28 : system_version_info_ = system_version_info; 75 28 : } 76 28 : return exception_msgs; 77 28 : } 78 : 79 : } // namespace Upstream 80 : } // namespace Envoy