LCOV - code coverage report
Current view: top level - source/common/http - conn_pool_base.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 107 135 79.3 %
Date: 2024-01-05 06:35:25 Functions: 14 15 93.3 %

          Line data    Source code
       1             : #include "source/common/http/conn_pool_base.h"
       2             : 
       3             : #include "source/common/common/assert.h"
       4             : #include "source/common/http/utility.h"
       5             : #include "source/common/network/transport_socket_options_impl.h"
       6             : #include "source/common/runtime/runtime_features.h"
       7             : #include "source/common/stats/timespan_impl.h"
       8             : #include "source/common/upstream/upstream_impl.h"
       9             : 
      10             : namespace Envoy {
      11             : namespace Http {
      12             : 
      13             : Network::TransportSocketOptionsConstSharedPtr
      14             : wrapTransportSocketOptions(Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
      15         173 :                            std::vector<Protocol> protocols) {
      16         173 :   std::vector<std::string> fallbacks;
      17         173 :   for (auto protocol : protocols) {
      18             :     // If configured to do so, we override the ALPN to use for the upstream connection to match the
      19             :     // selected protocol.
      20         173 :     switch (protocol) {
      21           0 :     case Http::Protocol::Http10:
      22           0 :       PANIC("not imlemented");
      23          68 :     case Http::Protocol::Http11:
      24          68 :       fallbacks.push_back(Http::Utility::AlpnNames::get().Http11);
      25          68 :       break;
      26         105 :     case Http::Protocol::Http2:
      27         105 :       fallbacks.push_back(Http::Utility::AlpnNames::get().Http2);
      28         105 :       break;
      29           0 :     case Http::Protocol::Http3:
      30             :       // HTTP3 ALPN is set in the QUIC stack based on supported versions.
      31           0 :       break;
      32         173 :     }
      33         173 :   }
      34             : 
      35         173 :   if (transport_socket_options) {
      36         140 :     return std::make_shared<Network::AlpnDecoratingTransportSocketOptions>(
      37         140 :         std::move(fallbacks), transport_socket_options);
      38         168 :   } else {
      39          33 :     return std::make_shared<Network::TransportSocketOptionsImpl>(
      40          33 :         "", std::vector<std::string>{}, std::vector<std::string>{}, std::move(fallbacks));
      41          33 :   }
      42         173 : }
      43             : 
      44             : HttpConnPoolImplBase::HttpConnPoolImplBase(
      45             :     Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
      46             :     Event::Dispatcher& dispatcher, const Network::ConnectionSocket::OptionsSharedPtr& options,
      47             :     const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
      48             :     Random::RandomGenerator& random_generator, Upstream::ClusterConnectivityState& state,
      49             :     std::vector<Http::Protocol> protocols)
      50             :     : Envoy::ConnectionPool::ConnPoolImplBase(
      51             :           host, priority, dispatcher, options,
      52             :           wrapTransportSocketOptions(transport_socket_options, protocols), state),
      53         173 :       random_generator_(random_generator) {
      54         173 :   ASSERT(!protocols.empty());
      55         173 : }
      56             : 
      57         173 : HttpConnPoolImplBase::~HttpConnPoolImplBase() { destructAllConnections(); }
      58             : 
      59             : ConnectionPool::Cancellable*
      60             : HttpConnPoolImplBase::newStream(Http::ResponseDecoder& response_decoder,
      61             :                                 Http::ConnectionPool::Callbacks& callbacks,
      62         251 :                                 const Instance::StreamOptions& options) {
      63         251 :   HttpAttachContext context({&response_decoder, &callbacks});
      64         251 :   return newStreamImpl(context, options.can_send_early_data_);
      65         251 : }
      66             : 
      67           0 : bool HttpConnPoolImplBase::hasActiveConnections() const {
      68           0 :   return (hasPendingStreams() || (hasActiveStreams()));
      69           0 : }
      70             : 
      71             : ConnectionPool::Cancellable*
      72             : HttpConnPoolImplBase::newPendingStream(Envoy::ConnectionPool::AttachContext& context,
      73         173 :                                        bool can_send_early_data) {
      74         173 :   Http::ResponseDecoder& decoder = *typedContext<HttpAttachContext>(context).decoder_;
      75         173 :   Http::ConnectionPool::Callbacks& callbacks = *typedContext<HttpAttachContext>(context).callbacks_;
      76         173 :   ENVOY_LOG(debug,
      77         173 :             "queueing stream due to no available connections (ready={} busy={} connecting={})",
      78         173 :             ready_clients_.size(), busy_clients_.size(), connecting_clients_.size());
      79         173 :   Envoy::ConnectionPool::PendingStreamPtr pending_stream(
      80         173 :       new HttpPendingStream(*this, decoder, callbacks, can_send_early_data));
      81         173 :   return addPendingStream(std::move(pending_stream));
      82         173 : }
      83             : 
      84             : void HttpConnPoolImplBase::onPoolReady(Envoy::ConnectionPool::ActiveClient& client,
      85         202 :                                        Envoy::ConnectionPool::AttachContext& context) {
      86         202 :   ActiveClient* http_client = static_cast<ActiveClient*>(&client);
      87         202 :   auto& http_context = typedContext<HttpAttachContext>(context);
      88         202 :   Http::ResponseDecoder& response_decoder = *http_context.decoder_;
      89         202 :   Http::ConnectionPool::Callbacks& callbacks = *http_context.callbacks_;
      90         202 :   Http::RequestEncoder& new_encoder = http_client->newStreamEncoder(response_decoder);
      91         202 :   callbacks.onPoolReady(new_encoder, client.real_host_description_,
      92         202 :                         http_client->codec_client_->streamInfo(),
      93         202 :                         http_client->codec_client_->protocol());
      94         202 : }
      95             : 
      96             : // All streams are 2^31. Client streams are half that, minus stream 0. Just to be on the safe
      97             : // side we do 2^29.
      98             : static const uint64_t DEFAULT_MAX_STREAMS = (1 << 29);
      99             : 
     100           5 : void MultiplexedActiveClientBase::onGoAway(Http::GoAwayErrorCode) {
     101           5 :   ENVOY_CONN_LOG(debug, "remote goaway", *codec_client_);
     102           5 :   parent_.host()->cluster().trafficStats()->upstream_cx_close_notify_.inc();
     103           5 :   if (state() != ActiveClient::State::Draining) {
     104           5 :     if (codec_client_->numActiveRequests() == 0) {
     105           5 :       codec_client_->close();
     106           5 :     } else {
     107           0 :       parent_.transitionActiveClientState(*this, ActiveClient::State::Draining);
     108           0 :     }
     109           5 :   }
     110           5 : }
     111             : 
     112             : // Adjust the concurrent stream limit if the negotiated concurrent stream limit
     113             : // is lower than the local max configured streams.
     114             : //
     115             : // Note: if multiple streams are assigned to a connection before the settings
     116             : // are received, they may still be reset by the peer. This could be avoided by
     117             : // not considering http/2 connections connected until the SETTINGS frame is
     118             : // received, but that would result in a latency penalty instead.
     119          68 : void MultiplexedActiveClientBase::onSettings(ReceivedSettings& settings) {
     120          68 :   if (settings.maxConcurrentStreams().has_value()) {
     121          38 :     int64_t old_unused_capacity = currentUnusedCapacity();
     122             :     // Given config limits old_unused_capacity should never exceed int32_t.
     123          38 :     ASSERT(std::numeric_limits<int32_t>::max() >= old_unused_capacity);
     124          38 :     if (parent().cache() && parent().origin().has_value()) {
     125           0 :       parent().cache()->setConcurrentStreams(*parent().origin(),
     126           0 :                                              settings.maxConcurrentStreams().value());
     127           0 :     }
     128          38 :     concurrent_stream_limit_ =
     129          38 :         std::min(settings.maxConcurrentStreams().value(), configured_stream_limit_);
     130             : 
     131          38 :     int64_t delta = old_unused_capacity - currentUnusedCapacity();
     132          38 :     if (state() == ActiveClient::State::Ready && currentUnusedCapacity() <= 0) {
     133           0 :       parent_.transitionActiveClientState(*this, ActiveClient::State::Busy);
     134          38 :     } else if (state() == ActiveClient::State::Busy && currentUnusedCapacity() > 0) {
     135           0 :       parent_.transitionActiveClientState(*this, ActiveClient::State::Ready);
     136           0 :     }
     137             : 
     138          38 :     if (delta > 0) {
     139           0 :       parent_.decrClusterStreamCapacity(delta);
     140           0 :       ENVOY_CONN_LOG(trace, "Decreasing stream capacity by {}", *codec_client_, delta);
     141          38 :     } else if (delta < 0) {
     142           0 :       parent_.incrClusterStreamCapacity(-delta);
     143           0 :       ENVOY_CONN_LOG(trace, "Increasing stream capacity by {}", *codec_client_, -delta);
     144           0 :     }
     145          38 :   }
     146          68 : }
     147             : 
     148         148 : void MultiplexedActiveClientBase::onStreamDestroy() {
     149         148 :   parent().onStreamClosed(*this, false);
     150             : 
     151             :   // If we are destroying this stream because of a disconnect, do not check for drain here. We will
     152             :   // wait until the connection has been fully drained of streams and then check in the connection
     153             :   // event callback.
     154         148 :   if (!closed_with_active_rq_) {
     155         108 :     parent().checkForIdleAndCloseIdleConnsIfDraining();
     156         108 :   }
     157         148 : }
     158             : 
     159         108 : void MultiplexedActiveClientBase::onStreamReset(Http::StreamResetReason reason) {
     160         108 :   switch (reason) {
     161          40 :   case StreamResetReason::ConnectionTermination:
     162          40 :   case StreamResetReason::LocalConnectionFailure:
     163          40 :   case StreamResetReason::RemoteConnectionFailure:
     164          40 :   case StreamResetReason::ConnectionTimeout:
     165          40 :     parent_.host()->cluster().trafficStats()->upstream_rq_pending_failure_eject_.inc();
     166          40 :     closed_with_active_rq_ = true;
     167          40 :     break;
     168          45 :   case StreamResetReason::LocalReset:
     169          68 :   case StreamResetReason::ProtocolError:
     170          68 :   case StreamResetReason::OverloadManager:
     171          68 :     parent_.host()->cluster().trafficStats()->upstream_rq_tx_reset_.inc();
     172          68 :     break;
     173           0 :   case StreamResetReason::RemoteReset:
     174           0 :     parent_.host()->cluster().trafficStats()->upstream_rq_rx_reset_.inc();
     175           0 :     break;
     176           0 :   case StreamResetReason::LocalRefusedStreamReset:
     177           0 :   case StreamResetReason::RemoteRefusedStreamReset:
     178           0 :   case StreamResetReason::Overflow:
     179           0 :   case StreamResetReason::ConnectError:
     180           0 :     break;
     181         108 :   }
     182         108 : }
     183             : 
     184         210 : uint64_t MultiplexedActiveClientBase::maxStreamsPerConnection(uint64_t max_streams_config) {
     185         210 :   return (max_streams_config != 0) ? max_streams_config : DEFAULT_MAX_STREAMS;
     186         210 : }
     187             : 
     188             : MultiplexedActiveClientBase::MultiplexedActiveClientBase(
     189             :     HttpConnPoolImplBase& parent, uint32_t effective_concurrent_streams,
     190             :     uint32_t max_configured_concurrent_streams, Stats::Counter& cx_total,
     191             :     OptRef<Upstream::Host::CreateConnectionData> data)
     192             :     : Envoy::Http::ActiveClient(
     193             :           parent, maxStreamsPerConnection(parent.host()->cluster().maxRequestsPerConnection()),
     194         105 :           effective_concurrent_streams, max_configured_concurrent_streams, data) {
     195         105 :   codec_client_->setCodecClientCallbacks(*this);
     196         105 :   codec_client_->setCodecConnectionCallbacks(*this);
     197         105 :   cx_total.inc();
     198         105 : }
     199             : 
     200         105 : bool MultiplexedActiveClientBase::closingWithIncompleteStream() const {
     201         105 :   return closed_with_active_rq_;
     202         105 : }
     203             : 
     204         148 : RequestEncoder& MultiplexedActiveClientBase::newStreamEncoder(ResponseDecoder& response_decoder) {
     205         148 :   return codec_client_->newStream(response_decoder);
     206         148 : }
     207             : 
     208             : } // namespace Http
     209             : } // namespace Envoy

Generated by: LCOV version 1.15