Line data Source code
1 : #include "source/extensions/clusters/dynamic_forward_proxy/cluster.h"
2 :
3 : #include <algorithm>
4 :
5 : #include "envoy/config/cluster/v3/cluster.pb.h"
6 : #include "envoy/extensions/clusters/dynamic_forward_proxy/v3/cluster.pb.h"
7 : #include "envoy/extensions/clusters/dynamic_forward_proxy/v3/cluster.pb.validate.h"
8 : #include "envoy/router/string_accessor.h"
9 : #include "envoy/stream_info/uint32_accessor.h"
10 :
11 : #include "source/common/http/utility.h"
12 : #include "source/common/network/transport_socket_options_impl.h"
13 : #include "source/common/router/string_accessor_impl.h"
14 : #include "source/common/stream_info/uint32_accessor_impl.h"
15 : #include "source/extensions/common/dynamic_forward_proxy/dns_cache_manager_impl.h"
16 : #include "source/extensions/transport_sockets/tls/cert_validator/default_validator.h"
17 : #include "source/extensions/transport_sockets/tls/utility.h"
18 :
19 : namespace Envoy {
20 : namespace Extensions {
21 : namespace Clusters {
22 : namespace DynamicForwardProxy {
23 :
24 : namespace {
25 : constexpr absl::string_view DynamicHostFilterStateKey = "envoy.upstream.dynamic_host";
26 : constexpr absl::string_view DynamicPortFilterStateKey = "envoy.upstream.dynamic_port";
27 :
28 : class DynamicHostObjectFactory : public StreamInfo::FilterState::ObjectFactory {
29 : public:
30 3 : std::string name() const override { return std::string(DynamicHostFilterStateKey); }
31 : std::unique_ptr<StreamInfo::FilterState::Object>
32 0 : createFromBytes(absl::string_view data) const override {
33 0 : return std::make_unique<Router::StringAccessorImpl>(data);
34 0 : }
35 : };
36 : class DynamicPortObjectFactory : public StreamInfo::FilterState::ObjectFactory {
37 : public:
38 3 : std::string name() const override { return std::string(DynamicPortFilterStateKey); }
39 : std::unique_ptr<StreamInfo::FilterState::Object>
40 0 : createFromBytes(absl::string_view data) const override {
41 0 : uint32_t port = 0;
42 0 : if (absl::SimpleAtoi(data, &port)) {
43 0 : return std::make_unique<StreamInfo::UInt32AccessorImpl>(port);
44 0 : }
45 0 : return nullptr;
46 0 : }
47 : };
48 :
49 : } // namespace
50 :
51 : REGISTER_FACTORY(DynamicHostObjectFactory, StreamInfo::FilterState::ObjectFactory);
52 : REGISTER_FACTORY(DynamicPortObjectFactory, StreamInfo::FilterState::ObjectFactory);
53 :
54 : Cluster::Cluster(
55 : const envoy::config::cluster::v3::Cluster& cluster,
56 : Extensions::Common::DynamicForwardProxy::DnsCacheSharedPtr&& cache,
57 : const envoy::extensions::clusters::dynamic_forward_proxy::v3::ClusterConfig& config,
58 : Upstream::ClusterFactoryContext& context,
59 : Extensions::Common::DynamicForwardProxy::DnsCacheManagerSharedPtr&& cache_manager)
60 : : Upstream::BaseDynamicClusterImpl(cluster, context),
61 : dns_cache_manager_(std::move(cache_manager)), dns_cache_(std::move(cache)),
62 : update_callbacks_handle_(dns_cache_->addUpdateCallbacks(*this)),
63 : local_info_(context.serverFactoryContext().localInfo()),
64 : main_thread_dispatcher_(context.serverFactoryContext().mainThreadDispatcher()),
65 : orig_cluster_config_(cluster),
66 : allow_coalesced_connections_(config.allow_coalesced_connections()),
67 : cm_(context.clusterManager()), max_sub_clusters_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
68 : config.sub_clusters_config(), max_sub_clusters, 1024)),
69 : sub_cluster_ttl_(
70 : PROTOBUF_GET_MS_OR_DEFAULT(config.sub_clusters_config(), sub_cluster_ttl, 300000)),
71 : sub_cluster_lb_policy_(config.sub_clusters_config().lb_policy()),
72 0 : enable_sub_cluster_(config.has_sub_clusters_config()) {
73 :
74 0 : if (enable_sub_cluster_) {
75 0 : idle_timer_ = main_thread_dispatcher_.createTimer([this]() { checkIdleSubCluster(); });
76 0 : idle_timer_->enableTimer(sub_cluster_ttl_);
77 0 : }
78 0 : }
79 :
80 0 : Cluster::~Cluster() {
81 0 : if (enable_sub_cluster_) {
82 0 : idle_timer_->disableTimer();
83 0 : idle_timer_.reset();
84 0 : }
85 0 : if (cm_.isShutdown()) {
86 0 : return;
87 0 : }
88 : // Should remove all sub clusters, otherwise, might be memory leaking.
89 : // This lock is useless, just make compiler happy.
90 0 : absl::WriterMutexLock lock{&cluster_map_lock_};
91 0 : for (auto it = cluster_map_.cbegin(); it != cluster_map_.cend();) {
92 0 : auto cluster_name = it->first;
93 0 : ENVOY_LOG(debug, "cluster='{}' removing from cluster_map & cluster manager", cluster_name);
94 0 : cluster_map_.erase(it++);
95 0 : cm_.removeCluster(cluster_name);
96 0 : }
97 0 : }
98 :
99 0 : void Cluster::startPreInit() {
100 : // If we are attaching to a pre-populated cache we need to initialize our hosts.
101 0 : std::unique_ptr<Upstream::HostVector> hosts_added;
102 0 : dns_cache_->iterateHostMap(
103 0 : [&](absl::string_view host, const Common::DynamicForwardProxy::DnsHostInfoSharedPtr& info) {
104 0 : addOrUpdateHost(host, info, hosts_added);
105 0 : });
106 0 : if (hosts_added) {
107 0 : updatePriorityState(*hosts_added, {});
108 0 : }
109 0 : onPreInitComplete();
110 0 : }
111 :
112 0 : bool Cluster::touch(const std::string& cluster_name) {
113 0 : absl::ReaderMutexLock lock{&cluster_map_lock_};
114 0 : const auto cluster_it = cluster_map_.find(cluster_name);
115 0 : if (cluster_it != cluster_map_.end()) {
116 0 : cluster_it->second->touch();
117 0 : return true;
118 0 : }
119 0 : ENVOY_LOG(debug, "cluster='{}' has been removed while touching", cluster_name);
120 0 : return false;
121 0 : }
122 :
123 0 : void Cluster::checkIdleSubCluster() {
124 0 : ASSERT(main_thread_dispatcher_.isThreadSafe());
125 0 : {
126 : // TODO: try read lock first.
127 0 : absl::WriterMutexLock lock{&cluster_map_lock_};
128 0 : for (auto it = cluster_map_.cbegin(); it != cluster_map_.cend();) {
129 0 : if (it->second->checkIdle()) {
130 0 : auto cluster_name = it->first;
131 0 : ENVOY_LOG(debug, "cluster='{}' removing from cluster_map & cluster manager", cluster_name);
132 0 : cluster_map_.erase(it++);
133 0 : cm_.removeCluster(cluster_name);
134 0 : } else {
135 0 : ++it;
136 0 : }
137 0 : }
138 0 : }
139 0 : idle_timer_->enableTimer(sub_cluster_ttl_);
140 0 : }
141 :
142 : std::pair<bool, absl::optional<envoy::config::cluster::v3::Cluster>>
143 : Cluster::createSubClusterConfig(const std::string& cluster_name, const std::string& host,
144 0 : const int port) {
145 0 : {
146 0 : absl::WriterMutexLock lock{&cluster_map_lock_};
147 0 : const auto cluster_it = cluster_map_.find(cluster_name);
148 0 : if (cluster_it != cluster_map_.end()) {
149 0 : cluster_it->second->touch();
150 0 : return std::make_pair(true, absl::nullopt);
151 0 : }
152 0 : if (cluster_map_.size() >= max_sub_clusters_) {
153 0 : ENVOY_LOG(debug, "cluster='{}' create failed due to max sub cluster limitation",
154 0 : cluster_name);
155 0 : return std::make_pair(false, absl::nullopt);
156 0 : }
157 0 : cluster_map_.emplace(cluster_name, std::make_shared<ClusterInfo>(cluster_name, *this));
158 0 : }
159 :
160 : // Inherit configuration from the parent DFP cluster.
161 0 : envoy::config::cluster::v3::Cluster config = orig_cluster_config_;
162 :
163 : // Overwrite the type.
164 0 : config.set_name(cluster_name);
165 0 : config.clear_cluster_type();
166 0 : config.set_lb_policy(sub_cluster_lb_policy_);
167 0 : config.set_type(
168 0 : envoy::config::cluster::v3::Cluster_DiscoveryType::Cluster_DiscoveryType_STRICT_DNS);
169 :
170 : // Set endpoint.
171 0 : auto load_assignments = config.mutable_load_assignment();
172 0 : load_assignments->set_cluster_name(cluster_name);
173 0 : load_assignments->clear_endpoints();
174 :
175 0 : auto socket_address = load_assignments->add_endpoints()
176 0 : ->add_lb_endpoints()
177 0 : ->mutable_endpoint()
178 0 : ->mutable_address()
179 0 : ->mutable_socket_address();
180 0 : socket_address->set_address(host);
181 0 : socket_address->set_port_value(port);
182 :
183 0 : return std::make_pair(true, absl::make_optional(config));
184 0 : }
185 :
186 : Upstream::HostConstSharedPtr Cluster::chooseHost(absl::string_view host,
187 0 : Upstream::LoadBalancerContext* context) const {
188 0 : uint16_t default_port = 80;
189 0 : if (info_->transportSocketMatcher().resolve(nullptr).factory_.implementsSecureTransport()) {
190 0 : default_port = 443;
191 0 : }
192 :
193 0 : const auto host_attributes = Http::Utility::parseAuthority(host);
194 0 : auto dynamic_host = std::string(host_attributes.host_);
195 0 : auto port = host_attributes.port_.value_or(default_port);
196 :
197 : // cluster name is prefix + host + port
198 0 : auto cluster_name = "DFPCluster:" + dynamic_host + ":" + std::to_string(port);
199 :
200 : // try again to get the sub cluster.
201 0 : auto cluster = cm_.getThreadLocalCluster(cluster_name);
202 0 : if (cluster == nullptr) {
203 0 : ENVOY_LOG(debug, "cluster='{}' get thread local failed, too short ttl?", cluster_name);
204 0 : return nullptr;
205 0 : }
206 :
207 0 : return cluster->loadBalancer().chooseHost(context);
208 0 : }
209 :
210 : Cluster::ClusterInfo::ClusterInfo(std::string cluster_name, Cluster& parent)
211 0 : : cluster_name_(cluster_name), parent_(parent) {
212 0 : ENVOY_LOG(debug, "cluster='{}' ClusterInfo created", cluster_name_);
213 0 : touch();
214 0 : }
215 :
216 0 : void Cluster::ClusterInfo::touch() {
217 0 : ENVOY_LOG(debug, "cluster='{}' updating last used time", cluster_name_);
218 0 : last_used_time_ = parent_.time_source_.monotonicTime().time_since_epoch();
219 0 : }
220 :
221 : // checkIdle run in the main thread.
222 0 : bool Cluster::ClusterInfo::checkIdle() {
223 0 : ASSERT(parent_.main_thread_dispatcher_.isThreadSafe());
224 :
225 0 : const std::chrono::steady_clock::duration now_duration =
226 0 : parent_.main_thread_dispatcher_.timeSource().monotonicTime().time_since_epoch();
227 0 : auto last_used_time = last_used_time_.load();
228 0 : ENVOY_LOG(debug, "cluster='{}' TTL check: now={} last_used={} TTL {}", cluster_name_,
229 0 : now_duration.count(), last_used_time.count(), parent_.sub_cluster_ttl_.count());
230 :
231 0 : if ((now_duration - last_used_time) > parent_.sub_cluster_ttl_) {
232 0 : ENVOY_LOG(debug, "cluster='{}' TTL expired", cluster_name_);
233 0 : return true;
234 0 : }
235 0 : return false;
236 0 : }
237 :
238 : void Cluster::addOrUpdateHost(
239 : absl::string_view host,
240 : const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info,
241 0 : std::unique_ptr<Upstream::HostVector>& hosts_added) {
242 0 : Upstream::LogicalHostSharedPtr emplaced_host;
243 0 : {
244 0 : absl::WriterMutexLock lock{&host_map_lock_};
245 :
246 : // NOTE: Right now we allow a DNS cache to be shared between multiple clusters. Though we have
247 : // connection/request circuit breakers on the cluster, we don't have any way to control the
248 : // maximum hosts on a cluster. We currently assume that host data shared via shared pointer is
249 : // a marginal memory cost above that already used by connections and requests, so relying on
250 : // connection/request circuit breakers is sufficient. We may have to revisit this in the
251 : // future.
252 0 : const auto host_map_it = host_map_.find(host);
253 0 : if (host_map_it != host_map_.end()) {
254 : // If we only have an address change, we can do that swap inline without any other updates.
255 : // The appropriate R/W locking is in place to allow this. The details of this locking are:
256 : // - Hosts are not thread local, they are global.
257 : // - We take a read lock when reading the address and a write lock when changing it.
258 : // - Address updates are very rare.
259 : // - Address reads are only done when a connection is being made and a "real" host
260 : // description is created or the host is queried via the admin endpoint. Both of
261 : // these operations are relatively rare and the read lock is held for a short period
262 : // of time.
263 : //
264 : // TODO(mattklein123): Right now the dynamic forward proxy / DNS cache works similar to how
265 : // logical DNS works, meaning that we only store a single address per
266 : // resolution. It would not be difficult to also expose strict DNS
267 : // semantics, meaning the cache would expose multiple addresses and the
268 : // cluster would create multiple logical hosts based on those addresses.
269 : // We will leave this is a follow up depending on need.
270 0 : ASSERT(host_info == host_map_it->second.shared_host_info_);
271 0 : ASSERT(host_map_it->second.shared_host_info_->address() !=
272 0 : host_map_it->second.logical_host_->address());
273 0 : ENVOY_LOG(debug, "updating dfproxy cluster host address '{}'", host);
274 0 : host_map_it->second.logical_host_->setNewAddresses(
275 0 : host_info->address(), host_info->addressList(), dummy_lb_endpoint_);
276 0 : return;
277 0 : }
278 :
279 0 : ENVOY_LOG(debug, "adding new dfproxy cluster host '{}'", host);
280 :
281 0 : emplaced_host = host_map_
282 0 : .try_emplace(host, host_info,
283 0 : std::make_shared<Upstream::LogicalHost>(
284 0 : info(), std::string{host}, host_info->address(),
285 0 : host_info->addressList(), dummy_locality_lb_endpoint_,
286 0 : dummy_lb_endpoint_, nullptr, time_source_))
287 0 : .first->second.logical_host_;
288 0 : }
289 :
290 0 : ASSERT(emplaced_host);
291 0 : if (hosts_added == nullptr) {
292 0 : hosts_added = std::make_unique<Upstream::HostVector>();
293 0 : }
294 0 : hosts_added->emplace_back(emplaced_host);
295 0 : }
296 :
297 : void Cluster::onDnsHostAddOrUpdate(
298 : const std::string& host,
299 0 : const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info) {
300 0 : ENVOY_LOG(debug, "Adding host info for {}", host);
301 :
302 0 : std::unique_ptr<Upstream::HostVector> hosts_added;
303 0 : addOrUpdateHost(host, host_info, hosts_added);
304 0 : if (hosts_added != nullptr) {
305 0 : ASSERT(!hosts_added->empty());
306 0 : updatePriorityState(*hosts_added, {});
307 0 : }
308 0 : }
309 :
310 : void Cluster::updatePriorityState(const Upstream::HostVector& hosts_added,
311 0 : const Upstream::HostVector& hosts_removed) {
312 0 : Upstream::PriorityStateManager priority_state_manager(*this, local_info_, nullptr);
313 0 : priority_state_manager.initializePriorityFor(dummy_locality_lb_endpoint_);
314 0 : {
315 0 : absl::ReaderMutexLock lock{&host_map_lock_};
316 0 : for (const auto& host : host_map_) {
317 0 : priority_state_manager.registerHostForPriority(host.second.logical_host_,
318 0 : dummy_locality_lb_endpoint_);
319 0 : }
320 0 : }
321 0 : priority_state_manager.updateClusterPrioritySet(
322 0 : 0, std::move(priority_state_manager.priorityState()[0].first), hosts_added, hosts_removed,
323 0 : absl::nullopt, absl::nullopt, absl::nullopt);
324 0 : }
325 :
326 0 : void Cluster::onDnsHostRemove(const std::string& host) {
327 0 : Upstream::HostVector hosts_removed;
328 0 : {
329 0 : absl::WriterMutexLock lock{&host_map_lock_};
330 0 : const auto host_map_it = host_map_.find(host);
331 0 : ASSERT(host_map_it != host_map_.end());
332 0 : hosts_removed.emplace_back(host_map_it->second.logical_host_);
333 0 : host_map_.erase(host);
334 0 : ENVOY_LOG(debug, "removing dfproxy cluster host '{}'", host);
335 0 : }
336 0 : updatePriorityState({}, hosts_removed);
337 0 : }
338 :
339 : Upstream::HostConstSharedPtr
340 0 : Cluster::LoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {
341 0 : if (!context) {
342 0 : return nullptr;
343 0 : }
344 :
345 0 : const Router::StringAccessor* dynamic_host_filter_state = nullptr;
346 0 : if (context->requestStreamInfo()) {
347 0 : dynamic_host_filter_state =
348 0 : context->requestStreamInfo()->filterState().getDataReadOnly<Router::StringAccessor>(
349 0 : DynamicHostFilterStateKey);
350 0 : }
351 :
352 0 : absl::string_view raw_host;
353 0 : if (dynamic_host_filter_state) {
354 0 : raw_host = dynamic_host_filter_state->asString();
355 0 : } else if (context->downstreamHeaders()) {
356 0 : raw_host = context->downstreamHeaders()->getHostValue();
357 0 : } else if (context->downstreamConnection()) {
358 0 : raw_host = context->downstreamConnection()->requestedServerName();
359 0 : }
360 :
361 : // For host lookup, we need to make sure to match the host of any DNS cache
362 : // insert. Two code points currently do DNS cache insert: the http DFP filter,
363 : // which inserts for HTTP traffic, and sets port based on the cluster's
364 : // security level, and the SNI DFP network filter which sets port based on
365 : // stream metadata, or configuration (which is then added as stream metadata).
366 0 : const bool is_secure = cluster_.info()
367 0 : ->transportSocketMatcher()
368 0 : .resolve(nullptr)
369 0 : .factory_.implementsSecureTransport();
370 0 : uint32_t port = is_secure ? 443 : 80;
371 0 : if (context->requestStreamInfo()) {
372 0 : const StreamInfo::UInt32Accessor* dynamic_port_filter_state =
373 0 : context->requestStreamInfo()->filterState().getDataReadOnly<StreamInfo::UInt32Accessor>(
374 0 : DynamicPortFilterStateKey);
375 0 : if (dynamic_port_filter_state != nullptr && dynamic_port_filter_state->value() > 0 &&
376 0 : dynamic_port_filter_state->value() <= 65535) {
377 0 : port = dynamic_port_filter_state->value();
378 0 : }
379 0 : }
380 :
381 0 : std::string host = Common::DynamicForwardProxy::DnsHostInfo::normalizeHostForDfp(raw_host, port);
382 :
383 0 : if (host.empty()) {
384 0 : ENVOY_LOG(debug, "host empty");
385 0 : return nullptr;
386 0 : }
387 :
388 0 : if (cluster_.enableSubCluster()) {
389 0 : return cluster_.chooseHost(host, context);
390 0 : }
391 :
392 0 : {
393 0 : absl::ReaderMutexLock lock{&cluster_.host_map_lock_};
394 0 : const auto host_it = cluster_.host_map_.find(host);
395 0 : if (host_it == cluster_.host_map_.end()) {
396 0 : ENVOY_LOG(debug, "host {} not found", host);
397 0 : return nullptr;
398 0 : } else {
399 0 : if (host_it->second.logical_host_->coarseHealth() == Upstream::Host::Health::Unhealthy) {
400 0 : ENVOY_LOG(debug, "host {} is unhealthy", host);
401 0 : return nullptr;
402 0 : }
403 0 : host_it->second.shared_host_info_->touch();
404 0 : return host_it->second.logical_host_;
405 0 : }
406 0 : }
407 0 : }
408 :
409 : absl::optional<Upstream::SelectedPoolAndConnection>
410 : Cluster::LoadBalancer::selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
411 : const Upstream::Host& host,
412 0 : std::vector<uint8_t>& hash_key) {
413 0 : const std::string& hostname = host.hostname();
414 0 : if (hostname.empty()) {
415 0 : return absl::nullopt;
416 0 : }
417 :
418 0 : LookupKey key = {hash_key, *host.address()};
419 0 : auto it = connection_info_map_.find(key);
420 0 : if (it == connection_info_map_.end()) {
421 0 : return absl::nullopt;
422 0 : }
423 :
424 0 : for (auto& info : it->second) {
425 0 : Envoy::Ssl::ConnectionInfoConstSharedPtr ssl = info.connection_->ssl();
426 0 : ASSERT(ssl);
427 0 : for (const std::string& san : ssl->dnsSansPeerCertificate()) {
428 0 : if (Extensions::TransportSockets::Tls::Utility::dnsNameMatch(hostname, san)) {
429 0 : return Upstream::SelectedPoolAndConnection{*info.pool_, *info.connection_};
430 0 : }
431 0 : }
432 0 : }
433 :
434 0 : return absl::nullopt;
435 0 : }
436 :
437 : OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks>
438 0 : Cluster::LoadBalancer::lifetimeCallbacks() {
439 0 : if (!cluster_.allowCoalescedConnections()) {
440 0 : return {};
441 0 : }
442 0 : return makeOptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks>(*this);
443 0 : }
444 :
445 : void Cluster::LoadBalancer::onConnectionOpen(Envoy::Http::ConnectionPool::Instance& pool,
446 : std::vector<uint8_t>& hash_key,
447 0 : const Network::Connection& connection) {
448 : // Only coalesce connections that are over TLS.
449 0 : if (!connection.ssl()) {
450 0 : return;
451 0 : }
452 0 : const std::string alpn = connection.nextProtocol();
453 0 : if (alpn != Http::Utility::AlpnNames::get().Http2 &&
454 0 : alpn != Http::Utility::AlpnNames::get().Http3) {
455 : // Only coalesce connections for HTTP/2 and HTTP/3.
456 0 : return;
457 0 : }
458 0 : const LookupKey key = {hash_key, *connection.connectionInfoProvider().remoteAddress()};
459 0 : ConnectionInfo info = {&pool, &connection};
460 0 : connection_info_map_[key].push_back(info);
461 0 : }
462 :
463 : void Cluster::LoadBalancer::onConnectionDraining(Envoy::Http::ConnectionPool::Instance& pool,
464 : std::vector<uint8_t>& hash_key,
465 0 : const Network::Connection& connection) {
466 0 : const LookupKey key = {hash_key, *connection.connectionInfoProvider().remoteAddress()};
467 0 : connection_info_map_[key].erase(
468 0 : std::remove_if(connection_info_map_[key].begin(), connection_info_map_[key].end(),
469 0 : [&pool, &connection](const ConnectionInfo& info) {
470 0 : return (info.pool_ == &pool && info.connection_ == &connection);
471 0 : }),
472 0 : connection_info_map_[key].end());
473 0 : }
474 :
475 : absl::StatusOr<std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
476 : ClusterFactory::createClusterWithConfig(
477 : const envoy::config::cluster::v3::Cluster& cluster,
478 : const envoy::extensions::clusters::dynamic_forward_proxy::v3::ClusterConfig& proto_config,
479 0 : Upstream::ClusterFactoryContext& context) {
480 :
481 0 : Extensions::Common::DynamicForwardProxy::DnsCacheManagerFactoryImpl cache_manager_factory(
482 0 : context.serverFactoryContext(), context.messageValidationVisitor());
483 :
484 0 : envoy::config::cluster::v3::Cluster cluster_config = cluster;
485 0 : if (!cluster_config.has_upstream_http_protocol_options()) {
486 : // This sets defaults which will only apply if using old style http config.
487 : // They will be a no-op if typed_extension_protocol_options are used for
488 : // http config.
489 0 : cluster_config.mutable_upstream_http_protocol_options()->set_auto_sni(true);
490 0 : cluster_config.mutable_upstream_http_protocol_options()->set_auto_san_validation(true);
491 0 : }
492 :
493 0 : Extensions::Common::DynamicForwardProxy::DnsCacheManagerSharedPtr cache_manager =
494 0 : cache_manager_factory.get();
495 0 : auto dns_cache_or_error = cache_manager->getCache(proto_config.dns_cache_config());
496 0 : RETURN_IF_STATUS_NOT_OK(dns_cache_or_error);
497 :
498 0 : auto new_cluster =
499 0 : std::shared_ptr<Cluster>(new Cluster(cluster_config, std::move(dns_cache_or_error.value()),
500 0 : proto_config, context, std::move(cache_manager)));
501 :
502 0 : Extensions::Common::DynamicForwardProxy::DFPClusterStoreFactory cluster_store_factory(
503 0 : context.serverFactoryContext().singletonManager());
504 0 : cluster_store_factory.get()->save(new_cluster->info()->name(), new_cluster);
505 :
506 0 : auto& options = new_cluster->info()->upstreamHttpProtocolOptions();
507 :
508 0 : if (!proto_config.allow_insecure_cluster_options()) {
509 0 : if (!options.has_value() ||
510 0 : (!options.value().auto_sni() || !options.value().auto_san_validation())) {
511 0 : return absl::InvalidArgumentError(
512 0 : "dynamic_forward_proxy cluster must have auto_sni and auto_san_validation true unless "
513 0 : "allow_insecure_cluster_options is set.");
514 0 : }
515 0 : }
516 0 : if (proto_config.has_sub_clusters_config() &&
517 0 : proto_config.sub_clusters_config().lb_policy() ==
518 0 : envoy::config::cluster::v3::Cluster_LbPolicy::Cluster_LbPolicy_CLUSTER_PROVIDED) {
519 0 : return absl::InvalidArgumentError(
520 0 : "unsupported lb_policy 'CLUSTER_PROVIDED' in sub_cluster_config");
521 0 : }
522 :
523 0 : auto lb = std::make_unique<Cluster::ThreadAwareLoadBalancer>(*new_cluster);
524 0 : return std::make_pair(new_cluster, std::move(lb));
525 0 : }
526 :
527 : REGISTER_FACTORY(ClusterFactory, Upstream::ClusterFactory);
528 :
529 : } // namespace DynamicForwardProxy
530 : } // namespace Clusters
531 : } // namespace Extensions
532 : } // namespace Envoy
|