1
#include "source/common/router/scoped_rds.h"
2

            
3
#include <memory>
4

            
5
#include "envoy/admin/v3/config_dump.pb.h"
6
#include "envoy/common/exception.h"
7
#include "envoy/config/core/v3/config_source.pb.h"
8
#include "envoy/config/route/v3/scoped_route.pb.h"
9
#include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h"
10
#include "envoy/service/discovery/v3/discovery.pb.h"
11

            
12
#include "source/common/common/assert.h"
13
#include "source/common/common/cleanup.h"
14
#include "source/common/common/logger.h"
15
#include "source/common/common/utility.h"
16
#include "source/common/config/api_version.h"
17
#include "source/common/config/resource_name.h"
18
#include "source/common/config/xds_resource.h"
19
#include "source/common/init/manager_impl.h"
20
#include "source/common/init/watcher_impl.h"
21
#include "source/common/protobuf/utility.h"
22
#include "source/common/router/rds_impl.h"
23
#include "source/common/router/scoped_config_impl.h"
24

            
25
#include "absl/strings/str_join.h"
26

            
27
// Types are deeply nested under Envoy::Config::ConfigProvider; use 'using-directives' across all
28
// ConfigProvider related types for consistency.
29
using Envoy::Config::ConfigProvider;
30
using Envoy::Config::ConfigProviderInstanceType;
31
using Envoy::Config::ConfigProviderManager;
32
using Envoy::Config::ConfigProviderPtr;
33
using Envoy::Config::ScopedResume;
34

            
35
namespace Envoy {
36
namespace Router {
37
namespace ScopedRoutesConfigProviderUtil {
38
ConfigProviderPtr create(
39
    const envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager&
40
        config,
41
    Server::Configuration::ServerFactoryContext& factory_context, Init::Manager& init_manager,
42
96
    const std::string& stat_prefix, ConfigProviderManager& scoped_routes_config_provider_manager) {
43
96
  ASSERT(config.route_specifier_case() ==
44
96
         envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager::
45
96
             RouteSpecifierCase::kScopedRoutes);
46
96
  switch (config.scoped_routes().config_specifier_case()) {
47
8
  case envoy::extensions::filters::network::http_connection_manager::v3::ScopedRoutes::
48
8
      ConfigSpecifierCase::kScopedRouteConfigurationsList: {
49
8
    const envoy::extensions::filters::network::http_connection_manager::v3::
50
8
        ScopedRouteConfigurationsList& scoped_route_list =
51
8
            config.scoped_routes().scoped_route_configurations_list();
52
8
    return scoped_routes_config_provider_manager.createStaticConfigProvider(
53
8
        RepeatedPtrUtil::convertToConstMessagePtrContainer<
54
8
            envoy::config::route::v3::ScopedRouteConfiguration,
55
8
            ProtobufTypes::ConstMessagePtrVector>(scoped_route_list.scoped_route_configurations()),
56
8
        factory_context,
57
8
        ScopedRoutesConfigProviderManagerOptArg(config.scoped_routes().name(),
58
8
                                                config.scoped_routes().rds_config_source()));
59
  }
60
88
  case envoy::extensions::filters::network::http_connection_manager::v3::ScopedRoutes::
61
88
      ConfigSpecifierCase::kScopedRds:
62
88
    return scoped_routes_config_provider_manager.createXdsConfigProvider(
63
88
        config.scoped_routes().scoped_rds(), factory_context, init_manager, stat_prefix,
64
88
        ScopedRoutesConfigProviderManagerOptArg(config.scoped_routes().name(),
65
88
                                                config.scoped_routes().rds_config_source()));
66
  case envoy::extensions::filters::network::http_connection_manager::v3::ScopedRoutes::
67
      ConfigSpecifierCase::CONFIG_SPECIFIER_NOT_SET:
68
    PANIC("not implemented");
69
96
  }
70
  PANIC_DUE_TO_CORRUPT_ENUM;
71
}
72

            
73
ScopeKeyBuilderPtr createScopeKeyBuilder(
74
    const envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager&
75
92
        config) {
76
92
  ASSERT(config.route_specifier_case() ==
77
92
         envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager::
78
92
             RouteSpecifierCase::kScopedRoutes);
79
92
  auto scope_key_builder = config.scoped_routes().scope_key_builder();
80
92
  return std::make_unique<ScopeKeyBuilderImpl>(std::move(scope_key_builder));
81
92
}
82

            
83
} // namespace ScopedRoutesConfigProviderUtil
84

            
85
namespace {
86

            
87
std::vector<ScopedRouteInfoConstSharedPtr>
88
makeScopedRouteInfos(ProtobufTypes::ConstMessagePtrVector&& config_protos,
89
                     Server::Configuration::ServerFactoryContext& factory_context,
90
8
                     ScopedRoutesConfigProviderManager& config_provider_manager) {
91
8
  std::vector<ScopedRouteInfoConstSharedPtr> scopes;
92
11
  for (std::unique_ptr<const Protobuf::Message>& config_proto : config_protos) {
93
11
    auto scoped_route_config =
94
11
        MessageUtil::downcastAndValidate<const envoy::config::route::v3::ScopedRouteConfiguration&>(
95
11
            *config_proto, factory_context.messageValidationContext().staticValidationVisitor());
96
11
    if (!scoped_route_config.route_configuration_name().empty()) {
97
1
      throwEnvoyExceptionOrPanic(
98
1
          "Fetching routes via RDS (route_configuration_name) is not supported "
99
1
          "with inline scoped routes.");
100
1
    }
101
10
    if (!scoped_route_config.has_route_configuration()) {
102
1
      throwEnvoyExceptionOrPanic(
103
1
          "You must specify a route_configuration with inline scoped routes.");
104
1
    }
105
9
    RouteConfigProviderPtr route_config_provider =
106
9
        config_provider_manager.routeConfigProviderManager().createStaticRouteConfigProvider(
107
9
            scoped_route_config.route_configuration(), factory_context,
108
9
            factory_context.messageValidationContext().staticValidationVisitor());
109
9
    scopes.push_back(std::make_shared<const ScopedRouteInfo>(std::move(scoped_route_config),
110
9
                                                             route_config_provider->configCast()));
111
9
  }
112

            
113
6
  return scopes;
114
8
}
115

            
116
} // namespace
117

            
118
InlineScopedRoutesConfigProvider::InlineScopedRoutesConfigProvider(
119
    ProtobufTypes::ConstMessagePtrVector&& config_protos, std::string name,
120
    Server::Configuration::ServerFactoryContext& factory_context,
121
    ScopedRoutesConfigProviderManager& config_provider_manager,
122
    envoy::config::core::v3::ConfigSource rds_config_source)
123
8
    : Envoy::Config::ImmutableConfigProviderBase(factory_context, config_provider_manager,
124
8
                                                 ConfigProviderInstanceType::Inline,
125
8
                                                 ConfigProvider::ApiType::Delta),
126
8
      name_(std::move(name)),
127
      scopes_(
128
8
          makeScopedRouteInfos(std::move(config_protos), factory_context, config_provider_manager)),
129
8
      config_(std::make_shared<ScopedConfigImpl>(scopes_)),
130
8
      rds_config_source_(std::move(rds_config_source)) {}
131

            
132
ScopedRdsConfigSubscription::ScopedRdsConfigSubscription(
133
    const envoy::extensions::filters::network::http_connection_manager::v3::ScopedRds& scoped_rds,
134
    const uint64_t manager_identifier, const std::string& name,
135
    Server::Configuration::ServerFactoryContext& factory_context, const std::string& stat_prefix,
136
    envoy::config::core::v3::ConfigSource rds_config_source,
137
    RouteConfigProviderManager& route_config_provider_manager,
138
    ScopedRoutesConfigProviderManager& config_provider_manager)
139
105
    : DeltaConfigSubscriptionInstance("SRDS", manager_identifier, config_provider_manager,
140
105
                                      factory_context),
141
105
      Envoy::Config::SubscriptionBase<envoy::config::route::v3::ScopedRouteConfiguration>(
142
105
          factory_context.messageValidationContext().dynamicValidationVisitor(), "name"),
143
105
      factory_context_(factory_context), name_(name),
144
105
      scope_(factory_context.scope().createScope(stat_prefix + "scoped_rds." + name + ".")),
145
105
      stats_({ALL_SCOPED_RDS_STATS(POOL_COUNTER(*scope_), POOL_GAUGE(*scope_))}),
146
105
      rds_config_source_(std::move(rds_config_source)), stat_prefix_(stat_prefix),
147
105
      route_config_provider_manager_(route_config_provider_manager) {
148
105
  const auto resource_name = getResourceName();
149
105
  if (scoped_rds.srds_resources_locator().empty()) {
150
103
    subscription_ = THROW_OR_RETURN_VALUE(
151
103
        factory_context.clusterManager().subscriptionFactory().subscriptionFromConfigSource(
152
103
            scoped_rds.scoped_rds_config_source(), Grpc::Common::typeUrl(resource_name), *scope_,
153
103
            *this, resource_decoder_, {}),
154
103
        Envoy::Config::SubscriptionPtr);
155
103
  } else {
156
2
    const auto srds_resources_locator = THROW_OR_RETURN_VALUE(
157
2
        Envoy::Config::XdsResourceIdentifier::decodeUrl(scoped_rds.srds_resources_locator()),
158
2
        xds::core::v3::ResourceLocator);
159
2
    subscription_ = THROW_OR_RETURN_VALUE(
160
2
        factory_context.clusterManager().subscriptionFactory().collectionSubscriptionFromUrl(
161
2
            srds_resources_locator, scoped_rds.scoped_rds_config_source(), resource_name, *scope_,
162
2
            *this, resource_decoder_),
163
2
        Envoy::Config::SubscriptionPtr);
164
2
  }
165

            
166
  // TODO(tony612): consider not using the callback here.
167
185
  initialize([]() -> Envoy::Config::ConfigProvider::ConfigConstSharedPtr {
168
185
    return std::make_shared<ScopedConfigImpl>();
169
185
  });
170
105
}
171

            
172
// Constructor for RdsRouteConfigProviderHelper when scope is eager loading.
173
// Initialize RdsRouteConfigProvider by default.
174
ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::RdsRouteConfigProviderHelper(
175
    ScopedRdsConfigSubscription& parent, std::string scope_name,
176
    envoy::extensions::filters::network::http_connection_manager::v3::Rds& rds,
177
    Init::Manager& init_manager)
178
94
    : parent_(parent), scope_name_(scope_name), on_demand_(false) {
179
94
  initRdsConfigProvider(rds, init_manager);
180
94
}
181

            
182
// Constructor for RdsRouteConfigProviderHelper when scope is on demand.
183
// Leave the RdsRouteConfigProvider uninitialized.
184
ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::RdsRouteConfigProviderHelper(
185
    ScopedRdsConfigSubscription& parent, std::string scope_name)
186
51
    : parent_(parent), scope_name_(scope_name), on_demand_(true) {
187
51
  parent_.stats_.on_demand_scopes_.inc();
188
51
}
189

            
190
// When on demand callback is received from main thread, there are 4 cases.
191
// 1. Scope is not found, post a scope not found callback back to worker thread.
192
// 2. Scope is found but route provider has not been initialized, create route provider.
193
// 3. After route provider has been initialized, if RouteConfiguration has been fetched,
194
// post scope found callback to worker thread.
195
// 4. After route provider has been initialized, if RouteConfiguration is null,
196
// cache the callback and wait for RouteConfiguration to come.
197
void ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::addOnDemandUpdateCallback(
198
57
    std::function<void()> callback) {
199
  // If RouteConfiguration has been initialized, run the callback to continue in filter chain,
200
  // otherwise cache it and wait for the route table to be initialized. If RouteConfiguration hasn't
201
  // been initialized, routeConfig() return a shared_ptr to NullConfigImpl. The name of
202
  // NullConfigImpl is an empty string.
203
57
  if (route_provider_ != nullptr && !routeConfig()->name().empty()) {
204
5
    callback();
205
5
    return;
206
5
  }
207
52
  on_demand_update_callbacks_.push_back(callback);
208
  // Initialize the rds provider if it has not been initialized. There is potential race here
209
  // because other worker threads may also post callback to on demand update the RouteConfiguration
210
  // associated with this scope. If rds provider has been initialized, just wait for
211
  // RouteConfiguration to be updated.
212
52
  maybeInitRdsConfigProvider();
213
52
}
214

            
215
132
void ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::runOnDemandUpdateCallback() {
216
132
  for (auto& callback : on_demand_update_callbacks_) {
217
52
    callback();
218
52
  }
219
132
  on_demand_update_callbacks_.clear();
220
132
}
221

            
222
void ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::initRdsConfigProvider(
223
    envoy::extensions::filters::network::http_connection_manager::v3::Rds& rds,
224
142
    Init::Manager& init_manager) {
225
142
  route_provider_ = std::dynamic_pointer_cast<RdsRouteConfigProviderImpl>(
226
142
      parent_.route_config_provider_manager_.createRdsRouteConfigProvider(
227
142
          rds, parent_.factory_context_, parent_.stat_prefix_, init_manager));
228

            
229
144
  rds_update_callback_handle_ = route_provider_->subscription().addUpdateCallback([this]() {
230
    // Subscribe to RDS update.
231
123
    parent_.onRdsConfigUpdate(scope_name_, route_provider_->configCast());
232
123
    return absl::OkStatus();
233
123
  });
234
142
  parent_.stats_.active_scopes_.inc();
235
142
}
236

            
237
52
void ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::maybeInitRdsConfigProvider() {
238
  // If the route provider have been initialized, return and wait for rds config update.
239
52
  if (route_provider_ != nullptr) {
240
4
    return;
241
4
  }
242

            
243
  // Create a init_manager to create a rds provider.
244
  // No transitive warming dependency here because only on demand update reach this point.
245
48
  Init::ManagerImpl srds_init_mgr("SRDS on demand init manager.");
246
48
  Cleanup srds_initialization_continuation([this, &srds_init_mgr] {
247
48
    Init::WatcherImpl noop_watcher(
248
48
        fmt::format("SRDS on demand ConfigUpdate watcher: {}", scope_name_),
249
48
        []() { /*Do nothing.*/ });
250
48
    srds_init_mgr.initialize(noop_watcher);
251
48
  });
252
  // Create route provider.
253
48
  envoy::extensions::filters::network::http_connection_manager::v3::Rds rds;
254
48
  rds.mutable_config_source()->MergeFrom(parent_.rds_config_source_);
255
48
  rds.set_route_config_name(
256
48
      parent_.scoped_route_map_[scope_name_]->configProto().route_configuration_name());
257
48
  initRdsConfigProvider(rds, srds_init_mgr);
258
48
  ENVOY_LOG(debug, fmt::format("Scope on demand update: {}", scope_name_));
259
  // If RouteConfiguration hasn't been initialized, routeConfig() return a shared_ptr to
260
  // NullConfigImpl. The name of NullConfigImpl is an empty string.
261
48
  if (routeConfig()->name().empty()) {
262
39
    return;
263
39
  }
264
  // If RouteConfiguration has been initialized, apply update to all the threads.
265
9
  parent_.onRdsConfigUpdate(scope_name_, route_provider_->configCast());
266
9
}
267

            
268
absl::StatusOr<bool> ScopedRdsConfigSubscription::addOrUpdateScopes(
269
    const std::vector<Envoy::Config::DecodedResourceRef>& resources, Init::Manager& init_manager,
270
140
    const std::string& version_info) {
271
140
  bool any_applied = false;
272
140
  envoy::extensions::filters::network::http_connection_manager::v3::Rds rds;
273
140
  rds.mutable_config_source()->MergeFrom(rds_config_source_);
274
140
  std::vector<ScopedRouteInfoConstSharedPtr> updated_scopes;
275
140
  std::list<RdsRouteConfigProviderHelperPtr> to_be_removed_rds_providers;
276
180
  for (const auto& resource : resources) {
277
    // Explicit copy so that we can std::move later.
278
180
    envoy::config::route::v3::ScopedRouteConfiguration scoped_route_config =
279
180
        dynamic_cast<const envoy::config::route::v3::ScopedRouteConfiguration&>(
280
180
            resource.get().resource());
281
180
    const std::string scope_name = scoped_route_config.name();
282
180
    if (const auto& scope_info_iter = scoped_route_map_.find(scope_name);
283
180
        scope_info_iter != scoped_route_map_.end()) {
284
37
      if (scope_info_iter->second->configHash() == MessageUtil::hash(scoped_route_config)) {
285
25
        continue;
286
25
      }
287
      // Remove the old key from scope_names_by_hash_ in case the scope key has changed. (If it
288
      // hasn't, we'll just add it back anyway.)
289
12
      if (scope_name_by_hash_.find(scope_info_iter->second->scopeKey().hash()) !=
290
12
          scope_name_by_hash_.end()) {
291
12
        scope_name_by_hash_.erase(scope_info_iter->second->scopeKey().hash());
292
12
      }
293
12
    }
294
155
    std::unique_ptr<RdsRouteConfigProviderHelper> rds_config_provider_helper;
295
155
    std::shared_ptr<ScopedRouteInfo> scoped_route_info = nullptr;
296
155
    if (scoped_route_config.has_route_configuration()) {
297
9
      RouteConfigProviderPtr route_config_provider =
298
9
          route_config_provider_manager_.createStaticRouteConfigProvider(
299
9
              scoped_route_config.route_configuration(), factory_context_,
300
9
              factory_context_.messageValidationContext().staticValidationVisitor());
301
9
      scoped_route_info = std::make_shared<ScopedRouteInfo>(std::move(scoped_route_config),
302
9
                                                            route_config_provider->configCast());
303
      // If this is an update from a scoped route configuration specifying route_configuration_name
304
      // to one specifying route_configuration, then the RDS subscription is no longer needed. We
305
      // can remove the RDS config provider, but hold on to them until exiting the loop in case the
306
      // subscription is reused by another scope still to be added.
307
9
      auto rds_config_provider_helper_iter = route_provider_by_scope_.find(scope_name);
308
9
      if (rds_config_provider_helper_iter != route_provider_by_scope_.end()) {
309
4
        to_be_removed_rds_providers.emplace_back(
310
4
            std::move(rds_config_provider_helper_iter->second));
311
4
        route_provider_by_scope_.erase(rds_config_provider_helper_iter);
312
4
      }
313
146
    } else {
314
146
      if (scoped_route_config.route_configuration_name().empty()) {
315
1
        return absl::InvalidArgumentError("route_configuration_name is empty.");
316
1
      }
317
145
      rds.set_route_config_name(scoped_route_config.route_configuration_name());
318
145
      std::unique_ptr<RdsRouteConfigProviderHelper> rds_config_provider_helper;
319
145
      if (scoped_route_config.on_demand() == false) {
320
        // For default scopes, create a rds helper with rds provider initialized.
321
94
        rds_config_provider_helper =
322
94
            std::make_unique<RdsRouteConfigProviderHelper>(*this, scope_name, rds, init_manager);
323
94
        scoped_route_info = std::make_shared<ScopedRouteInfo>(
324
94
            std::move(scoped_route_config), rds_config_provider_helper->routeConfig());
325
122
      } else {
326
        // For on demand scopes, create a rds helper with rds provider uninitialized.
327
51
        rds_config_provider_helper =
328
51
            std::make_unique<RdsRouteConfigProviderHelper>(*this, scope_name);
329
        // scope_route_info->routeConfig() will be nullptr, because RouteConfiguration is not
330
        // loaded.
331
51
        scoped_route_info =
332
51
            std::make_shared<ScopedRouteInfo>(std::move(scoped_route_config), nullptr);
333
51
      }
334
145
      route_provider_by_scope_[scope_name] = std::move(rds_config_provider_helper);
335
145
    }
336
154
    scope_name_by_hash_[scoped_route_info->scopeKey().hash()] = scoped_route_info->scopeName();
337
154
    scoped_route_map_[scoped_route_info->scopeName()] = scoped_route_info;
338
154
    updated_scopes.push_back(scoped_route_info);
339
154
    any_applied = true;
340
154
    ENVOY_LOG(debug, "srds: queueing add/update of scoped_route '{}', version: {}",
341
154
              scoped_route_info->scopeName(), version_info);
342
154
  }
343

            
344
  // scoped_route_info of both eager loading and on demand scopes will be propagated to work
345
  // threads. Upon a scoped RouteConfiguration miss, if the scope exists, an on demand update
346
  // callback will be posted to main thread.
347
139
  if (!updated_scopes.empty()) {
348
124
    applyConfigUpdate([updated_scopes](ConfigProvider::ConfigConstSharedPtr config)
349
220
                          -> ConfigProvider::ConfigConstSharedPtr {
350
220
      auto* thread_local_scoped_config =
351
220
          const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get()));
352
220
      thread_local_scoped_config->addOrUpdateRoutingScopes(updated_scopes);
353
220
      return config;
354
220
    });
355
124
  }
356
139
  return any_applied;
357
140
}
358

            
359
std::list<ScopedRdsConfigSubscription::RdsRouteConfigProviderHelperPtr>
360
ScopedRdsConfigSubscription::removeScopes(
361
140
    const Protobuf::RepeatedPtrField<std::string>& scope_names, const std::string& version_info) {
362
140
  std::list<ScopedRdsConfigSubscription::RdsRouteConfigProviderHelperPtr>
363
140
      to_be_removed_rds_providers;
364
140
  std::vector<std::string> removed_scope_names;
365
140
  for (const auto& scope_name : scope_names) {
366
18
    auto iter = scoped_route_map_.find(scope_name);
367
18
    if (iter != scoped_route_map_.end()) {
368
16
      auto rds_config_provider_helper_iter = route_provider_by_scope_.find(scope_name);
369
16
      if (rds_config_provider_helper_iter != route_provider_by_scope_.end()) {
370
16
        to_be_removed_rds_providers.emplace_back(
371
16
            std::move(rds_config_provider_helper_iter->second));
372
16
        route_provider_by_scope_.erase(rds_config_provider_helper_iter);
373
16
      }
374
16
      ASSERT(scope_name_by_hash_.find(iter->second->scopeKey().hash()) !=
375
16
             scope_name_by_hash_.end());
376
16
      scope_name_by_hash_.erase(iter->second->scopeKey().hash());
377
16
      scoped_route_map_.erase(iter);
378
16
      removed_scope_names.push_back(scope_name);
379
16
      ENVOY_LOG(debug, "srds: queueing removal of scoped route '{}', version: {}", scope_name,
380
16
                version_info);
381
16
    }
382
18
  }
383
140
  if (!removed_scope_names.empty()) {
384
16
    applyConfigUpdate([removed_scope_names](ConfigProvider::ConfigConstSharedPtr config)
385
22
                          -> ConfigProvider::ConfigConstSharedPtr {
386
22
      auto* thread_local_scoped_config =
387
22
          const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get()));
388
22
      thread_local_scoped_config->removeRoutingScopes(removed_scope_names);
389
22
      return config;
390
22
    });
391
16
  }
392
140
  return to_be_removed_rds_providers;
393
140
}
394

            
395
absl::Status ScopedRdsConfigSubscription::onConfigUpdate(
396
    const std::vector<Envoy::Config::DecodedResourceRef>& added_resources,
397
    const Protobuf::RepeatedPtrField<std::string>& removed_resources,
398
148
    const std::string& version_info) {
399
  // Destruction of resume_rds will lift the floodgate for new RDS subscriptions.
400
  // Note in the case of partial acceptance, accepted RDS subscriptions should be started
401
  // despite of any error.
402
148
  ScopedResume resume_rds;
403
  // If new route config sources come after the local init manager's initialize() been
404
  // called, the init manager can't accept new targets. Instead we use a local override which will
405
  // start new subscriptions but not wait on them to be ready.
406
148
  std::unique_ptr<Init::ManagerImpl> srds_init_mgr;
407
  // NOTE: This should be defined after srds_init_mgr and resume_rds, as it depends on the
408
  // srds_init_mgr, and we want a single RDS discovery request to be sent to management
409
  // server.
410
148
  std::unique_ptr<Cleanup> srds_initialization_continuation;
411
148
  ASSERT(localInitManager().state() > Init::Manager::State::Uninitialized);
412
148
  const auto type_url = Envoy::Config::getTypeUrl<envoy::config::route::v3::RouteConfiguration>();
413
  // Pause RDS to not send a burst of RDS requests until we start all the new subscriptions.
414
  // In the case that localInitManager is uninitialized, RDS is already paused
415
  // either by Server init or LDS init.
416
148
  resume_rds = factory_context_.xdsManager().pause(type_url);
417
  // if local init manager is initialized, the parent init manager may have gone away.
418
148
  if (localInitManager().state() == Init::Manager::State::Initialized) {
419
46
    srds_init_mgr =
420
46
        std::make_unique<Init::ManagerImpl>(fmt::format("SRDS {}:{}", name_, version_info));
421
46
    srds_initialization_continuation =
422
46
        std::make_unique<Cleanup>([this, &srds_init_mgr, version_info] {
423
          // For new RDS subscriptions created after listener warming up, we don't wait for them to
424
          // warm up.
425
46
          Init::WatcherImpl noop_watcher(
426
              // Note: we just throw it away.
427
46
              fmt::format("SRDS ConfigUpdate watcher {}:{}", name_, version_info),
428
46
              []() { /*Do nothing.*/ });
429
46
          srds_init_mgr->initialize(noop_watcher);
430
46
        });
431
46
  }
432

            
433
148
  std::string exception_msg;
434
148
  Protobuf::RepeatedPtrField<std::string> clean_removed_resources =
435
148
      detectUpdateConflictAndCleanupRemoved(added_resources, removed_resources, exception_msg);
436
148
  if (!exception_msg.empty()) {
437
8
    return absl::InvalidArgumentError(
438
8
        fmt::format("Error adding/updating scoped route(s): {}", exception_msg));
439
8
  }
440

            
441
  // Do not delete RDS config providers just yet, in case the to be deleted RDS subscriptions could
442
  // be reused by some to be added scopes.
443
140
  std::list<ScopedRdsConfigSubscription::RdsRouteConfigProviderHelperPtr>
444
140
      to_be_removed_rds_providers = removeScopes(clean_removed_resources, version_info);
445

            
446
140
  auto status_or_applied = addOrUpdateScopes(
447
140
      added_resources, (srds_init_mgr == nullptr ? localInitManager() : *srds_init_mgr),
448
140
      version_info);
449
140
  if (!status_or_applied.status().ok()) {
450
1
    return status_or_applied.status();
451
1
  }
452
139
  const bool any_applied = status_or_applied.value();
453
139
  const auto status = ConfigSubscriptionCommonBase::onConfigUpdate();
454
139
  if (!status.ok()) {
455
    return status;
456
  }
457
139
  if (any_applied || !to_be_removed_rds_providers.empty()) {
458
136
    setLastConfigInfo(absl::optional<LastConfigInfo>({absl::nullopt, version_info}));
459
136
  }
460
139
  stats_.all_scopes_.set(scoped_route_map_.size());
461
139
  stats_.config_reload_.inc();
462
139
  stats_.config_reload_time_ms_.set(DateUtil::nowToMilliseconds(factory_context_.timeSource()));
463
139
  return absl::OkStatus();
464
139
}
465

            
466
void ScopedRdsConfigSubscription::onRdsConfigUpdate(const std::string& scope_name,
467
132
                                                    ConfigConstSharedPtr new_rds_config) {
468
132
  auto iter = scoped_route_map_.find(scope_name);
469
132
  ASSERT(iter != scoped_route_map_.end(),
470
132
         fmt::format("trying to update route config for non-existing scope {}", scope_name));
471
132
  auto new_scoped_route_info = std::make_shared<ScopedRouteInfo>(
472
132
      envoy::config::route::v3::ScopedRouteConfiguration(iter->second->configProto()),
473
132
      std::move(new_rds_config));
474
132
  scoped_route_map_[new_scoped_route_info->scopeName()] = new_scoped_route_info;
475
132
  applyConfigUpdate([new_scoped_route_info](ConfigProvider::ConfigConstSharedPtr config)
476
242
                        -> ConfigProvider::ConfigConstSharedPtr {
477
242
    auto* thread_local_scoped_config =
478
242
        const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get()));
479
242
    thread_local_scoped_config->addOrUpdateRoutingScopes({new_scoped_route_info});
480
242
    return config;
481
242
  });
482
  // The data plane may wait for the route configuration to come back.
483
132
  route_provider_by_scope_[scope_name]->runOnDemandUpdateCallback();
484
132
}
485

            
486
// TODO(stevenzzzz): see issue #7508, consider generalizing this function as it overlaps with
487
// CdsApiImpl::onConfigUpdate.
488
absl::Status ScopedRdsConfigSubscription::onConfigUpdate(
489
    const std::vector<Envoy::Config::DecodedResourceRef>& resources,
490
78
    const std::string& version_info) {
491
78
  Protobuf::RepeatedPtrField<std::string> to_remove_repeated;
492
78
  for (const auto& scoped_route : scoped_route_map_) {
493
37
    *to_remove_repeated.Add() = scoped_route.first;
494
37
  }
495
78
  return onConfigUpdate(resources, to_remove_repeated, version_info);
496
78
}
497

            
498
Protobuf::RepeatedPtrField<std::string>
499
ScopedRdsConfigSubscription::detectUpdateConflictAndCleanupRemoved(
500
    const std::vector<Envoy::Config::DecodedResourceRef>& resources,
501
148
    const Protobuf::RepeatedPtrField<std::string>& removed_resources, std::string& exception_msg) {
502
148
  Protobuf::RepeatedPtrField<std::string> clean_removed_resources;
503
  // All the scope names to be removed or updated.
504
148
  absl::flat_hash_set<std::string> updated_or_removed_scopes;
505
148
  for (const std::string& removed_resource : removed_resources) {
506
47
    updated_or_removed_scopes.insert(removed_resource);
507
47
  }
508
194
  for (const auto& resource : resources) {
509
194
    const auto& scoped_route =
510
194
        dynamic_cast<const envoy::config::route::v3::ScopedRouteConfiguration&>(
511
194
            resource.get().resource());
512
194
    updated_or_removed_scopes.insert(scoped_route.name());
513
194
  }
514

            
515
148
  absl::flat_hash_map<uint64_t, std::string> scope_name_by_hash = scope_name_by_hash_;
516
148
  absl::erase_if(scope_name_by_hash, [&updated_or_removed_scopes](const auto& key_name) {
517
76
    auto const& [key, name] = key_name;
518
76
    UNREFERENCED_PARAMETER(key);
519
76
    return updated_or_removed_scopes.contains(name);
520
76
  });
521
148
  absl::flat_hash_map<std::string, envoy::config::route::v3::ScopedRouteConfiguration>
522
148
      scoped_routes;
523
194
  for (const auto& resource : resources) {
524
    // Throws (thus rejects all) on any error.
525
194
    const auto& scoped_route =
526
194
        dynamic_cast<const envoy::config::route::v3::ScopedRouteConfiguration&>(
527
194
            resource.get().resource());
528
194
    const std::string& scope_name = scoped_route.name();
529
194
    auto scope_config_inserted = scoped_routes.try_emplace(scope_name, std::move(scoped_route));
530
194
    if (!scope_config_inserted.second) {
531
2
      exception_msg = fmt::format("duplicate scoped route configuration '{}' found", scope_name);
532
2
      return clean_removed_resources;
533
2
    }
534
192
    envoy::config::route::v3::ScopedRouteConfiguration scoped_route_config =
535
192
        scope_config_inserted.first->second;
536
192
    const uint64_t key_fingerprint =
537
192
        ScopedRouteInfo(std::move(scoped_route_config), nullptr).scopeKey().hash();
538
192
    if (!scope_name_by_hash.try_emplace(key_fingerprint, scope_name).second) {
539
6
      exception_msg =
540
6
          fmt::format("scope key conflict found, first scope is '{}', second scope is '{}'",
541
6
                      scope_name_by_hash[key_fingerprint], scope_name);
542
6
      return clean_removed_resources;
543
6
    }
544
192
  }
545

            
546
  // only remove resources that is not going to be updated.
547
140
  for (const std::string& removed_resource : removed_resources) {
548
45
    if (!scoped_routes.contains(removed_resource)) {
549
18
      *clean_removed_resources.Add() = removed_resource;
550
18
    }
551
45
  }
552
140
  return clean_removed_resources;
553
148
}
554

            
555
void ScopedRdsConfigSubscription::onDemandRdsUpdate(
556
    std::shared_ptr<Router::ScopeKey> scope_key, Event::Dispatcher& thread_local_dispatcher,
557
    Http::RouteConfigUpdatedCallback&& route_config_updated_cb,
558
67
    std::weak_ptr<Envoy::Config::ConfigSubscriptionCommonBase> weak_subscription) {
559
67
  factory_context_.mainThreadDispatcher().post([this, &thread_local_dispatcher, scope_key,
560
67
                                                route_config_updated_cb, weak_subscription]() {
561
    // If the subscription has been destroyed, return immediately.
562
67
    if (!weak_subscription.lock()) {
563
1
      thread_local_dispatcher.post([route_config_updated_cb] { route_config_updated_cb(false); });
564
1
      return;
565
1
    }
566

            
567
66
    auto iter = scope_name_by_hash_.find(scope_key->hash());
568
    // Return to filter chain if we can't find the scope.
569
    // The scope may have been destroyed when callback reach the main thread.
570
66
    if (iter == scope_name_by_hash_.end()) {
571
9
      thread_local_dispatcher.post([route_config_updated_cb] { route_config_updated_cb(false); });
572
9
      return;
573
9
    }
574
    // Wrap the thread local dispatcher inside the callback.
575
57
    std::function<void()> thread_local_updated_callback = [route_config_updated_cb,
576
57
                                                           &thread_local_dispatcher]() {
577
57
      thread_local_dispatcher.post([route_config_updated_cb] { route_config_updated_cb(true); });
578
57
    };
579
57
    std::string scope_name = iter->second;
580
    // On demand initialization inside main thread.
581
57
    route_provider_by_scope_[scope_name]->addOnDemandUpdateCallback(thread_local_updated_callback);
582
57
  });
583
67
}
584

            
585
ScopedRdsConfigProvider::ScopedRdsConfigProvider(
586
    ScopedRdsConfigSubscriptionSharedPtr&& subscription)
587
114
    : MutableConfigProviderCommonBase(std::move(subscription), ConfigProvider::ApiType::Delta) {}
588

            
589
ProtobufTypes::MessagePtr
590
28
ScopedRoutesConfigProviderManager::dumpConfigs(const Matchers::StringMatcher& name_matcher) const {
591
28
  auto config_dump = std::make_unique<envoy::admin::v3::ScopedRoutesConfigDump>();
592
28
  for (const auto& element : configSubscriptions()) {
593
6
    auto subscription = element.second.lock();
594
6
    ASSERT(subscription);
595

            
596
6
    if (subscription->configInfo()) {
597
5
      auto* dynamic_config = config_dump->mutable_dynamic_scoped_route_configs()->Add();
598
5
      dynamic_config->set_version_info(subscription->configInfo().value().last_config_version_);
599
5
      const ScopedRdsConfigSubscription* typed_subscription =
600
5
          static_cast<ScopedRdsConfigSubscription*>(subscription.get());
601
5
      dynamic_config->set_name(typed_subscription->name());
602
5
      const ScopedRouteMap& scoped_route_map = typed_subscription->scopedRouteMap();
603
5
      for (const auto& it : scoped_route_map) {
604
3
        if (!name_matcher.match(it.second->configProto().name())) {
605
1
          continue;
606
1
        }
607
2
        dynamic_config->mutable_scoped_route_configs()->Add()->PackFrom(it.second->configProto());
608
2
      }
609
5
      TimestampUtil::systemClockToTimestamp(subscription->lastUpdated(),
610
5
                                            *dynamic_config->mutable_last_updated());
611
5
    }
612
6
  }
613

            
614
28
  for (const auto& provider : immutableConfigProviders(ConfigProviderInstanceType::Inline)) {
615
2
    const auto protos_info =
616
2
        provider->configProtoInfoVector<envoy::config::route::v3::ScopedRouteConfiguration>();
617
2
    ASSERT(protos_info != absl::nullopt);
618
2
    auto* inline_config = config_dump->mutable_inline_scoped_route_configs()->Add();
619
2
    inline_config->set_name(static_cast<InlineScopedRoutesConfigProvider*>(provider)->name());
620
3
    for (const auto& config_proto : protos_info.value().config_protos_) {
621
3
      if (!name_matcher.match(config_proto->name())) {
622
        continue;
623
      }
624
3
      inline_config->mutable_scoped_route_configs()->Add()->PackFrom(*config_proto);
625
3
    }
626
2
    TimestampUtil::systemClockToTimestamp(provider->lastUpdated(),
627
2
                                          *inline_config->mutable_last_updated());
628
2
  }
629

            
630
28
  return config_dump;
631
28
}
632

            
633
ConfigProviderPtr ScopedRoutesConfigProviderManager::createXdsConfigProvider(
634
    const Protobuf::Message& config_source_proto,
635
    Server::Configuration::ServerFactoryContext& factory_context, Init::Manager& init_manager,
636
114
    const std::string& stat_prefix, const ConfigProviderManager::OptionalArg& optarg) {
637
114
  const auto& typed_optarg = static_cast<const ScopedRoutesConfigProviderManagerOptArg&>(optarg);
638
114
  ScopedRdsConfigSubscriptionSharedPtr subscription =
639
114
      ConfigProviderManagerImplBase::getSubscription<ScopedRdsConfigSubscription>(
640
114
          config_source_proto, init_manager,
641
114
          [&config_source_proto, &factory_context, &stat_prefix,
642
114
           &typed_optarg](const uint64_t manager_identifier,
643
114
                          ConfigProviderManagerImplBase& config_provider_manager)
644
114
              -> Envoy::Config::ConfigSubscriptionCommonBaseSharedPtr {
645
105
            const auto& scoped_rds_config_source = dynamic_cast<
646
105
                const envoy::extensions::filters::network::http_connection_manager::v3::ScopedRds&>(
647
105
                config_source_proto);
648
105
            return std::make_shared<ScopedRdsConfigSubscription>(
649
105
                scoped_rds_config_source, manager_identifier, typed_optarg.scoped_routes_name_,
650
105
                factory_context, stat_prefix, typed_optarg.rds_config_source_,
651
105
                static_cast<ScopedRoutesConfigProviderManager&>(config_provider_manager)
652
105
                    .routeConfigProviderManager(),
653
105
                static_cast<ScopedRoutesConfigProviderManager&>(config_provider_manager));
654
105
          });
655

            
656
114
  return std::make_unique<ScopedRdsConfigProvider>(std::move(subscription));
657
114
}
658

            
659
ConfigProviderPtr ScopedRoutesConfigProviderManager::createStaticConfigProvider(
660
    ProtobufTypes::ConstMessagePtrVector&& config_protos,
661
    Server::Configuration::ServerFactoryContext& factory_context,
662
8
    const ConfigProviderManager::OptionalArg& optarg) {
663
8
  const auto& typed_optarg = static_cast<const ScopedRoutesConfigProviderManagerOptArg&>(optarg);
664
8
  return std::make_unique<InlineScopedRoutesConfigProvider>(
665
8
      std::move(config_protos), typed_optarg.scoped_routes_name_, factory_context, *this,
666
8
      typed_optarg.rds_config_source_);
667
8
}
668

            
669
REGISTER_FACTORY(SrdsFactoryDefault, SrdsFactory);
670

            
671
} // namespace Router
672
} // namespace Envoy