Line data Source code
1 : #include "source/common/upstream/od_cds_api_impl.h"
2 :
3 : #include "source/common/common/assert.h"
4 : #include "source/common/grpc/common.h"
5 :
6 : #include "absl/strings/str_join.h"
7 :
8 : namespace Envoy {
9 : namespace Upstream {
10 :
11 : OdCdsApiSharedPtr
12 : OdCdsApiImpl::create(const envoy::config::core::v3::ConfigSource& odcds_config,
13 : OptRef<xds::core::v3::ResourceLocator> odcds_resources_locator,
14 : ClusterManager& cm, MissingClusterNotifier& notifier, Stats::Scope& scope,
15 0 : ProtobufMessage::ValidationVisitor& validation_visitor) {
16 0 : return OdCdsApiSharedPtr(new OdCdsApiImpl(odcds_config, odcds_resources_locator, cm, notifier,
17 0 : scope, validation_visitor));
18 0 : }
19 :
20 : OdCdsApiImpl::OdCdsApiImpl(const envoy::config::core::v3::ConfigSource& odcds_config,
21 : OptRef<xds::core::v3::ResourceLocator> odcds_resources_locator,
22 : ClusterManager& cm, MissingClusterNotifier& notifier,
23 : Stats::Scope& scope,
24 : ProtobufMessage::ValidationVisitor& validation_visitor)
25 : : Envoy::Config::SubscriptionBase<envoy::config::cluster::v3::Cluster>(validation_visitor,
26 : "name"),
27 : helper_(cm, "odcds"), cm_(cm), notifier_(notifier),
28 0 : scope_(scope.createScope("cluster_manager.odcds.")) {
29 : // TODO(krnowak): Move the subscription setup to CdsApiHelper. Maybe make CdsApiHelper a base
30 : // class for CDS and ODCDS.
31 0 : const auto resource_name = getResourceName();
32 0 : if (!odcds_resources_locator.has_value()) {
33 0 : subscription_ = cm_.subscriptionFactory().subscriptionFromConfigSource(
34 0 : odcds_config, Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_, {});
35 0 : } else {
36 0 : subscription_ = cm.subscriptionFactory().collectionSubscriptionFromUrl(
37 0 : *odcds_resources_locator, odcds_config, resource_name, *scope_, *this, resource_decoder_);
38 0 : }
39 0 : }
40 :
41 : absl::Status OdCdsApiImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
42 0 : const std::string& version_info) {
43 0 : UNREFERENCED_PARAMETER(resources);
44 0 : UNREFERENCED_PARAMETER(version_info);
45 : // On-demand cluster updates are only supported for delta, not sotw.
46 0 : PANIC("not supported");
47 0 : }
48 :
49 : absl::Status
50 : OdCdsApiImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& added_resources,
51 : const Protobuf::RepeatedPtrField<std::string>& removed_resources,
52 0 : const std::string& system_version_info) {
53 0 : auto exception_msgs =
54 0 : helper_.onConfigUpdate(added_resources, removed_resources, system_version_info);
55 0 : sendAwaiting();
56 0 : status_ = StartStatus::InitialFetchDone;
57 : // According to the XDS specification, the server can send a reply with names in the
58 : // removed_resources field for requested resources that do not exist. That way we can notify the
59 : // interested parties about the missing resource immediately without waiting for some timeout to
60 : // be triggered.
61 0 : for (const auto& resource_name : removed_resources) {
62 0 : ENVOY_LOG(debug, "odcds: notifying about potential missing cluster {}", resource_name);
63 0 : notifier_.notifyMissingCluster(resource_name);
64 0 : }
65 0 : if (!exception_msgs.empty()) {
66 0 : return absl::InvalidArgumentError(
67 0 : fmt::format("Error adding/updating cluster(s) {}", absl::StrJoin(exception_msgs, ", ")));
68 0 : }
69 0 : return absl::OkStatus();
70 0 : }
71 :
72 : void OdCdsApiImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
73 0 : const EnvoyException*) {
74 0 : ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
75 0 : sendAwaiting();
76 0 : status_ = StartStatus::InitialFetchDone;
77 0 : }
78 :
79 0 : void OdCdsApiImpl::sendAwaiting() {
80 0 : if (awaiting_names_.empty()) {
81 0 : return;
82 0 : }
83 : // The awaiting names are sent only once. After the state transition from Starting to
84 : // InitialFetchDone (which happens on the first received response), the awaiting names list is not
85 : // used any more.
86 0 : ENVOY_LOG(debug, "odcds: sending request for awaiting cluster names {}",
87 0 : fmt::join(awaiting_names_, ", "));
88 0 : subscription_->requestOnDemandUpdate(awaiting_names_);
89 0 : awaiting_names_.clear();
90 0 : }
91 :
92 0 : void OdCdsApiImpl::updateOnDemand(std::string cluster_name) {
93 0 : switch (status_) {
94 0 : case StartStatus::NotStarted:
95 0 : ENVOY_LOG(trace, "odcds: starting a subscription with cluster name {}", cluster_name);
96 0 : status_ = StartStatus::Started;
97 0 : subscription_->start({std::move(cluster_name)});
98 0 : return;
99 :
100 0 : case StartStatus::Started:
101 0 : ENVOY_LOG(trace, "odcds: putting cluster name {} on awaiting list", cluster_name);
102 0 : awaiting_names_.insert(std::move(cluster_name));
103 0 : return;
104 :
105 0 : case StartStatus::InitialFetchDone:
106 0 : ENVOY_LOG(trace, "odcds: requesting for cluster name {}", cluster_name);
107 0 : subscription_->requestOnDemandUpdate({std::move(cluster_name)});
108 0 : return;
109 0 : }
110 0 : PANIC("corrupt enum");
111 0 : }
112 :
113 : } // namespace Upstream
114 : } // namespace Envoy
|