1
#include "source/common/config/xds_manager_impl.h"
2

            
3
#include <algorithm>
4
#include <memory>
5
#include <optional>
6
#include <string>
7
#include <utility>
8
#include <vector>
9

            
10
#include "envoy/common/exception.h"
11
#include "envoy/common/optref.h"
12
#include "envoy/config/core/v3/config_source.pb.h"
13
#include "envoy/config/core/v3/config_source.pb.validate.h"
14
#include "envoy/config/custom_config_validators.h"
15
#include "envoy/config/grpc_mux.h"
16
#include "envoy/config/subscription.h"
17
#include "envoy/config/subscription_factory.h"
18
#include "envoy/config/xds_config_tracker.h"
19
#include "envoy/config/xds_resources_delegate.h"
20
#include "envoy/grpc/async_client.h"
21
#include "envoy/grpc/async_client_manager.h"
22
#include "envoy/stats/scope.h"
23
#include "envoy/upstream/cluster_manager.h"
24

            
25
#include "source/common/common/assert.h"
26
#include "source/common/common/backoff_strategy.h"
27
#include "source/common/common/cleanup.h"
28
#include "source/common/common/logger.h"
29
#include "source/common/common/thread.h"
30
#include "source/common/config/custom_config_validators_impl.h"
31
#include "source/common/config/null_grpc_mux_impl.h"
32
#include "source/common/config/subscription_factory_impl.h"
33
#include "source/common/config/utility.h"
34
#include "source/common/config/xds_resource.h"
35
#include "source/common/protobuf/protobuf.h"
36
#include "source/common/protobuf/utility.h"
37
#include "source/common/runtime/runtime_features.h"
38
#include "source/common/upstream/load_stats_reporter_impl.h"
39

            
40
#include "absl/container/flat_hash_set.h"
41
#include "absl/status/status.h"
42
#include "absl/status/statusor.h"
43
#include "absl/strings/string_view.h"
44

            
45
namespace Envoy {
46
namespace Config {
47
namespace {
48
absl::Status createUniqueClients(Grpc::AsyncClientManager& async_client_manager,
49
                                 const envoy::config::core::v3::ApiConfigSource& config_source,
50
                                 Stats::Scope& stats_scope, bool skip_cluster_check,
51
                                 bool xdstp_config_source,
52
                                 Grpc::RawAsyncClientSharedPtr& primary_client,
53
521
                                 Grpc::RawAsyncClientSharedPtr& failover_client) {
54
521
  auto factory_primary_or_error = Config::Utility::factoryForGrpcApiConfigSource(
55
521
      async_client_manager, config_source, stats_scope, skip_cluster_check, 0 /*grpc_service_idx*/,
56
521
      xdstp_config_source);
57
521
  RETURN_IF_NOT_OK_REF(factory_primary_or_error.status());
58
519
  Grpc::AsyncClientFactoryPtr factory_failover = nullptr;
59
519
  if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
60
70
    auto factory_failover_or_error = Config::Utility::factoryForGrpcApiConfigSource(
61
70
        async_client_manager, config_source, stats_scope, skip_cluster_check,
62
70
        1 /*grpc_service_idx*/, xdstp_config_source);
63
70
    RETURN_IF_NOT_OK_REF(factory_failover_or_error.status());
64
69
    factory_failover = std::move(factory_failover_or_error.value());
65
69
  }
66
518
  absl::StatusOr<Grpc::RawAsyncClientPtr> success =
67
518
      factory_primary_or_error.value()->createUncachedRawAsyncClient();
68
518
  RETURN_IF_NOT_OK_REF(success.status());
69
518
  primary_client = std::move(*success);
70
518
  if (factory_failover) {
71
59
    success = factory_failover->createUncachedRawAsyncClient();
72
59
    RETURN_IF_NOT_OK_REF(success.status());
73
59
    failover_client = std::move(*success);
74
59
  }
75
518
  return absl::OkStatus();
76
518
}
77

            
78
absl::Status createSharedClients(Grpc::AsyncClientManager& async_client_manager,
79
                                 const envoy::config::core::v3::ApiConfigSource& api_config_source,
80
                                 Stats::Scope& stats_scope, bool skip_cluster_check,
81
                                 bool xdstp_config_source,
82
                                 Grpc::RawAsyncClientSharedPtr& primary_client,
83
17
                                 Grpc::RawAsyncClientSharedPtr& failover_client) {
84
17
  absl::StatusOr<Envoy::OptRef<const envoy::config::core::v3::GrpcService>> maybe_grpc_service =
85
17
      Utility::getGrpcConfigFromApiConfigSource(api_config_source, /*grpc_service_idx*/ 0,
86
17
                                                xdstp_config_source);
87
17
  RETURN_IF_NOT_OK_REF(maybe_grpc_service.status());
88
17
  if (maybe_grpc_service.value().has_value()) {
89
17
    absl::StatusOr<Grpc::RawAsyncClientSharedPtr> success =
90
17
        async_client_manager.getOrCreateRawAsyncClientWithHashKey(
91
17
            Grpc::GrpcServiceConfigWithHashKey(*maybe_grpc_service.value()), stats_scope,
92
17
            skip_cluster_check);
93
17
    RETURN_IF_NOT_OK_REF(success.status());
94
16
    primary_client = std::move(*success);
95
16
  }
96
16
  if (Runtime::runtimeFeatureEnabled("envoy.restart_features.xds_failover_support")) {
97
6
    absl::StatusOr<Envoy::OptRef<const envoy::config::core::v3::GrpcService>> maybe_grpc_service =
98
6
        Utility::getGrpcConfigFromApiConfigSource(api_config_source, /*grpc_service_idx*/ 1,
99
6
                                                  xdstp_config_source);
100
6
    RETURN_IF_NOT_OK_REF(maybe_grpc_service.status());
101
6
    if (maybe_grpc_service.value().has_value()) {
102
4
      absl::StatusOr<Grpc::RawAsyncClientSharedPtr> success =
103
4
          async_client_manager.getOrCreateRawAsyncClientWithHashKey(
104
4
              Grpc::GrpcServiceConfigWithHashKey(*maybe_grpc_service.value()), stats_scope,
105
4
              skip_cluster_check);
106
4
      RETURN_IF_NOT_OK_REF(success.status());
107
3
      failover_client = std::move(*success);
108
3
    }
109
6
  }
110
15
  return absl::OkStatus();
111
16
}
112

            
113
absl::Status createGrpcClients(Grpc::AsyncClientManager& async_client_manager,
114
                               const envoy::config::core::v3::ApiConfigSource& api_config_source,
115
                               Stats::Scope& stats_scope, bool skip_cluster_check,
116
                               bool xdstp_config_source,
117
                               Grpc::RawAsyncClientSharedPtr& primary_client,
118
538
                               Grpc::RawAsyncClientSharedPtr& failover_client) {
119

            
120
538
  if (Runtime::runtimeFeatureEnabled("envoy.restart_features.use_cached_grpc_client_for_xds")) {
121
17
    RETURN_IF_NOT_OK(createSharedClients(async_client_manager, api_config_source, stats_scope,
122
17
                                         skip_cluster_check, xdstp_config_source, primary_client,
123
17
                                         failover_client));
124
521
  } else {
125
521
    RETURN_IF_NOT_OK(createUniqueClients(async_client_manager, api_config_source, stats_scope,
126
521
                                         skip_cluster_check, xdstp_config_source, primary_client,
127
521
                                         failover_client));
128
518
  }
129
533
  if (primary_client == nullptr) {
130
2
    return absl::InvalidArgumentError("gRPC client construction failed for primary cluster.");
131
2
  }
132
531
  return absl::OkStatus();
133
533
}
134
} // namespace
135

            
136
absl::Status XdsManagerImpl::initialize(const envoy::config::bootstrap::v3::Bootstrap& bootstrap,
137
10769
                                        Upstream::ClusterManager* cm) {
138
10769
  ASSERT(cm != nullptr);
139
10769
  cm_ = cm;
140

            
141
  // Initialize the XdsResourceDelegate extension, if set on the bootstrap config.
142
10769
  if (bootstrap.has_xds_delegate_extension()) {
143
4
    auto& factory = Config::Utility::getAndCheckFactory<XdsResourcesDelegateFactory>(
144
4
        bootstrap.xds_delegate_extension());
145
4
    xds_resources_delegate_ = factory.createXdsResourcesDelegate(
146
4
        bootstrap.xds_delegate_extension().typed_config(),
147
4
        validation_context_.dynamicValidationVisitor(), api_, main_thread_dispatcher_);
148
4
  }
149

            
150
  // Initialize the XdsConfigTracker extension, if set on the bootstrap config.
151
10769
  if (bootstrap.has_xds_config_tracker_extension()) {
152
32
    auto& tracker_factory = Config::Utility::getAndCheckFactory<XdsConfigTrackerFactory>(
153
32
        bootstrap.xds_config_tracker_extension());
154
32
    xds_config_tracker_ = tracker_factory.createXdsConfigTracker(
155
32
        bootstrap.xds_config_tracker_extension().typed_config(),
156
32
        validation_context_.dynamicValidationVisitor(), api_, main_thread_dispatcher_);
157
32
  }
158

            
159
10769
  OptRef<XdsResourcesDelegate> xds_resources_delegate =
160
10769
      makeOptRefFromPtr<XdsResourcesDelegate>(xds_resources_delegate_.get());
161
10769
  OptRef<XdsConfigTracker> xds_config_tracker =
162
10769
      makeOptRefFromPtr<XdsConfigTracker>(xds_config_tracker_.get());
163

            
164
10769
  subscription_factory_ = std::make_unique<SubscriptionFactoryImpl>(
165
10769
      local_info_, main_thread_dispatcher_, *cm_, validation_context_.dynamicValidationVisitor(),
166
10769
      api_, server_, xds_resources_delegate, xds_config_tracker);
167
10769
  return absl::OkStatus();
168
10769
}
169

            
170
absl::Status
171
10764
XdsManagerImpl::initializeAdsConnections(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) {
172
  // Assumes that primary clusters were already initialized by the
173
  // cluster-manager.
174
  // Setup the xDS-TP based config-sources.
175
  // Iterate over the ConfigSources defined in the bootstrap and initialize each as an ADS source.
176
10764
  for (const auto& config_source : bootstrap.config_sources()) {
177
73
    absl::StatusOr<AuthorityData> authority_or_error = createAuthority(config_source, false);
178
73
    RETURN_IF_NOT_OK(authority_or_error.status());
179
71
    authorities_.emplace_back(std::move(*authority_or_error));
180
71
  }
181
  // Initialize the default_config_source as an ADS source.
182
10762
  if (bootstrap.has_default_config_source()) {
183
71
    absl::StatusOr<AuthorityData> authority_or_error =
184
71
        createAuthority(bootstrap.default_config_source(), true);
185
71
    RETURN_IF_NOT_OK(authority_or_error.status());
186
66
    default_authority_ = std::make_unique<AuthorityData>(std::move(*authority_or_error));
187
66
  }
188

            
189
  // TODO(adisuissa): the rest of this function should be refactored so the shared
190
  // code with "createAuthority" is only defined once.
191
  // Setup the ads_config mux.
192
10757
  const auto& dyn_resources = bootstrap.dynamic_resources();
193
  // This is the only point where distinction between delta ADS and state-of-the-world ADS is made.
194
  // After here, we just have a GrpcMux interface held in ads_mux_, which hides
195
  // whether the backing implementation is delta or SotW.
196
10757
  if (dyn_resources.has_ads_config()) {
197
385
    Config::CustomConfigValidatorsPtr custom_config_validators =
198
385
        std::make_unique<Config::CustomConfigValidatorsImpl>(
199
385
            validation_context_.dynamicValidationVisitor(), server_,
200
385
            dyn_resources.ads_config().config_validators());
201

            
202
385
    auto strategy_or_error = Config::Utility::prepareJitteredExponentialBackOffStrategy(
203
385
        dyn_resources.ads_config(), random_,
204
385
        Envoy::Config::SubscriptionFactory::RetryInitialDelayMs,
205
385
        Envoy::Config::SubscriptionFactory::RetryMaxDelayMs);
206
385
    RETURN_IF_NOT_OK_REF(strategy_or_error.status());
207
385
    JitteredExponentialBackOffStrategyPtr backoff_strategy = std::move(strategy_or_error.value());
208

            
209
385
    OptRef<XdsConfigTracker> xds_config_tracker =
210
385
        makeOptRefFromPtr<XdsConfigTracker>(xds_config_tracker_.get());
211

            
212
385
    if (dyn_resources.ads_config().api_type() ==
213
385
        envoy::config::core::v3::ApiConfigSource::DELTA_GRPC) {
214
230
      absl::Status status = Config::Utility::checkTransportVersion(dyn_resources.ads_config());
215
230
      RETURN_IF_NOT_OK(status);
216
230
      std::string name;
217
230
      if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
218
62
        name = "envoy.config_mux.delta_grpc_mux_factory";
219
168
      } else {
220
168
        name = "envoy.config_mux.new_grpc_mux_factory";
221
168
      }
222
230
      auto* factory = Config::Utility::getFactoryByName<Config::MuxFactory>(name);
223
230
      if (!factory) {
224
        return absl::InvalidArgumentError(fmt::format("{} not found", name));
225
      }
226
230
      Grpc::RawAsyncClientSharedPtr primary_client;
227
230
      Grpc::RawAsyncClientSharedPtr failover_client;
228
230
      RETURN_IF_NOT_OK(createGrpcClients(cm_->grpcAsyncClientManager(), dyn_resources.ads_config(),
229
230
                                         *stats_.rootScope(), /*skip_cluster_check*/ false,
230
230
                                         /*xdstp_config_source*/ false, primary_client,
231
230
                                         failover_client));
232

            
233
230
      std::function<std::unique_ptr<Upstream::LoadStatsReporter>()> lrs_factory =
234
230
          [&, primary_client]() -> std::unique_ptr<Upstream::LoadStatsReporter> {
235
        auto reporter = std::make_unique<Upstream::LoadStatsReporterImpl>(
236
            local_info_, *cm_, *stats_.rootScope(), primary_client, main_thread_dispatcher_);
237
        return reporter;
238
      };
239

            
240
230
      ads_mux_ = factory->create(std::move(primary_client), std::move(failover_client),
241
230
                                 main_thread_dispatcher_, random_, *stats_.rootScope(),
242
230
                                 dyn_resources.ads_config(), local_info_,
243
230
                                 std::move(custom_config_validators), std::move(backoff_strategy),
244
230
                                 xds_config_tracker, {}, lrs_factory);
245
261
    } else {
246
155
      absl::Status status = Config::Utility::checkTransportVersion(dyn_resources.ads_config());
247
155
      RETURN_IF_NOT_OK(status);
248
155
      std::string name;
249
155
      if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
250
17
        name = "envoy.config_mux.sotw_grpc_mux_factory";
251
138
      } else {
252
138
        name = "envoy.config_mux.grpc_mux_factory";
253
138
      }
254

            
255
155
      auto* factory = Config::Utility::getFactoryByName<Config::MuxFactory>(name);
256
155
      if (!factory) {
257
        return absl::InvalidArgumentError(fmt::format("{} not found", name));
258
      }
259
155
      Grpc::RawAsyncClientSharedPtr primary_client;
260
155
      Grpc::RawAsyncClientSharedPtr failover_client;
261
155
      RETURN_IF_NOT_OK(createGrpcClients(cm_->grpcAsyncClientManager(), dyn_resources.ads_config(),
262
155
                                         *stats_.rootScope(), /*skip_cluster_check*/ false,
263
155
                                         /*xdstp_config_source*/ false, primary_client,
264
155
                                         failover_client));
265

            
266
152
      std::function<std::unique_ptr<Upstream::LoadStatsReporter>()> lrs_factory =
267
152
          [&, primary_client]() -> std::unique_ptr<Upstream::LoadStatsReporter> {
268
        auto reporter = std::make_unique<Upstream::LoadStatsReporterImpl>(
269
            local_info_, *cm_, *stats_.rootScope(), primary_client, main_thread_dispatcher_);
270
        return reporter;
271
      };
272

            
273
152
      OptRef<XdsResourcesDelegate> xds_resources_delegate =
274
152
          makeOptRefFromPtr<XdsResourcesDelegate>(xds_resources_delegate_.get());
275
152
      ads_mux_ = factory->create(std::move(primary_client), std::move(failover_client),
276
152
                                 main_thread_dispatcher_, random_, *stats_.rootScope(),
277
152
                                 dyn_resources.ads_config(), local_info_,
278
152
                                 std::move(custom_config_validators), std::move(backoff_strategy),
279
152
                                 xds_config_tracker, xds_resources_delegate, lrs_factory);
280
152
    }
281
10658
  } else {
282
10372
    ads_mux_ = std::make_unique<Config::NullGrpcMuxImpl>();
283
10372
  }
284
10754
  return absl::OkStatus();
285
10757
}
286

            
287
10715
void XdsManagerImpl::startXdstpAdsMuxes() {
288
  // Start the ADS mux objects that were defined in `config_sources`.
289
10715
  for (AuthorityData& authority : authorities_) {
290
57
    authority.grpc_mux_->start();
291
57
  }
292
  // Start the ADS mux of the `default_config_source`, if defined.
293
10715
  if (default_authority_ != nullptr) {
294
57
    default_authority_->grpc_mux_->start();
295
57
  }
296
10715
}
297

            
298
absl::StatusOr<SubscriptionPtr> XdsManagerImpl::subscribeToSingletonResource(
299
    absl::string_view resource_name, OptRef<const envoy::config::core::v3::ConfigSource> config,
300
    absl::string_view type_url, Stats::Scope& scope, SubscriptionCallbacks& callbacks,
301
188
    OpaqueResourceDecoderSharedPtr resource_decoder, const SubscriptionOptions& options) {
302
  // If the resource name is not xDS-TP based, use the old subscription way.
303
188
  if (!XdsResourceIdentifier::hasXdsTpScheme(resource_name)) {
304
109
    if (!config.has_value()) {
305
1
      return absl::InvalidArgumentError(
306
1
          fmt::format("Given subscrption to resource {} must either have an xDS-TP based "
307
1
                      "resource or a config must be provided.",
308
1
                      resource_name));
309
1
    }
310
108
    return subscription_factory_->subscriptionFromConfigSource(*config, type_url, scope, callbacks,
311
108
                                                               resource_decoder, options);
312
109
  }
313
79
  absl::StatusOr<xds::core::v3::ResourceName> resource_urn_or_error =
314
79
      XdsResourceIdentifier::decodeUrn(resource_name);
315
79
  RETURN_IF_NOT_OK(resource_urn_or_error.status());
316
79
  const xds::core::v3::ResourceName resource_urn = std::move(resource_urn_or_error.value());
317
  // Otherwise look at whether there is a Peer-Config.
318
79
  if (config.has_value()) {
319
    // If the config has authorities defined, see if those authorities match the resource's
320
    // authority.
321
48
    bool matched_authority = false;
322
48
    if (!config->authorities().empty()) {
323
3
      for (const auto& authority : config->authorities()) {
324
3
        if (authority.name() == resource_urn.authority()) {
325
1
          matched_authority = true;
326
1
          break;
327
1
        }
328
3
      }
329
3
      if (matched_authority) {
330
        // TODO(adisuissa): support this use case by adding a config-source dynamically to the
331
        // XdsManager.
332
1
        return absl::UnimplementedError(
333
1
            "Dynamically using non-bootstrap defined xDS-TP config sources is not yet supported.");
334
1
      }
335
3
    }
336
48
  }
337
78
  AuthorityData* matched_authority = nullptr;
338
  // Find the right authority from the config_sources authorities by iterating over the bootstrap
339
  // defined authorities.
340
92
  for (auto it = authorities_.begin(); (it != authorities_.end()); ++it) {
341
77
    if (it->authority_names_.contains(resource_urn.authority())) {
342
      // Found the correct authority to use, subscribe using its mux.
343
63
      matched_authority = &(*it);
344
63
      break;
345
63
    }
346
77
  }
347
  // No valid authority found, fallback to use the default_config_source (if defined).
348
78
  if ((matched_authority == nullptr) && (default_authority_ != nullptr)) {
349
12
    matched_authority = default_authority_.get();
350
12
  }
351
  // If found an xdstp-based authority, use it.
352
78
  if (matched_authority != nullptr) {
353
    // Use the config-source from the authorities that were added in the bootstrap.
354
75
    return subscription_factory_->subscriptionOverAdsGrpcMux(
355
75
        matched_authority->grpc_mux_, matched_authority->config_, type_url, scope, callbacks,
356
75
        resource_decoder, options);
357
75
  }
358
  // Nothing was matched, revert to the old-way (given the config-source) if possible.
359
  // This will be used for backwards compatibility.
360
3
  if (config.has_value()) {
361
1
    return subscription_factory_->subscriptionFromConfigSource(*config, type_url, scope, callbacks,
362
1
                                                               resource_decoder, options);
363
1
  }
364
  // No actual config source was found, return an error.
365
2
  return absl::NotFoundError(
366
2
      fmt::format("No valid authority was found for the given xDS-TP resource {}.", resource_name));
367
3
}
368

            
369
24870
ScopedResume XdsManagerImpl::pause(const std::vector<std::string>& type_urls) {
370
  // Apply the pause on all "ADS" based sources (old-ADS-mux, and
371
  // xdstp-config-based sources) by collecting the per-xDS-mux scopes under a
372
  // single scope. Using a shared_ptr here so we can pass it to the Cleanup
373
  // object that is created at the return statement.
374
24870
  auto scoped_resume_collection = std::make_shared<std::vector<ScopedResume>>();
375
24870
  if (ads_mux_ != nullptr) {
376
24870
    scoped_resume_collection->emplace_back(ads_mux_->pause(type_urls));
377
24870
  }
378
24870
  for (auto& authority : authorities_) {
379
254
    scoped_resume_collection->emplace_back(authority.grpc_mux_->pause(type_urls));
380
254
  }
381
24870
  if (default_authority_ != nullptr) {
382
254
    scoped_resume_collection->emplace_back(default_authority_->grpc_mux_->pause(type_urls));
383
254
  }
384
24870
  return std::make_unique<Cleanup>([scoped_resume_collection]() {
385
    // Do nothing. After this function is called the scoped_resume_collection
386
    // will be destroyed, and all the internal cleanups will be invoked.
387
24856
  });
388
24870
}
389

            
390
absl::Status
391
30
XdsManagerImpl::setAdsConfigSource(const envoy::config::core::v3::ApiConfigSource& config_source) {
392
30
  ASSERT_IS_MAIN_OR_TEST_THREAD();
393
30
  RETURN_IF_NOT_OK(validateAdsConfig(config_source));
394

            
395
28
  return replaceAdsMux(config_source);
396
30
}
397

            
398
absl::StatusOr<XdsManagerImpl::AuthorityData>
399
XdsManagerImpl::createAuthority(const envoy::config::core::v3::ConfigSource& config_source,
400
144
                                bool allow_no_authority_names) {
401
  // Only the config_source.api_config_source can be used for authorities at the moment.
402
144
  if (!config_source.has_api_config_source()) {
403
1
    return absl::InvalidArgumentError(
404
1
        "Only api_config_source type is currently supported for xdstp-based config sources.");
405
1
  }
406

            
407
143
  if (!allow_no_authority_names && config_source.authorities().empty()) {
408
1
    return absl::InvalidArgumentError(
409
1
        "xdstp-based non-default config source must have at least one authority.");
410
1
  }
411

            
412
  // Validate that the authority names in the config source don't have repeated values.
413
142
  absl::flat_hash_set<std::string> config_source_authorities;
414
142
  config_source_authorities.reserve(config_source.authorities().size());
415
144
  for (const auto& authority : config_source.authorities()) {
416
144
    const auto ret = config_source_authorities.emplace(authority.name());
417
144
    if (!ret.second) {
418
2
      return absl::InvalidArgumentError(
419
2
          fmt::format("xdstp-based config source authority {} is configured more than once in an "
420
2
                      "xdstp-based config source.",
421
2
                      authority.name()));
422
2
    }
423
144
  }
424

            
425
140
  const auto& api_config_source = config_source.api_config_source();
426

            
427
140
  if ((api_config_source.api_type() != envoy::config::core::v3::ApiConfigSource::AGGREGATED_GRPC) &&
428
140
      (api_config_source.api_type() !=
429
83
       envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC)) {
430
1
    return absl::InvalidArgumentError("xdstp-based config source authority only supports "
431
1
                                      "AGGREGATED_GRPC and AGGREGATED_DELTA_GRPC types.");
432
1
  }
433

            
434
139
  Config::CustomConfigValidatorsPtr custom_config_validators =
435
139
      std::make_unique<Config::CustomConfigValidatorsImpl>(
436
139
          validation_context_.dynamicValidationVisitor(), server_,
437
139
          api_config_source.config_validators());
438

            
439
139
  auto strategy_or_error = Config::Utility::prepareJitteredExponentialBackOffStrategy(
440
139
      api_config_source, random_, Envoy::Config::SubscriptionFactory::RetryInitialDelayMs,
441
139
      Envoy::Config::SubscriptionFactory::RetryMaxDelayMs);
442
139
  RETURN_IF_NOT_OK_REF(strategy_or_error.status());
443
139
  JitteredExponentialBackOffStrategyPtr backoff_strategy = std::move(strategy_or_error.value());
444

            
445
139
  OptRef<XdsConfigTracker> xds_config_tracker =
446
139
      makeOptRefFromPtr<XdsConfigTracker>(xds_config_tracker_.get());
447

            
448
139
  GrpcMuxSharedPtr authority_mux = nullptr;
449
139
  if (api_config_source.api_type() ==
450
139
      envoy::config::core::v3::ApiConfigSource::AGGREGATED_DELTA_GRPC) {
451
82
    absl::Status status = Config::Utility::checkTransportVersion(api_config_source);
452
82
    RETURN_IF_NOT_OK(status);
453
82
    std::string name;
454
82
    if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
455
24
      name = "envoy.config_mux.delta_grpc_mux_factory";
456
58
    } else {
457
58
      name = "envoy.config_mux.new_grpc_mux_factory";
458
58
    }
459
82
    auto* factory = Config::Utility::getFactoryByName<Config::MuxFactory>(name);
460
82
    if (!factory) {
461
1
      return absl::InvalidArgumentError(fmt::format("{} not found", name));
462
1
    }
463
81
    Grpc::RawAsyncClientSharedPtr primary_client;
464
81
    Grpc::RawAsyncClientSharedPtr failover_client;
465
81
    RETURN_IF_NOT_OK(createGrpcClients(cm_->grpcAsyncClientManager(), api_config_source,
466
81
                                       *stats_.rootScope(), /*skip_cluster_check*/ false,
467
81
                                       /*xdstp_config_source*/ true, primary_client,
468
81
                                       failover_client));
469
81
    std::function<std::unique_ptr<Upstream::LoadStatsReporter>()> lrs_factory =
470
81
        [&, primary_client]() -> std::unique_ptr<Upstream::LoadStatsReporter> {
471
      auto reporter = std::make_unique<Upstream::LoadStatsReporterImpl>(
472
          local_info_, *cm_, *stats_.rootScope(), primary_client, main_thread_dispatcher_);
473
      return reporter;
474
    };
475

            
476
81
    authority_mux = factory->create(
477
81
        std::move(primary_client), std::move(failover_client), main_thread_dispatcher_, random_,
478
81
        *stats_.rootScope(), api_config_source, local_info_, std::move(custom_config_validators),
479
81
        std::move(backoff_strategy), xds_config_tracker, {}, lrs_factory);
480
105
  } else {
481
57
    ASSERT(api_config_source.api_type() ==
482
57
           envoy::config::core::v3::ApiConfigSource::AGGREGATED_GRPC);
483
57
    absl::Status status = Config::Utility::checkTransportVersion(api_config_source);
484
57
    RETURN_IF_NOT_OK(status);
485
57
    std::string name;
486
57
    if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.unified_mux")) {
487
      name = "envoy.config_mux.sotw_grpc_mux_factory";
488
57
    } else {
489
57
      name = "envoy.config_mux.grpc_mux_factory";
490
57
    }
491

            
492
57
    auto* factory = Config::Utility::getFactoryByName<Config::MuxFactory>(name);
493
57
    if (!factory) {
494
1
      return absl::InvalidArgumentError(fmt::format("{} not found", name));
495
1
    }
496
56
    Grpc::RawAsyncClientSharedPtr primary_client;
497
56
    Grpc::RawAsyncClientSharedPtr failover_client;
498
56
    RETURN_IF_NOT_OK(createGrpcClients(cm_->grpcAsyncClientManager(), api_config_source,
499
56
                                       *stats_.rootScope(), /*skip_cluster_check*/ false,
500
56
                                       /*xdstp_config_source*/ true, primary_client,
501
56
                                       failover_client));
502
56
    OptRef<XdsResourcesDelegate> xds_resources_delegate =
503
56
        makeOptRefFromPtr<XdsResourcesDelegate>(xds_resources_delegate_.get());
504

            
505
56
    std::function<std::unique_ptr<Upstream::LoadStatsReporter>()> lrs_factory =
506
56
        [&, primary_client]() -> std::unique_ptr<Upstream::LoadStatsReporter> {
507
      auto reporter = std::make_unique<Upstream::LoadStatsReporterImpl>(
508
          local_info_, *cm_, *stats_.rootScope(), primary_client, main_thread_dispatcher_);
509
      return reporter;
510
    };
511

            
512
56
    authority_mux = factory->create(
513
56
        std::move(primary_client), std::move(failover_client), main_thread_dispatcher_, random_,
514
56
        *stats_.rootScope(), api_config_source, local_info_, std::move(custom_config_validators),
515
56
        std::move(backoff_strategy), xds_config_tracker, xds_resources_delegate, lrs_factory);
516
56
  }
517
137
  ASSERT(authority_mux != nullptr);
518

            
519
137
  return AuthorityData(config_source, std::move(config_source_authorities),
520
137
                       std::move(authority_mux));
521
139
}
522

            
523
absl::Status
524
30
XdsManagerImpl::validateAdsConfig(const envoy::config::core::v3::ApiConfigSource& config_source) {
525
30
  auto& validation_visitor = validation_context_.staticValidationVisitor();
526
30
  TRY_ASSERT_MAIN_THREAD { MessageUtil::validate(config_source, validation_visitor); }
527
30
  END_TRY
528
30
  CATCH(const EnvoyException& e, { return absl::InternalError(e.what()); });
529
28
  return absl::OkStatus();
530
30
}
531

            
532
absl::Status
533
28
XdsManagerImpl::replaceAdsMux(const envoy::config::core::v3::ApiConfigSource& ads_config) {
534
28
  ASSERT(cm_ != nullptr);
535
  // If there was no ADS defined, reject replacement.
536
28
  const auto& bootstrap = server_.bootstrap();
537
28
  if (!bootstrap.has_dynamic_resources() || !bootstrap.dynamic_resources().has_ads_config()) {
538
2
    return absl::InternalError(
539
2
        "Cannot replace an ADS config when one wasn't previously configured in the bootstrap");
540
2
  }
541
26
  const auto& bootstrap_ads_config = server_.bootstrap().dynamic_resources().ads_config();
542

            
543
  // There is no support for switching between different ADS types.
544
26
  if (ads_config.api_type() != bootstrap_ads_config.api_type()) {
545
4
    return absl::InternalError(fmt::format(
546
4
        "Cannot replace an ADS config with a different api_type (expected: {})",
547
4
        envoy::config::core::v3::ApiConfigSource::ApiType_Name(bootstrap_ads_config.api_type())));
548
4
  }
549

            
550
  // There is no support for using a different config validator. Note that if
551
  // this is mainly because the validator could be stateful and if the delta-xDS
552
  // protocol is used, then the new validator will not have the context of the
553
  // previous one.
554
22
  if (bootstrap_ads_config.config_validators_size() != ads_config.config_validators_size()) {
555
2
    return absl::InternalError(fmt::format(
556
2
        "Cannot replace config_validators in ADS config (different size) - Previous: {}, New: {}",
557
2
        bootstrap_ads_config.config_validators_size(), ads_config.config_validators_size()));
558
20
  } else if (bootstrap_ads_config.config_validators_size() > 0) {
559
2
    const bool equal_config_validators = std::equal(
560
2
        bootstrap_ads_config.config_validators().begin(),
561
2
        bootstrap_ads_config.config_validators().end(), ads_config.config_validators().begin(),
562
2
        [](const envoy::config::core::v3::TypedExtensionConfig& a,
563
2
           const envoy::config::core::v3::TypedExtensionConfig& b) {
564
2
          return Protobuf::util::MessageDifferencer::Equivalent(a, b);
565
2
        });
566
2
    if (!equal_config_validators) {
567
2
      return absl::InternalError(fmt::format("Cannot replace config_validators in ADS config "
568
2
                                             "(different contents)\nPrevious: {}\nNew: {}",
569
2
                                             bootstrap_ads_config.DebugString(),
570
2
                                             ads_config.DebugString()));
571
2
    }
572
2
  }
573

            
574
18
  ENVOY_LOG_MISC(trace, "Replacing ADS config with:\n{}", ads_config.DebugString());
575
18
  auto strategy_or_error = Config::Utility::prepareJitteredExponentialBackOffStrategy(
576
18
      ads_config, random_, Envoy::Config::SubscriptionFactory::RetryInitialDelayMs,
577
18
      Envoy::Config::SubscriptionFactory::RetryMaxDelayMs);
578
18
  RETURN_IF_NOT_OK_REF(strategy_or_error.status());
579
16
  JitteredExponentialBackOffStrategyPtr backoff_strategy = std::move(strategy_or_error.value());
580

            
581
16
  absl::Status status = Config::Utility::checkTransportVersion(ads_config);
582
16
  RETURN_IF_NOT_OK(status);
583

            
584
16
  Grpc::RawAsyncClientSharedPtr primary_client;
585
16
  Grpc::RawAsyncClientSharedPtr failover_client;
586
16
  RETURN_IF_NOT_OK(createGrpcClients(
587
16
      cm_->grpcAsyncClientManager(), ads_config, *stats_.rootScope(), /*skip_cluster_check*/ false,
588
16
      /*xdstp_config_source*/ false, primary_client, failover_client));
589

            
590
  // Primary client must not be null, as the primary xDS source must be a valid one.
591
  // The failover_client may be null (no failover defined).
592
12
  ASSERT(primary_client != nullptr);
593

            
594
  // This will cause a disconnect from the current sources, and replacement of the clients.
595
12
  status = ads_mux_->updateMuxSource(std::move(primary_client), std::move(failover_client),
596
12
                                     *stats_.rootScope(), std::move(backoff_strategy), ads_config);
597
12
  return status;
598
16
}
599

            
600
} // namespace Config
601
} // namespace Envoy