LCOV - code coverage report
Current view: top level - source/extensions/clusters/original_dst - original_dst_cluster.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 1 252 0.4 %
Date: 2024-01-05 06:35:25 Functions: 1 13 7.7 %

          Line data    Source code
       1             : #include "source/extensions/clusters/original_dst/original_dst_cluster.h"
       2             : 
       3             : #include <chrono>
       4             : #include <list>
       5             : #include <string>
       6             : #include <vector>
       7             : 
       8             : #include "envoy/config/cluster/v3/cluster.pb.h"
       9             : #include "envoy/config/core/v3/base.pb.h"
      10             : #include "envoy/config/core/v3/health_check.pb.h"
      11             : #include "envoy/config/endpoint/v3/endpoint_components.pb.h"
      12             : #include "envoy/stats/scope.h"
      13             : 
      14             : #include "source/common/http/headers.h"
      15             : #include "source/common/network/address_impl.h"
      16             : #include "source/common/network/filter_state_dst_address.h"
      17             : #include "source/common/network/utility.h"
      18             : #include "source/common/protobuf/protobuf.h"
      19             : #include "source/common/protobuf/utility.h"
      20             : #include "source/common/runtime/runtime_features.h"
      21             : 
      22             : namespace Envoy {
      23             : namespace Upstream {
      24             : 
      25           0 : OriginalDstClusterHandle::~OriginalDstClusterHandle() {
      26           0 :   std::shared_ptr<OriginalDstCluster> cluster = std::move(cluster_);
      27           0 :   cluster_.reset();
      28           0 :   Event::Dispatcher& dispatcher = cluster->dispatcher_;
      29           0 :   dispatcher.post([cluster = std::move(cluster)]() mutable { cluster.reset(); });
      30           0 : }
      31             : 
      32           0 : HostConstSharedPtr OriginalDstCluster::LoadBalancer::chooseHost(LoadBalancerContext* context) {
      33           0 :   if (context) {
      34             :     // Check if filter state override is present, if yes use it before anything else.
      35           0 :     Network::Address::InstanceConstSharedPtr dst_host = filterStateOverrideHost(context);
      36             : 
      37             :     // Check if override host metadata is present, if yes use it otherwise check the header.
      38           0 :     if (dst_host == nullptr) {
      39           0 :       dst_host = metadataOverrideHost(context);
      40           0 :     }
      41             : 
      42             :     // Check if override host header is present, if yes use it otherwise check local address.
      43           0 :     if (dst_host == nullptr) {
      44           0 :       dst_host = requestOverrideHost(context);
      45           0 :     }
      46             : 
      47           0 :     if (dst_host == nullptr) {
      48           0 :       const Network::Connection* connection = context->downstreamConnection();
      49             :       // The local address of the downstream connection is the original destination address,
      50             :       // if localAddressRestored() returns 'true'.
      51           0 :       if (connection && connection->connectionInfoProvider().localAddressRestored()) {
      52           0 :         dst_host = connection->connectionInfoProvider().localAddress();
      53           0 :       }
      54           0 :     }
      55           0 :     if (dst_host && port_override_.has_value()) {
      56           0 :       dst_host = Network::Utility::getAddressWithPort(*dst_host.get(), port_override_.value());
      57           0 :     }
      58             : 
      59           0 :     if (dst_host) {
      60           0 :       const Network::Address::Instance& dst_addr = *dst_host.get();
      61             :       // Check if a host with the destination address is already in the host set.
      62           0 :       auto it = host_map_->find(dst_addr.asString());
      63           0 :       if (it != host_map_->end()) {
      64           0 :         HostConstSharedPtr host = it->second->host_;
      65           0 :         ENVOY_LOG(trace, "Using existing host {} {}.", *host, host->address()->asString());
      66           0 :         it->second->used_ = true;
      67           0 :         return host;
      68           0 :       }
      69             :       // Add a new host
      70           0 :       const Network::Address::Ip* dst_ip = dst_addr.ip();
      71           0 :       if (dst_ip) {
      72           0 :         Network::Address::InstanceConstSharedPtr host_ip_port(
      73           0 :             Network::Utility::copyInternetAddressAndPort(*dst_ip));
      74             :         // Create a host we can use immediately.
      75           0 :         auto info = parent_->cluster_->info();
      76           0 :         HostSharedPtr host(std::make_shared<HostImpl>(
      77           0 :             info, info->name() + dst_addr.asString(), std::move(host_ip_port), nullptr, 1,
      78           0 :             envoy::config::core::v3::Locality().default_instance(),
      79           0 :             envoy::config::endpoint::v3::Endpoint::HealthCheckConfig().default_instance(), 0,
      80           0 :             envoy::config::core::v3::UNKNOWN, parent_->cluster_->time_source_));
      81           0 :         ENVOY_LOG(debug, "Created host {} {}.", *host, host->address()->asString());
      82             : 
      83             :         // Tell the cluster about the new host
      84             :         // lambda cannot capture a member by value.
      85           0 :         std::weak_ptr<OriginalDstClusterHandle> post_parent = parent_;
      86           0 :         parent_->cluster_->dispatcher_.post([post_parent, host]() mutable {
      87             :           // The main cluster may have disappeared while this post was queued.
      88           0 :           if (std::shared_ptr<OriginalDstClusterHandle> parent = post_parent.lock()) {
      89           0 :             parent->cluster_->addHost(host);
      90           0 :           }
      91           0 :         });
      92           0 :         return host;
      93           0 :       } else {
      94           0 :         ENVOY_LOG(debug, "Failed to create host for {}.", dst_addr.asString());
      95           0 :       }
      96           0 :     }
      97           0 :   }
      98             :   // TODO(ramaraochavali): add a stat and move this log line to debug.
      99           0 :   ENVOY_LOG(warn, "original_dst_load_balancer: No downstream connection or no original_dst.");
     100           0 :   return nullptr;
     101           0 : }
     102             : 
     103             : Network::Address::InstanceConstSharedPtr
     104           0 : OriginalDstCluster::LoadBalancer::filterStateOverrideHost(LoadBalancerContext* context) {
     105           0 :   const auto streamInfos = {
     106           0 :       context->requestStreamInfo(),
     107           0 :       context->downstreamConnection() ? &context->downstreamConnection()->streamInfo() : nullptr};
     108           0 :   for (const auto streamInfo : streamInfos) {
     109           0 :     if (streamInfo == nullptr) {
     110           0 :       continue;
     111           0 :     }
     112           0 :     const auto* dst_address = streamInfo->filterState().getDataReadOnly<Network::AddressObject>(
     113           0 :         OriginalDstClusterFilterStateKey);
     114           0 :     if (dst_address) {
     115           0 :       return dst_address->address();
     116           0 :     }
     117           0 :   }
     118           0 :   return nullptr;
     119           0 : }
     120             : 
     121             : Network::Address::InstanceConstSharedPtr
     122           0 : OriginalDstCluster::LoadBalancer::requestOverrideHost(LoadBalancerContext* context) {
     123           0 :   if (!http_header_name_.has_value()) {
     124           0 :     return nullptr;
     125           0 :   }
     126           0 :   const Http::HeaderMap* downstream_headers = context->downstreamHeaders();
     127           0 :   if (!downstream_headers) {
     128           0 :     return nullptr;
     129           0 :   }
     130           0 :   Http::HeaderMap::GetResult override_header = downstream_headers->get(*http_header_name_);
     131           0 :   if (override_header.empty()) {
     132           0 :     return nullptr;
     133           0 :   }
     134             :   // This is an implicitly untrusted header, so per the API documentation only the first
     135             :   // value is used.
     136           0 :   const std::string request_override_host(override_header[0]->value().getStringView());
     137           0 :   Network::Address::InstanceConstSharedPtr request_host =
     138           0 :       Network::Utility::parseInternetAddressAndPortNoThrow(request_override_host, false);
     139           0 :   if (request_host == nullptr) {
     140           0 :     ENVOY_LOG(debug, "original_dst_load_balancer: invalid override header value. {}",
     141           0 :               request_override_host);
     142           0 :     parent_->cluster_->info()->trafficStats()->original_dst_host_invalid_.inc();
     143           0 :     return nullptr;
     144           0 :   }
     145           0 :   ENVOY_LOG(debug, "Using request override host {}.", request_override_host);
     146           0 :   return request_host;
     147           0 : }
     148             : 
     149             : Network::Address::InstanceConstSharedPtr
     150           0 : OriginalDstCluster::LoadBalancer::metadataOverrideHost(LoadBalancerContext* context) {
     151           0 :   if (!metadata_key_.has_value()) {
     152           0 :     return nullptr;
     153           0 :   }
     154           0 :   const auto streamInfos = {
     155           0 :       context->requestStreamInfo(),
     156           0 :       context->downstreamConnection() ? &context->downstreamConnection()->streamInfo() : nullptr};
     157           0 :   const ProtobufWkt::Value* value = nullptr;
     158           0 :   for (const auto streamInfo : streamInfos) {
     159           0 :     if (streamInfo == nullptr) {
     160           0 :       continue;
     161           0 :     }
     162           0 :     const auto& metadata = streamInfo->dynamicMetadata();
     163           0 :     value = &Config::Metadata::metadataValue(&metadata, metadata_key_.value());
     164             :     // Path can refer to a list, in which case we extract the first element.
     165           0 :     if (value->kind_case() == ProtobufWkt::Value::kListValue) {
     166           0 :       const auto& values = value->list_value().values();
     167           0 :       if (!values.empty()) {
     168           0 :         value = &(values[0]);
     169           0 :       }
     170           0 :     }
     171           0 :     if (value->kind_case() == ProtobufWkt::Value::kStringValue) {
     172           0 :       break;
     173           0 :     }
     174           0 :   }
     175           0 :   if (value == nullptr || value->kind_case() != ProtobufWkt::Value::kStringValue) {
     176           0 :     return nullptr;
     177           0 :   }
     178           0 :   const std::string& metadata_override_host = value->string_value();
     179           0 :   Network::Address::InstanceConstSharedPtr metadata_host =
     180           0 :       Network::Utility::parseInternetAddressAndPortNoThrow(metadata_override_host, false);
     181           0 :   if (metadata_host == nullptr) {
     182           0 :     ENVOY_LOG(debug, "original_dst_load_balancer: invalid override metadata value. {}",
     183           0 :               metadata_override_host);
     184           0 :     parent_->cluster_->info()->trafficStats()->original_dst_host_invalid_.inc();
     185           0 :     return nullptr;
     186           0 :   }
     187           0 :   ENVOY_LOG(debug, "Using metadata override host {}.", metadata_override_host);
     188           0 :   return metadata_host;
     189           0 : }
     190             : 
     191             : OriginalDstCluster::OriginalDstCluster(const envoy::config::cluster::v3::Cluster& config,
     192             :                                        ClusterFactoryContext& context)
     193             :     : ClusterImplBase(config, context),
     194             :       dispatcher_(context.serverFactoryContext().mainThreadDispatcher()),
     195             :       cleanup_interval_ms_(
     196             :           std::chrono::milliseconds(PROTOBUF_GET_MS_OR_DEFAULT(config, cleanup_interval, 5000))),
     197           0 :       cleanup_timer_(dispatcher_.createTimer([this]() -> void { cleanup(); })),
     198           0 :       host_map_(std::make_shared<HostMultiMap>()) {
     199           0 :   if (const auto& config_opt = info_->lbOriginalDstConfig(); config_opt.has_value()) {
     200           0 :     if (config_opt->use_http_header()) {
     201           0 :       http_header_name_ = config_opt->http_header_name().empty()
     202           0 :                               ? Http::Headers::get().EnvoyOriginalDstHost
     203           0 :                               : Http::LowerCaseString(config_opt->http_header_name());
     204           0 :     }
     205           0 :     if (config_opt->has_metadata_key()) {
     206           0 :       metadata_key_ = Config::MetadataKey(config_opt->metadata_key());
     207           0 :     }
     208           0 :     if (config_opt->has_upstream_port_override()) {
     209           0 :       port_override_ = config_opt->upstream_port_override().value();
     210           0 :     }
     211           0 :   }
     212           0 :   cleanup_timer_->enableTimer(cleanup_interval_ms_);
     213           0 : }
     214             : 
     215           0 : void OriginalDstCluster::addHost(HostSharedPtr& host) {
     216           0 :   std::string address = host->address()->asString();
     217           0 :   HostMultiMapSharedPtr new_host_map = std::make_shared<HostMultiMap>(*getCurrentHostMap());
     218           0 :   auto it = new_host_map->find(address);
     219           0 :   if (it != new_host_map->end()) {
     220             :     // If the entry already exists, that means the worker that posted this host
     221             :     // had a stale host map. Because the host is potentially in that worker's
     222             :     // connection pools, we save the host in the host map hosts_ list and the
     223             :     // cluster priority set. Subsequently, the entire hosts_ list and the
     224             :     // primary host are removed collectively, once no longer in use.
     225           0 :     it->second->hosts_.push_back(host);
     226           0 :   } else {
     227             :     // The first worker that creates a host for the address defines the primary
     228             :     // host structure.
     229           0 :     new_host_map->emplace(address, std::make_shared<HostsForAddress>(host));
     230           0 :   }
     231           0 :   ENVOY_LOG(debug, "addHost() adding {} {}.", *host, address);
     232           0 :   setHostMap(new_host_map);
     233             : 
     234             :   // Given the current config, only EDS clusters support multiple priorities.
     235           0 :   ASSERT(priority_set_.hostSetsPerPriority().size() == 1);
     236           0 :   const auto& first_host_set = priority_set_.getOrCreateHostSet(0);
     237           0 :   HostVectorSharedPtr all_hosts(new HostVector(first_host_set.hosts()));
     238           0 :   all_hosts->emplace_back(host);
     239           0 :   priority_set_.updateHosts(0,
     240           0 :                             HostSetImpl::partitionHosts(all_hosts, HostsPerLocalityImpl::empty()),
     241           0 :                             {}, {std::move(host)}, {}, absl::nullopt, absl::nullopt);
     242           0 : }
     243             : 
     244           0 : void OriginalDstCluster::cleanup() {
     245           0 :   HostVectorSharedPtr keeping_hosts(new HostVector);
     246           0 :   HostVector to_be_removed;
     247           0 :   absl::flat_hash_set<absl::string_view> removed_addresses;
     248           0 :   auto host_map = getCurrentHostMap();
     249           0 :   if (!host_map->empty()) {
     250           0 :     ENVOY_LOG(trace, "Cleaning up stale original dst hosts.");
     251           0 :     for (const auto& [addr, hosts] : *host_map) {
     252             :       // Address is kept in the cluster if either of the two things happen:
     253             :       // 1) a host has been recently selected for the address; 2) none of the
     254             :       // hosts are currently in any of the connection pools.
     255             :       // The set of hosts for a single address are treated as a unit.
     256             :       //
     257             :       // Using the used_ bit is preserved for backwards compatibility and to
     258             :       // add a delay between load balancers choosing a host and grabbing a
     259             :       // handle on the host. This prevents the following interleaving:
     260             :       //
     261             :       // 1) worker 1: pools release host h
     262             :       // 2) worker 1: auto h = lb.chooseHost(&ctx);
     263             :       // 3) main: cleanup() // deletes h because h is not used by the pools
     264             :       // 4) worker 1: auto handle = h.acquireHandle();
     265             :       //
     266             :       // Because the duration between steps 2) and 4) is O(instructions), step
     267             :       // 3) will not delete h since it takes at least one cleanup_interval for
     268             :       // the host to set used_ bit for h to false.
     269           0 :       bool keep = false;
     270           0 :       if (hosts->used_) {
     271           0 :         keep = true;
     272           0 :         hosts->used_ = false; // Mark to be removed during the next round.
     273           0 :       } else if (Runtime::runtimeFeatureEnabled(
     274           0 :                      "envoy.reloadable_features.original_dst_rely_on_idle_timeout")) {
     275             :         // Check that all hosts (first, as well as others that may have been added concurrently)
     276             :         // are not in use by any connection pool.
     277           0 :         if (hosts->host_->used()) {
     278           0 :           keep = true;
     279           0 :         } else {
     280           0 :           for (const auto& host : hosts->hosts_) {
     281           0 :             if (host->used()) {
     282           0 :               keep = true;
     283           0 :               break;
     284           0 :             }
     285           0 :           }
     286           0 :         }
     287           0 :       }
     288           0 :       if (keep) {
     289           0 :         ENVOY_LOG(trace, "Keeping active address {}.", addr);
     290           0 :         keeping_hosts->emplace_back(hosts->host_);
     291           0 :         if (!hosts->hosts_.empty()) {
     292           0 :           keeping_hosts->insert(keeping_hosts->end(), hosts->hosts_.begin(), hosts->hosts_.end());
     293           0 :         }
     294           0 :       } else {
     295           0 :         ENVOY_LOG(trace, "Removing stale address {}.", addr);
     296           0 :         removed_addresses.insert(addr);
     297           0 :         to_be_removed.emplace_back(hosts->host_);
     298           0 :         if (!hosts->hosts_.empty()) {
     299           0 :           to_be_removed.insert(to_be_removed.end(), hosts->hosts_.begin(), hosts->hosts_.end());
     300           0 :         }
     301           0 :       }
     302           0 :     }
     303           0 :   }
     304           0 :   if (!to_be_removed.empty()) {
     305           0 :     HostMultiMapSharedPtr new_host_map = std::make_shared<HostMultiMap>(*host_map);
     306           0 :     for (const auto& addr : removed_addresses) {
     307           0 :       new_host_map->erase(addr);
     308           0 :     }
     309           0 :     setHostMap(new_host_map);
     310           0 :     priority_set_.updateHosts(
     311           0 :         0, HostSetImpl::partitionHosts(keeping_hosts, HostsPerLocalityImpl::empty()), {}, {},
     312           0 :         to_be_removed, false, absl::nullopt);
     313           0 :   }
     314             : 
     315           0 :   cleanup_timer_->enableTimer(cleanup_interval_ms_);
     316           0 : }
     317             : 
     318             : absl::StatusOr<std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>>
     319             : OriginalDstClusterFactory::createClusterImpl(const envoy::config::cluster::v3::Cluster& cluster,
     320           0 :                                              ClusterFactoryContext& context) {
     321           0 :   if (cluster.lb_policy() != envoy::config::cluster::v3::Cluster::CLUSTER_PROVIDED) {
     322           0 :     return absl::InvalidArgumentError(
     323           0 :         fmt::format("cluster: LB policy {} is not valid for Cluster type {}. Only "
     324           0 :                     "'CLUSTER_PROVIDED' is allowed with cluster type 'ORIGINAL_DST'",
     325           0 :                     envoy::config::cluster::v3::Cluster::LbPolicy_Name(cluster.lb_policy()),
     326           0 :                     envoy::config::cluster::v3::Cluster::DiscoveryType_Name(cluster.type())));
     327           0 :   }
     328             : 
     329           0 :   if (cluster.has_load_assignment()) {
     330           0 :     return absl::InvalidArgumentError(
     331           0 :         "ORIGINAL_DST clusters must have no load assignment configured");
     332           0 :   }
     333             : 
     334           0 :   if (!cluster.original_dst_lb_config().use_http_header() &&
     335           0 :       !cluster.original_dst_lb_config().http_header_name().empty()) {
     336           0 :     return absl::InvalidArgumentError(fmt::format(
     337           0 :         "ORIGINAL_DST cluster: invalid config http_header_name={} and use_http_header is "
     338           0 :         "false. Set use_http_header to true if http_header_name is desired.",
     339           0 :         cluster.original_dst_lb_config().http_header_name()));
     340           0 :   }
     341             : 
     342             :   // TODO(mattklein123): The original DST load balancer type should be deprecated and instead
     343             :   //                     the cluster should directly supply the load balancer. This will remove
     344             :   //                     a special case and allow this cluster to be compiled out as an extension.
     345           0 :   auto new_cluster = std::shared_ptr<OriginalDstCluster>(new OriginalDstCluster(cluster, context));
     346           0 :   auto lb = std::make_unique<OriginalDstCluster::ThreadAwareLoadBalancer>(
     347           0 :       std::make_shared<OriginalDstClusterHandle>(new_cluster));
     348           0 :   return std::make_pair(new_cluster, std::move(lb));
     349           0 : }
     350             : 
     351             : /**
     352             :  * Static registration for the original dst cluster factory. @see RegisterFactory.
     353             :  */
     354             : REGISTER_FACTORY(OriginalDstClusterFactory, ClusterFactory);
     355             : 
     356             : class OriginalDstClusterFilterStateFactory : public Network::BaseAddressObjectFactory {
     357             : public:
     358           2 :   std::string name() const override { return std::string(OriginalDstClusterFilterStateKey); }
     359             : };
     360             : 
     361             : REGISTER_FACTORY(OriginalDstClusterFilterStateFactory, StreamInfo::FilterState::ObjectFactory);
     362             : 
     363             : } // namespace Upstream
     364             : } // namespace Envoy

Generated by: LCOV version 1.15