LCOV - code coverage report
Current view: top level - source/common/tcp - conn_pool.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 113 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 38 0.0 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include <list>
       4             : #include <memory>
       5             : 
       6             : #include "envoy/event/deferred_deletable.h"
       7             : #include "envoy/event/timer.h"
       8             : #include "envoy/network/connection.h"
       9             : #include "envoy/network/filter.h"
      10             : #include "envoy/stats/timespan.h"
      11             : #include "envoy/tcp/conn_pool.h"
      12             : #include "envoy/upstream/upstream.h"
      13             : 
      14             : #include "source/common/common/linked_object.h"
      15             : #include "source/common/common/logger.h"
      16             : #include "source/common/http/conn_pool_base.h"
      17             : #include "source/common/network/filter_impl.h"
      18             : #include "source/common/runtime/runtime_features.h"
      19             : 
      20             : namespace Envoy {
      21             : namespace Tcp {
      22             : 
      23             : class ConnPoolImpl;
      24             : 
      25             : struct TcpAttachContext : public Envoy::ConnectionPool::AttachContext {
      26           0 :   TcpAttachContext(Tcp::ConnectionPool::Callbacks* callbacks) : callbacks_(callbacks) {}
      27             :   Tcp::ConnectionPool::Callbacks* callbacks_;
      28             : };
      29             : 
      30             : class TcpPendingStream : public Envoy::ConnectionPool::PendingStream {
      31             : public:
      32             :   TcpPendingStream(Envoy::ConnectionPool::ConnPoolImplBase& parent, bool can_send_early_data,
      33             :                    TcpAttachContext& context)
      34           0 :       : Envoy::ConnectionPool::PendingStream(parent, can_send_early_data), context_(context) {}
      35           0 :   Envoy::ConnectionPool::AttachContext& context() override { return context_; }
      36             : 
      37             :   TcpAttachContext context_;
      38             : };
      39             : 
      40             : class ActiveTcpClient : public Envoy::ConnectionPool::ActiveClient {
      41             : public:
      42             :   struct ConnReadFilter : public Network::ReadFilterBaseImpl {
      43           0 :     ConnReadFilter(ActiveTcpClient& parent) : parent_(parent) {}
      44             : 
      45             :     // Network::ReadFilter
      46           0 :     Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override {
      47           0 :       parent_.onUpstreamData(data, end_stream);
      48           0 :       return Network::FilterStatus::StopIteration;
      49           0 :     }
      50             :     ActiveTcpClient& parent_;
      51             :   };
      52             : 
      53             :   // This acts as the bridge between the ActiveTcpClient and an individual TCP connection.
      54             :   class TcpConnectionData : public Envoy::Tcp::ConnectionPool::ConnectionData {
      55             :   public:
      56             :     TcpConnectionData(ActiveTcpClient& parent, Network::ClientConnection& connection)
      57           0 :         : parent_(&parent), connection_(connection) {
      58           0 :       parent_->tcp_connection_data_ = this;
      59           0 :     }
      60           0 :     ~TcpConnectionData() override {
      61             :       // Generally it is the case that TcpConnectionData will be destroyed before the
      62             :       // ActiveTcpClient. Because ordering on the deferred delete list is not guaranteed in the
      63             :       // case of a disconnect, make sure parent_ is valid before doing clean-up.
      64           0 :       if (parent_) {
      65           0 :         parent_->clearCallbacks();
      66           0 :       }
      67           0 :     }
      68             : 
      69           0 :     Network::ClientConnection& connection() override { return connection_; }
      70           0 :     void setConnectionState(ConnectionPool::ConnectionStatePtr&& state) override {
      71           0 :       parent_->connection_state_ = std::move(state);
      72           0 :     }
      73             : 
      74           0 :     void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override {
      75           0 :       parent_->callbacks_ = &callbacks;
      76           0 :     }
      77           0 :     void release() { parent_ = nullptr; }
      78             : 
      79             :   protected:
      80           0 :     ConnectionPool::ConnectionState* connectionState() override {
      81           0 :       return parent_->connection_state_.get();
      82           0 :     }
      83             : 
      84             :   private:
      85             :     ActiveTcpClient* parent_;
      86             :     Network::ClientConnection& connection_;
      87             :   };
      88             : 
      89             :   ActiveTcpClient(Envoy::ConnectionPool::ConnPoolImplBase& parent,
      90             :                   const Upstream::HostConstSharedPtr& host, uint64_t concurrent_stream_limit,
      91             :                   absl::optional<std::chrono::milliseconds> idle_timeout);
      92             :   ~ActiveTcpClient() override;
      93             : 
      94             :   // Override the default's of Envoy::ConnectionPool::ActiveClient for class-specific functions.
      95             :   // Network::ConnectionCallbacks
      96             :   void onEvent(Network::ConnectionEvent event) override;
      97           0 :   void onAboveWriteBufferHighWatermark() override { callbacks_->onAboveWriteBufferHighWatermark(); }
      98           0 :   void onBelowWriteBufferLowWatermark() override { callbacks_->onBelowWriteBufferLowWatermark(); }
      99             : 
     100             :   // Undo the readDisable done in onEvent(Connected) - now that there is an associated connection,
     101             :   // drain any data.
     102           0 :   void readEnableIfNew() {
     103             :     // It is expected for Envoy use of ActiveTcpClient this function only be
     104             :     // called once. Other users of the TcpConnPool may recycle Tcp connections,
     105             :     // and this safeguards them against read-enabling too many times.
     106           0 :     if (!associated_before_) {
     107           0 :       associated_before_ = true;
     108           0 :       connection_->readDisable(false);
     109             :       // Also while we're at it, make sure the connection will proxy all TCP
     110             :       // data before picking up a FIN.
     111           0 :       connection_->detectEarlyCloseWhenReadDisabled(false);
     112           0 :     }
     113           0 :   }
     114             : 
     115           0 :   void initializeReadFilters() override { connection_->initializeReadFilters(); }
     116           0 :   absl::optional<Http::Protocol> protocol() const override { return {}; }
     117             :   void close() override;
     118           0 :   uint32_t numActiveStreams() const override { return callbacks_ ? 1 : 0; }
     119           0 :   bool closingWithIncompleteStream() const override { return false; }
     120           0 :   uint64_t id() const override { return connection_->id(); }
     121             : 
     122           0 :   void onUpstreamData(Buffer::Instance& data, bool end_stream) {
     123           0 :     if (callbacks_) {
     124           0 :       callbacks_->onUpstreamData(data, end_stream);
     125           0 :     } else {
     126           0 :       close();
     127           0 :     }
     128           0 :   }
     129             :   virtual void clearCallbacks();
     130             : 
     131             :   // Called if the underlying connection is idle over the cluster's tcpPoolIdleTimeout()
     132             :   void onIdleTimeout();
     133             :   void disableIdleTimer();
     134             :   void setIdleTimer();
     135             : 
     136             :   std::shared_ptr<ConnReadFilter> read_filter_handle_;
     137             :   Envoy::ConnectionPool::ConnPoolImplBase& parent_;
     138             :   ConnectionPool::UpstreamCallbacks* callbacks_{};
     139             :   Network::ClientConnectionPtr connection_;
     140             :   ConnectionPool::ConnectionStatePtr connection_state_;
     141             :   TcpConnectionData* tcp_connection_data_{};
     142             :   bool associated_before_{};
     143             :   absl::optional<std::chrono::milliseconds> idle_timeout_;
     144             :   Event::TimerPtr idle_timer_;
     145             : };
     146             : 
     147             : class ConnPoolImpl : public Envoy::ConnectionPool::ConnPoolImplBase,
     148             :                      public Tcp::ConnectionPool::Instance {
     149             : public:
     150             :   ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::HostConstSharedPtr host,
     151             :                Upstream::ResourcePriority priority,
     152             :                const Network::ConnectionSocket::OptionsSharedPtr& options,
     153             :                Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
     154             :                Upstream::ClusterConnectivityState& state,
     155             :                absl::optional<std::chrono::milliseconds> idle_timeout)
     156             :       : Envoy::ConnectionPool::ConnPoolImplBase(host, priority, dispatcher, options,
     157             :                                                 transport_socket_options, state),
     158           0 :         idle_timeout_(idle_timeout) {}
     159           0 :   ~ConnPoolImpl() override { destructAllConnections(); }
     160             : 
     161             :   // Event::DeferredDeletable
     162           0 :   void deleteIsPending() override { deleteIsPendingImpl(); }
     163             : 
     164           0 :   void addIdleCallback(IdleCb cb) override { addIdleCallbackImpl(cb); }
     165           0 :   bool isIdle() const override { return isIdleImpl(); }
     166           0 :   void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override {
     167           0 :     drainConnectionsImpl(drain_behavior);
     168           0 :     if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) {
     169           0 :       return;
     170           0 :     }
     171             :     // Legacy behavior for the TCP connection pool marks all connecting clients
     172             :     // as draining.
     173           0 :     for (auto& connecting_client : connecting_clients_) {
     174           0 :       if (connecting_client->remaining_streams_ > 1) {
     175           0 :         uint64_t old_limit = connecting_client->effectiveConcurrentStreamLimit();
     176           0 :         connecting_client->remaining_streams_ = 1;
     177           0 :         if (connecting_client->effectiveConcurrentStreamLimit() < old_limit) {
     178           0 :           decrConnectingAndConnectedStreamCapacity(
     179           0 :               old_limit - connecting_client->effectiveConcurrentStreamLimit(), *connecting_client);
     180           0 :         }
     181           0 :       }
     182           0 :     }
     183           0 :   }
     184             : 
     185           0 :   void closeConnections() override {
     186           0 :     for (auto* list : {&ready_clients_, &busy_clients_, &connecting_clients_}) {
     187           0 :       while (!list->empty()) {
     188           0 :         list->front()->close();
     189           0 :       }
     190           0 :     }
     191           0 :   }
     192           0 :   ConnectionPool::Cancellable* newConnection(Tcp::ConnectionPool::Callbacks& callbacks) override {
     193           0 :     TcpAttachContext context(&callbacks);
     194             :     // TLS early data over TCP is not supported yet.
     195           0 :     return newStreamImpl(context, /*can_send_early_data=*/false);
     196           0 :   }
     197           0 :   bool maybePreconnect(float preconnect_ratio) override {
     198           0 :     return maybePreconnectImpl(preconnect_ratio);
     199           0 :   }
     200             : 
     201             :   ConnectionPool::Cancellable* newPendingStream(Envoy::ConnectionPool::AttachContext& context,
     202           0 :                                                 bool can_send_early_data) override {
     203           0 :     Envoy::ConnectionPool::PendingStreamPtr pending_stream = std::make_unique<TcpPendingStream>(
     204           0 :         *this, can_send_early_data, typedContext<TcpAttachContext>(context));
     205           0 :     return addPendingStream(std::move(pending_stream));
     206           0 :   }
     207             : 
     208           0 :   Upstream::HostDescriptionConstSharedPtr host() const override {
     209           0 :     return Envoy::ConnectionPool::ConnPoolImplBase::host();
     210           0 :   }
     211             : 
     212           0 :   Envoy::ConnectionPool::ActiveClientPtr instantiateActiveClient() override {
     213           0 :     return std::make_unique<ActiveTcpClient>(*this, Envoy::ConnectionPool::ConnPoolImplBase::host(),
     214           0 :                                              1, idle_timeout_);
     215           0 :   }
     216             : 
     217             :   void onPoolReady(Envoy::ConnectionPool::ActiveClient& client,
     218           0 :                    Envoy::ConnectionPool::AttachContext& context) override {
     219           0 :     ActiveTcpClient* tcp_client = static_cast<ActiveTcpClient*>(&client);
     220           0 :     tcp_client->readEnableIfNew();
     221           0 :     auto* callbacks = typedContext<TcpAttachContext>(context).callbacks_;
     222           0 :     std::unique_ptr<Envoy::Tcp::ConnectionPool::ConnectionData> connection_data =
     223           0 :         std::make_unique<ActiveTcpClient::TcpConnectionData>(*tcp_client, *tcp_client->connection_);
     224           0 :     callbacks->onPoolReady(std::move(connection_data), tcp_client->real_host_description_);
     225             : 
     226             :     // The tcp client is taken over. Stop the idle timer.
     227           0 :     if (!connection_data) {
     228           0 :       tcp_client->disableIdleTimer();
     229           0 :     }
     230           0 :   }
     231             : 
     232             :   void onPoolFailure(const Upstream::HostDescriptionConstSharedPtr& host_description,
     233             :                      absl::string_view failure_reason, ConnectionPool::PoolFailureReason reason,
     234           0 :                      Envoy::ConnectionPool::AttachContext& context) override {
     235           0 :     auto* callbacks = typedContext<TcpAttachContext>(context).callbacks_;
     236           0 :     callbacks->onPoolFailure(reason, failure_reason, host_description);
     237           0 :   }
     238             : 
     239           0 :   bool enforceMaxRequests() const override { return false; }
     240             :   // These two functions exist for testing parity between old and new Tcp Connection Pools.
     241           0 :   virtual void onConnReleased(Envoy::ConnectionPool::ActiveClient&) {}
     242           0 :   virtual void onConnDestroyed() {}
     243             : 
     244             :   absl::optional<std::chrono::milliseconds> idle_timeout_;
     245             : };
     246             : 
     247             : } // namespace Tcp
     248             : } // namespace Envoy

Generated by: LCOV version 1.15