Line data Source code
1 : #include "source/common/upstream/cds_api_impl.h" 2 : 3 : #include "source/common/common/assert.h" 4 : #include "source/common/grpc/common.h" 5 : 6 : #include "absl/strings/str_join.h" 7 : 8 : namespace Envoy { 9 : namespace Upstream { 10 : 11 : CdsApiPtr CdsApiImpl::create(const envoy::config::core::v3::ConfigSource& cds_config, 12 : const xds::core::v3::ResourceLocator* cds_resources_locator, 13 : ClusterManager& cm, Stats::Scope& scope, 14 28 : ProtobufMessage::ValidationVisitor& validation_visitor) { 15 28 : return CdsApiPtr{ 16 28 : new CdsApiImpl(cds_config, cds_resources_locator, cm, scope, validation_visitor)}; 17 28 : } 18 : 19 : CdsApiImpl::CdsApiImpl(const envoy::config::core::v3::ConfigSource& cds_config, 20 : const xds::core::v3::ResourceLocator* cds_resources_locator, 21 : ClusterManager& cm, Stats::Scope& scope, 22 : ProtobufMessage::ValidationVisitor& validation_visitor) 23 : : Envoy::Config::SubscriptionBase<envoy::config::cluster::v3::Cluster>(validation_visitor, 24 : "name"), 25 28 : helper_(cm, "cds"), cm_(cm), scope_(scope.createScope("cluster_manager.cds.")) { 26 28 : const auto resource_name = getResourceName(); 27 28 : if (cds_resources_locator == nullptr) { 28 28 : subscription_ = cm_.subscriptionFactory().subscriptionFromConfigSource( 29 28 : cds_config, Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_, {}); 30 28 : } else { 31 0 : subscription_ = cm.subscriptionFactory().collectionSubscriptionFromUrl( 32 0 : *cds_resources_locator, cds_config, resource_name, *scope_, *this, resource_decoder_); 33 0 : } 34 28 : } 35 : 36 : absl::Status CdsApiImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources, 37 20 : const std::string& version_info) { 38 20 : auto all_existing_clusters = cm_.clusters(); 39 : // Exclude the clusters which CDS wants to add. 40 20 : for (const auto& resource : resources) { 41 20 : all_existing_clusters.active_clusters_.erase(resource.get().name()); 42 20 : all_existing_clusters.warming_clusters_.erase(resource.get().name()); 43 20 : } 44 20 : Protobuf::RepeatedPtrField<std::string> to_remove_repeated; 45 40 : for (const auto& [cluster_name, _] : all_existing_clusters.active_clusters_) { 46 40 : UNREFERENCED_PARAMETER(_); 47 40 : *to_remove_repeated.Add() = cluster_name; 48 40 : } 49 20 : for (const auto& [cluster_name, _] : all_existing_clusters.warming_clusters_) { 50 0 : UNREFERENCED_PARAMETER(_); 51 : // Do not add the cluster twice when the cluster is both active and warming. 52 0 : if (!all_existing_clusters.active_clusters_.contains(cluster_name)) { 53 0 : *to_remove_repeated.Add() = cluster_name; 54 0 : } 55 0 : } 56 20 : return onConfigUpdate(resources, to_remove_repeated, version_info); 57 20 : } 58 : 59 : absl::Status 60 : CdsApiImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& added_resources, 61 : const Protobuf::RepeatedPtrField<std::string>& removed_resources, 62 28 : const std::string& system_version_info) { 63 28 : auto exception_msgs = 64 28 : helper_.onConfigUpdate(added_resources, removed_resources, system_version_info); 65 28 : runInitializeCallbackIfAny(); 66 28 : if (!exception_msgs.empty()) { 67 0 : return absl::InvalidArgumentError( 68 0 : fmt::format("Error adding/updating cluster(s) {}", absl::StrJoin(exception_msgs, ", "))); 69 0 : } 70 28 : return absl::OkStatus(); 71 28 : } 72 : 73 : void CdsApiImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason, 74 0 : const EnvoyException*) { 75 0 : ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason); 76 : // We need to allow server startup to continue, even if we have a bad 77 : // config. 78 0 : runInitializeCallbackIfAny(); 79 0 : } 80 : 81 28 : void CdsApiImpl::runInitializeCallbackIfAny() { 82 28 : if (initialize_callback_) { 83 28 : initialize_callback_(); 84 28 : initialize_callback_ = nullptr; 85 28 : } 86 28 : } 87 : 88 : } // namespace Upstream 89 : } // namespace Envoy