1
#include "source/common/upstream/od_cds_api_impl.h"
2

            
3
#include "source/common/common/assert.h"
4
#include "source/common/grpc/common.h"
5

            
6
#include "absl/strings/str_join.h"
7

            
8
namespace Envoy {
9
namespace Upstream {
10

            
11
absl::StatusOr<OdCdsApiSharedPtr>
12
OdCdsApiImpl::create(const envoy::config::core::v3::ConfigSource& odcds_config,
13
                     OptRef<xds::core::v3::ResourceLocator> odcds_resources_locator,
14
                     Config::XdsManager& xds_manager, ClusterManager& cm,
15
                     MissingClusterNotifier& notifier, Stats::Scope& scope,
16
                     ProtobufMessage::ValidationVisitor& validation_visitor,
17
100
                     Server::Configuration::ServerFactoryContext&) {
18
100
  absl::Status creation_status = absl::OkStatus();
19
100
  auto ret =
20
100
      OdCdsApiSharedPtr(new OdCdsApiImpl(odcds_config, odcds_resources_locator, xds_manager, cm,
21
100
                                         notifier, scope, validation_visitor, creation_status));
22
100
  RETURN_IF_NOT_OK(creation_status);
23
100
  return ret;
24
100
}
25

            
26
OdCdsApiImpl::OdCdsApiImpl(const envoy::config::core::v3::ConfigSource& odcds_config,
27
                           OptRef<xds::core::v3::ResourceLocator> odcds_resources_locator,
28
                           Config::XdsManager& xds_manager, ClusterManager& cm,
29
                           MissingClusterNotifier& notifier, Stats::Scope& scope,
30
                           ProtobufMessage::ValidationVisitor& validation_visitor,
31
                           absl::Status& creation_status)
32
100
    : Envoy::Config::SubscriptionBase<envoy::config::cluster::v3::Cluster>(validation_visitor,
33
100
                                                                           "name"),
34
100
      helper_(cm, xds_manager, "odcds"), notifier_(notifier),
35
100
      scope_(scope.createScope("cluster_manager.odcds.")) {
36
  // TODO(krnowak): Move the subscription setup to CdsApiHelper. Maybe make CdsApiHelper a base
37
  // class for CDS and ODCDS.
38
100
  const auto resource_name = getResourceName();
39
100
  absl::StatusOr<Config::SubscriptionPtr> subscription_or_error;
40
100
  if (!odcds_resources_locator.has_value()) {
41
99
    subscription_or_error = cm.subscriptionFactory().subscriptionFromConfigSource(
42
99
        odcds_config, Grpc::Common::typeUrl(resource_name), *scope_, *this, resource_decoder_, {});
43
99
  } else {
44
1
    subscription_or_error = cm.subscriptionFactory().collectionSubscriptionFromUrl(
45
1
        *odcds_resources_locator, odcds_config, resource_name, *scope_, *this, resource_decoder_);
46
1
  }
47
100
  SET_AND_RETURN_IF_NOT_OK(subscription_or_error.status(), creation_status);
48
100
  subscription_ = std::move(*subscription_or_error);
49
100
}
50

            
51
absl::Status OdCdsApiImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
52
                                          const std::string& version_info) {
53
  UNREFERENCED_PARAMETER(resources);
54
  UNREFERENCED_PARAMETER(version_info);
55
  // On-demand cluster updates are only supported for delta, not sotw.
56
  PANIC("not supported");
57
}
58

            
59
absl::Status
60
OdCdsApiImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& added_resources,
61
                             const Protobuf::RepeatedPtrField<std::string>& removed_resources,
62
68
                             const std::string& system_version_info) {
63
68
  auto [_, exception_msgs] =
64
68
      helper_.onConfigUpdate(added_resources, removed_resources, system_version_info);
65
68
  sendAwaiting();
66
68
  status_ = StartStatus::InitialFetchDone;
67
  // According to the XDS specification, the server can send a reply with names in the
68
  // removed_resources field for requested resources that do not exist. That way we can notify the
69
  // interested parties about the missing resource immediately without waiting for some timeout to
70
  // be triggered.
71
68
  for (const auto& resource_name : removed_resources) {
72
11
    ENVOY_LOG(debug, "odcds: notifying about potential missing cluster {}", resource_name);
73
11
    notifier_.notifyMissingCluster(resource_name);
74
11
  }
75
68
  if (!exception_msgs.empty()) {
76
1
    return absl::InvalidArgumentError(
77
1
        fmt::format("Error adding/updating cluster(s) {}", absl::StrJoin(exception_msgs, ", ")));
78
1
  }
79
67
  return absl::OkStatus();
80
68
}
81

            
82
void OdCdsApiImpl::onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
83
4
                                        const EnvoyException*) {
84
4
  ASSERT(Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure != reason);
85
4
  sendAwaiting();
86
4
  status_ = StartStatus::InitialFetchDone;
87
4
}
88

            
89
72
void OdCdsApiImpl::sendAwaiting() {
90
72
  if (awaiting_names_.empty()) {
91
69
    return;
92
69
  }
93
  // The awaiting names are sent only once. After the state transition from Starting to
94
  // InitialFetchDone (which happens on the first received response), the awaiting names list is not
95
  // used any more.
96
3
  ENVOY_LOG(debug, "odcds: sending request for awaiting cluster names {}",
97
3
            fmt::join(awaiting_names_, ", "));
98
3
  subscription_->requestOnDemandUpdate(awaiting_names_);
99
3
  awaiting_names_.clear();
100
3
}
101

            
102
117
void OdCdsApiImpl::updateOnDemand(std::string cluster_name) {
103
117
  switch (status_) {
104
81
  case StartStatus::NotStarted:
105
81
    ENVOY_LOG(trace, "odcds: starting a subscription with cluster name {}", cluster_name);
106
81
    status_ = StartStatus::Started;
107
81
    subscription_->start({std::move(cluster_name)});
108
81
    return;
109

            
110
7
  case StartStatus::Started:
111
7
    ENVOY_LOG(trace, "odcds: putting cluster name {} on awaiting list", cluster_name);
112
7
    awaiting_names_.insert(std::move(cluster_name));
113
7
    return;
114

            
115
29
  case StartStatus::InitialFetchDone:
116
29
    ENVOY_LOG(trace, "odcds: requesting for cluster name {}", cluster_name);
117
29
    subscription_->requestOnDemandUpdate({std::move(cluster_name)});
118
29
    return;
119
117
  }
120
117
}
121

            
122
// A class that maintains all the od-cds xDS-TP based singleton subscriptions,
123
// and update the cluster-manager when the resources are updated.
124
// The object will only be accessed by the main thread. It should also be a
125
// singleton object that is used by all the filters that need to access od-cds
126
// over xdstp-based config sources, and will only be allocated for the first
127
// occurrence of the filter.
128
class XdstpOdCdsApiImpl::XdstpOdcdsSubscriptionsManager : public Singleton::Instance,
129
                                                          Logger::Loggable<Logger::Id::upstream> {
130
public:
131
  XdstpOdcdsSubscriptionsManager(Config::XdsManager& xds_manager, ClusterManager& cm,
132
                                 MissingClusterNotifier& notifier, Stats::Scope& scope,
133
                                 ProtobufMessage::ValidationVisitor& validation_visitor)
134
83
      : xds_manager_(xds_manager), helper_(cm, xds_manager, "odcds-xdstp"), notifier_(notifier),
135
83
        scope_(scope.createScope("cluster_manager.odcds.")),
136
83
        validation_visitor_(validation_visitor) {}
137

            
138
  absl::Status onResourceUpdate(absl::string_view resource_name,
139
                                const Config::DecodedResourceRef& resource,
140
106
                                const std::string& system_version_info) {
141
106
    auto [_, exception_msgs] = helper_.onConfigUpdate({resource}, {}, system_version_info);
142
106
    if (!exception_msgs.empty()) {
143
      return absl::InvalidArgumentError(fmt::format("Error adding/updating cluster {} - {}",
144
                                                    resource_name,
145
                                                    absl::StrJoin(exception_msgs, ", ")));
146
    }
147
106
    return absl::OkStatus();
148
106
  }
149

            
150
  absl::Status onResourceRemoved(absl::string_view resource_name,
151
10
                                 const std::string& system_version_info) {
152
    // TODO(adisuissa): add direct `onResourceRemove(resource_name)` to `helper_`.
153
10
    Protobuf::RepeatedPtrField<std::string> removed_resource_list;
154
10
    removed_resource_list.Add(std::string(resource_name));
155
10
    auto [_, exception_msgs] =
156
10
        helper_.onConfigUpdate({}, removed_resource_list, system_version_info);
157
    // Removal of a cluster should not result in an error.
158
10
    ASSERT(exception_msgs.empty());
159
10
    notifier_.notifyMissingCluster(resource_name);
160
10
    return absl::OkStatus();
161
10
  }
162

            
163
2
  void onFailure(absl::string_view resource_name) {
164
2
    ENVOY_LOG(trace, "ODCDS-manager: failure for resource: {}", resource_name);
165
    // This function will only be invoked if the resource wasn't previously updated or removed.
166
    // Remove the resource, so if there are other resources waiting for it,
167
    // their initialization can proceed.
168
2
    notifier_.notifyMissingCluster(resource_name);
169
2
  }
170

            
171
128
  void addSubscription(absl::string_view resource_name, bool old_ads) {
172
128
    if (subscriptions_.contains(resource_name)) {
173
1
      ENVOY_LOG(debug, "ODCDS-manager: resource {} is already subscribed to, skipping",
174
1
                resource_name);
175
1
      return;
176
1
    }
177
127
    ENVOY_LOG(trace, "ODCDS-manager: adding a subscription for resource {}", resource_name);
178
    // Subscribe using the xds-manager.
179
127
    auto subscription =
180
127
        std::make_unique<PerSubscriptionData>(*this, resource_name, validation_visitor_);
181
127
    absl::Status status = subscription->initializeSubscription(old_ads);
182
127
    if (status.ok()) {
183
127
      subscriptions_.emplace(std::string(resource_name), std::move(subscription));
184
127
    } else {
185
      // There was an error while subscribing. This could be, for example, when
186
      // the cluster_name isn't a valid xdstp resource, or its config-source was
187
      // not added to the bootstrap's config_sources.
188
      ENVOY_LOG(info,
189
                "ODCDS-manager: xDS-TP resource {} could not be registered: {}. Treating as "
190
                "missing cluster",
191
                resource_name, status.message());
192
      onFailure(resource_name);
193
    }
194
127
  }
195

            
196
private:
197
  // A singleton subscription handler.
198
  class PerSubscriptionData : Envoy::Config::SubscriptionBase<envoy::config::cluster::v3::Cluster> {
199
  public:
200
    PerSubscriptionData(XdstpOdcdsSubscriptionsManager& parent, absl::string_view resource_name,
201
                        ProtobufMessage::ValidationVisitor& validation_visitor)
202
127
        : Envoy::Config::SubscriptionBase<envoy::config::cluster::v3::Cluster>(validation_visitor,
203
127
                                                                               "name"),
204
127
          parent_(parent), resource_name_(resource_name) {}
205

            
206
127
    absl::Status initializeSubscription(bool old_ads) {
207
127
      const auto resource_type = getResourceName();
208
      // If old_ads is set, creates a subscription using the staticAdsConfigSource.
209
      // Otherwise, the subscribeToSingletonResource will take care of
210
      // subscription via the ADS source.
211
127
      absl::StatusOr<Config::SubscriptionPtr> subscription_or_error =
212
127
          parent_.xds_manager_.subscribeToSingletonResource(
213
127
              resource_name_,
214
127
              old_ads
215
127
                  ? makeOptRef<const envoy::config::core::v3::ConfigSource>(staticAdsConfigSource())
216
127
                  : absl::nullopt,
217
127
              Grpc::Common::typeUrl(resource_type), *parent_.scope_, *this, resource_decoder_, {});
218
127
      RETURN_IF_NOT_OK_REF(subscription_or_error.status());
219
127
      subscription_ = std::move(subscription_or_error.value());
220
127
      subscription_->start({resource_name_});
221
127
      return absl::OkStatus();
222
127
    }
223

            
224
  private:
225
92
    const envoy::config::core::v3::ConfigSource& staticAdsConfigSource() {
226
92
      CONSTRUCT_ON_FIRST_USE(envoy::config::core::v3::ConfigSource,
227
92
                             []() -> envoy::config::core::v3::ConfigSource {
228
92
                               envoy::config::core::v3::ConfigSource ads;
229
92
                               ads.mutable_ads();
230
92
                               return ads;
231
92
                             }());
232
92
    }
233

            
234
    // Config::SubscriptionCallbacks
235
    absl::Status onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
236
7
                                const std::string& version_info) override {
237
      // As this is a singleton subscription, the response can either contain 1
238
      // resource that should be updated or 0 (implying the resource should be
239
      // removed).
240
7
      ASSERT(resources.empty() || (resources.size() == 1));
241
7
      resource_was_updated_ = true;
242
7
      if (resources.empty()) {
243
1
        ENVOY_LOG(trace, "ODCDS-manager: removing a single resource: {}", resource_name_);
244
1
        return parent_.onResourceRemoved(resource_name_, version_info);
245
1
      }
246
      // A single cluster update.
247
6
      ENVOY_LOG(trace, "ODCDS-manager: updating a single resource: {}", resource_name_);
248
6
      return parent_.onResourceUpdate(resource_name_, resources[0], version_info);
249
7
    }
250
    absl::Status onConfigUpdate(const std::vector<Config::DecodedResourceRef>& added_resources,
251
                                const Protobuf::RepeatedPtrField<std::string>& removed_resources,
252
109
                                const std::string& system_version_info) override {
253
      // As this is a singleton subscription, the update can either contain 1
254
      // added resource, or remove the resource.
255
109
      ASSERT(added_resources.size() + removed_resources.size() == 1);
256
109
      resource_was_updated_ = true;
257
109
      if (!removed_resources.empty()) {
258
9
        ENVOY_LOG(trace, "ODCDS-manager: removing a single resource: {}", resource_name_);
259
9
        return parent_.onResourceRemoved(resource_name_, system_version_info);
260
9
      }
261
      // A single cluster update.
262
100
      ENVOY_LOG(trace, "ODCDS-manager: updating a single resource: {}", resource_name_);
263
100
      return parent_.onResourceUpdate(resource_name_, added_resources[0], system_version_info);
264
109
    }
265
    void onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason reason,
266
2
                              const EnvoyException* e) override {
267
2
      ASSERT(reason != Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure);
268
2
      ENVOY_LOG(trace, "ODCDS-manager: error while fetching a single resource {}: {}",
269
2
                resource_name_, e->what());
270
      // If the resource wasn't previously updated, this sends a notification that it was removed,
271
      // so if there are any resources waiting for this one, they can proceed.
272
2
      if (!resource_was_updated_) {
273
2
        resource_was_updated_ = true;
274
2
        parent_.onFailure(resource_name_);
275
2
      }
276
2
    }
277

            
278
    XdstpOdcdsSubscriptionsManager& parent_;
279
    // TODO(adisuissa): this can be converted to an absl::string_view and point to the
280
    // subscriptions_ map key.
281
    const std::string resource_name_;
282
    Config::SubscriptionPtr subscription_;
283
    bool resource_was_updated_{false};
284
  };
285
  using PerSubscriptionDataPtr = std::unique_ptr<PerSubscriptionData>;
286

            
287
  Config::XdsManager& xds_manager_;
288
  CdsApiHelper helper_;
289
  MissingClusterNotifier& notifier_;
290
  Stats::ScopeSharedPtr scope_;
291
  ProtobufMessage::ValidationVisitor& validation_visitor_;
292
  // Maps a resource name to its subscription data.
293
  absl::flat_hash_map<std::string, PerSubscriptionDataPtr> subscriptions_;
294
};
295

            
296
// Register the XdstpOdcdsSubscriptionsManager singleton.
297
SINGLETON_MANAGER_REGISTRATION(xdstp_odcds_subscriptions_manager);
298

            
299
absl::StatusOr<OdCdsApiSharedPtr>
300
XdstpOdCdsApiImpl::create(const envoy::config::core::v3::ConfigSource& config_source,
301
                          OptRef<xds::core::v3::ResourceLocator>, Config::XdsManager& xds_manager,
302
                          ClusterManager& cm, MissingClusterNotifier& notifier, Stats::Scope& scope,
303
                          ProtobufMessage::ValidationVisitor& validation_visitor,
304
83
                          Server::Configuration::ServerFactoryContext& server_factory_context) {
305
83
  absl::Status creation_status = absl::OkStatus();
306
  // TODO(adisuissa): convert the config_source to optional.
307
83
  const bool old_ads = config_source.config_source_specifier_case() ==
308
83
                       envoy::config::core::v3::ConfigSource::ConfigSourceSpecifierCase::kAds;
309
83
  auto ret = OdCdsApiSharedPtr(new XdstpOdCdsApiImpl(xds_manager, cm, notifier, scope,
310
83
                                                     server_factory_context, old_ads,
311
83
                                                     validation_visitor, creation_status));
312
83
  RETURN_IF_NOT_OK(creation_status);
313
83
  return ret;
314
83
}
315

            
316
XdstpOdCdsApiImpl::XdstpOdCdsApiImpl(Config::XdsManager& xds_manager, ClusterManager& cm,
317
                                     MissingClusterNotifier& notifier, Stats::Scope& scope,
318
                                     Server::Configuration::ServerFactoryContext& server_context,
319
                                     bool old_ads,
320
                                     ProtobufMessage::ValidationVisitor& validation_visitor,
321
                                     absl::Status& creation_status)
322
83
    : old_ads_(old_ads) {
323
  // Create a singleton xdstp-based od-cds handler. This will be accessed by
324
  // the main thread and used by all the filters that need to access od-cds
325
  // over xdstp-based config sources.
326
  // The singleton object will handle all the subscriptions to OD-CDS
327
  // resources, and will apply the updates to the cluster-manager.
328
83
  subscriptions_manager_ =
329
83
      subscriptionsManager(server_context, xds_manager, cm, notifier, scope, validation_visitor);
330
  // This will always succeed as the xDS-TP config-source matching the resource name
331
  // will only be known when that resource is subscribed to.
332
83
  creation_status = absl::OkStatus();
333
83
}
334

            
335
XdstpOdCdsApiImpl::XdstpOdcdsSubscriptionsManagerSharedPtr
336
XdstpOdCdsApiImpl::subscriptionsManager(Server::Configuration::ServerFactoryContext& server_context,
337
                                        Config::XdsManager& xds_manager, ClusterManager& cm,
338
                                        MissingClusterNotifier& notifier, Stats::Scope& scope,
339
83
                                        ProtobufMessage::ValidationVisitor& validation_visitor) {
340
83
  return server_context.singletonManager().getTyped<XdstpOdcdsSubscriptionsManager>(
341
83
      SINGLETON_MANAGER_REGISTERED_NAME(xdstp_odcds_subscriptions_manager),
342
83
      [&xds_manager, &cm, &notifier, &scope, &validation_visitor] {
343
83
        return std::make_shared<XdstpOdcdsSubscriptionsManager>(xds_manager, cm, notifier, scope,
344
83
                                                                validation_visitor);
345
83
      });
346
83
}
347

            
348
128
void XdstpOdCdsApiImpl::updateOnDemand(std::string cluster_name) {
349
128
  subscriptions_manager_->addSubscription(cluster_name, old_ads_);
350
128
}
351
} // namespace Upstream
352
} // namespace Envoy