LCOV - code coverage report
Current view: top level - source/common/conn_pool - conn_pool_base.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 41 75 54.7 %
Date: 2024-01-05 06:35:25 Functions: 18 31 58.1 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include "envoy/common/conn_pool.h"
       4             : #include "envoy/event/dispatcher.h"
       5             : #include "envoy/network/connection.h"
       6             : #include "envoy/stats/timespan.h"
       7             : #include "envoy/upstream/cluster_manager.h"
       8             : 
       9             : #include "source/common/common/debug_recursion_checker.h"
      10             : #include "source/common/common/dump_state_utils.h"
      11             : #include "source/common/common/linked_object.h"
      12             : 
      13             : #include "absl/strings/string_view.h"
      14             : #include "fmt/ostream.h"
      15             : 
      16             : namespace Envoy {
      17             : namespace ConnectionPool {
      18             : 
      19             : class ConnPoolImplBase;
      20             : 
      21             : // A placeholder struct for whatever data a given connection pool needs to
      22             : // successfully attach an upstream connection to a downstream connection.
      23             : struct AttachContext {
      24             :   // Add a virtual destructor to allow for the dynamic_cast ASSERT in typedContext.
      25         424 :   virtual ~AttachContext() = default;
      26             : };
      27             : 
      28             : // ActiveClient provides a base class for connection pool clients that handles connection timings
      29             : // as well as managing the connection timeout.
      30             : class ActiveClient : public LinkedObject<ActiveClient>,
      31             :                      public Network::ConnectionCallbacks,
      32             :                      public Event::DeferredDeletable,
      33             :                      protected Logger::Loggable<Logger::Id::pool> {
      34             : public:
      35             :   ActiveClient(ConnPoolImplBase& parent, uint32_t lifetime_stream_limit,
      36             :                uint32_t effective_concurrent_streams, uint32_t concurrent_stream_limit);
      37             :   ActiveClient(ConnPoolImplBase& parent, uint32_t lifetime_stream_limit,
      38             :                uint32_t concurrent_stream_limit);
      39             :   ~ActiveClient() override;
      40             : 
      41         173 :   virtual void releaseResources() { releaseResourcesBase(); }
      42             :   void releaseResourcesBase();
      43             : 
      44             :   // Network::ConnectionCallbacks
      45           0 :   void onAboveWriteBufferHighWatermark() override {}
      46           0 :   void onBelowWriteBufferLowWatermark() override {}
      47             : 
      48             :   // Called if the connection does not complete within the cluster's connectTimeout()
      49             :   void onConnectTimeout();
      50             : 
      51             :   // Called if the maximum connection duration is reached.
      52             :   void onConnectionDurationTimeout();
      53             : 
      54             :   // Returns the concurrent stream limit, accounting for if the total stream limit
      55             :   // is less than the concurrent stream limit.
      56           0 :   virtual uint32_t effectiveConcurrentStreamLimit() const {
      57           0 :     return std::min(remaining_streams_, concurrent_stream_limit_);
      58           0 :   }
      59             : 
      60             :   // Returns the application protocol, or absl::nullopt for TCP.
      61             :   virtual absl::optional<Http::Protocol> protocol() const PURE;
      62             : 
      63        1025 :   virtual int64_t currentUnusedCapacity() const {
      64        1025 :     int64_t remaining_concurrent_streams =
      65        1025 :         static_cast<int64_t>(concurrent_stream_limit_) - numActiveStreams();
      66             : 
      67        1025 :     return std::min<int64_t>(remaining_streams_, remaining_concurrent_streams);
      68        1025 :   }
      69             : 
      70             :   // Initialize upstream read filters. Called when connected.
      71             :   virtual void initializeReadFilters() PURE;
      72             : 
      73             :   // Closes the underlying connection.
      74             :   virtual void close() PURE;
      75             :   // Returns the ID of the underlying connection.
      76             :   virtual uint64_t id() const PURE;
      77             :   // Returns true if this closed with an incomplete stream, for stats tracking/ purposes.
      78             :   virtual bool closingWithIncompleteStream() const PURE;
      79             :   // Returns the number of active streams on this connection.
      80             :   virtual uint32_t numActiveStreams() const PURE;
      81             : 
      82             :   // Return true if it is ready to dispatch the next stream.
      83         165 :   virtual bool readyForStream() const {
      84         165 :     ASSERT(!supportsEarlyData());
      85         165 :     return state_ == State::Ready;
      86         165 :   }
      87             : 
      88             :   // This function is called onStreamClosed to see if there was a negative delta
      89             :   // and (if necessary) update associated bookkeeping.
      90             :   // HTTP/1 and TCP pools can not have negative delta so the default implementation simply returns
      91             :   // false. The HTTP/2 connection pool can have this state, so overrides this function.
      92           0 :   virtual bool hadNegativeDeltaOnStreamClosed() { return false; }
      93             : 
      94             :   enum class State {
      95             :     Connecting,        // Connection is not yet established.
      96             :     ReadyForEarlyData, // Any additional early data stream can be immediately dispatched to this
      97             :                        // connection.
      98             :     Ready,             // Additional streams may be immediately dispatched to this connection.
      99             :     Busy,              // Connection is at its concurrent stream limit.
     100             :     Draining,          // No more streams can be dispatched to this connection, and it will be
     101             :                        // closed when all streams complete.
     102             :     Closed             // Connection is closed and object is queued for destruction.
     103             :   };
     104             : 
     105        1450 :   State state() const { return state_; }
     106             : 
     107         425 :   void setState(State state) {
     108         425 :     if (state == State::ReadyForEarlyData && !supportsEarlyData()) {
     109           0 :       IS_ENVOY_BUG("Unable to set state to ReadyForEarlyData in a client which does not support "
     110           0 :                    "early data.");
     111           0 :       return;
     112           0 :     }
     113             :     // If the client is transitioning to draining, update the remaining
     114             :     // streams and pool and cluster capacity.
     115         425 :     if (state == State::Draining) {
     116           0 :       drain();
     117           0 :     }
     118         425 :     state_ = state;
     119         425 :   }
     120             : 
     121             :   // Sets the remaining streams to 0, and updates pool and cluster capacity.
     122             :   virtual void drain();
     123             : 
     124         787 :   virtual bool hasHandshakeCompleted() const {
     125         787 :     ASSERT(!supportsEarlyData());
     126         787 :     return state_ != State::Connecting;
     127         787 :   }
     128             : 
     129             :   ConnPoolImplBase& parent_;
     130             :   // The count of remaining streams allowed for this connection.
     131             :   // This will start out as the total number of streams per connection if capped
     132             :   // by configuration, or it will be set to std::numeric_limits<uint32_t>::max() to be
     133             :   // (functionally) unlimited.
     134             :   // TODO: this could be moved to an optional to make it actually unlimited.
     135             :   uint32_t remaining_streams_;
     136             :   // The will start out as the upper limit of max concurrent streams for this connection
     137             :   // if capped by configuration, or it will be set to std::numeric_limits<uint32_t>::max()
     138             :   // to be (functionally) unlimited.
     139             :   uint32_t configured_stream_limit_;
     140             :   // The max concurrent stream for this connection, it's initialized by `configured_stream_limit_`
     141             :   // and can be adjusted by SETTINGS frame, but the max value of it can't exceed
     142             :   // `configured_stream_limit_`.
     143             :   uint32_t concurrent_stream_limit_;
     144             :   Upstream::HostDescriptionConstSharedPtr real_host_description_;
     145             :   Stats::TimespanPtr conn_connect_ms_;
     146             :   Stats::TimespanPtr conn_length_;
     147             :   Event::TimerPtr connect_timer_;
     148             :   Event::TimerPtr connection_duration_timer_;
     149             :   bool resources_released_{false};
     150             :   bool timed_out_{false};
     151             :   // TODO(danzh) remove this once http codec exposes the handshake state for h3.
     152             :   bool has_handshake_completed_{false};
     153             : 
     154             : protected:
     155             :   // HTTP/3 subclass should override this.
     156           0 :   virtual bool supportsEarlyData() const { return false; }
     157             : 
     158             : private:
     159             :   State state_{State::Connecting};
     160             : };
     161             : 
     162             : // PendingStream is the base class tracking streams for which a connection has been created but not
     163             : // yet established.
     164             : class PendingStream : public LinkedObject<PendingStream>, public ConnectionPool::Cancellable {
     165             : public:
     166             :   PendingStream(ConnPoolImplBase& parent, bool can_send_early_data);
     167             :   ~PendingStream() override;
     168             : 
     169             :   // ConnectionPool::Cancellable
     170             :   void cancel(Envoy::ConnectionPool::CancelPolicy policy) override;
     171             : 
     172             :   // The context here returns a pointer to whatever context is provided with newStream(),
     173             :   // which will be passed back to the parent in onPoolReady or onPoolFailure.
     174             :   virtual AttachContext& context() PURE;
     175             : 
     176             :   ConnPoolImplBase& parent_;
     177             :   // The request can be sent as early data.
     178             :   bool can_send_early_data_;
     179             : };
     180             : 
     181             : using PendingStreamPtr = std::unique_ptr<PendingStream>;
     182             : 
     183             : using ActiveClientPtr = std::unique_ptr<ActiveClient>;
     184             : 
     185             : // Base class that handles stream queueing logic shared between connection pool implementations.
     186             : class ConnPoolImplBase : protected Logger::Loggable<Logger::Id::pool> {
     187             : public:
     188             :   ConnPoolImplBase(Upstream::HostConstSharedPtr host, Upstream::ResourcePriority priority,
     189             :                    Event::Dispatcher& dispatcher,
     190             :                    const Network::ConnectionSocket::OptionsSharedPtr& options,
     191             :                    const Network::TransportSocketOptionsConstSharedPtr& transport_socket_options,
     192             :                    Upstream::ClusterConnectivityState& state);
     193             :   virtual ~ConnPoolImplBase();
     194             : 
     195             :   void deleteIsPendingImpl();
     196             :   // By default, the connection pool will track connected and connecting stream
     197             :   // capacity as streams are created and destroyed. QUIC does custom stream
     198             :   // accounting so will override this to false.
     199         404 :   virtual bool trackStreamCapacity() { return true; }
     200             : 
     201             :   // A helper function to get the specific context type from the base class context.
     202         554 :   template <class T> T& typedContext(AttachContext& context) {
     203         554 :     ASSERT(dynamic_cast<T*>(&context) != nullptr);
     204         554 :     return *static_cast<T*>(&context);
     205         554 :   }
     206             : 
     207             :   // Determines if prefetching is warranted based on the number of streams in
     208             :   // use, pending streams, anticipated and/or currently unused capacity, and
     209             :   // preconnect configuration.
     210             :   //
     211             :   // If anticipate_incoming_stream is true this assumes a call to newStream is
     212             :   // pending, which is true for global preconnect.
     213             :   static bool shouldConnect(size_t pending_streams, size_t active_streams,
     214             :                             int64_t connecting_and_connected_capacity, float preconnect_ratio,
     215             :                             bool anticipate_incoming_stream = false);
     216             : 
     217             :   // Envoy::ConnectionPool::Instance implementation helpers
     218             :   void addIdleCallbackImpl(Instance::IdleCb cb);
     219             :   // Returns true if the pool is idle.
     220             :   bool isIdleImpl() const;
     221             :   void drainConnectionsImpl(DrainBehavior drain_behavior);
     222        3219 :   const Upstream::HostConstSharedPtr& host() const { return host_; }
     223             :   // Called if this pool is likely to be picked soon, to determine if it's worth preconnecting.
     224             :   bool maybePreconnectImpl(float global_preconnect_ratio);
     225             : 
     226             :   // Closes and destroys all connections. This must be called in the destructor of
     227             :   // derived classes because the derived ActiveClient will downcast parent_ to a more
     228             :   // specific type of ConnPoolImplBase, but if the more specific part is already destructed
     229             :   // (due to bottom-up destructor ordering in c++) that access will be invalid.
     230             :   void destructAllConnections();
     231             : 
     232             :   // Returns a new instance of ActiveClient.
     233             :   virtual ActiveClientPtr instantiateActiveClient() PURE;
     234             : 
     235             :   // Gets a pointer to the list that currently owns this client.
     236             :   std::list<ActiveClientPtr>& owningList(ActiveClient::State state);
     237             : 
     238             :   // Removes the PendingStream from the list of streams. Called when the PendingStream is
     239             :   // cancelled, e.g. when the stream is reset before a connection has been established.
     240             :   void onPendingStreamCancel(PendingStream& stream, Envoy::ConnectionPool::CancelPolicy policy);
     241             : 
     242             :   // Fails all pending streams, calling onPoolFailure on the associated callbacks.
     243             :   void purgePendingStreams(const Upstream::HostDescriptionConstSharedPtr& host_description,
     244             :                            absl::string_view failure_reason,
     245             :                            ConnectionPool::PoolFailureReason pool_failure_reason);
     246             : 
     247             :   // Closes any idle connections as this pool is drained.
     248             :   void closeIdleConnectionsForDrainingPool();
     249             : 
     250             :   // Changes the state_ of an ActiveClient and moves to the appropriate list.
     251             :   void transitionActiveClientState(ActiveClient& client, ActiveClient::State new_state);
     252             : 
     253             :   void onConnectionEvent(ActiveClient& client, absl::string_view failure_reason,
     254             :                          Network::ConnectionEvent event);
     255             : 
     256             :   // Check if the pool has gone idle and invoke idle notification callbacks.
     257             :   void checkForIdleAndNotify();
     258             : 
     259             :   // See if the pool has gone idle. If we're draining, this will also close idle connections.
     260             :   void checkForIdleAndCloseIdleConnsIfDraining();
     261             : 
     262             :   void scheduleOnUpstreamReady();
     263             :   ConnectionPool::Cancellable* newStreamImpl(AttachContext& context, bool can_send_early_data);
     264             : 
     265             :   virtual ConnectionPool::Cancellable* newPendingStream(AttachContext& context,
     266             :                                                         bool can_send_early_data) PURE;
     267             : 
     268             :   virtual void attachStreamToClient(Envoy::ConnectionPool::ActiveClient& client,
     269             :                                     AttachContext& context);
     270             : 
     271             :   virtual void onPoolFailure(const Upstream::HostDescriptionConstSharedPtr& host_description,
     272             :                              absl::string_view failure_reason,
     273             :                              ConnectionPool::PoolFailureReason pool_failure_reason,
     274             :                              AttachContext& context) PURE;
     275             :   virtual void onPoolReady(ActiveClient& client, AttachContext& context) PURE;
     276             :   // Called by derived classes any time a stream is completed or destroyed for any reason.
     277             :   void onStreamClosed(Envoy::ConnectionPool::ActiveClient& client, bool delay_attaching_stream);
     278             : 
     279         933 :   Event::Dispatcher& dispatcher() { return dispatcher_; }
     280         692 :   Upstream::ResourcePriority priority() const { return priority_; }
     281         173 :   const Network::ConnectionSocket::OptionsSharedPtr& socketOptions() { return socket_options_; }
     282         346 :   const Network::TransportSocketOptionsConstSharedPtr& transportSocketOptions() {
     283         346 :     return transport_socket_options_;
     284         346 :   }
     285           0 :   bool hasPendingStreams() const { return !pending_streams_.empty(); }
     286             : 
     287           0 :   void decrClusterStreamCapacity(uint32_t delta) {
     288           0 :     state_.decrConnectingAndConnectedStreamCapacity(delta);
     289           0 :   }
     290           0 :   void incrClusterStreamCapacity(uint32_t delta) {
     291           0 :     state_.incrConnectingAndConnectedStreamCapacity(delta);
     292           0 :   }
     293           0 :   void dumpState(std::ostream& os, int indent_level = 0) const {
     294           0 :     const char* spaces = spacesForLevel(indent_level);
     295           0 :     os << spaces << "ConnPoolImplBase " << this << DUMP_MEMBER(ready_clients_.size())
     296           0 :        << DUMP_MEMBER(busy_clients_.size()) << DUMP_MEMBER(connecting_clients_.size())
     297           0 :        << DUMP_MEMBER(connecting_stream_capacity_) << DUMP_MEMBER(num_active_streams_)
     298           0 :        << DUMP_MEMBER(pending_streams_.size())
     299           0 :        << " per upstream preconnect ratio: " << perUpstreamPreconnectRatio();
     300           0 :   }
     301             : 
     302           0 :   friend std::ostream& operator<<(std::ostream& os, const ConnPoolImplBase& s) {
     303           0 :     s.dumpState(os);
     304           0 :     return os;
     305           0 :   }
     306           0 :   Upstream::ClusterConnectivityState& state() { return state_; }
     307             : 
     308             :   void decrConnectingAndConnectedStreamCapacity(uint32_t delta, ActiveClient& client);
     309             :   void incrConnectingAndConnectedStreamCapacity(uint32_t delta, ActiveClient& client);
     310             : 
     311             :   // Called when an upstream is ready to serve pending streams.
     312             :   void onUpstreamReady();
     313             : 
     314             :   // Called when an upstream is ready to serve early data streams.
     315             :   void onUpstreamReadyForEarlyData(ActiveClient& client);
     316             : 
     317             : protected:
     318         165 :   virtual void onConnected(Envoy::ConnectionPool::ActiveClient&) {}
     319           8 :   virtual void onConnectFailed(Envoy::ConnectionPool::ActiveClient&) {}
     320             : 
     321             :   enum class ConnectionResult {
     322             :     FailedToCreateConnection,
     323             :     CreatedNewConnection,
     324             :     ShouldNotConnect,
     325             :     NoConnectionRateLimited,
     326             :     CreatedButRateLimited,
     327             :   };
     328             :   // Creates up to 3 connections, based on the preconnect ratio.
     329             :   // Returns the ConnectionResult of the last attempt.
     330             :   ConnectionResult tryCreateNewConnections();
     331             : 
     332             :   // Creates a new connection if there is sufficient demand, it is allowed by resourceManager, or
     333             :   // to avoid starving this pool.
     334             :   // Demand is determined either by perUpstreamPreconnectRatio() or global_preconnect_ratio
     335             :   // if this is called by maybePreconnect()
     336             :   ConnectionResult tryCreateNewConnection(float global_preconnect_ratio = 0);
     337             : 
     338             :   // A helper function which determines if a canceled pending connection should
     339             :   // be closed as excess or not.
     340             :   bool connectingConnectionIsExcess(const ActiveClient& client) const;
     341             : 
     342             :   // A helper function which determines if a new incoming stream should trigger
     343             :   // connection preconnect.
     344             :   bool shouldCreateNewConnection(float global_preconnect_ratio) const;
     345             : 
     346             :   float perUpstreamPreconnectRatio() const;
     347             : 
     348             :   ConnectionPool::Cancellable*
     349         173 :   addPendingStream(Envoy::ConnectionPool::PendingStreamPtr&& pending_stream) {
     350         173 :     LinkedList::moveIntoList(std::move(pending_stream), pending_streams_);
     351         173 :     state_.incrPendingStreams(1);
     352         173 :     return pending_streams_.front().get();
     353         173 :   }
     354             : 
     355           0 :   bool hasActiveStreams() const { return num_active_streams_ > 0; }
     356             : 
     357             :   Upstream::ClusterConnectivityState& state_;
     358             : 
     359             :   const Upstream::HostConstSharedPtr host_;
     360             :   const Upstream::ResourcePriority priority_;
     361             : 
     362             :   Event::Dispatcher& dispatcher_;
     363             :   const Network::ConnectionSocket::OptionsSharedPtr socket_options_;
     364             :   const Network::TransportSocketOptionsConstSharedPtr transport_socket_options_;
     365             : 
     366             :   // True if the max requests circuit breakers apply.
     367             :   // This will be false for the TCP pool, true otherwise.
     368         202 :   virtual bool enforceMaxRequests() const { return true; }
     369             : 
     370             :   std::list<Instance::IdleCb> idle_callbacks_;
     371             : 
     372             :   // When calling purgePendingStreams, this list will be used to hold the streams we are about
     373             :   // to purge. We need this if one cancelled streams cancels a different pending stream
     374             :   std::list<PendingStreamPtr> pending_streams_to_purge_;
     375             : 
     376             :   // Clients that are ready to handle additional streams.
     377             :   // All entries are in state Ready.
     378             :   std::list<ActiveClientPtr> ready_clients_;
     379             : 
     380             :   // Clients that are not ready to handle additional streams due to being Busy or Draining.
     381             :   std::list<ActiveClientPtr> busy_clients_;
     382             : 
     383             :   // Clients that are not ready to handle additional streams because they are Connecting.
     384             :   std::list<ActiveClientPtr> connecting_clients_;
     385             : 
     386             :   // Clients that are ready to handle additional early data streams because they have 0-RTT
     387             :   // credentials.
     388             :   std::list<ActiveClientPtr> early_data_clients_;
     389             : 
     390             :   // The number of streams that can be immediately dispatched
     391             :   // if all Connecting connections become connected.
     392             :   uint32_t connecting_stream_capacity_{0};
     393             : 
     394             : private:
     395             :   // Drain all the clients in the given list.
     396             :   // Prerequisite: the given clients shouldn't be idle.
     397             :   void drainClients(std::list<ActiveClientPtr>& clients);
     398             : 
     399             :   std::list<PendingStreamPtr> pending_streams_;
     400             : 
     401             :   // The number of streams currently attached to clients.
     402             :   uint32_t num_active_streams_{0};
     403             : 
     404             :   // Whether the connection pool is currently in the process of closing
     405             :   // all connections so that it can be gracefully deleted.
     406             :   bool is_draining_for_deletion_{false};
     407             : 
     408             :   // True iff this object is in the deferred delete list.
     409             :   bool deferred_deleting_{false};
     410             : 
     411             :   Event::SchedulableCallbackPtr upstream_ready_cb_;
     412             :   Common::DebugRecursionChecker recursion_checker_;
     413             : };
     414             : 
     415             : } // namespace ConnectionPool
     416             : } // namespace Envoy
     417             : 
     418             : // NOLINT(namespace-envoy)
     419             : namespace fmt {
     420             : template <> struct formatter<Envoy::ConnectionPool::ConnPoolImplBase> : ostream_formatter {};
     421             : } // namespace fmt

Generated by: LCOV version 1.15