LCOV - code coverage report
Current view: top level - source/common/conn_pool - conn_pool_base.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 337 566 59.5 %
Date: 2024-01-05 06:35:25 Functions: 33 46 71.7 %

          Line data    Source code
       1             : #include "source/common/conn_pool/conn_pool_base.h"
       2             : 
       3             : #include "source/common/common/assert.h"
       4             : #include "source/common/common/debug_recursion_checker.h"
       5             : #include "source/common/network/transport_socket_options_impl.h"
       6             : #include "source/common/runtime/runtime_features.h"
       7             : #include "source/common/stats/timespan_impl.h"
       8             : #include "source/common/upstream/upstream_impl.h"
       9             : 
      10             : namespace Envoy {
      11             : namespace ConnectionPool {
      12             : namespace {
      13           0 : [[maybe_unused]] ssize_t connectingCapacity(const std::list<ActiveClientPtr>& connecting_clients) {
      14           0 :   ssize_t ret = 0;
      15           0 :   for (const auto& client : connecting_clients) {
      16           0 :     ret += client->currentUnusedCapacity();
      17           0 :   }
      18           0 :   return ret;
      19           0 : }
      20             : } // namespace
      21             : 
      22             : ConnPoolImplBase::ConnPoolImplBase(
      23             :     Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
      24             :     Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
      25             :     const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
      26             :     Upstream::ClusterConnectivityState& state)
      27             :     : state_(state), host_(host), priority_(priority), dispatcher_(dispatcher),
      28             :       socket_options_(options), transport_socket_options_(transport_socket_options),
      29         173 :       upstream_ready_cb_(dispatcher_.createSchedulableCallback([this]() { onUpstreamReady(); })) {}
      30             : 
      31         173 : ConnPoolImplBase::~ConnPoolImplBase() {
      32         173 :   ASSERT(isIdleImpl());
      33         173 :   ASSERT(connecting_stream_capacity_ == 0);
      34         173 : }
      35             : 
      36         173 : void ConnPoolImplBase::deleteIsPendingImpl() {
      37         173 :   deferred_deleting_ = true;
      38         173 :   ASSERT(isIdleImpl());
      39         173 :   ASSERT(connecting_stream_capacity_ == 0);
      40         173 : }
      41             : 
      42         173 : void ConnPoolImplBase::destructAllConnections() {
      43         692 :   for (auto* list : {&ready_clients_, &busy_clients_, &connecting_clients_, &early_data_clients_}) {
      44         692 :     while (!list->empty()) {
      45           0 :       list->front()->close();
      46           0 :     }
      47         692 :   }
      48             : 
      49             :   // Make sure all clients are destroyed before we are destroyed.
      50         173 :   dispatcher_.clearDeferredDeleteList();
      51         173 : }
      52             : 
      53             : bool ConnPoolImplBase::shouldConnect(size_t pending_streams, size_t active_streams,
      54             :                                      int64_t connecting_and_connected_capacity,
      55         432 :                                      float preconnect_ratio, bool anticipate_incoming_stream) {
      56             :   // This is set to true any time global preconnect is being calculated.
      57             :   // ClusterManagerImpl::maybePreconnect is called directly before a stream is created, so the
      58             :   // stream must be anticipated.
      59             :   //
      60             :   // Also without this, we would never pre-establish a connection as the first
      61             :   // connection in a pool because pending/active streams could both be 0.
      62         432 :   int anticipated_streams = anticipate_incoming_stream ? 1 : 0;
      63             : 
      64             :   // The number of streams we want to be provisioned for is the number of
      65             :   // pending, active, and anticipated streams times the preconnect ratio.
      66             :   // The number of streams we are (theoretically) provisioned for is the
      67             :   // connecting stream capacity plus the number of active streams.
      68             :   //
      69             :   // If preconnect ratio is not set, it defaults to 1, and this simplifies to the
      70             :   // legacy value of pending_streams_.size() > connecting_stream_capacity_
      71         432 :   return (pending_streams + active_streams + anticipated_streams) * preconnect_ratio >
      72         432 :          connecting_and_connected_capacity + active_streams;
      73         432 : }
      74             : 
      75         432 : bool ConnPoolImplBase::shouldCreateNewConnection(float global_preconnect_ratio) const {
      76             :   // If the host is not healthy, don't make it do extra work, especially as
      77             :   // upstream selection logic may result in bypassing this upstream entirely.
      78             :   // If an Envoy user wants preconnecting for degraded upstreams this could be
      79             :   // added later via extending the preconnect config.
      80         432 :   if (host_->coarseHealth() != Upstream::Host::Health::Healthy) {
      81           0 :     return pending_streams_.size() > connecting_stream_capacity_;
      82           0 :   }
      83             : 
      84             :   // Determine if we are trying to prefetch for global preconnect or local preconnect.
      85         432 :   if (global_preconnect_ratio != 0) {
      86             :     // If global preconnecting is on, and this connection is within the global
      87             :     // preconnect limit, preconnect.
      88             :     // For global preconnect, we anticipate an incoming stream to this pool, since it is
      89             :     // prefetching for the next upcoming stream, which will likely be assigned to this pool.
      90             :     // We may eventually want to track preconnect_attempts to allow more preconnecting for
      91             :     // heavily weighted upstreams or sticky picks.
      92           0 :     return shouldConnect(pending_streams_.size(), num_active_streams_, connecting_stream_capacity_,
      93           0 :                          global_preconnect_ratio, true);
      94         432 :   } else {
      95             :     // Ensure this local pool has adequate connections for the given load.
      96             :     //
      97             :     // Local preconnect does not need to anticipate a stream. It is called as
      98             :     // new streams are established or torn down and simply attempts to maintain
      99             :     // the correct ratio of streams and anticipated capacity.
     100         432 :     return shouldConnect(pending_streams_.size(), num_active_streams_, connecting_stream_capacity_,
     101         432 :                          perUpstreamPreconnectRatio());
     102         432 :   }
     103         432 : }
     104             : 
     105         432 : float ConnPoolImplBase::perUpstreamPreconnectRatio() const {
     106         432 :   return host_->cluster().perUpstreamPreconnectRatio();
     107         432 : }
     108             : 
     109         259 : ConnPoolImplBase::ConnectionResult ConnPoolImplBase::tryCreateNewConnections() {
     110         259 :   ConnPoolImplBase::ConnectionResult result;
     111             :   // Somewhat arbitrarily cap the number of connections preconnected due to new
     112             :   // incoming connections. The preconnect ratio is capped at 3, so in steady
     113             :   // state, no more than 3 connections should be preconnected. If hosts go
     114             :   // unhealthy, and connections are not immediately preconnected, it could be that
     115             :   // many connections are desired when the host becomes healthy again, but
     116             :   // overwhelming it with connections is not desirable.
     117         432 :   for (int i = 0; i < 3; ++i) {
     118         432 :     result = tryCreateNewConnection();
     119         432 :     if (result != ConnectionResult::CreatedNewConnection) {
     120         259 :       break;
     121         259 :     }
     122         432 :   }
     123         259 :   ASSERT(!is_draining_for_deletion_ || result != ConnectionResult::CreatedNewConnection);
     124         259 :   return result;
     125         259 : }
     126             : 
     127             : ConnPoolImplBase::ConnectionResult
     128         432 : ConnPoolImplBase::tryCreateNewConnection(float global_preconnect_ratio) {
     129             :   // There are already enough Connecting connections for the number of queued streams.
     130         432 :   if (!shouldCreateNewConnection(global_preconnect_ratio)) {
     131         259 :     ENVOY_LOG(trace, "not creating a new connection, shouldCreateNewConnection returned false.");
     132         259 :     return ConnectionResult::ShouldNotConnect;
     133         259 :   }
     134             : 
     135         173 :   const bool can_create_connection = host_->canCreateConnection(priority_);
     136             : 
     137         173 :   if (!can_create_connection) {
     138           0 :     host_->cluster().trafficStats()->upstream_cx_overflow_.inc();
     139           0 :   }
     140             :   // If we are at the connection circuit-breaker limit due to other upstreams having
     141             :   // too many open connections, and this upstream has no connections, always create one, to
     142             :   // prevent pending streams being queued to this upstream with no way to be processed.
     143         173 :   if (can_create_connection || (ready_clients_.empty() && busy_clients_.empty() &&
     144         173 :                                 connecting_clients_.empty() && early_data_clients_.empty())) {
     145         173 :     ENVOY_LOG(debug, "creating a new connection (connecting={})", connecting_clients_.size());
     146         173 :     ActiveClientPtr client = instantiateActiveClient();
     147         173 :     if (client.get() == nullptr) {
     148           0 :       ENVOY_LOG(trace, "connection creation failed");
     149           0 :       return ConnectionResult::FailedToCreateConnection;
     150           0 :     }
     151         173 :     ASSERT(client->state() == ActiveClient::State::Connecting);
     152         173 :     ASSERT(std::numeric_limits<uint64_t>::max() - connecting_stream_capacity_ >=
     153         173 :            static_cast<uint64_t>(client->currentUnusedCapacity()));
     154         173 :     ASSERT(client->real_host_description_);
     155             :     // Increase the connecting capacity to reflect the streams this connection can serve.
     156         173 :     incrConnectingAndConnectedStreamCapacity(client->currentUnusedCapacity(), *client);
     157         173 :     LinkedList::moveIntoList(std::move(client), owningList(client->state()));
     158         173 :     return can_create_connection ? ConnectionResult::CreatedNewConnection
     159         173 :                                  : ConnectionResult::CreatedButRateLimited;
     160         173 :   } else {
     161           0 :     ENVOY_LOG(trace, "not creating a new connection: connection constrained");
     162           0 :     return ConnectionResult::NoConnectionRateLimited;
     163           0 :   }
     164         173 : }
     165             : 
     166             : void ConnPoolImplBase::attachStreamToClient(Envoy::ConnectionPool::ActiveClient& client,
     167         202 :                                             AttachContext& context) {
     168         202 :   ASSERT(client.readyForStream());
     169             : 
     170         202 :   Upstream::ClusterTrafficStats& traffic_stats = *host_->cluster().trafficStats();
     171         202 :   if (client.state() == Envoy::ConnectionPool::ActiveClient::State::ReadyForEarlyData) {
     172           0 :     traffic_stats.upstream_rq_0rtt_.inc();
     173           0 :   }
     174             : 
     175         202 :   if (enforceMaxRequests() && !host_->cluster().resourceManager(priority_).requests().canCreate()) {
     176           0 :     ENVOY_LOG(debug, "max streams overflow");
     177           0 :     onPoolFailure(client.real_host_description_, absl::string_view(),
     178           0 :                   ConnectionPool::PoolFailureReason::Overflow, context);
     179           0 :     traffic_stats.upstream_rq_pending_overflow_.inc();
     180           0 :     return;
     181           0 :   }
     182         202 :   ENVOY_CONN_LOG(debug, "creating stream", client);
     183             : 
     184             :   // Latch capacity before updating remaining streams.
     185         202 :   uint64_t capacity = client.currentUnusedCapacity();
     186         202 :   client.remaining_streams_--;
     187         202 :   if (client.remaining_streams_ == 0) {
     188           0 :     ENVOY_CONN_LOG(debug, "maximum streams per connection, start draining", client);
     189           0 :     traffic_stats.upstream_cx_max_requests_.inc();
     190           0 :     transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::Draining);
     191         202 :   } else if (capacity == 1) {
     192             :     // As soon as the new stream is created, the client will be maxed out.
     193          54 :     transitionActiveClientState(client, Envoy::ConnectionPool::ActiveClient::State::Busy);
     194          54 :   }
     195             : 
     196             :   // Decrement the capacity, as there's one less stream available for serving.
     197             :   // For HTTP/3, the capacity is updated in newStreamEncoder.
     198         202 :   if (trackStreamCapacity()) {
     199         202 :     decrConnectingAndConnectedStreamCapacity(1, client);
     200         202 :   }
     201             :   // Track the new active stream.
     202         202 :   state_.incrActiveStreams(1);
     203         202 :   num_active_streams_++;
     204         202 :   host_->stats().rq_total_.inc();
     205         202 :   host_->stats().rq_active_.inc();
     206         202 :   traffic_stats.upstream_rq_total_.inc();
     207         202 :   traffic_stats.upstream_rq_active_.inc();
     208         202 :   host_->cluster().resourceManager(priority_).requests().inc();
     209             : 
     210         202 :   onPoolReady(client, context);
     211         202 : }
     212             : 
     213             : void ConnPoolImplBase::onStreamClosed(Envoy::ConnectionPool::ActiveClient& client,
     214         202 :                                       bool delay_attaching_stream) {
     215         202 :   ENVOY_CONN_LOG(debug, "destroying stream: {} remaining", client, client.numActiveStreams());
     216         202 :   ASSERT(num_active_streams_ > 0);
     217         202 :   state_.decrActiveStreams(1);
     218         202 :   num_active_streams_--;
     219         202 :   host_->stats().rq_active_.dec();
     220         202 :   host_->cluster().trafficStats()->upstream_rq_active_.dec();
     221         202 :   host_->cluster().resourceManager(priority_).requests().dec();
     222             :   // We don't update the capacity for HTTP/3 as the stream count should only
     223             :   // increase when a MAX_STREAMS frame is received.
     224         202 :   if (trackStreamCapacity()) {
     225             :     // If the effective client capacity was limited by concurrency, increase connecting capacity.
     226         202 :     bool limited_by_concurrency =
     227         202 :         client.remaining_streams_ > client.concurrent_stream_limit_ - client.numActiveStreams() - 1;
     228             :     // The capacity calculated by concurrency could be negative if a SETTINGS frame lowered the
     229             :     // number of allowed streams. In this case, effective client capacity was still limited by
     230             :     // concurrency, compare client.concurrent_stream_limit_ and client.numActiveStreams() directly
     231             :     // to avoid overflow.
     232         202 :     bool negative_capacity = client.concurrent_stream_limit_ < client.numActiveStreams() + 1;
     233         202 :     if (negative_capacity || limited_by_concurrency) {
     234          33 :       incrConnectingAndConnectedStreamCapacity(1, client);
     235          33 :     }
     236         202 :   }
     237         202 :   if (client.state() == ActiveClient::State::Draining && client.numActiveStreams() == 0) {
     238             :     // Close out the draining client if we no longer have active streams.
     239           0 :     client.close();
     240         202 :   } else if (client.state() == ActiveClient::State::Busy && client.currentUnusedCapacity() > 0) {
     241          33 :     if (!client.hasHandshakeCompleted()) {
     242           0 :       transitionActiveClientState(client, ActiveClient::State::ReadyForEarlyData);
     243           0 :       if (!delay_attaching_stream) {
     244           0 :         onUpstreamReadyForEarlyData(client);
     245           0 :       }
     246          33 :     } else {
     247          33 :       transitionActiveClientState(client, ActiveClient::State::Ready);
     248          33 :       if (!delay_attaching_stream) {
     249           0 :         onUpstreamReady();
     250           0 :       }
     251          33 :     }
     252          33 :   }
     253         202 : }
     254             : 
     255             : ConnectionPool::Cancellable* ConnPoolImplBase::newStreamImpl(AttachContext& context,
     256         251 :                                                              bool can_send_early_data) {
     257         251 :   ASSERT(!is_draining_for_deletion_);
     258         251 :   ASSERT(!deferred_deleting_);
     259             : 
     260         251 :   ASSERT(static_cast<ssize_t>(connecting_stream_capacity_) ==
     261         251 :          connectingCapacity(connecting_clients_) +
     262         251 :              connectingCapacity(early_data_clients_)); // O(n) debug check.
     263         251 :   if (!ready_clients_.empty()) {
     264          78 :     ActiveClient& client = *ready_clients_.front();
     265          78 :     ENVOY_CONN_LOG(debug, "using existing fully connected connection", client);
     266          78 :     attachStreamToClient(client, context);
     267             :     // Even if there's a ready client, we may want to preconnect to handle the next incoming stream.
     268          78 :     tryCreateNewConnections();
     269          78 :     return nullptr;
     270          78 :   }
     271             : 
     272         173 :   if (can_send_early_data && !early_data_clients_.empty()) {
     273           0 :     ActiveClient& client = *early_data_clients_.front();
     274           0 :     ENVOY_CONN_LOG(debug, "using existing early data ready connection", client);
     275           0 :     attachStreamToClient(client, context);
     276             :     // Even if there's an available client, we may want to preconnect to handle the next
     277             :     // incoming stream.
     278           0 :     tryCreateNewConnections();
     279           0 :     return nullptr;
     280           0 :   }
     281             : 
     282         173 :   if (!host_->cluster().resourceManager(priority_).pendingRequests().canCreate()) {
     283           0 :     ENVOY_LOG(debug, "max pending streams overflow");
     284           0 :     onPoolFailure(nullptr, absl::string_view(), ConnectionPool::PoolFailureReason::Overflow,
     285           0 :                   context);
     286           0 :     host_->cluster().trafficStats()->upstream_rq_pending_overflow_.inc();
     287           0 :     return nullptr;
     288           0 :   }
     289             : 
     290         173 :   ConnectionPool::Cancellable* pending = newPendingStream(context, can_send_early_data);
     291         173 :   ENVOY_LOG(debug, "trying to create new connection");
     292         173 :   ENVOY_LOG(trace, fmt::format("{}", *this));
     293             : 
     294         173 :   auto old_capacity = connecting_stream_capacity_;
     295             :   // This must come after newPendingStream() because this function uses the
     296             :   // length of pending_streams_ to determine if a new connection is needed.
     297         173 :   const ConnectionResult result = tryCreateNewConnections();
     298             :   // If there is not enough connecting capacity, the only reason to not
     299             :   // increase capacity is if the connection limits are exceeded.
     300         173 :   ENVOY_BUG(pending_streams_.size() <= connecting_stream_capacity_ ||
     301         173 :                 connecting_stream_capacity_ > old_capacity ||
     302         173 :                 (result == ConnectionResult::NoConnectionRateLimited ||
     303         173 :                  result == ConnectionResult::FailedToCreateConnection),
     304         173 :             fmt::format("Failed to create expected connection: {}", *this));
     305         173 :   if (result == ConnectionResult::FailedToCreateConnection) {
     306             :     // This currently only happens for HTTP/3 if secrets aren't yet loaded.
     307             :     // Trigger connection failure.
     308           0 :     pending->cancel(Envoy::ConnectionPool::CancelPolicy::CloseExcess);
     309           0 :     onPoolFailure(nullptr, absl::string_view(),
     310           0 :                   ConnectionPool::PoolFailureReason::LocalConnectionFailure, context);
     311           0 :     return nullptr;
     312           0 :   }
     313         173 :   return pending;
     314         173 : }
     315             : 
     316           0 : bool ConnPoolImplBase::maybePreconnectImpl(float global_preconnect_ratio) {
     317           0 :   ASSERT(!deferred_deleting_);
     318           0 :   return tryCreateNewConnection(global_preconnect_ratio) == ConnectionResult::CreatedNewConnection;
     319           0 : }
     320             : 
     321          33 : void ConnPoolImplBase::scheduleOnUpstreamReady() {
     322          33 :   upstream_ready_cb_->scheduleCallbackCurrentIteration();
     323          33 : }
     324             : 
     325         177 : void ConnPoolImplBase::onUpstreamReady() {
     326         301 :   while (!pending_streams_.empty() && !ready_clients_.empty()) {
     327         124 :     ActiveClientPtr& client = ready_clients_.front();
     328         124 :     ENVOY_CONN_LOG(debug, "attaching to next stream", *client);
     329             :     // Pending streams are pushed onto the front, so pull from the back.
     330         124 :     attachStreamToClient(*client, pending_streams_.back()->context());
     331         124 :     state_.decrPendingStreams(1);
     332         124 :     pending_streams_.pop_back();
     333         124 :   }
     334         177 :   if (!pending_streams_.empty()) {
     335           0 :     tryCreateNewConnections();
     336           0 :   }
     337         177 : }
     338             : 
     339         850 : std::list<ActiveClientPtr>& ConnPoolImplBase::owningList(ActiveClient::State state) {
     340         850 :   switch (state) {
     341         346 :   case ActiveClient::State::Connecting:
     342         346 :     return connecting_clients_;
     343           0 :   case ActiveClient::State::ReadyForEarlyData:
     344           0 :     return early_data_clients_;
     345         396 :   case ActiveClient::State::Ready:
     346         396 :     return ready_clients_;
     347         108 :   case ActiveClient::State::Busy:
     348         108 :   case ActiveClient::State::Draining:
     349         108 :     return busy_clients_;
     350           0 :   case ActiveClient::State::Closed:
     351           0 :     break; // Fall through to PANIC.
     352         850 :   }
     353           0 :   PANIC("unexpected");
     354           0 : }
     355             : 
     356             : void ConnPoolImplBase::transitionActiveClientState(ActiveClient& client,
     357         252 :                                                    ActiveClient::State new_state) {
     358         252 :   auto& old_list = owningList(client.state());
     359         252 :   auto& new_list = owningList(new_state);
     360         252 :   client.setState(new_state);
     361             : 
     362             :   // old_list and new_list can be equal when transitioning from Busy to Draining.
     363             :   //
     364             :   // The documentation for list.splice() (which is what moveBetweenLists() calls) is
     365             :   // unclear whether it is allowed for src and dst to be the same, so check here
     366             :   // since it is a no-op anyways.
     367         252 :   if (&old_list != &new_list) {
     368         252 :     client.moveBetweenLists(old_list, new_list);
     369         252 :   }
     370         252 : }
     371             : 
     372         173 : void ConnPoolImplBase::addIdleCallbackImpl(Instance::IdleCb cb) { idle_callbacks_.push_back(cb); }
     373             : 
     374           0 : void ConnPoolImplBase::closeIdleConnectionsForDrainingPool() {
     375           0 :   Common::AutoDebugRecursionChecker assert_not_in(recursion_checker_);
     376             : 
     377             :   // Create a separate list of elements to close to avoid mutate-while-iterating problems.
     378           0 :   std::list<ActiveClient*> to_close;
     379             : 
     380           0 :   for (auto& client : ready_clients_) {
     381           0 :     if (client->numActiveStreams() == 0) {
     382           0 :       to_close.push_back(client.get());
     383           0 :     }
     384           0 :   }
     385             : 
     386           0 :   if (pending_streams_.empty()) {
     387           0 :     for (auto& client : connecting_clients_) {
     388           0 :       to_close.push_back(client.get());
     389           0 :     }
     390           0 :     for (auto& client : early_data_clients_) {
     391           0 :       if (client->numActiveStreams() == 0) {
     392           0 :         to_close.push_back(client.get());
     393           0 :       }
     394           0 :     }
     395           0 :   }
     396             : 
     397           0 :   for (auto& entry : to_close) {
     398           0 :     ENVOY_LOG_EVENT(debug, "closing_idle_client", "closing idle client {} for cluster {}",
     399           0 :                     entry->id(), host_->cluster().name());
     400           0 :     entry->close();
     401           0 :   }
     402           0 : }
     403             : 
     404           0 : void ConnPoolImplBase::drainClients(std::list<ActiveClientPtr>& clients) {
     405           0 :   while (!clients.empty()) {
     406           0 :     ASSERT(clients.front()->numActiveStreams() > 0u);
     407           0 :     ENVOY_LOG_EVENT(
     408           0 :         debug, "draining_non_idle_client", "draining {} client {} for cluster {}",
     409           0 :         (clients.front()->state() == ActiveClient::State::Ready ? "ready" : "early data"),
     410           0 :         clients.front()->id(), host_->cluster().name());
     411           0 :     transitionActiveClientState(*clients.front(), ActiveClient::State::Draining);
     412           0 :   }
     413           0 : }
     414             : 
     415           0 : void ConnPoolImplBase::drainConnectionsImpl(DrainBehavior drain_behavior) {
     416           0 :   if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) {
     417           0 :     is_draining_for_deletion_ = true;
     418           0 :     checkForIdleAndCloseIdleConnsIfDraining();
     419           0 :     return;
     420           0 :   }
     421           0 :   closeIdleConnectionsForDrainingPool();
     422             :   // closeIdleConnectionsForDrainingPool() closes all connections in ready_clients_ with no active
     423             :   // streams and if no pending streams, all connections in early_data_clients_ with no active
     424             :   // streams as well, so all remaining entries in ready_clients_ are serving streams. Move them and
     425             :   // all entries in busy_clients_ to draining.
     426           0 :   if (pending_streams_.empty()) {
     427             :     // The remaining early data clients are non-idle.
     428           0 :     drainClients(early_data_clients_);
     429           0 :   }
     430             : 
     431           0 :   drainClients(ready_clients_);
     432             : 
     433             :   // Changing busy_clients_ to Draining does not move them between lists,
     434             :   // so use a for-loop since the list is not mutated.
     435           0 :   ASSERT(&owningList(ActiveClient::State::Draining) == &busy_clients_);
     436           0 :   for (auto& busy_client : busy_clients_) {
     437           0 :     if (busy_client->state() == ActiveClient::State::Draining) {
     438           0 :       continue;
     439           0 :     }
     440           0 :     ENVOY_LOG_EVENT(debug, "draining_busy_client", "draining busy client {} for cluster {}",
     441           0 :                     busy_client->id(), host_->cluster().name());
     442           0 :     transitionActiveClientState(*busy_client, ActiveClient::State::Draining);
     443           0 :   }
     444           0 : }
     445             : 
     446         522 : bool ConnPoolImplBase::isIdleImpl() const {
     447         522 :   return pending_streams_.empty() && ready_clients_.empty() && busy_clients_.empty() &&
     448         522 :          connecting_clients_.empty() && early_data_clients_.empty();
     449         522 : }
     450             : 
     451             : /*
     452             :   This method may be invoked once or twice.
     453             :   It is called first time in ConnPoolImplBase::onConnectionEvent for Local/RemoteClose events.
     454             :   The second time it is called from Envoy::Tcp::ActiveTcpClient::~ActiveTcpClient via
     455             :   ConnPoolImplBase::checkForIdleAndCloseIdleConnsIfDraining.
     456             : 
     457             :   The logic must be constructed in such way that the method is called once or twice.
     458             :   See PR 30807 description for explanation why idle callbacks are deleted after being called.
     459             : */
     460         522 : void ConnPoolImplBase::checkForIdleAndNotify() {
     461         522 :   if (isIdleImpl()) {
     462         173 :     ENVOY_LOG(debug, "invoking {} idle callback(s) - is_draining_for_deletion_={}",
     463         173 :               idle_callbacks_.size(), is_draining_for_deletion_);
     464         173 :     for (const Instance::IdleCb& cb : idle_callbacks_) {
     465         173 :       cb();
     466         173 :     }
     467             :     // Clear callbacks, so they are not executed if checkForIdleAndNotify is called again.
     468         173 :     idle_callbacks_.clear();
     469         173 :   }
     470         522 : }
     471             : 
     472         349 : void ConnPoolImplBase::checkForIdleAndCloseIdleConnsIfDraining() {
     473         349 :   if (is_draining_for_deletion_) {
     474           0 :     closeIdleConnectionsForDrainingPool();
     475           0 :   }
     476             : 
     477         349 :   checkForIdleAndNotify();
     478         349 : }
     479             : 
     480             : void ConnPoolImplBase::onConnectionEvent(ActiveClient& client, absl::string_view failure_reason,
     481         338 :                                          Network::ConnectionEvent event) {
     482         338 :   switch (event) {
     483         101 :   case Network::ConnectionEvent::RemoteClose:
     484         173 :   case Network::ConnectionEvent::LocalClose: {
     485         173 :     if (client.connect_timer_) {
     486           8 :       ASSERT(!client.has_handshake_completed_);
     487           8 :       client.connect_timer_->disableTimer();
     488           8 :       client.connect_timer_.reset();
     489           8 :     }
     490         173 :     decrConnectingAndConnectedStreamCapacity(client.currentUnusedCapacity(), client);
     491             : 
     492             :     // Make sure that onStreamClosed won't double count.
     493         173 :     client.remaining_streams_ = 0;
     494             :     // The client died.
     495         173 :     ENVOY_CONN_LOG(debug, "client disconnected, failure reason: {}", client, failure_reason);
     496             : 
     497         173 :     Envoy::Upstream::reportUpstreamCxDestroy(host_, event);
     498         173 :     const bool incomplete_stream = client.closingWithIncompleteStream();
     499         173 :     if (incomplete_stream) {
     500          61 :       Envoy::Upstream::reportUpstreamCxDestroyActiveRequest(host_, event);
     501          61 :     }
     502             : 
     503         173 :     if (!client.hasHandshakeCompleted()) {
     504           8 :       client.has_handshake_completed_ = true;
     505           8 :       host_->cluster().trafficStats()->upstream_cx_connect_fail_.inc();
     506           8 :       host_->stats().cx_connect_fail_.inc();
     507             : 
     508           8 :       onConnectFailed(client);
     509             :       // Purge pending streams only if this client doesn't contribute to the local connecting
     510             :       // stream capacity. In other words, the rest clients  would be able to handle all the
     511             :       // pending stream once they are connected.
     512           8 :       ConnectionPool::PoolFailureReason reason;
     513           8 :       if (client.timed_out_) {
     514           0 :         reason = ConnectionPool::PoolFailureReason::Timeout;
     515           8 :       } else if (event == Network::ConnectionEvent::RemoteClose) {
     516           8 :         reason = ConnectionPool::PoolFailureReason::RemoteConnectionFailure;
     517           8 :       } else {
     518           0 :         reason = ConnectionPool::PoolFailureReason::LocalConnectionFailure;
     519           0 :       }
     520             : 
     521             :       // Raw connect failures should never happen under normal circumstances. If we have an
     522             :       // upstream that is behaving badly, streams can get stuck here in the pending state. If we
     523             :       // see a connect failure, we purge all pending streams so that calling code can determine
     524             :       // what to do with the stream.
     525             :       // NOTE: We move the existing pending streams to a temporary list. This is done so that
     526             :       //       if retry logic submits a new stream to the pool, we don't fail it inline.
     527           8 :       purgePendingStreams(client.real_host_description_, failure_reason, reason);
     528             :       // See if we should preconnect based on active connections.
     529           8 :       if (!is_draining_for_deletion_) {
     530           8 :         tryCreateNewConnections();
     531           8 :       }
     532           8 :     }
     533             : 
     534             :     // We need to release our resourceManager() resources before checking below for
     535             :     // whether we can create a new connection. Normally this would happen when
     536             :     // client's destructor runs, but this object needs to be deferredDelete'd(), so
     537             :     // this forces part of its cleanup to happen now.
     538         173 :     client.releaseResources();
     539             : 
     540             :     // Again, since we know this object is going to be deferredDelete'd(), we take
     541             :     // this opportunity to disable and reset the connection duration timer so that
     542             :     // it doesn't trigger while on the deferred delete list. In theory it is safe
     543             :     // to handle the Closed state in onConnectionDurationTimeout, but we handle
     544             :     // it here for simplicity and safety anyway.
     545         173 :     if (client.connection_duration_timer_) {
     546           0 :       client.connection_duration_timer_->disableTimer();
     547           0 :       client.connection_duration_timer_.reset();
     548           0 :     }
     549             : 
     550         173 :     dispatcher_.deferredDelete(client.removeFromList(owningList(client.state())));
     551             : 
     552             :     // Check if the pool transitioned to idle state after removing closed client
     553             :     // from one of the client tracking lists.
     554             :     // There is no need to check if other connections are idle in a draining pool
     555             :     // because the pool will close all idle connection when it is starting to
     556             :     // drain.
     557             :     // Trying to close other connections here can lead to deep recursion when
     558             :     // a large number idle connections are closed at the start of pool drain.
     559             :     // See CdsIntegrationTest.CdsClusterDownWithLotsOfIdleConnections for an example.
     560         173 :     checkForIdleAndNotify();
     561             : 
     562         173 :     client.setState(ActiveClient::State::Closed);
     563             : 
     564             :     // If we have pending streams and we just lost a connection we should make a new one.
     565         173 :     if (!pending_streams_.empty()) {
     566           0 :       tryCreateNewConnections();
     567           0 :     }
     568         173 :     break;
     569         101 :   }
     570         165 :   case Network::ConnectionEvent::Connected: {
     571         165 :     ASSERT(client.connect_timer_ != nullptr && !client.has_handshake_completed_);
     572         165 :     client.connect_timer_->disableTimer();
     573         165 :     client.connect_timer_.reset();
     574             : 
     575         165 :     ASSERT(connecting_stream_capacity_ >= client.currentUnusedCapacity());
     576         165 :     connecting_stream_capacity_ -= client.currentUnusedCapacity();
     577         165 :     client.has_handshake_completed_ = true;
     578         165 :     client.conn_connect_ms_->complete();
     579         165 :     client.conn_connect_ms_.reset();
     580         165 :     if (client.state() == ActiveClient::State::Connecting ||
     581         165 :         client.state() == ActiveClient::State::ReadyForEarlyData) {
     582         165 :       transitionActiveClientState(client,
     583         165 :                                   (client.currentUnusedCapacity() > 0 ? ActiveClient::State::Ready
     584         165 :                                                                       : ActiveClient::State::Busy));
     585         165 :     }
     586             : 
     587             :     // Now that the active client is ready, set up a timer for max connection duration.
     588         165 :     const absl::optional<std::chrono::milliseconds> max_connection_duration =
     589         165 :         client.parent_.host()->cluster().maxConnectionDuration();
     590         165 :     if (max_connection_duration.has_value()) {
     591           0 :       client.connection_duration_timer_ = client.parent_.dispatcher().createTimer(
     592           0 :           [&client]() { client.onConnectionDurationTimeout(); });
     593           0 :       client.connection_duration_timer_->enableTimer(max_connection_duration.value());
     594           0 :     }
     595             :     // Initialize client read filters
     596         165 :     if (Runtime::runtimeFeatureEnabled("envoy.reloadable_features.initialize_upstream_filters")) {
     597         165 :       client.initializeReadFilters();
     598         165 :     }
     599             :     // At this point, for the mixed ALPN pool, the client may be deleted. Do not
     600             :     // refer to client after this point.
     601         165 :     onConnected(client);
     602         165 :     if (client.readyForStream()) {
     603         165 :       onUpstreamReady();
     604         165 :     }
     605         165 :     checkForIdleAndCloseIdleConnsIfDraining();
     606         165 :     break;
     607         101 :   }
     608           0 :   case Network::ConnectionEvent::ConnectedZeroRtt: {
     609           0 :     ENVOY_CONN_LOG(debug, "0-RTT connected with capacity {}", client,
     610           0 :                    client.currentUnusedCapacity());
     611             :     // No need to update connecting capacity and connect_timer_ as the client is still connecting.
     612           0 :     ASSERT(client.state() == ActiveClient::State::Connecting);
     613           0 :     host()->cluster().trafficStats()->upstream_cx_connect_with_0_rtt_.inc();
     614           0 :     transitionActiveClientState(client, (client.currentUnusedCapacity() > 0
     615           0 :                                              ? ActiveClient::State::ReadyForEarlyData
     616           0 :                                              : ActiveClient::State::Busy));
     617           0 :     break;
     618         101 :   }
     619         338 :   }
     620         338 : }
     621             : 
     622             : PendingStream::PendingStream(ConnPoolImplBase& parent, bool can_send_early_data)
     623         173 :     : parent_(parent), can_send_early_data_(can_send_early_data) {
     624         173 :   Upstream::ClusterTrafficStats& traffic_stats = *parent_.host()->cluster().trafficStats();
     625         173 :   traffic_stats.upstream_rq_pending_total_.inc();
     626         173 :   traffic_stats.upstream_rq_pending_active_.inc();
     627         173 :   parent_.host()->cluster().resourceManager(parent_.priority()).pendingRequests().inc();
     628         173 : }
     629             : 
     630         173 : PendingStream::~PendingStream() {
     631         173 :   parent_.host()->cluster().trafficStats()->upstream_rq_pending_active_.dec();
     632         173 :   parent_.host()->cluster().resourceManager(parent_.priority()).pendingRequests().dec();
     633         173 : }
     634             : 
     635          43 : void PendingStream::cancel(Envoy::ConnectionPool::CancelPolicy policy) {
     636          43 :   parent_.onPendingStreamCancel(*this, policy);
     637          43 : }
     638             : 
     639             : void ConnPoolImplBase::purgePendingStreams(
     640             :     const Upstream::HostDescriptionConstSharedPtr& host_description,
     641           8 :     absl::string_view failure_reason, ConnectionPool::PoolFailureReason reason) {
     642             :   // NOTE: We move the existing pending streams to a temporary list. This is done so that
     643             :   //       if retry logic submits a new stream to the pool, we don't fail it inline.
     644           8 :   state_.decrPendingStreams(pending_streams_.size());
     645           8 :   pending_streams_to_purge_ = std::move(pending_streams_);
     646          14 :   while (!pending_streams_to_purge_.empty()) {
     647           6 :     PendingStreamPtr stream =
     648           6 :         pending_streams_to_purge_.front()->removeFromList(pending_streams_to_purge_);
     649           6 :     host_->cluster().trafficStats()->upstream_rq_pending_failure_eject_.inc();
     650           6 :     onPoolFailure(host_description, failure_reason, reason, stream->context());
     651           6 :   }
     652           8 : }
     653             : 
     654           0 : bool ConnPoolImplBase::connectingConnectionIsExcess(const ActiveClient& client) const {
     655           0 :   ASSERT(!client.hasHandshakeCompleted());
     656           0 :   ASSERT(connecting_stream_capacity_ >= client.currentUnusedCapacity());
     657             :   // If perUpstreamPreconnectRatio is one, this simplifies to checking if there would still be
     658             :   // sufficient connecting stream capacity to serve all pending streams if the most recent client
     659             :   // were removed from the picture.
     660             :   //
     661             :   // If preconnect ratio is set, it also factors in the anticipated load based on both queued
     662             :   // streams and active streams, and makes sure the connecting capacity would still be sufficient to
     663             :   // serve that even with the most recent client removed.
     664           0 :   return (pending_streams_.size() + num_active_streams_) * perUpstreamPreconnectRatio() <=
     665           0 :          (connecting_stream_capacity_ - client.currentUnusedCapacity() + num_active_streams_);
     666           0 : }
     667             : 
     668             : void ConnPoolImplBase::onPendingStreamCancel(PendingStream& stream,
     669          43 :                                              Envoy::ConnectionPool::CancelPolicy policy) {
     670          43 :   ENVOY_LOG(debug, "cancelling pending stream");
     671          43 :   if (!pending_streams_to_purge_.empty()) {
     672             :     // If pending_streams_to_purge_ is not empty, it means that we are called from
     673             :     // with-in a onPoolFailure callback invoked in purgePendingStreams (i.e. purgePendingStreams
     674             :     // is down in the call stack). Remove this stream from the list as it is cancelled,
     675             :     // and there is no need to call its onPoolFailure callback.
     676           0 :     stream.removeFromList(pending_streams_to_purge_);
     677          43 :   } else {
     678          43 :     state_.decrPendingStreams(1);
     679          43 :     stream.removeFromList(pending_streams_);
     680          43 :   }
     681          43 :   if (policy == Envoy::ConnectionPool::CancelPolicy::CloseExcess) {
     682           0 :     if (!connecting_clients_.empty() &&
     683           0 :         connectingConnectionIsExcess(*connecting_clients_.front())) {
     684           0 :       auto& client = *connecting_clients_.front();
     685           0 :       transitionActiveClientState(client, ActiveClient::State::Draining);
     686           0 :       client.close();
     687           0 :     } else if (!early_data_clients_.empty()) {
     688           0 :       for (ActiveClientPtr& client : early_data_clients_) {
     689           0 :         if (client->numActiveStreams() == 0) {
     690             :           // Find an idle early data client and check if it is excess.
     691           0 :           if (connectingConnectionIsExcess(*client)) {
     692             :             // Close the client after the for loop avoid messing up with iterator.
     693           0 :             transitionActiveClientState(*client, ActiveClient::State::Draining);
     694           0 :             client->close();
     695           0 :           }
     696           0 :           break;
     697           0 :         }
     698           0 :       }
     699           0 :     }
     700           0 :   }
     701             : 
     702          43 :   host_->cluster().trafficStats()->upstream_rq_cancelled_.inc();
     703          43 :   checkForIdleAndCloseIdleConnsIfDraining();
     704          43 : }
     705             : 
     706             : void ConnPoolImplBase::decrConnectingAndConnectedStreamCapacity(uint32_t delta,
     707         375 :                                                                 ActiveClient& client) {
     708         375 :   state_.decrConnectingAndConnectedStreamCapacity(delta);
     709         375 :   if (!client.hasHandshakeCompleted()) {
     710             :     // If still doing handshake, it is contributing to the local connecting stream capacity. Update
     711             :     // the capacity as well.
     712           8 :     ASSERT(connecting_stream_capacity_ >= delta);
     713           8 :     connecting_stream_capacity_ -= delta;
     714           8 :   }
     715         375 : }
     716             : 
     717             : void ConnPoolImplBase::incrConnectingAndConnectedStreamCapacity(uint32_t delta,
     718         206 :                                                                 ActiveClient& client) {
     719         206 :   state_.incrConnectingAndConnectedStreamCapacity(delta);
     720         206 :   if (!client.hasHandshakeCompleted()) {
     721         173 :     connecting_stream_capacity_ += delta;
     722         173 :   }
     723         206 : }
     724             : 
     725           0 : void ConnPoolImplBase::onUpstreamReadyForEarlyData(ActiveClient& client) {
     726           0 :   ASSERT(!client.hasHandshakeCompleted() && client.readyForStream());
     727             :   // Check pending streams backward for safe request.
     728             :   // Note that this is a O(n) search, but the expected size of pending_streams_ should be small. If
     729             :   // this becomes a problem, we could split pending_streams_ into 2 lists.
     730           0 :   auto it = pending_streams_.end();
     731           0 :   if (it == pending_streams_.begin()) {
     732           0 :     return;
     733           0 :   }
     734           0 :   --it;
     735           0 :   while (client.currentUnusedCapacity() > 0) {
     736           0 :     PendingStream& stream = **it;
     737           0 :     bool stop_iteration{false};
     738           0 :     if (it != pending_streams_.begin()) {
     739           0 :       --it;
     740           0 :     } else {
     741           0 :       stop_iteration = true;
     742           0 :     }
     743             : 
     744           0 :     if (stream.can_send_early_data_) {
     745           0 :       ENVOY_CONN_LOG(debug, "creating stream for early data.", client);
     746           0 :       attachStreamToClient(client, stream.context());
     747           0 :       state_.decrPendingStreams(1);
     748           0 :       stream.removeFromList(pending_streams_);
     749           0 :     }
     750           0 :     if (stop_iteration) {
     751           0 :       return;
     752           0 :     }
     753           0 :   }
     754           0 : }
     755             : 
     756             : namespace {
     757             : // Translate zero to UINT64_MAX so that the zero/unlimited case doesn't
     758             : // have to be handled specially.
     759         519 : uint32_t translateZeroToUnlimited(uint32_t limit) {
     760         519 :   return (limit != 0) ? limit : std::numeric_limits<uint32_t>::max();
     761         519 : }
     762             : } // namespace
     763             : 
     764             : ActiveClient::ActiveClient(ConnPoolImplBase& parent, uint32_t lifetime_stream_limit,
     765             :                            uint32_t concurrent_stream_limit)
     766             :     : ActiveClient(parent, lifetime_stream_limit, concurrent_stream_limit,
     767           0 :                    concurrent_stream_limit) {}
     768             : 
     769             : ActiveClient::ActiveClient(ConnPoolImplBase& parent, uint32_t lifetime_stream_limit,
     770             :                            uint32_t effective_concurrent_streams, uint32_t concurrent_stream_limit)
     771             :     : parent_(parent), remaining_streams_(translateZeroToUnlimited(lifetime_stream_limit)),
     772             :       configured_stream_limit_(translateZeroToUnlimited(effective_concurrent_streams)),
     773             :       concurrent_stream_limit_(translateZeroToUnlimited(concurrent_stream_limit)),
     774         173 :       connect_timer_(parent_.dispatcher().createTimer([this]() { onConnectTimeout(); })) {
     775         173 :   conn_connect_ms_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>(
     776         173 :       parent_.host()->cluster().trafficStats()->upstream_cx_connect_ms_,
     777         173 :       parent_.dispatcher().timeSource());
     778         173 :   conn_length_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>(
     779         173 :       parent_.host()->cluster().trafficStats()->upstream_cx_length_ms_,
     780         173 :       parent_.dispatcher().timeSource());
     781         173 :   connect_timer_->enableTimer(parent_.host()->cluster().connectTimeout());
     782         173 :   parent_.host()->stats().cx_total_.inc();
     783         173 :   parent_.host()->stats().cx_active_.inc();
     784         173 :   parent_.host()->cluster().trafficStats()->upstream_cx_total_.inc();
     785         173 :   parent_.host()->cluster().trafficStats()->upstream_cx_active_.inc();
     786         173 :   parent_.host()->cluster().resourceManager(parent_.priority()).connections().inc();
     787         173 : }
     788             : 
     789         173 : ActiveClient::~ActiveClient() { releaseResourcesBase(); }
     790             : 
     791         346 : void ActiveClient::releaseResourcesBase() {
     792         346 :   if (!resources_released_) {
     793         173 :     resources_released_ = true;
     794             : 
     795         173 :     conn_length_->complete();
     796             : 
     797         173 :     parent_.host()->cluster().trafficStats()->upstream_cx_active_.dec();
     798         173 :     parent_.host()->stats().cx_active_.dec();
     799         173 :     parent_.host()->cluster().resourceManager(parent_.priority()).connections().dec();
     800         173 :   }
     801         346 : }
     802             : 
     803           0 : void ActiveClient::onConnectTimeout() {
     804           0 :   ENVOY_CONN_LOG(debug, "connect timeout", *this);
     805           0 :   parent_.host()->cluster().trafficStats()->upstream_cx_connect_timeout_.inc();
     806           0 :   timed_out_ = true;
     807           0 :   close();
     808           0 : }
     809             : 
     810           0 : void ActiveClient::onConnectionDurationTimeout() {
     811           0 :   if (!hasHandshakeCompleted()) {
     812             :     // The connection duration timer should only have started after we were connected.
     813           0 :     ENVOY_BUG(false, "max connection duration reached while connecting");
     814           0 :     return;
     815           0 :   }
     816             : 
     817           0 :   if (state_ == ActiveClient::State::Closed) {
     818             :     // The connection duration timer should have been disabled and reset in onConnectionEvent
     819             :     // for closing connections.
     820           0 :     ENVOY_BUG(false, "max connection duration reached while closed");
     821           0 :     return;
     822           0 :   }
     823             : 
     824             :   // There's nothing to do if the client is draining.
     825           0 :   if (state_ == ActiveClient::State::Draining) {
     826           0 :     return;
     827           0 :   }
     828             : 
     829           0 :   ENVOY_CONN_LOG(debug, "max connection duration reached, start draining", *this);
     830           0 :   parent_.host()->cluster().trafficStats()->upstream_cx_max_duration_reached_.inc();
     831           0 :   parent_.transitionActiveClientState(*this, Envoy::ConnectionPool::ActiveClient::State::Draining);
     832             : 
     833             :   // Close out the draining client if we no longer have active streams.
     834             :   // We have to do this here because there won't be an onStreamClosed (because there are
     835             :   // no active streams) to do it for us later.
     836           0 :   if (numActiveStreams() == 0) {
     837           0 :     close();
     838           0 :   }
     839           0 : }
     840             : 
     841           0 : void ActiveClient::drain() {
     842           0 :   if (currentUnusedCapacity() <= 0) {
     843           0 :     return;
     844           0 :   }
     845           0 :   parent_.decrConnectingAndConnectedStreamCapacity(currentUnusedCapacity(), *this);
     846             : 
     847           0 :   remaining_streams_ = 0;
     848           0 : }
     849             : 
     850             : } // namespace ConnectionPool
     851             : } // namespace Envoy

Generated by: LCOV version 1.15