LCOV - code coverage report
Current view: top level - source/extensions/clusters/dynamic_forward_proxy - cluster.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 2 347 0.6 %
Date: 2024-01-05 06:35:25 Functions: 2 27 7.4 %

          Line data    Source code
       1             : #include "source/extensions/clusters/dynamic_forward_proxy/cluster.h"
       2             : 
       3             : #include <algorithm>
       4             : 
       5             : #include "envoy/config/cluster/v3/cluster.pb.h"
       6             : #include "envoy/extensions/clusters/dynamic_forward_proxy/v3/cluster.pb.h"
       7             : #include "envoy/extensions/clusters/dynamic_forward_proxy/v3/cluster.pb.validate.h"
       8             : #include "envoy/router/string_accessor.h"
       9             : #include "envoy/stream_info/uint32_accessor.h"
      10             : 
      11             : #include "source/common/http/utility.h"
      12             : #include "source/common/network/transport_socket_options_impl.h"
      13             : #include "source/common/router/string_accessor_impl.h"
      14             : #include "source/common/stream_info/uint32_accessor_impl.h"
      15             : #include "source/extensions/common/dynamic_forward_proxy/dns_cache_manager_impl.h"
      16             : #include "source/extensions/transport_sockets/tls/cert_validator/default_validator.h"
      17             : #include "source/extensions/transport_sockets/tls/utility.h"
      18             : 
      19             : namespace Envoy {
      20             : namespace Extensions {
      21             : namespace Clusters {
      22             : namespace DynamicForwardProxy {
      23             : 
      24             : namespace {
      25             : constexpr absl::string_view DynamicHostFilterStateKey = "envoy.upstream.dynamic_host";
      26             : constexpr absl::string_view DynamicPortFilterStateKey = "envoy.upstream.dynamic_port";
      27             : 
      28             : class DynamicHostObjectFactory : public StreamInfo::FilterState::ObjectFactory {
      29             : public:
      30           3 :   std::string name() const override { return std::string(DynamicHostFilterStateKey); }
      31             :   std::unique_ptr<StreamInfo::FilterState::Object>
      32           0 :   createFromBytes(absl::string_view data) const override {
      33           0 :     return std::make_unique<Router::StringAccessorImpl>(data);
      34           0 :   }
      35             : };
      36             : class DynamicPortObjectFactory : public StreamInfo::FilterState::ObjectFactory {
      37             : public:
      38           3 :   std::string name() const override { return std::string(DynamicPortFilterStateKey); }
      39             :   std::unique_ptr<StreamInfo::FilterState::Object>
      40           0 :   createFromBytes(absl::string_view data) const override {
      41           0 :     uint32_t port = 0;
      42           0 :     if (absl::SimpleAtoi(data, &port)) {
      43           0 :       return std::make_unique<StreamInfo::UInt32AccessorImpl>(port);
      44           0 :     }
      45           0 :     return nullptr;
      46           0 :   }
      47             : };
      48             : 
      49             : } // namespace
      50             : 
      51             : REGISTER_FACTORY(DynamicHostObjectFactory, StreamInfo::FilterState::ObjectFactory);
      52             : REGISTER_FACTORY(DynamicPortObjectFactory, StreamInfo::FilterState::ObjectFactory);
      53             : 
      54             : Cluster::Cluster(
      55             :     const envoy::config::cluster::v3::Cluster& cluster,
      56             :     Extensions::Common::DynamicForwardProxy::DnsCacheSharedPtr&& cache,
      57             :     const envoy::extensions::clusters::dynamic_forward_proxy::v3::ClusterConfig& config,
      58             :     Upstream::ClusterFactoryContext& context,
      59             :     Extensions::Common::DynamicForwardProxy::DnsCacheManagerSharedPtr&& cache_manager)
      60             :     : Upstream::BaseDynamicClusterImpl(cluster, context),
      61             :       dns_cache_manager_(std::move(cache_manager)), dns_cache_(std::move(cache)),
      62             :       update_callbacks_handle_(dns_cache_->addUpdateCallbacks(*this)),
      63             :       local_info_(context.serverFactoryContext().localInfo()),
      64             :       main_thread_dispatcher_(context.serverFactoryContext().mainThreadDispatcher()),
      65             :       orig_cluster_config_(cluster),
      66             :       allow_coalesced_connections_(config.allow_coalesced_connections()),
      67             :       cm_(context.clusterManager()), max_sub_clusters_(PROTOBUF_GET_WRAPPED_OR_DEFAULT(
      68             :                                          config.sub_clusters_config(), max_sub_clusters, 1024)),
      69             :       sub_cluster_ttl_(
      70             :           PROTOBUF_GET_MS_OR_DEFAULT(config.sub_clusters_config(), sub_cluster_ttl, 300000)),
      71             :       sub_cluster_lb_policy_(config.sub_clusters_config().lb_policy()),
      72           0 :       enable_sub_cluster_(config.has_sub_clusters_config()) {
      73             : 
      74           0 :   if (enable_sub_cluster_) {
      75           0 :     idle_timer_ = main_thread_dispatcher_.createTimer([this]() { checkIdleSubCluster(); });
      76           0 :     idle_timer_->enableTimer(sub_cluster_ttl_);
      77           0 :   }
      78           0 : }
      79             : 
      80           0 : Cluster::~Cluster() {
      81           0 :   if (enable_sub_cluster_) {
      82           0 :     idle_timer_->disableTimer();
      83           0 :     idle_timer_.reset();
      84           0 :   }
      85           0 :   if (cm_.isShutdown()) {
      86           0 :     return;
      87           0 :   }
      88             :   // Should remove all sub clusters, otherwise, might be memory leaking.
      89             :   // This lock is useless, just make compiler happy.
      90           0 :   absl::WriterMutexLock lock{&cluster_map_lock_};
      91           0 :   for (auto it = cluster_map_.cbegin(); it != cluster_map_.cend();) {
      92           0 :     auto cluster_name = it->first;
      93           0 :     ENVOY_LOG(debug, "cluster='{}' removing from cluster_map & cluster manager", cluster_name);
      94           0 :     cluster_map_.erase(it++);
      95           0 :     cm_.removeCluster(cluster_name);
      96           0 :   }
      97           0 : }
      98             : 
      99           0 : void Cluster::startPreInit() {
     100             :   // If we are attaching to a pre-populated cache we need to initialize our hosts.
     101           0 :   std::unique_ptr<Upstream::HostVector> hosts_added;
     102           0 :   dns_cache_->iterateHostMap(
     103           0 :       [&](absl::string_view host, const Common::DynamicForwardProxy::DnsHostInfoSharedPtr& info) {
     104           0 :         addOrUpdateHost(host, info, hosts_added);
     105           0 :       });
     106           0 :   if (hosts_added) {
     107           0 :     updatePriorityState(*hosts_added, {});
     108           0 :   }
     109           0 :   onPreInitComplete();
     110           0 : }
     111             : 
     112           0 : bool Cluster::touch(const std::string& cluster_name) {
     113           0 :   absl::ReaderMutexLock lock{&cluster_map_lock_};
     114           0 :   const auto cluster_it = cluster_map_.find(cluster_name);
     115           0 :   if (cluster_it != cluster_map_.end()) {
     116           0 :     cluster_it->second->touch();
     117           0 :     return true;
     118           0 :   }
     119           0 :   ENVOY_LOG(debug, "cluster='{}' has been removed while touching", cluster_name);
     120           0 :   return false;
     121           0 : }
     122             : 
     123           0 : void Cluster::checkIdleSubCluster() {
     124           0 :   ASSERT(main_thread_dispatcher_.isThreadSafe());
     125           0 :   {
     126             :     // TODO: try read lock first.
     127           0 :     absl::WriterMutexLock lock{&cluster_map_lock_};
     128           0 :     for (auto it = cluster_map_.cbegin(); it != cluster_map_.cend();) {
     129           0 :       if (it->second->checkIdle()) {
     130           0 :         auto cluster_name = it->first;
     131           0 :         ENVOY_LOG(debug, "cluster='{}' removing from cluster_map & cluster manager", cluster_name);
     132           0 :         cluster_map_.erase(it++);
     133           0 :         cm_.removeCluster(cluster_name);
     134           0 :       } else {
     135           0 :         ++it;
     136           0 :       }
     137           0 :     }
     138           0 :   }
     139           0 :   idle_timer_->enableTimer(sub_cluster_ttl_);
     140           0 : }
     141             : 
     142             : std::pair<bool, absl::optional<envoy::config::cluster::v3::Cluster>>
     143             : Cluster::createSubClusterConfig(const std::string& cluster_name, const std::string& host,
     144           0 :                                 const int port) {
     145           0 :   {
     146           0 :     absl::WriterMutexLock lock{&cluster_map_lock_};
     147           0 :     const auto cluster_it = cluster_map_.find(cluster_name);
     148           0 :     if (cluster_it != cluster_map_.end()) {
     149           0 :       cluster_it->second->touch();
     150           0 :       return std::make_pair(true, absl::nullopt);
     151           0 :     }
     152           0 :     if (cluster_map_.size() >= max_sub_clusters_) {
     153           0 :       ENVOY_LOG(debug, "cluster='{}' create failed due to max sub cluster limitation",
     154           0 :                 cluster_name);
     155           0 :       return std::make_pair(false, absl::nullopt);
     156           0 :     }
     157           0 :     cluster_map_.emplace(cluster_name, std::make_shared<ClusterInfo>(cluster_name, *this));
     158           0 :   }
     159             : 
     160             :   // Inherit configuration from the parent DFP cluster.
     161           0 :   envoy::config::cluster::v3::Cluster config = orig_cluster_config_;
     162             : 
     163             :   // Overwrite the type.
     164           0 :   config.set_name(cluster_name);
     165           0 :   config.clear_cluster_type();
     166           0 :   config.set_lb_policy(sub_cluster_lb_policy_);
     167           0 :   config.set_type(
     168           0 :       envoy::config::cluster::v3::Cluster_DiscoveryType::Cluster_DiscoveryType_STRICT_DNS);
     169             : 
     170             :   // Set endpoint.
     171           0 :   auto load_assignments = config.mutable_load_assignment();
     172           0 :   load_assignments->set_cluster_name(cluster_name);
     173           0 :   load_assignments->clear_endpoints();
     174             : 
     175           0 :   auto socket_address = load_assignments->add_endpoints()
     176           0 :                             ->add_lb_endpoints()
     177           0 :                             ->mutable_endpoint()
     178           0 :                             ->mutable_address()
     179           0 :                             ->mutable_socket_address();
     180           0 :   socket_address->set_address(host);
     181           0 :   socket_address->set_port_value(port);
     182             : 
     183           0 :   return std::make_pair(true, absl::make_optional(config));
     184           0 : }
     185             : 
     186             : Upstream::HostConstSharedPtr Cluster::chooseHost(absl::string_view host,
     187           0 :                                                  Upstream::LoadBalancerContext* context) const {
     188           0 :   uint16_t default_port = 80;
     189           0 :   if (info_->transportSocketMatcher().resolve(nullptr).factory_.implementsSecureTransport()) {
     190           0 :     default_port = 443;
     191           0 :   }
     192             : 
     193           0 :   const auto host_attributes = Http::Utility::parseAuthority(host);
     194           0 :   auto dynamic_host = std::string(host_attributes.host_);
     195           0 :   auto port = host_attributes.port_.value_or(default_port);
     196             : 
     197             :   // cluster name is prefix + host + port
     198           0 :   auto cluster_name = "DFPCluster:" + dynamic_host + ":" + std::to_string(port);
     199             : 
     200             :   // try again to get the sub cluster.
     201           0 :   auto cluster = cm_.getThreadLocalCluster(cluster_name);
     202           0 :   if (cluster == nullptr) {
     203           0 :     ENVOY_LOG(debug, "cluster='{}' get thread local failed, too short ttl?", cluster_name);
     204           0 :     return nullptr;
     205           0 :   }
     206             : 
     207           0 :   return cluster->loadBalancer().chooseHost(context);
     208           0 : }
     209             : 
     210             : Cluster::ClusterInfo::ClusterInfo(std::string cluster_name, Cluster& parent)
     211           0 :     : cluster_name_(cluster_name), parent_(parent) {
     212           0 :   ENVOY_LOG(debug, "cluster='{}' ClusterInfo created", cluster_name_);
     213           0 :   touch();
     214           0 : }
     215             : 
     216           0 : void Cluster::ClusterInfo::touch() {
     217           0 :   ENVOY_LOG(debug, "cluster='{}' updating last used time", cluster_name_);
     218           0 :   last_used_time_ = parent_.time_source_.monotonicTime().time_since_epoch();
     219           0 : }
     220             : 
     221             : // checkIdle run in the main thread.
     222           0 : bool Cluster::ClusterInfo::checkIdle() {
     223           0 :   ASSERT(parent_.main_thread_dispatcher_.isThreadSafe());
     224             : 
     225           0 :   const std::chrono::steady_clock::duration now_duration =
     226           0 :       parent_.main_thread_dispatcher_.timeSource().monotonicTime().time_since_epoch();
     227           0 :   auto last_used_time = last_used_time_.load();
     228           0 :   ENVOY_LOG(debug, "cluster='{}' TTL check: now={} last_used={} TTL {}", cluster_name_,
     229           0 :             now_duration.count(), last_used_time.count(), parent_.sub_cluster_ttl_.count());
     230             : 
     231           0 :   if ((now_duration - last_used_time) > parent_.sub_cluster_ttl_) {
     232           0 :     ENVOY_LOG(debug, "cluster='{}' TTL expired", cluster_name_);
     233           0 :     return true;
     234           0 :   }
     235           0 :   return false;
     236           0 : }
     237             : 
     238             : void Cluster::addOrUpdateHost(
     239             :     absl::string_view host,
     240             :     const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info,
     241           0 :     std::unique_ptr<Upstream::HostVector>& hosts_added) {
     242           0 :   Upstream::LogicalHostSharedPtr emplaced_host;
     243           0 :   {
     244           0 :     absl::WriterMutexLock lock{&host_map_lock_};
     245             : 
     246             :     // NOTE: Right now we allow a DNS cache to be shared between multiple clusters. Though we have
     247             :     // connection/request circuit breakers on the cluster, we don't have any way to control the
     248             :     // maximum hosts on a cluster. We currently assume that host data shared via shared pointer is
     249             :     // a marginal memory cost above that already used by connections and requests, so relying on
     250             :     // connection/request circuit breakers is sufficient. We may have to revisit this in the
     251             :     // future.
     252           0 :     const auto host_map_it = host_map_.find(host);
     253           0 :     if (host_map_it != host_map_.end()) {
     254             :       // If we only have an address change, we can do that swap inline without any other updates.
     255             :       // The appropriate R/W locking is in place to allow this. The details of this locking are:
     256             :       //  - Hosts are not thread local, they are global.
     257             :       //  - We take a read lock when reading the address and a write lock when changing it.
     258             :       //  - Address updates are very rare.
     259             :       //  - Address reads are only done when a connection is being made and a "real" host
     260             :       //    description is created or the host is queried via the admin endpoint. Both of
     261             :       //    these operations are relatively rare and the read lock is held for a short period
     262             :       //    of time.
     263             :       //
     264             :       // TODO(mattklein123): Right now the dynamic forward proxy / DNS cache works similar to how
     265             :       //                     logical DNS works, meaning that we only store a single address per
     266             :       //                     resolution. It would not be difficult to also expose strict DNS
     267             :       //                     semantics, meaning the cache would expose multiple addresses and the
     268             :       //                     cluster would create multiple logical hosts based on those addresses.
     269             :       //                     We will leave this is a follow up depending on need.
     270           0 :       ASSERT(host_info == host_map_it->second.shared_host_info_);
     271           0 :       ASSERT(host_map_it->second.shared_host_info_->address() !=
     272           0 :              host_map_it->second.logical_host_->address());
     273           0 :       ENVOY_LOG(debug, "updating dfproxy cluster host address '{}'", host);
     274           0 :       host_map_it->second.logical_host_->setNewAddresses(
     275           0 :           host_info->address(), host_info->addressList(), dummy_lb_endpoint_);
     276           0 :       return;
     277           0 :     }
     278             : 
     279           0 :     ENVOY_LOG(debug, "adding new dfproxy cluster host '{}'", host);
     280             : 
     281           0 :     emplaced_host = host_map_
     282           0 :                         .try_emplace(host, host_info,
     283           0 :                                      std::make_shared<Upstream::LogicalHost>(
     284           0 :                                          info(), std::string{host}, host_info->address(),
     285           0 :                                          host_info->addressList(), dummy_locality_lb_endpoint_,
     286           0 :                                          dummy_lb_endpoint_, nullptr, time_source_))
     287           0 :                         .first->second.logical_host_;
     288           0 :   }
     289             : 
     290           0 :   ASSERT(emplaced_host);
     291           0 :   if (hosts_added == nullptr) {
     292           0 :     hosts_added = std::make_unique<Upstream::HostVector>();
     293           0 :   }
     294           0 :   hosts_added->emplace_back(emplaced_host);
     295           0 : }
     296             : 
     297             : void Cluster::onDnsHostAddOrUpdate(
     298             :     const std::string& host,
     299           0 :     const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info) {
     300           0 :   ENVOY_LOG(debug, "Adding host info for {}", host);
     301             : 
     302           0 :   std::unique_ptr<Upstream::HostVector> hosts_added;
     303           0 :   addOrUpdateHost(host, host_info, hosts_added);
     304           0 :   if (hosts_added != nullptr) {
     305           0 :     ASSERT(!hosts_added->empty());
     306           0 :     updatePriorityState(*hosts_added, {});
     307           0 :   }
     308           0 : }
     309             : 
     310             : void Cluster::updatePriorityState(const Upstream::HostVector& hosts_added,
     311           0 :                                   const Upstream::HostVector& hosts_removed) {
     312           0 :   Upstream::PriorityStateManager priority_state_manager(*this, local_info_, nullptr);
     313           0 :   priority_state_manager.initializePriorityFor(dummy_locality_lb_endpoint_);
     314           0 :   {
     315           0 :     absl::ReaderMutexLock lock{&host_map_lock_};
     316           0 :     for (const auto& host : host_map_) {
     317           0 :       priority_state_manager.registerHostForPriority(host.second.logical_host_,
     318           0 :                                                      dummy_locality_lb_endpoint_);
     319           0 :     }
     320           0 :   }
     321           0 :   priority_state_manager.updateClusterPrioritySet(
     322           0 :       0, std::move(priority_state_manager.priorityState()[0].first), hosts_added, hosts_removed,
     323           0 :       absl::nullopt, absl::nullopt, absl::nullopt);
     324           0 : }
     325             : 
     326           0 : void Cluster::onDnsHostRemove(const std::string& host) {
     327           0 :   Upstream::HostVector hosts_removed;
     328           0 :   {
     329           0 :     absl::WriterMutexLock lock{&host_map_lock_};
     330           0 :     const auto host_map_it = host_map_.find(host);
     331           0 :     ASSERT(host_map_it != host_map_.end());
     332           0 :     hosts_removed.emplace_back(host_map_it->second.logical_host_);
     333           0 :     host_map_.erase(host);
     334           0 :     ENVOY_LOG(debug, "removing dfproxy cluster host '{}'", host);
     335           0 :   }
     336           0 :   updatePriorityState({}, hosts_removed);
     337           0 : }
     338             : 
     339             : Upstream::HostConstSharedPtr
     340           0 : Cluster::LoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {
     341           0 :   if (!context) {
     342           0 :     return nullptr;
     343           0 :   }
     344             : 
     345           0 :   const Router::StringAccessor* dynamic_host_filter_state = nullptr;
     346           0 :   if (context->requestStreamInfo()) {
     347           0 :     dynamic_host_filter_state =
     348           0 :         context->requestStreamInfo()->filterState().getDataReadOnly<Router::StringAccessor>(
     349           0 :             DynamicHostFilterStateKey);
     350           0 :   }
     351             : 
     352           0 :   absl::string_view raw_host;
     353           0 :   if (dynamic_host_filter_state) {
     354           0 :     raw_host = dynamic_host_filter_state->asString();
     355           0 :   } else if (context->downstreamHeaders()) {
     356           0 :     raw_host = context->downstreamHeaders()->getHostValue();
     357           0 :   } else if (context->downstreamConnection()) {
     358           0 :     raw_host = context->downstreamConnection()->requestedServerName();
     359           0 :   }
     360             : 
     361             :   // For host lookup, we need to make sure to match the host of any DNS cache
     362             :   // insert. Two code points currently do DNS cache insert: the http DFP filter,
     363             :   // which inserts for HTTP traffic, and sets port based on the cluster's
     364             :   // security level, and the SNI DFP network filter which sets port based on
     365             :   // stream metadata, or configuration (which is then added as stream metadata).
     366           0 :   const bool is_secure = cluster_.info()
     367           0 :                              ->transportSocketMatcher()
     368           0 :                              .resolve(nullptr)
     369           0 :                              .factory_.implementsSecureTransport();
     370           0 :   uint32_t port = is_secure ? 443 : 80;
     371           0 :   if (context->requestStreamInfo()) {
     372           0 :     const StreamInfo::UInt32Accessor* dynamic_port_filter_state =
     373           0 :         context->requestStreamInfo()->filterState().getDataReadOnly<StreamInfo::UInt32Accessor>(
     374           0 :             DynamicPortFilterStateKey);
     375           0 :     if (dynamic_port_filter_state != nullptr && dynamic_port_filter_state->value() > 0 &&
     376           0 :         dynamic_port_filter_state->value() <= 65535) {
     377           0 :       port = dynamic_port_filter_state->value();
     378           0 :     }
     379           0 :   }
     380             : 
     381           0 :   std::string host = Common::DynamicForwardProxy::DnsHostInfo::normalizeHostForDfp(raw_host, port);
     382             : 
     383           0 :   if (host.empty()) {
     384           0 :     ENVOY_LOG(debug, "host empty");
     385           0 :     return nullptr;
     386           0 :   }
     387             : 
     388           0 :   if (cluster_.enableSubCluster()) {
     389           0 :     return cluster_.chooseHost(host, context);
     390           0 :   }
     391             : 
     392           0 :   {
     393           0 :     absl::ReaderMutexLock lock{&cluster_.host_map_lock_};
     394           0 :     const auto host_it = cluster_.host_map_.find(host);
     395           0 :     if (host_it == cluster_.host_map_.end()) {
     396           0 :       ENVOY_LOG(debug, "host {} not found", host);
     397           0 :       return nullptr;
     398           0 :     } else {
     399           0 :       if (host_it->second.logical_host_->coarseHealth() == Upstream::Host::Health::Unhealthy) {
     400           0 :         ENVOY_LOG(debug, "host {} is unhealthy", host);
     401           0 :         return nullptr;
     402           0 :       }
     403           0 :       host_it->second.shared_host_info_->touch();
     404           0 :       return host_it->second.logical_host_;
     405           0 :     }
     406           0 :   }
     407           0 : }
     408             : 
     409             : absl::optional<Upstream::SelectedPoolAndConnection>
     410             : Cluster::LoadBalancer::selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
     411             :                                                 const Upstream::Host& host,
     412           0 :                                                 std::vector<uint8_t>& hash_key) {
     413           0 :   const std::string& hostname = host.hostname();
     414           0 :   if (hostname.empty()) {
     415           0 :     return absl::nullopt;
     416           0 :   }
     417             : 
     418           0 :   LookupKey key = {hash_key, *host.address()};
     419           0 :   auto it = connection_info_map_.find(key);
     420           0 :   if (it == connection_info_map_.end()) {
     421           0 :     return absl::nullopt;
     422           0 :   }
     423             : 
     424           0 :   for (auto& info : it->second) {
     425           0 :     Envoy::Ssl::ConnectionInfoConstSharedPtr ssl = info.connection_->ssl();
     426           0 :     ASSERT(ssl);
     427           0 :     for (const std::string& san : ssl->dnsSansPeerCertificate()) {
     428           0 :       if (Extensions::TransportSockets::Tls::Utility::dnsNameMatch(hostname, san)) {
     429           0 :         return Upstream::SelectedPoolAndConnection{*info.pool_, *info.connection_};
     430           0 :       }
     431           0 :     }
     432           0 :   }
     433             : 
     434           0 :   return absl::nullopt;
     435           0 : }
     436             : 
     437             : OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks>
     438           0 : Cluster::LoadBalancer::lifetimeCallbacks() {
     439           0 :   if (!cluster_.allowCoalescedConnections()) {
     440           0 :     return {};
     441           0 :   }
     442           0 :   return makeOptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks>(*this);
     443           0 : }
     444             : 
     445             : void Cluster::LoadBalancer::onConnectionOpen(Envoy::Http::ConnectionPool::Instance& pool,
     446             :                                              std::vector<uint8_t>& hash_key,
     447           0 :                                              const Network::Connection& connection) {
     448             :   // Only coalesce connections that are over TLS.
     449           0 :   if (!connection.ssl()) {
     450           0 :     return;
     451           0 :   }
     452           0 :   const std::string alpn = connection.nextProtocol();
     453           0 :   if (alpn != Http::Utility::AlpnNames::get().Http2 &&
     454           0 :       alpn != Http::Utility::AlpnNames::get().Http3) {
     455             :     // Only coalesce connections for HTTP/2 and HTTP/3.
     456           0 :     return;
     457           0 :   }
     458           0 :   const LookupKey key = {hash_key, *connection.connectionInfoProvider().remoteAddress()};
     459           0 :   ConnectionInfo info = {&pool, &connection};
     460           0 :   connection_info_map_[key].push_back(info);
     461           0 : }
     462             : 
     463             : void Cluster::LoadBalancer::onConnectionDraining(Envoy::Http::ConnectionPool::Instance& pool,
     464             :                                                  std::vector<uint8_t>& hash_key,
     465           0 :                                                  const Network::Connection& connection) {
     466           0 :   const LookupKey key = {hash_key, *connection.connectionInfoProvider().remoteAddress()};
     467           0 :   connection_info_map_[key].erase(
     468           0 :       std::remove_if(connection_info_map_[key].begin(), connection_info_map_[key].end(),
     469           0 :                      [&pool, &connection](const ConnectionInfo& info) {
     470           0 :                        return (info.pool_ == &pool && info.connection_ == &connection);
     471           0 :                      }),
     472           0 :       connection_info_map_[key].end());
     473           0 : }
     474             : 
     475             : absl::StatusOr<std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
     476             : ClusterFactory::createClusterWithConfig(
     477             :     const envoy::config::cluster::v3::Cluster& cluster,
     478             :     const envoy::extensions::clusters::dynamic_forward_proxy::v3::ClusterConfig& proto_config,
     479           0 :     Upstream::ClusterFactoryContext& context) {
     480             : 
     481           0 :   Extensions::Common::DynamicForwardProxy::DnsCacheManagerFactoryImpl cache_manager_factory(
     482           0 :       context.serverFactoryContext(), context.messageValidationVisitor());
     483             : 
     484           0 :   envoy::config::cluster::v3::Cluster cluster_config = cluster;
     485           0 :   if (!cluster_config.has_upstream_http_protocol_options()) {
     486             :     // This sets defaults which will only apply if using old style http config.
     487             :     // They will be a no-op if typed_extension_protocol_options are used for
     488             :     // http config.
     489           0 :     cluster_config.mutable_upstream_http_protocol_options()->set_auto_sni(true);
     490           0 :     cluster_config.mutable_upstream_http_protocol_options()->set_auto_san_validation(true);
     491           0 :   }
     492             : 
     493           0 :   Extensions::Common::DynamicForwardProxy::DnsCacheManagerSharedPtr cache_manager =
     494           0 :       cache_manager_factory.get();
     495           0 :   auto dns_cache_or_error = cache_manager->getCache(proto_config.dns_cache_config());
     496           0 :   RETURN_IF_STATUS_NOT_OK(dns_cache_or_error);
     497             : 
     498           0 :   auto new_cluster =
     499           0 :       std::shared_ptr<Cluster>(new Cluster(cluster_config, std::move(dns_cache_or_error.value()),
     500           0 :                                            proto_config, context, std::move(cache_manager)));
     501             : 
     502           0 :   Extensions::Common::DynamicForwardProxy::DFPClusterStoreFactory cluster_store_factory(
     503           0 :       context.serverFactoryContext().singletonManager());
     504           0 :   cluster_store_factory.get()->save(new_cluster->info()->name(), new_cluster);
     505             : 
     506           0 :   auto& options = new_cluster->info()->upstreamHttpProtocolOptions();
     507             : 
     508           0 :   if (!proto_config.allow_insecure_cluster_options()) {
     509           0 :     if (!options.has_value() ||
     510           0 :         (!options.value().auto_sni() || !options.value().auto_san_validation())) {
     511           0 :       return absl::InvalidArgumentError(
     512           0 :           "dynamic_forward_proxy cluster must have auto_sni and auto_san_validation true unless "
     513           0 :           "allow_insecure_cluster_options is set.");
     514           0 :     }
     515           0 :   }
     516           0 :   if (proto_config.has_sub_clusters_config() &&
     517           0 :       proto_config.sub_clusters_config().lb_policy() ==
     518           0 :           envoy::config::cluster::v3::Cluster_LbPolicy::Cluster_LbPolicy_CLUSTER_PROVIDED) {
     519           0 :     return absl::InvalidArgumentError(
     520           0 :         "unsupported lb_policy 'CLUSTER_PROVIDED' in sub_cluster_config");
     521           0 :   }
     522             : 
     523           0 :   auto lb = std::make_unique<Cluster::ThreadAwareLoadBalancer>(*new_cluster);
     524           0 :   return std::make_pair(new_cluster, std::move(lb));
     525           0 : }
     526             : 
     527             : REGISTER_FACTORY(ClusterFactory, Upstream::ClusterFactory);
     528             : 
     529             : } // namespace DynamicForwardProxy
     530             : } // namespace Clusters
     531             : } // namespace Extensions
     532             : } // namespace Envoy

Generated by: LCOV version 1.15