LCOV - code coverage report
Current view: top level - source/common/tcp_proxy - upstream.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 224 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 32 0.0 %

          Line data    Source code
       1             : #include "source/common/tcp_proxy/upstream.h"
       2             : 
       3             : #include "envoy/upstream/cluster_manager.h"
       4             : 
       5             : #include "source/common/http/codec_client.h"
       6             : #include "source/common/http/codes.h"
       7             : #include "source/common/http/header_map_impl.h"
       8             : #include "source/common/http/headers.h"
       9             : #include "source/common/http/utility.h"
      10             : #include "source/common/runtime/runtime_features.h"
      11             : 
      12             : namespace Envoy {
      13             : namespace TcpProxy {
      14             : using TunnelingConfig =
      15             :     envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig;
      16             : 
      17             : TcpUpstream::TcpUpstream(Tcp::ConnectionPool::ConnectionDataPtr&& data,
      18             :                          Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks)
      19           0 :     : upstream_conn_data_(std::move(data)) {
      20           0 :   Network::ClientConnection& connection = upstream_conn_data_->connection();
      21           0 :   connection.enableHalfClose(true);
      22           0 :   upstream_conn_data_->addUpstreamCallbacks(upstream_callbacks);
      23           0 : }
      24             : 
      25           0 : bool TcpUpstream::readDisable(bool disable) {
      26           0 :   if (upstream_conn_data_ == nullptr ||
      27           0 :       upstream_conn_data_->connection().state() != Network::Connection::State::Open) {
      28             :     // Because we flush write downstream, we can have a case where upstream has already disconnected
      29             :     // and we are waiting to flush. If we had a watermark event during this time we should no
      30             :     // longer touch the upstream connection.
      31           0 :     return false;
      32           0 :   }
      33             : 
      34           0 :   upstream_conn_data_->connection().readDisable(disable);
      35           0 :   return true;
      36           0 : }
      37             : 
      38           0 : void TcpUpstream::encodeData(Buffer::Instance& data, bool end_stream) {
      39           0 :   upstream_conn_data_->connection().write(data, end_stream);
      40           0 : }
      41             : 
      42           0 : void TcpUpstream::addBytesSentCallback(Network::Connection::BytesSentCb cb) {
      43           0 :   upstream_conn_data_->connection().addBytesSentCallback(cb);
      44           0 : }
      45             : 
      46           0 : bool TcpUpstream::startUpstreamSecureTransport() {
      47           0 :   return (upstream_conn_data_ == nullptr)
      48           0 :              ? false
      49           0 :              : upstream_conn_data_->connection().startSecureTransport();
      50           0 : }
      51             : 
      52           0 : Ssl::ConnectionInfoConstSharedPtr TcpUpstream::getUpstreamConnectionSslInfo() {
      53           0 :   if (upstream_conn_data_ != nullptr) {
      54           0 :     return upstream_conn_data_->connection().ssl();
      55           0 :   }
      56           0 :   return nullptr;
      57           0 : }
      58             : 
      59             : Tcp::ConnectionPool::ConnectionData*
      60           0 : TcpUpstream::onDownstreamEvent(Network::ConnectionEvent event) {
      61             :   // TODO(botengyao): propagate RST back to upstream connection if RST is received from downstream.
      62           0 :   if (event == Network::ConnectionEvent::RemoteClose) {
      63             :     // The close call may result in this object being deleted. Latch the
      64             :     // connection locally so it can be returned for potential draining.
      65           0 :     auto* conn_data = upstream_conn_data_.release();
      66           0 :     conn_data->connection().close(
      67           0 :         Network::ConnectionCloseType::FlushWrite,
      68           0 :         StreamInfo::LocalCloseReasons::get().ClosingUpstreamTcpDueToDownstreamRemoteClose);
      69           0 :     return conn_data;
      70           0 :   } else if (event == Network::ConnectionEvent::LocalClose) {
      71           0 :     upstream_conn_data_->connection().close(
      72           0 :         Network::ConnectionCloseType::NoFlush,
      73           0 :         StreamInfo::LocalCloseReasons::get().ClosingUpstreamTcpDueToDownstreamLocalClose);
      74           0 :   }
      75           0 :   return nullptr;
      76           0 : }
      77             : 
      78             : HttpUpstream::HttpUpstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
      79             :                            const TunnelingConfigHelper& config,
      80             :                            StreamInfo::StreamInfo& downstream_info, Http::CodecType type)
      81             :     : config_(config), downstream_info_(downstream_info), response_decoder_(*this),
      82           0 :       upstream_callbacks_(callbacks), type_(type) {}
      83             : 
      84           0 : HttpUpstream::~HttpUpstream() { resetEncoder(Network::ConnectionEvent::LocalClose); }
      85             : 
      86           0 : bool HttpUpstream::isValidResponse(const Http::ResponseHeaderMap& headers) {
      87           0 :   if (type_ == Http::CodecType::HTTP1) {
      88             :     //  According to RFC7231 any 2xx response indicates that the connection is
      89             :     //  established.
      90             :     //  Any 'Content-Length' or 'Transfer-Encoding' header fields MUST be ignored.
      91             :     //  https://tools.ietf.org/html/rfc7231#section-4.3.6
      92           0 :     return Http::CodeUtility::is2xx(Http::Utility::getResponseStatus(headers));
      93           0 :   }
      94           0 :   return Http::Utility::getResponseStatus(headers) == 200;
      95           0 : }
      96             : 
      97           0 : void HttpUpstream::setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) {
      98           0 :   request_encoder_ = &request_encoder;
      99           0 :   request_encoder_->getStream().addCallbacks(*this);
     100           0 :   auto headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>({
     101           0 :       {Http::Headers::get().Method, config_.usePost() ? "POST" : "CONNECT"},
     102           0 :       {Http::Headers::get().Host, config_.host(downstream_info_)},
     103           0 :   });
     104           0 :   if (config_.usePost()) {
     105           0 :     headers->addReference(Http::Headers::get().Path, config_.postPath());
     106           0 :   }
     107             : 
     108           0 :   if (type_ == Http::CodecType::HTTP1) {
     109           0 :     request_encoder_->enableTcpTunneling();
     110           0 :     ASSERT(request_encoder_->http1StreamEncoderOptions() != absl::nullopt);
     111           0 :   } else {
     112           0 :     const std::string& scheme =
     113           0 :         is_ssl ? Http::Headers::get().SchemeValues.Https : Http::Headers::get().SchemeValues.Http;
     114             : 
     115           0 :     if (config_.usePost()) {
     116           0 :       headers->addReference(Http::Headers::get().Scheme, scheme);
     117           0 :     }
     118           0 :   }
     119             : 
     120           0 :   config_.headerEvaluator().evaluateHeaders(*headers, {downstream_info_.getRequestHeaders()},
     121           0 :                                             downstream_info_);
     122           0 :   const auto status = request_encoder_->encodeHeaders(*headers, false);
     123             :   // Encoding can only fail on missing required request headers.
     124           0 :   ASSERT(status.ok());
     125           0 : }
     126             : 
     127           0 : bool HttpUpstream::readDisable(bool disable) {
     128           0 :   if (!request_encoder_) {
     129           0 :     return false;
     130           0 :   }
     131           0 :   request_encoder_->getStream().readDisable(disable);
     132           0 :   return true;
     133           0 : }
     134             : 
     135           0 : void HttpUpstream::encodeData(Buffer::Instance& data, bool end_stream) {
     136           0 :   if (!request_encoder_) {
     137           0 :     return;
     138           0 :   }
     139           0 :   auto codec = type_;
     140           0 :   request_encoder_->encodeData(data, end_stream);
     141             : 
     142             :   // doneWriting() is being skipped for H1 codec to avoid resetEncoder() call.
     143             :   // This is because H1 codec does not support half-closed stream. Calling resetEncoder()
     144             :   // will fully close the upstream connection without flushing any pending data, rather than a http
     145             :   // stream reset.
     146             :   // More details can be found on https://github.com/envoyproxy/envoy/pull/13293
     147           0 :   if ((codec != Http::CodecType::HTTP1) && (end_stream)) {
     148           0 :     doneWriting();
     149           0 :   }
     150           0 : }
     151             : 
     152           0 : void HttpUpstream::addBytesSentCallback(Network::Connection::BytesSentCb) {
     153             :   // The HTTP tunneling mode does not tickle the idle timeout when bytes are
     154             :   // sent to the kernel.
     155             :   // This can be implemented if any user cares about the difference in time
     156             :   // between it being sent to the HTTP/2 stack and out to the kernel.
     157           0 : }
     158             : 
     159             : Tcp::ConnectionPool::ConnectionData*
     160           0 : HttpUpstream::onDownstreamEvent(Network::ConnectionEvent event) {
     161           0 :   if (event == Network::ConnectionEvent::LocalClose ||
     162           0 :       event == Network::ConnectionEvent::RemoteClose) {
     163           0 :     resetEncoder(Network::ConnectionEvent::LocalClose, false);
     164           0 :   }
     165           0 :   return nullptr;
     166           0 : }
     167             : 
     168           0 : void HttpUpstream::onResetStream(Http::StreamResetReason, absl::string_view) {
     169           0 :   read_half_closed_ = true;
     170           0 :   write_half_closed_ = true;
     171           0 :   resetEncoder(Network::ConnectionEvent::LocalClose);
     172           0 : }
     173             : 
     174           0 : void HttpUpstream::onAboveWriteBufferHighWatermark() {
     175           0 :   upstream_callbacks_.onAboveWriteBufferHighWatermark();
     176           0 : }
     177             : 
     178           0 : void HttpUpstream::onBelowWriteBufferLowWatermark() {
     179           0 :   upstream_callbacks_.onBelowWriteBufferLowWatermark();
     180           0 : }
     181             : 
     182           0 : void HttpUpstream::resetEncoder(Network::ConnectionEvent event, bool inform_downstream) {
     183           0 :   if (!request_encoder_) {
     184           0 :     return;
     185           0 :   }
     186           0 :   request_encoder_->getStream().removeCallbacks(*this);
     187           0 :   if (!write_half_closed_ || !read_half_closed_) {
     188           0 :     request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
     189           0 :   }
     190           0 :   request_encoder_ = nullptr;
     191             :   // If we did not receive a valid CONNECT response yet we treat this as a pool
     192             :   // failure, otherwise we forward the event downstream.
     193           0 :   if (conn_pool_callbacks_ != nullptr) {
     194           0 :     conn_pool_callbacks_->onFailure();
     195           0 :     return;
     196           0 :   }
     197           0 :   if (inform_downstream) {
     198           0 :     upstream_callbacks_.onEvent(event);
     199           0 :   }
     200           0 : }
     201             : 
     202           0 : void HttpUpstream::doneReading() {
     203           0 :   read_half_closed_ = true;
     204           0 :   if (write_half_closed_) {
     205           0 :     resetEncoder(Network::ConnectionEvent::LocalClose);
     206           0 :   }
     207           0 : }
     208             : 
     209           0 : void HttpUpstream::doneWriting() {
     210           0 :   write_half_closed_ = true;
     211           0 :   if (read_half_closed_) {
     212           0 :     resetEncoder(Network::ConnectionEvent::LocalClose);
     213           0 :   }
     214           0 : }
     215             : 
     216             : TcpConnPool::TcpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
     217             :                          Upstream::LoadBalancerContext* context,
     218             :                          Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
     219             :                          StreamInfo::StreamInfo& downstream_info)
     220           0 :     : upstream_callbacks_(upstream_callbacks), downstream_info_(downstream_info) {
     221           0 :   conn_pool_data_ = thread_local_cluster.tcpConnPool(Upstream::ResourcePriority::Default, context);
     222           0 : }
     223             : 
     224           0 : TcpConnPool::~TcpConnPool() {
     225           0 :   if (upstream_handle_ != nullptr) {
     226           0 :     upstream_handle_->cancel(ConnectionPool::CancelPolicy::CloseExcess);
     227           0 :   }
     228           0 : }
     229             : 
     230           0 : void TcpConnPool::newStream(GenericConnectionPoolCallbacks& callbacks) {
     231           0 :   callbacks_ = &callbacks;
     232             :   // Given this function is reentrant, make sure we only reset the upstream_handle_ if given a
     233             :   // valid connection handle. If newConnection fails inline it may result in attempting to
     234             :   // select a new host, and a recursive call to establishUpstreamConnection. In this case the
     235             :   // first call to newConnection will return null and the inner call will persist.
     236           0 :   Tcp::ConnectionPool::Cancellable* handle = conn_pool_data_.value().newConnection(*this);
     237           0 :   if (handle) {
     238           0 :     ASSERT(upstream_handle_ == nullptr);
     239           0 :     upstream_handle_ = handle;
     240           0 :   }
     241           0 : }
     242             : 
     243             : void TcpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason,
     244             :                                 absl::string_view failure_reason,
     245           0 :                                 Upstream::HostDescriptionConstSharedPtr host) {
     246           0 :   upstream_handle_ = nullptr;
     247           0 :   callbacks_->onGenericPoolFailure(reason, failure_reason, host);
     248           0 : }
     249             : 
     250             : void TcpConnPool::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
     251           0 :                               Upstream::HostDescriptionConstSharedPtr host) {
     252           0 :   if (downstream_info_.downstreamAddressProvider().connectionID()) {
     253           0 :     ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]",
     254           0 :               conn_data->connection().id(),
     255           0 :               downstream_info_.downstreamAddressProvider().connectionID().value());
     256           0 :   }
     257             : 
     258           0 :   upstream_handle_ = nullptr;
     259           0 :   Tcp::ConnectionPool::ConnectionData* latched_data = conn_data.get();
     260           0 :   Network::Connection& connection = conn_data->connection();
     261             : 
     262           0 :   auto upstream = std::make_unique<TcpUpstream>(std::move(conn_data), upstream_callbacks_);
     263           0 :   callbacks_->onGenericPoolReady(
     264           0 :       &connection.streamInfo(), std::move(upstream), host,
     265           0 :       latched_data->connection().connectionInfoProvider(),
     266           0 :       latched_data->connection().streamInfo().downstreamAddressProvider().sslConnection());
     267           0 : }
     268             : 
     269             : HttpConnPool::HttpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
     270             :                            Upstream::LoadBalancerContext* context,
     271             :                            const TunnelingConfigHelper& config,
     272             :                            Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
     273             :                            Http::CodecType type, StreamInfo::StreamInfo& downstream_info)
     274             :     : config_(config), type_(type), upstream_callbacks_(upstream_callbacks),
     275           0 :       downstream_info_(downstream_info) {
     276           0 :   absl::optional<Http::Protocol> protocol;
     277           0 :   if (type_ == Http::CodecType::HTTP3) {
     278           0 :     protocol = Http::Protocol::Http3;
     279           0 :   } else if (type_ == Http::CodecType::HTTP2) {
     280           0 :     protocol = Http::Protocol::Http2;
     281           0 :   }
     282           0 :   conn_pool_data_ =
     283           0 :       thread_local_cluster.httpConnPool(Upstream::ResourcePriority::Default, protocol, context);
     284           0 : }
     285             : 
     286           0 : HttpConnPool::~HttpConnPool() {
     287           0 :   if (upstream_handle_ != nullptr) {
     288             :     // Because HTTP connections are generally shorter lived and have a higher probability of use
     289             :     // before going idle, they are closed with Default rather than CloseExcess.
     290           0 :     upstream_handle_->cancel(ConnectionPool::CancelPolicy::Default);
     291           0 :   }
     292           0 : }
     293             : 
     294           0 : void HttpConnPool::newStream(GenericConnectionPoolCallbacks& callbacks) {
     295           0 :   callbacks_ = &callbacks;
     296           0 :   upstream_ = std::make_unique<HttpUpstream>(upstream_callbacks_, config_, downstream_info_, type_);
     297           0 :   Tcp::ConnectionPool::Cancellable* handle =
     298           0 :       conn_pool_data_.value().newStream(upstream_->responseDecoder(), *this,
     299           0 :                                         {/*can_send_early_data_=*/false,
     300           0 :                                          /*can_use_http3_=*/true});
     301           0 :   if (handle != nullptr) {
     302           0 :     upstream_handle_ = handle;
     303           0 :   }
     304           0 : }
     305             : 
     306             : void HttpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason,
     307             :                                  absl::string_view failure_reason,
     308           0 :                                  Upstream::HostDescriptionConstSharedPtr host) {
     309           0 :   upstream_handle_ = nullptr;
     310           0 :   callbacks_->onGenericPoolFailure(reason, failure_reason, host);
     311           0 : }
     312             : 
     313             : void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder,
     314             :                                Upstream::HostDescriptionConstSharedPtr host,
     315           0 :                                StreamInfo::StreamInfo& info, absl::optional<Http::Protocol>) {
     316           0 :   if (info.downstreamAddressProvider().connectionID() &&
     317           0 :       downstream_info_.downstreamAddressProvider().connectionID()) {
     318             :     // info.downstreamAddressProvider() is being called to get the upstream connection ID,
     319             :     // because the StreamInfo object here is of the upstream connection.
     320           0 :     ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]",
     321           0 :               info.downstreamAddressProvider().connectionID().value(),
     322           0 :               downstream_info_.downstreamAddressProvider().connectionID().value());
     323           0 :   }
     324             : 
     325           0 :   upstream_handle_ = nullptr;
     326           0 :   upstream_->setRequestEncoder(request_encoder,
     327           0 :                                host->transportSocketFactory().implementsSecureTransport());
     328           0 :   upstream_->setConnPoolCallbacks(std::make_unique<HttpConnPool::Callbacks>(
     329           0 :       *this, host, info.downstreamAddressProvider().sslConnection()));
     330           0 : }
     331             : 
     332             : void HttpConnPool::onGenericPoolReady(Upstream::HostDescriptionConstSharedPtr& host,
     333             :                                       const Network::ConnectionInfoProvider& address_provider,
     334           0 :                                       Ssl::ConnectionInfoConstSharedPtr ssl_info) {
     335           0 :   callbacks_->onGenericPoolReady(nullptr, std::move(upstream_), host, address_provider, ssl_info);
     336           0 : }
     337             : 
     338             : } // namespace TcpProxy
     339             : } // namespace Envoy

Generated by: LCOV version 1.15