Line data Source code
1 : #include "source/common/router/scoped_rds.h"
2 :
3 : #include <memory>
4 :
5 : #include "envoy/admin/v3/config_dump.pb.h"
6 : #include "envoy/config/core/v3/config_source.pb.h"
7 : #include "envoy/config/route/v3/scoped_route.pb.h"
8 : #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h"
9 : #include "envoy/service/discovery/v3/discovery.pb.h"
10 :
11 : #include "source/common/common/assert.h"
12 : #include "source/common/common/cleanup.h"
13 : #include "source/common/common/logger.h"
14 : #include "source/common/common/utility.h"
15 : #include "source/common/config/api_version.h"
16 : #include "source/common/config/resource_name.h"
17 : #include "source/common/config/xds_resource.h"
18 : #include "source/common/init/manager_impl.h"
19 : #include "source/common/init/watcher_impl.h"
20 : #include "source/common/protobuf/utility.h"
21 : #include "source/common/router/rds_impl.h"
22 : #include "source/common/router/scoped_config_impl.h"
23 :
24 : #include "absl/strings/str_join.h"
25 :
26 : // Types are deeply nested under Envoy::Config::ConfigProvider; use 'using-directives' across all
27 : // ConfigProvider related types for consistency.
28 : using Envoy::Config::ConfigProvider;
29 : using Envoy::Config::ConfigProviderInstanceType;
30 : using Envoy::Config::ConfigProviderManager;
31 : using Envoy::Config::ConfigProviderPtr;
32 : using Envoy::Config::ScopedResume;
33 :
34 : namespace Envoy {
35 : namespace Router {
36 : namespace ScopedRoutesConfigProviderUtil {
37 : ConfigProviderPtr create(
38 : const envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager&
39 : config,
40 : Server::Configuration::ServerFactoryContext& factory_context, Init::Manager& init_manager,
41 0 : const std::string& stat_prefix, ConfigProviderManager& scoped_routes_config_provider_manager) {
42 0 : ASSERT(config.route_specifier_case() ==
43 0 : envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager::
44 0 : RouteSpecifierCase::kScopedRoutes);
45 0 : switch (config.scoped_routes().config_specifier_case()) {
46 0 : case envoy::extensions::filters::network::http_connection_manager::v3::ScopedRoutes::
47 0 : ConfigSpecifierCase::kScopedRouteConfigurationsList: {
48 0 : const envoy::extensions::filters::network::http_connection_manager::v3::
49 0 : ScopedRouteConfigurationsList& scoped_route_list =
50 0 : config.scoped_routes().scoped_route_configurations_list();
51 0 : return scoped_routes_config_provider_manager.createStaticConfigProvider(
52 0 : RepeatedPtrUtil::convertToConstMessagePtrContainer<
53 0 : envoy::config::route::v3::ScopedRouteConfiguration,
54 0 : ProtobufTypes::ConstMessagePtrVector>(scoped_route_list.scoped_route_configurations()),
55 0 : factory_context,
56 0 : ScopedRoutesConfigProviderManagerOptArg(config.scoped_routes().name(),
57 0 : config.scoped_routes().rds_config_source()));
58 0 : }
59 0 : case envoy::extensions::filters::network::http_connection_manager::v3::ScopedRoutes::
60 0 : ConfigSpecifierCase::kScopedRds:
61 0 : return scoped_routes_config_provider_manager.createXdsConfigProvider(
62 0 : config.scoped_routes().scoped_rds(), factory_context, init_manager, stat_prefix,
63 0 : ScopedRoutesConfigProviderManagerOptArg(config.scoped_routes().name(),
64 0 : config.scoped_routes().rds_config_source()));
65 0 : case envoy::extensions::filters::network::http_connection_manager::v3::ScopedRoutes::
66 0 : ConfigSpecifierCase::CONFIG_SPECIFIER_NOT_SET:
67 0 : PANIC("not implemented");
68 0 : }
69 0 : PANIC_DUE_TO_CORRUPT_ENUM;
70 0 : }
71 :
72 : ScopeKeyBuilderPtr createScopeKeyBuilder(
73 : const envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager&
74 0 : config) {
75 0 : ASSERT(config.route_specifier_case() ==
76 0 : envoy::extensions::filters::network::http_connection_manager::v3::HttpConnectionManager::
77 0 : RouteSpecifierCase::kScopedRoutes);
78 0 : auto scope_key_builder = config.scoped_routes().scope_key_builder();
79 0 : return std::make_unique<ScopeKeyBuilderImpl>(std::move(scope_key_builder));
80 0 : }
81 :
82 : } // namespace ScopedRoutesConfigProviderUtil
83 :
84 : namespace {
85 :
86 : std::vector<ScopedRouteInfoConstSharedPtr>
87 : makeScopedRouteInfos(ProtobufTypes::ConstMessagePtrVector&& config_protos,
88 : Server::Configuration::ServerFactoryContext& factory_context,
89 0 : ScopedRoutesConfigProviderManager& config_provider_manager) {
90 0 : std::vector<ScopedRouteInfoConstSharedPtr> scopes;
91 0 : for (std::unique_ptr<const Protobuf::Message>& config_proto : config_protos) {
92 0 : auto scoped_route_config =
93 0 : MessageUtil::downcastAndValidate<const envoy::config::route::v3::ScopedRouteConfiguration&>(
94 0 : *config_proto, factory_context.messageValidationContext().staticValidationVisitor());
95 0 : if (!scoped_route_config.route_configuration_name().empty()) {
96 0 : throwEnvoyExceptionOrPanic(
97 0 : "Fetching routes via RDS (route_configuration_name) is not supported "
98 0 : "with inline scoped routes.");
99 0 : }
100 0 : if (!scoped_route_config.has_route_configuration()) {
101 0 : throwEnvoyExceptionOrPanic(
102 0 : "You must specify a route_configuration with inline scoped routes.");
103 0 : }
104 0 : RouteConfigProviderPtr route_config_provider =
105 0 : config_provider_manager.routeConfigProviderManager().createStaticRouteConfigProvider(
106 0 : scoped_route_config.route_configuration(), factory_context,
107 0 : factory_context.messageValidationContext().staticValidationVisitor());
108 0 : scopes.push_back(std::make_shared<const ScopedRouteInfo>(scoped_route_config,
109 0 : route_config_provider->configCast()));
110 0 : }
111 :
112 0 : return scopes;
113 0 : }
114 :
115 : } // namespace
116 :
117 : InlineScopedRoutesConfigProvider::InlineScopedRoutesConfigProvider(
118 : ProtobufTypes::ConstMessagePtrVector&& config_protos, std::string name,
119 : Server::Configuration::ServerFactoryContext& factory_context,
120 : ScopedRoutesConfigProviderManager& config_provider_manager,
121 : envoy::config::core::v3::ConfigSource rds_config_source)
122 : : Envoy::Config::ImmutableConfigProviderBase(factory_context, config_provider_manager,
123 : ConfigProviderInstanceType::Inline,
124 : ConfigProvider::ApiType::Delta),
125 : name_(std::move(name)),
126 : scopes_(
127 : makeScopedRouteInfos(std::move(config_protos), factory_context, config_provider_manager)),
128 : config_(std::make_shared<ScopedConfigImpl>(scopes_)),
129 0 : rds_config_source_(std::move(rds_config_source)) {}
130 :
131 : ScopedRdsConfigSubscription::ScopedRdsConfigSubscription(
132 : const envoy::extensions::filters::network::http_connection_manager::v3::ScopedRds& scoped_rds,
133 : const uint64_t manager_identifier, const std::string& name,
134 : Server::Configuration::ServerFactoryContext& factory_context, const std::string& stat_prefix,
135 : envoy::config::core::v3::ConfigSource rds_config_source,
136 : RouteConfigProviderManager& route_config_provider_manager,
137 : ScopedRoutesConfigProviderManager& config_provider_manager)
138 : : DeltaConfigSubscriptionInstance("SRDS", manager_identifier, config_provider_manager,
139 : factory_context),
140 : Envoy::Config::SubscriptionBase<envoy::config::route::v3::ScopedRouteConfiguration>(
141 : factory_context.messageValidationContext().dynamicValidationVisitor(), "name"),
142 : factory_context_(factory_context), name_(name),
143 : scope_(factory_context.scope().createScope(stat_prefix + "scoped_rds." + name + ".")),
144 : stats_({ALL_SCOPED_RDS_STATS(POOL_COUNTER(*scope_), POOL_GAUGE(*scope_))}),
145 : rds_config_source_(std::move(rds_config_source)), stat_prefix_(stat_prefix),
146 0 : route_config_provider_manager_(route_config_provider_manager) {
147 0 : const auto resource_name = getResourceName();
148 0 : if (scoped_rds.srds_resources_locator().empty()) {
149 0 : subscription_ =
150 0 : factory_context.clusterManager().subscriptionFactory().subscriptionFromConfigSource(
151 0 : scoped_rds.scoped_rds_config_source(), Grpc::Common::typeUrl(resource_name), *scope_,
152 0 : *this, resource_decoder_, {});
153 0 : } else {
154 0 : const auto srds_resources_locator =
155 0 : Envoy::Config::XdsResourceIdentifier::decodeUrl(scoped_rds.srds_resources_locator());
156 0 : subscription_ =
157 0 : factory_context.clusterManager().subscriptionFactory().collectionSubscriptionFromUrl(
158 0 : srds_resources_locator, scoped_rds.scoped_rds_config_source(), resource_name, *scope_,
159 0 : *this, resource_decoder_);
160 0 : }
161 :
162 : // TODO(tony612): consider not using the callback here.
163 0 : initialize([]() -> Envoy::Config::ConfigProvider::ConfigConstSharedPtr {
164 0 : return std::make_shared<ScopedConfigImpl>();
165 0 : });
166 0 : }
167 :
168 : // Constructor for RdsRouteConfigProviderHelper when scope is eager loading.
169 : // Initialize RdsRouteConfigProvider by default.
170 : ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::RdsRouteConfigProviderHelper(
171 : ScopedRdsConfigSubscription& parent, std::string scope_name,
172 : envoy::extensions::filters::network::http_connection_manager::v3::Rds& rds,
173 : Init::Manager& init_manager)
174 0 : : parent_(parent), scope_name_(scope_name), on_demand_(false) {
175 0 : initRdsConfigProvider(rds, init_manager);
176 0 : }
177 :
178 : // Constructor for RdsRouteConfigProviderHelper when scope is on demand.
179 : // Leave the RdsRouteConfigProvider uninitialized.
180 : ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::RdsRouteConfigProviderHelper(
181 : ScopedRdsConfigSubscription& parent, std::string scope_name)
182 0 : : parent_(parent), scope_name_(scope_name), on_demand_(true) {
183 0 : parent_.stats_.on_demand_scopes_.inc();
184 0 : }
185 :
186 : // When on demand callback is received from main thread, there are 4 cases.
187 : // 1. Scope is not found, post a scope not found callback back to worker thread.
188 : // 2. Scope is found but route provider has not been initialized, create route provider.
189 : // 3. After route provider has been initialized, if RouteConfiguration has been fetched,
190 : // post scope found callback to worker thread.
191 : // 4. After route provider has been initialized, if RouteConfiguration is null,
192 : // cache the callback and wait for RouteConfiguration to come.
193 : void ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::addOnDemandUpdateCallback(
194 0 : std::function<void()> callback) {
195 : // If RouteConfiguration has been initialized, run the callback to continue in filter chain,
196 : // otherwise cache it and wait for the route table to be initialized. If RouteConfiguration hasn't
197 : // been initialized, routeConfig() return a shared_ptr to NullConfigImpl. The name of
198 : // NullConfigImpl is an empty string.
199 0 : if (route_provider_ != nullptr && !routeConfig()->name().empty()) {
200 0 : callback();
201 0 : return;
202 0 : }
203 0 : on_demand_update_callbacks_.push_back(callback);
204 : // Initialize the rds provider if it has not been initialized. There is potential race here
205 : // because other worker threads may also post callback to on demand update the RouteConfiguration
206 : // associated with this scope. If rds provider has been initialized, just wait for
207 : // RouteConfiguration to be updated.
208 0 : maybeInitRdsConfigProvider();
209 0 : }
210 :
211 0 : void ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::runOnDemandUpdateCallback() {
212 0 : for (auto& callback : on_demand_update_callbacks_) {
213 0 : callback();
214 0 : }
215 0 : on_demand_update_callbacks_.clear();
216 0 : }
217 :
218 : void ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::initRdsConfigProvider(
219 : envoy::extensions::filters::network::http_connection_manager::v3::Rds& rds,
220 0 : Init::Manager& init_manager) {
221 0 : route_provider_ = std::dynamic_pointer_cast<RdsRouteConfigProviderImpl>(
222 0 : parent_.route_config_provider_manager_.createRdsRouteConfigProvider(
223 0 : rds, parent_.factory_context_, parent_.stat_prefix_, init_manager));
224 :
225 0 : rds_update_callback_handle_ = route_provider_->subscription().addUpdateCallback([this]() {
226 : // Subscribe to RDS update.
227 0 : parent_.onRdsConfigUpdate(scope_name_, route_provider_->configCast());
228 0 : });
229 0 : parent_.stats_.active_scopes_.inc();
230 0 : }
231 :
232 0 : void ScopedRdsConfigSubscription::RdsRouteConfigProviderHelper::maybeInitRdsConfigProvider() {
233 : // If the route provider have been initialized, return and wait for rds config update.
234 0 : if (route_provider_ != nullptr) {
235 0 : return;
236 0 : }
237 :
238 : // Create a init_manager to create a rds provider.
239 : // No transitive warming dependency here because only on demand update reach this point.
240 0 : Init::ManagerImpl srds_init_mgr("SRDS on demand init manager.");
241 0 : Cleanup srds_initialization_continuation([this, &srds_init_mgr] {
242 0 : Init::WatcherImpl noop_watcher(
243 0 : fmt::format("SRDS on demand ConfigUpdate watcher: {}", scope_name_),
244 0 : []() { /*Do nothing.*/ });
245 0 : srds_init_mgr.initialize(noop_watcher);
246 0 : });
247 : // Create route provider.
248 0 : envoy::extensions::filters::network::http_connection_manager::v3::Rds rds;
249 0 : rds.mutable_config_source()->MergeFrom(parent_.rds_config_source_);
250 0 : rds.set_route_config_name(
251 0 : parent_.scoped_route_map_[scope_name_]->configProto().route_configuration_name());
252 0 : initRdsConfigProvider(rds, srds_init_mgr);
253 0 : ENVOY_LOG(debug, fmt::format("Scope on demand update: {}", scope_name_));
254 : // If RouteConfiguration hasn't been initialized, routeConfig() return a shared_ptr to
255 : // NullConfigImpl. The name of NullConfigImpl is an empty string.
256 0 : if (routeConfig()->name().empty()) {
257 0 : return;
258 0 : }
259 : // If RouteConfiguration has been initialized, apply update to all the threads.
260 0 : parent_.onRdsConfigUpdate(scope_name_, route_provider_->configCast());
261 0 : }
262 :
263 : absl::StatusOr<bool> ScopedRdsConfigSubscription::addOrUpdateScopes(
264 : const std::vector<Envoy::Config::DecodedResourceRef>& resources, Init::Manager& init_manager,
265 0 : const std::string& version_info) {
266 0 : bool any_applied = false;
267 0 : envoy::extensions::filters::network::http_connection_manager::v3::Rds rds;
268 0 : rds.mutable_config_source()->MergeFrom(rds_config_source_);
269 0 : std::vector<ScopedRouteInfoConstSharedPtr> updated_scopes;
270 0 : for (const auto& resource : resources) {
271 : // Explicit copy so that we can std::move later.
272 0 : envoy::config::route::v3::ScopedRouteConfiguration scoped_route_config =
273 0 : dynamic_cast<const envoy::config::route::v3::ScopedRouteConfiguration&>(
274 0 : resource.get().resource());
275 0 : if (scoped_route_config.route_configuration_name().empty()) {
276 0 : return absl::InvalidArgumentError("route_configuration_name is empty.");
277 0 : }
278 0 : const std::string scope_name = scoped_route_config.name();
279 0 : if (const auto& scope_info_iter = scoped_route_map_.find(scope_name);
280 0 : scope_info_iter != scoped_route_map_.end() &&
281 0 : scope_info_iter->second->configHash() == MessageUtil::hash(scoped_route_config)) {
282 0 : continue;
283 0 : }
284 0 : rds.set_route_config_name(scoped_route_config.route_configuration_name());
285 0 : std::unique_ptr<RdsRouteConfigProviderHelper> rds_config_provider_helper;
286 0 : std::shared_ptr<ScopedRouteInfo> scoped_route_info = nullptr;
287 0 : if (scoped_route_config.on_demand() == false) {
288 : // For default scopes, create a rds helper with rds provider initialized.
289 0 : rds_config_provider_helper =
290 0 : std::make_unique<RdsRouteConfigProviderHelper>(*this, scope_name, rds, init_manager);
291 0 : scoped_route_info = std::make_shared<ScopedRouteInfo>(
292 0 : std::move(scoped_route_config), rds_config_provider_helper->routeConfig());
293 0 : } else {
294 : // For on demand scopes, create a rds helper with rds provider uninitialized.
295 0 : rds_config_provider_helper =
296 0 : std::make_unique<RdsRouteConfigProviderHelper>(*this, scope_name);
297 : // scope_route_info->routeConfig() will be nullptr, because RouteConfiguration is not loaded.
298 0 : scoped_route_info =
299 0 : std::make_shared<ScopedRouteInfo>(std::move(scoped_route_config), nullptr);
300 0 : }
301 0 : route_provider_by_scope_[scope_name] = std::move(rds_config_provider_helper);
302 0 : scope_name_by_hash_[scoped_route_info->scopeKey().hash()] = scoped_route_info->scopeName();
303 0 : scoped_route_map_[scoped_route_info->scopeName()] = scoped_route_info;
304 0 : updated_scopes.push_back(scoped_route_info);
305 0 : any_applied = true;
306 0 : ENVOY_LOG(debug, "srds: queueing add/update of scoped_route '{}', version: {}",
307 0 : scoped_route_info->scopeName(), version_info);
308 0 : }
309 :
310 : // scoped_route_info of both eager loading and on demand scopes will be propagated to work
311 : // threads. Upon a scoped RouteConfiguration miss, if the scope exists, an on demand update
312 : // callback will be posted to main thread.
313 0 : if (!updated_scopes.empty()) {
314 0 : applyConfigUpdate([updated_scopes](ConfigProvider::ConfigConstSharedPtr config)
315 0 : -> ConfigProvider::ConfigConstSharedPtr {
316 0 : auto* thread_local_scoped_config =
317 0 : const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get()));
318 0 : thread_local_scoped_config->addOrUpdateRoutingScopes(updated_scopes);
319 0 : return config;
320 0 : });
321 0 : }
322 0 : return any_applied;
323 0 : }
324 :
325 : std::list<ScopedRdsConfigSubscription::RdsRouteConfigProviderHelperPtr>
326 : ScopedRdsConfigSubscription::removeScopes(
327 0 : const Protobuf::RepeatedPtrField<std::string>& scope_names, const std::string& version_info) {
328 0 : std::list<ScopedRdsConfigSubscription::RdsRouteConfigProviderHelperPtr>
329 0 : to_be_removed_rds_providers;
330 0 : std::vector<std::string> removed_scope_names;
331 0 : for (const auto& scope_name : scope_names) {
332 0 : auto iter = scoped_route_map_.find(scope_name);
333 0 : if (iter != scoped_route_map_.end()) {
334 0 : auto rds_config_provider_helper_iter = route_provider_by_scope_.find(scope_name);
335 0 : if (rds_config_provider_helper_iter != route_provider_by_scope_.end()) {
336 0 : to_be_removed_rds_providers.emplace_back(
337 0 : std::move(rds_config_provider_helper_iter->second));
338 0 : route_provider_by_scope_.erase(rds_config_provider_helper_iter);
339 0 : }
340 0 : ASSERT(scope_name_by_hash_.find(iter->second->scopeKey().hash()) !=
341 0 : scope_name_by_hash_.end());
342 0 : scope_name_by_hash_.erase(iter->second->scopeKey().hash());
343 0 : scoped_route_map_.erase(iter);
344 0 : removed_scope_names.push_back(scope_name);
345 0 : ENVOY_LOG(debug, "srds: queueing removal of scoped route '{}', version: {}", scope_name,
346 0 : version_info);
347 0 : }
348 0 : }
349 0 : if (!removed_scope_names.empty()) {
350 0 : applyConfigUpdate([removed_scope_names](ConfigProvider::ConfigConstSharedPtr config)
351 0 : -> ConfigProvider::ConfigConstSharedPtr {
352 0 : auto* thread_local_scoped_config =
353 0 : const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get()));
354 0 : thread_local_scoped_config->removeRoutingScopes(removed_scope_names);
355 0 : return config;
356 0 : });
357 0 : }
358 0 : return to_be_removed_rds_providers;
359 0 : }
360 :
361 : absl::Status ScopedRdsConfigSubscription::onConfigUpdate(
362 : const std::vector<Envoy::Config::DecodedResourceRef>& added_resources,
363 : const Protobuf::RepeatedPtrField<std::string>& removed_resources,
364 0 : const std::string& version_info) {
365 : // NOTE: deletes are done before adds/updates.
366 0 : absl::flat_hash_map<std::string, ScopedRouteInfoConstSharedPtr> to_be_removed_scopes;
367 : // Destruction of resume_rds will lift the floodgate for new RDS subscriptions.
368 : // Note in the case of partial acceptance, accepted RDS subscriptions should be started
369 : // despite of any error.
370 0 : ScopedResume resume_rds;
371 : // If new route config sources come after the local init manager's initialize() been
372 : // called, the init manager can't accept new targets. Instead we use a local override which will
373 : // start new subscriptions but not wait on them to be ready.
374 0 : std::unique_ptr<Init::ManagerImpl> srds_init_mgr;
375 : // NOTE: This should be defined after srds_init_mgr and resume_rds, as it depends on the
376 : // srds_init_mgr, and we want a single RDS discovery request to be sent to management
377 : // server.
378 0 : std::unique_ptr<Cleanup> srds_initialization_continuation;
379 0 : ASSERT(localInitManager().state() > Init::Manager::State::Uninitialized);
380 0 : const auto type_url = Envoy::Config::getTypeUrl<envoy::config::route::v3::RouteConfiguration>();
381 : // Pause RDS to not send a burst of RDS requests until we start all the new subscriptions.
382 : // In the case that localInitManager is uninitialized, RDS is already paused
383 : // either by Server init or LDS init.
384 0 : if (factory_context_.clusterManager().adsMux()) {
385 0 : resume_rds = factory_context_.clusterManager().adsMux()->pause(type_url);
386 0 : }
387 : // if local init manager is initialized, the parent init manager may have gone away.
388 0 : if (localInitManager().state() == Init::Manager::State::Initialized) {
389 0 : srds_init_mgr =
390 0 : std::make_unique<Init::ManagerImpl>(fmt::format("SRDS {}:{}", name_, version_info));
391 0 : srds_initialization_continuation =
392 0 : std::make_unique<Cleanup>([this, &srds_init_mgr, version_info] {
393 : // For new RDS subscriptions created after listener warming up, we don't wait for them to
394 : // warm up.
395 0 : Init::WatcherImpl noop_watcher(
396 : // Note: we just throw it away.
397 0 : fmt::format("SRDS ConfigUpdate watcher {}:{}", name_, version_info),
398 0 : []() { /*Do nothing.*/ });
399 0 : srds_init_mgr->initialize(noop_watcher);
400 0 : });
401 0 : }
402 :
403 0 : std::string exception_msg;
404 0 : Protobuf::RepeatedPtrField<std::string> clean_removed_resources =
405 0 : detectUpdateConflictAndCleanupRemoved(added_resources, removed_resources, exception_msg);
406 0 : if (!exception_msg.empty()) {
407 0 : return absl::InvalidArgumentError(
408 0 : fmt::format("Error adding/updating scoped route(s): {}", exception_msg));
409 0 : }
410 :
411 : // Do not delete RDS config providers just yet, in case the to be deleted RDS subscriptions could
412 : // be reused by some to be added scopes.
413 0 : std::list<ScopedRdsConfigSubscription::RdsRouteConfigProviderHelperPtr>
414 0 : to_be_removed_rds_providers = removeScopes(clean_removed_resources, version_info);
415 :
416 0 : auto status_or_applied = addOrUpdateScopes(
417 0 : added_resources, (srds_init_mgr == nullptr ? localInitManager() : *srds_init_mgr),
418 0 : version_info);
419 0 : if (!status_or_applied.status().ok()) {
420 0 : return status_or_applied.status();
421 0 : }
422 0 : const bool any_applied = status_or_applied.value();
423 0 : const auto status = ConfigSubscriptionCommonBase::onConfigUpdate();
424 0 : if (!status.ok()) {
425 0 : return status;
426 0 : }
427 0 : if (any_applied || !to_be_removed_rds_providers.empty()) {
428 0 : setLastConfigInfo(absl::optional<LastConfigInfo>({absl::nullopt, version_info}));
429 0 : }
430 0 : stats_.all_scopes_.set(scoped_route_map_.size());
431 0 : stats_.config_reload_.inc();
432 0 : stats_.config_reload_time_ms_.set(DateUtil::nowToMilliseconds(factory_context_.timeSource()));
433 0 : return absl::OkStatus();
434 0 : }
435 :
436 : void ScopedRdsConfigSubscription::onRdsConfigUpdate(const std::string& scope_name,
437 0 : ConfigConstSharedPtr new_rds_config) {
438 0 : auto iter = scoped_route_map_.find(scope_name);
439 0 : ASSERT(iter != scoped_route_map_.end(),
440 0 : fmt::format("trying to update route config for non-existing scope {}", scope_name));
441 0 : auto new_scoped_route_info = std::make_shared<ScopedRouteInfo>(
442 0 : envoy::config::route::v3::ScopedRouteConfiguration(iter->second->configProto()),
443 0 : std::move(new_rds_config));
444 0 : scoped_route_map_[new_scoped_route_info->scopeName()] = new_scoped_route_info;
445 0 : applyConfigUpdate([new_scoped_route_info](ConfigProvider::ConfigConstSharedPtr config)
446 0 : -> ConfigProvider::ConfigConstSharedPtr {
447 0 : auto* thread_local_scoped_config =
448 0 : const_cast<ScopedConfigImpl*>(static_cast<const ScopedConfigImpl*>(config.get()));
449 0 : thread_local_scoped_config->addOrUpdateRoutingScopes({new_scoped_route_info});
450 0 : return config;
451 0 : });
452 : // The data plane may wait for the route configuration to come back.
453 0 : route_provider_by_scope_[scope_name]->runOnDemandUpdateCallback();
454 0 : }
455 :
456 : // TODO(stevenzzzz): see issue #7508, consider generalizing this function as it overlaps with
457 : // CdsApiImpl::onConfigUpdate.
458 : absl::Status ScopedRdsConfigSubscription::onConfigUpdate(
459 : const std::vector<Envoy::Config::DecodedResourceRef>& resources,
460 0 : const std::string& version_info) {
461 0 : Protobuf::RepeatedPtrField<std::string> to_remove_repeated;
462 0 : for (const auto& scoped_route : scoped_route_map_) {
463 0 : *to_remove_repeated.Add() = scoped_route.first;
464 0 : }
465 0 : return onConfigUpdate(resources, to_remove_repeated, version_info);
466 0 : }
467 :
468 : Protobuf::RepeatedPtrField<std::string>
469 : ScopedRdsConfigSubscription::detectUpdateConflictAndCleanupRemoved(
470 : const std::vector<Envoy::Config::DecodedResourceRef>& resources,
471 0 : const Protobuf::RepeatedPtrField<std::string>& removed_resources, std::string& exception_msg) {
472 0 : Protobuf::RepeatedPtrField<std::string> clean_removed_resources;
473 : // All the scope names to be removed or updated.
474 0 : absl::flat_hash_set<std::string> updated_or_removed_scopes;
475 0 : for (const std::string& removed_resource : removed_resources) {
476 0 : updated_or_removed_scopes.insert(removed_resource);
477 0 : }
478 0 : for (const auto& resource : resources) {
479 0 : const auto& scoped_route =
480 0 : dynamic_cast<const envoy::config::route::v3::ScopedRouteConfiguration&>(
481 0 : resource.get().resource());
482 0 : updated_or_removed_scopes.insert(scoped_route.name());
483 0 : }
484 :
485 0 : absl::flat_hash_map<uint64_t, std::string> scope_name_by_hash = scope_name_by_hash_;
486 0 : absl::erase_if(scope_name_by_hash, [&updated_or_removed_scopes](const auto& key_name) {
487 0 : auto const& [key, name] = key_name;
488 0 : UNREFERENCED_PARAMETER(key);
489 0 : return updated_or_removed_scopes.contains(name);
490 0 : });
491 0 : absl::flat_hash_map<std::string, envoy::config::route::v3::ScopedRouteConfiguration>
492 0 : scoped_routes;
493 0 : for (const auto& resource : resources) {
494 : // Throws (thus rejects all) on any error.
495 0 : const auto& scoped_route =
496 0 : dynamic_cast<const envoy::config::route::v3::ScopedRouteConfiguration&>(
497 0 : resource.get().resource());
498 0 : const std::string& scope_name = scoped_route.name();
499 0 : auto scope_config_inserted = scoped_routes.try_emplace(scope_name, std::move(scoped_route));
500 0 : if (!scope_config_inserted.second) {
501 0 : exception_msg = fmt::format("duplicate scoped route configuration '{}' found", scope_name);
502 0 : return clean_removed_resources;
503 0 : }
504 0 : envoy::config::route::v3::ScopedRouteConfiguration scoped_route_config =
505 0 : scope_config_inserted.first->second;
506 0 : const uint64_t key_fingerprint =
507 0 : ScopedRouteInfo(std::move(scoped_route_config), nullptr).scopeKey().hash();
508 0 : if (!scope_name_by_hash.try_emplace(key_fingerprint, scope_name).second) {
509 0 : exception_msg =
510 0 : fmt::format("scope key conflict found, first scope is '{}', second scope is '{}'",
511 0 : scope_name_by_hash[key_fingerprint], scope_name);
512 0 : return clean_removed_resources;
513 0 : }
514 0 : }
515 :
516 : // only remove resources that is not going to be updated.
517 0 : for (const std::string& removed_resource : removed_resources) {
518 0 : if (!scoped_routes.contains(removed_resource)) {
519 0 : *clean_removed_resources.Add() = removed_resource;
520 0 : }
521 0 : }
522 0 : return clean_removed_resources;
523 0 : }
524 :
525 : void ScopedRdsConfigSubscription::onDemandRdsUpdate(
526 : std::shared_ptr<Router::ScopeKey> scope_key, Event::Dispatcher& thread_local_dispatcher,
527 : Http::RouteConfigUpdatedCallback&& route_config_updated_cb,
528 0 : std::weak_ptr<Envoy::Config::ConfigSubscriptionCommonBase> weak_subscription) {
529 0 : factory_context_.mainThreadDispatcher().post([this, &thread_local_dispatcher, scope_key,
530 0 : route_config_updated_cb, weak_subscription]() {
531 : // If the subscription has been destroyed, return immediately.
532 0 : if (!weak_subscription.lock()) {
533 0 : thread_local_dispatcher.post([route_config_updated_cb] { route_config_updated_cb(false); });
534 0 : return;
535 0 : }
536 :
537 0 : auto iter = scope_name_by_hash_.find(scope_key->hash());
538 : // Return to filter chain if we can't find the scope.
539 : // The scope may have been destroyed when callback reach the main thread.
540 0 : if (iter == scope_name_by_hash_.end()) {
541 0 : thread_local_dispatcher.post([route_config_updated_cb] { route_config_updated_cb(false); });
542 0 : return;
543 0 : }
544 : // Wrap the thread local dispatcher inside the callback.
545 0 : std::function<void()> thread_local_updated_callback = [route_config_updated_cb,
546 0 : &thread_local_dispatcher]() {
547 0 : thread_local_dispatcher.post([route_config_updated_cb] { route_config_updated_cb(true); });
548 0 : };
549 0 : std::string scope_name = iter->second;
550 : // On demand initialization inside main thread.
551 0 : route_provider_by_scope_[scope_name]->addOnDemandUpdateCallback(thread_local_updated_callback);
552 0 : });
553 0 : }
554 :
555 : ScopedRdsConfigProvider::ScopedRdsConfigProvider(
556 : ScopedRdsConfigSubscriptionSharedPtr&& subscription)
557 0 : : MutableConfigProviderCommonBase(std::move(subscription), ConfigProvider::ApiType::Delta) {}
558 :
559 : ProtobufTypes::MessagePtr
560 70 : ScopedRoutesConfigProviderManager::dumpConfigs(const Matchers::StringMatcher& name_matcher) const {
561 70 : auto config_dump = std::make_unique<envoy::admin::v3::ScopedRoutesConfigDump>();
562 70 : for (const auto& element : configSubscriptions()) {
563 0 : auto subscription = element.second.lock();
564 0 : ASSERT(subscription);
565 :
566 0 : if (subscription->configInfo()) {
567 0 : auto* dynamic_config = config_dump->mutable_dynamic_scoped_route_configs()->Add();
568 0 : dynamic_config->set_version_info(subscription->configInfo().value().last_config_version_);
569 0 : const ScopedRdsConfigSubscription* typed_subscription =
570 0 : static_cast<ScopedRdsConfigSubscription*>(subscription.get());
571 0 : dynamic_config->set_name(typed_subscription->name());
572 0 : const ScopedRouteMap& scoped_route_map = typed_subscription->scopedRouteMap();
573 0 : for (const auto& it : scoped_route_map) {
574 0 : if (!name_matcher.match(it.second->configProto().name())) {
575 0 : continue;
576 0 : }
577 0 : dynamic_config->mutable_scoped_route_configs()->Add()->PackFrom(it.second->configProto());
578 0 : }
579 0 : TimestampUtil::systemClockToTimestamp(subscription->lastUpdated(),
580 0 : *dynamic_config->mutable_last_updated());
581 0 : }
582 0 : }
583 :
584 70 : for (const auto& provider : immutableConfigProviders(ConfigProviderInstanceType::Inline)) {
585 0 : const auto protos_info =
586 0 : provider->configProtoInfoVector<envoy::config::route::v3::ScopedRouteConfiguration>();
587 0 : ASSERT(protos_info != absl::nullopt);
588 0 : auto* inline_config = config_dump->mutable_inline_scoped_route_configs()->Add();
589 0 : inline_config->set_name(static_cast<InlineScopedRoutesConfigProvider*>(provider)->name());
590 0 : for (const auto& config_proto : protos_info.value().config_protos_) {
591 0 : if (!name_matcher.match(config_proto->name())) {
592 0 : continue;
593 0 : }
594 0 : inline_config->mutable_scoped_route_configs()->Add()->PackFrom(*config_proto);
595 0 : }
596 0 : TimestampUtil::systemClockToTimestamp(provider->lastUpdated(),
597 0 : *inline_config->mutable_last_updated());
598 0 : }
599 :
600 70 : return config_dump;
601 70 : }
602 :
603 : ConfigProviderPtr ScopedRoutesConfigProviderManager::createXdsConfigProvider(
604 : const Protobuf::Message& config_source_proto,
605 : Server::Configuration::ServerFactoryContext& factory_context, Init::Manager& init_manager,
606 0 : const std::string& stat_prefix, const ConfigProviderManager::OptionalArg& optarg) {
607 0 : const auto& typed_optarg = static_cast<const ScopedRoutesConfigProviderManagerOptArg&>(optarg);
608 0 : ScopedRdsConfigSubscriptionSharedPtr subscription =
609 0 : ConfigProviderManagerImplBase::getSubscription<ScopedRdsConfigSubscription>(
610 0 : config_source_proto, init_manager,
611 0 : [&config_source_proto, &factory_context, &stat_prefix,
612 0 : &typed_optarg](const uint64_t manager_identifier,
613 0 : ConfigProviderManagerImplBase& config_provider_manager)
614 0 : -> Envoy::Config::ConfigSubscriptionCommonBaseSharedPtr {
615 0 : const auto& scoped_rds_config_source = dynamic_cast<
616 0 : const envoy::extensions::filters::network::http_connection_manager::v3::ScopedRds&>(
617 0 : config_source_proto);
618 0 : return std::make_shared<ScopedRdsConfigSubscription>(
619 0 : scoped_rds_config_source, manager_identifier, typed_optarg.scoped_routes_name_,
620 0 : factory_context, stat_prefix, typed_optarg.rds_config_source_,
621 0 : static_cast<ScopedRoutesConfigProviderManager&>(config_provider_manager)
622 0 : .routeConfigProviderManager(),
623 0 : static_cast<ScopedRoutesConfigProviderManager&>(config_provider_manager));
624 0 : });
625 :
626 0 : return std::make_unique<ScopedRdsConfigProvider>(std::move(subscription));
627 0 : }
628 :
629 : ConfigProviderPtr ScopedRoutesConfigProviderManager::createStaticConfigProvider(
630 : ProtobufTypes::ConstMessagePtrVector&& config_protos,
631 : Server::Configuration::ServerFactoryContext& factory_context,
632 0 : const ConfigProviderManager::OptionalArg& optarg) {
633 0 : const auto& typed_optarg = static_cast<const ScopedRoutesConfigProviderManagerOptArg&>(optarg);
634 0 : return std::make_unique<InlineScopedRoutesConfigProvider>(
635 0 : std::move(config_protos), typed_optarg.scoped_routes_name_, factory_context, *this,
636 0 : typed_optarg.rds_config_source_);
637 0 : }
638 :
639 : } // namespace Router
640 : } // namespace Envoy
|