/proc/self/cwd/source/common/router/rds_impl.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/common/router/rds_impl.h" |
2 | | |
3 | | #include <chrono> |
4 | | #include <cstdint> |
5 | | #include <memory> |
6 | | #include <string> |
7 | | |
8 | | #include "envoy/admin/v3/config_dump.pb.h" |
9 | | #include "envoy/config/core/v3/config_source.pb.h" |
10 | | #include "envoy/extensions/filters/network/http_connection_manager/v3/http_connection_manager.pb.h" |
11 | | #include "envoy/service/discovery/v3/discovery.pb.h" |
12 | | |
13 | | #include "source/common/common/assert.h" |
14 | | #include "source/common/common/fmt.h" |
15 | | #include "source/common/config/api_version.h" |
16 | | #include "source/common/config/utility.h" |
17 | | #include "source/common/http/header_map_impl.h" |
18 | | #include "source/common/protobuf/utility.h" |
19 | | #include "source/common/router/config_impl.h" |
20 | | #include "source/common/router/route_config_update_receiver_impl.h" |
21 | | |
22 | | namespace Envoy { |
23 | | namespace Router { |
24 | | |
25 | | // TODO(htuch): If support for multiple clusters is added per #1170 cluster_name_ |
26 | | RdsRouteConfigSubscription::RdsRouteConfigSubscription( |
27 | | RouteConfigUpdatePtr&& config_update, |
28 | | Envoy::Config::OpaqueResourceDecoderSharedPtr&& resource_decoder, |
29 | | const envoy::extensions::filters::network::http_connection_manager::v3::Rds& rds, |
30 | | const uint64_t manager_identifier, Server::Configuration::ServerFactoryContext& factory_context, |
31 | | const std::string& stat_prefix, Rds::RouteConfigProviderManager& route_config_provider_manager) |
32 | | : Rds::RdsRouteConfigSubscription(std::move(config_update), std::move(resource_decoder), |
33 | | rds.config_source(), rds.route_config_name(), |
34 | | manager_identifier, factory_context, stat_prefix + "rds.", |
35 | | "RDS", route_config_provider_manager), |
36 | | config_update_info_(static_cast<RouteConfigUpdateReceiver*>( |
37 | 106 | Rds::RdsRouteConfigSubscription::config_update_info_.get())) {} |
38 | | |
39 | 106 | RdsRouteConfigSubscription::~RdsRouteConfigSubscription() { config_update_info_.release(); } |
40 | | |
41 | | absl::Status RdsRouteConfigSubscription::beforeProviderUpdate( |
42 | 18 | std::unique_ptr<Init::ManagerImpl>& noop_init_manager, std::unique_ptr<Cleanup>& resume_rds) { |
43 | 18 | if (config_update_info_->protobufConfigurationCast().has_vhds() && |
44 | 18 | config_update_info_->vhdsConfigurationChanged()) { |
45 | 0 | ENVOY_LOG(debug, |
46 | 0 | "rds: vhds configuration present/changed, (re)starting vhds: config_name={} hash={}", |
47 | 0 | route_config_name_, routeConfigUpdate()->configHash()); |
48 | 0 | ASSERT(config_update_info_->configInfo().has_value()); |
49 | 0 | maybeCreateInitManager(routeConfigUpdate()->configInfo().value().version_, noop_init_manager, |
50 | 0 | resume_rds); |
51 | 0 | auto subscription_or_error = VhdsSubscription::createVhdsSubscription( |
52 | 0 | config_update_info_, factory_context_, stat_prefix_, route_config_provider_); |
53 | 0 | RETURN_IF_NOT_OK_REF(subscription_or_error.status()); |
54 | 0 | vhds_subscription_ = std::move(subscription_or_error.value()); |
55 | 0 | vhds_subscription_->registerInitTargetWithInitManager( |
56 | 0 | noop_init_manager == nullptr ? local_init_manager_ : *noop_init_manager); |
57 | 0 | } |
58 | 18 | return absl::OkStatus(); |
59 | 18 | } |
60 | | |
61 | 18 | void RdsRouteConfigSubscription::afterProviderUpdate() { |
62 | | // RDS update removed VHDS configuration |
63 | 18 | if (!config_update_info_->protobufConfigurationCast().has_vhds()) { |
64 | 18 | vhds_subscription_.release(); |
65 | 18 | } |
66 | | |
67 | 18 | THROW_IF_NOT_OK(update_callback_manager_.runCallbacks()); |
68 | 18 | } |
69 | | |
70 | | // Initialize a no-op InitManager in case the one in the factory_context has completed |
71 | | // initialization. This can happen if an RDS config update for an already established RDS |
72 | | // subscription contains VHDS configuration. |
73 | | void RdsRouteConfigSubscription::maybeCreateInitManager( |
74 | | const std::string& version_info, std::unique_ptr<Init::ManagerImpl>& init_manager, |
75 | 0 | std::unique_ptr<Cleanup>& init_vhds) { |
76 | 0 | if (local_init_manager_.state() == Init::Manager::State::Initialized) { |
77 | 0 | init_manager = std::make_unique<Init::ManagerImpl>( |
78 | 0 | fmt::format("VHDS {}:{}", route_config_name_, version_info)); |
79 | 0 | init_vhds = std::make_unique<Cleanup>([this, &init_manager, version_info] { |
80 | | // For new RDS subscriptions created after listener warming up, we don't wait for them to warm |
81 | | // up. |
82 | 0 | Init::WatcherImpl noop_watcher( |
83 | | // Note: we just throw it away. |
84 | 0 | fmt::format("VHDS ConfigUpdate watcher {}:{}", route_config_name_, version_info), |
85 | 0 | []() { /*Do nothing.*/ }); |
86 | 0 | init_manager->initialize(noop_watcher); |
87 | 0 | }); |
88 | 0 | } |
89 | 0 | } |
90 | | |
91 | 0 | void RdsRouteConfigSubscription::updateOnDemand(const std::string& aliases) { |
92 | 0 | if (vhds_subscription_.get() == nullptr) { |
93 | 0 | return; |
94 | 0 | } |
95 | 0 | vhds_subscription_->updateOnDemand(aliases); |
96 | 0 | } |
97 | | |
98 | | RdsRouteConfigProviderImpl::RdsRouteConfigProviderImpl( |
99 | | RdsRouteConfigSubscriptionSharedPtr&& subscription, |
100 | | Server::Configuration::ServerFactoryContext& factory_context) |
101 | | : base_(subscription, factory_context), config_update_info_(subscription->routeConfigUpdate()), |
102 | 106 | factory_context_(factory_context) { |
103 | | // The subscription referenced by the 'base_' and by 'this' is the same. |
104 | | // In it the provider is already set by the 'base_' so it points to that. |
105 | | // Need to set again to point to 'this'. |
106 | 106 | base_.subscription().routeConfigProvider() = this; |
107 | 106 | } |
108 | | |
109 | 106 | RdsRouteConfigSubscription& RdsRouteConfigProviderImpl::subscription() { |
110 | 106 | return static_cast<RdsRouteConfigSubscription&>(base_.subscription()); |
111 | 106 | } |
112 | | |
113 | 18 | absl::Status RdsRouteConfigProviderImpl::onConfigUpdate() { |
114 | 18 | auto status = base_.onConfigUpdate(); |
115 | 18 | if (!status.ok()) { |
116 | 0 | return status; |
117 | 0 | } |
118 | | |
119 | 18 | const auto aliases = config_update_info_->resourceIdsInLastVhdsUpdate(); |
120 | | // Regular (non-VHDS) RDS updates don't populate aliases fields in resources. |
121 | 18 | if (aliases.empty()) { |
122 | 18 | return absl::OkStatus(); |
123 | 18 | } |
124 | | |
125 | 0 | const auto config = |
126 | 0 | std::static_pointer_cast<const ConfigImpl>(config_update_info_->parsedConfiguration()); |
127 | | // Notifies connections that RouteConfiguration update has been propagated. |
128 | | // Callbacks processing is performed in FIFO order. The callback is skipped if alias used in |
129 | | // the VHDS update request do not match the aliases in the update response |
130 | 0 | for (auto it = config_update_callbacks_.begin(); it != config_update_callbacks_.end();) { |
131 | 0 | auto found = aliases.find(it->alias_); |
132 | 0 | if (found != aliases.end()) { |
133 | | // TODO(dmitri-d) HeaderMapImpl is expensive, need to profile this |
134 | 0 | auto host_header = Http::RequestHeaderMapImpl::create(); |
135 | 0 | host_header->setHost(VhdsSubscription::aliasToDomainName(it->alias_)); |
136 | 0 | const bool host_exists = config->virtualHostExists(*host_header); |
137 | 0 | std::weak_ptr<Http::RouteConfigUpdatedCallback> current_cb(it->cb_); |
138 | 0 | it->thread_local_dispatcher_.post([current_cb, host_exists] { |
139 | 0 | if (auto cb = current_cb.lock()) { |
140 | 0 | (*cb)(host_exists); |
141 | 0 | } |
142 | 0 | }); |
143 | 0 | it = config_update_callbacks_.erase(it); |
144 | 0 | } else { |
145 | 0 | it++; |
146 | 0 | } |
147 | 0 | } |
148 | 0 | return absl::OkStatus(); |
149 | 18 | } |
150 | | |
151 | 1 | ConfigConstSharedPtr RdsRouteConfigProviderImpl::configCast() const { |
152 | 1 | ASSERT(dynamic_cast<const Config*>(RdsRouteConfigProviderImpl::config().get())); |
153 | 1 | return std::static_pointer_cast<const Config>(RdsRouteConfigProviderImpl::config()); |
154 | 1 | } |
155 | | |
156 | | // Schedules a VHDS request on the main thread and queues up the callback to use when the VHDS |
157 | | // response has been propagated to the worker thread that was the request origin. |
158 | | void RdsRouteConfigProviderImpl::requestVirtualHostsUpdate( |
159 | | const std::string& for_domain, Event::Dispatcher& thread_local_dispatcher, |
160 | 0 | std::weak_ptr<Http::RouteConfigUpdatedCallback> route_config_updated_cb) { |
161 | 0 | auto alias = VhdsSubscription::domainNameToAlias( |
162 | 0 | config_update_info_->protobufConfigurationCast().name(), for_domain); |
163 | | // The RdsRouteConfigProviderImpl instance can go away before the dispatcher has a chance to |
164 | | // execute the callback. still_alive shared_ptr will be deallocated when the current instance of |
165 | | // the RdsRouteConfigProviderImpl is deallocated; we rely on a weak_ptr to still_alive flag to |
166 | | // determine if the RdsRouteConfigProviderImpl instance is still valid. |
167 | 0 | factory_context_.mainThreadDispatcher().post([this, |
168 | 0 | maybe_still_alive = |
169 | 0 | std::weak_ptr<bool>(still_alive_), |
170 | 0 | alias, &thread_local_dispatcher, |
171 | 0 | route_config_updated_cb]() -> void { |
172 | 0 | if (maybe_still_alive.lock()) { |
173 | 0 | subscription().updateOnDemand(alias); |
174 | 0 | config_update_callbacks_.push_back({alias, thread_local_dispatcher, route_config_updated_cb}); |
175 | 0 | } |
176 | 0 | }); |
177 | 0 | } |
178 | | RouteConfigProviderSharedPtr RdsFactoryImpl::createRdsRouteConfigProvider( |
179 | | const envoy::extensions::filters::network::http_connection_manager::v3::Rds& rds, |
180 | | Server::Configuration::ServerFactoryContext& factory_context, const std::string& stat_prefix, |
181 | | Init::Manager& init_manager, ProtoTraitsImpl& proto_traits, |
182 | 133 | Rds::RouteConfigProviderManager& manager) { |
183 | 133 | auto provider = manager.addDynamicProvider( |
184 | 133 | rds, rds.route_config_name(), init_manager, |
185 | 133 | [&factory_context, &rds, &stat_prefix, &manager, &proto_traits](uint64_t manager_identifier) { |
186 | 106 | auto config_update = |
187 | 106 | std::make_unique<RouteConfigUpdateReceiverImpl>(proto_traits, factory_context); |
188 | 106 | auto resource_decoder = std::make_shared< |
189 | 106 | Envoy::Config::OpaqueResourceDecoderImpl<envoy::config::route::v3::RouteConfiguration>>( |
190 | 106 | factory_context.messageValidationContext().dynamicValidationVisitor(), "name"); |
191 | 106 | auto subscription = std::make_shared<RdsRouteConfigSubscription>( |
192 | 106 | std::move(config_update), std::move(resource_decoder), rds, manager_identifier, |
193 | 106 | factory_context, stat_prefix, manager); |
194 | 106 | auto provider = |
195 | 106 | std::make_shared<RdsRouteConfigProviderImpl>(std::move(subscription), factory_context); |
196 | 106 | return std::make_pair(provider, &provider->subscription().initTarget()); |
197 | 106 | }); |
198 | 133 | ASSERT(dynamic_cast<RouteConfigProvider*>(provider.get())); |
199 | 133 | return std::static_pointer_cast<RouteConfigProvider>(provider); |
200 | 133 | } |
201 | | |
202 | | REGISTER_FACTORY(RdsFactoryImpl, RdsFactory); |
203 | | |
204 | | } // namespace Router |
205 | | } // namespace Envoy |