LCOV - code coverage report
Current view: top level - source/common/network - connection_impl.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 42 49 85.7 %
Date: 2024-01-05 06:35:25 Functions: 19 26 73.1 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include <atomic>
       4             : #include <cstdint>
       5             : #include <list>
       6             : #include <memory>
       7             : #include <string>
       8             : 
       9             : #include "envoy/common/scope_tracker.h"
      10             : #include "envoy/network/transport_socket.h"
      11             : 
      12             : #include "source/common/buffer/watermark_buffer.h"
      13             : #include "source/common/event/libevent.h"
      14             : #include "source/common/network/connection_impl_base.h"
      15             : #include "source/common/stream_info/stream_info_impl.h"
      16             : 
      17             : #include "absl/types/optional.h"
      18             : 
      19             : namespace Envoy {
      20             : class RandomPauseFilter;
      21             : class TestPauseFilter;
      22             : 
      23             : namespace Network {
      24             : 
      25             : class MultiConnectionBaseImpl;
      26             : 
      27             : /**
      28             :  * Utility functions for the connection implementation.
      29             :  */
      30             : class ConnectionImplUtility {
      31             : public:
      32             :   /**
      33             :    * Update the buffer stats for a connection.
      34             :    * @param delta supplies the data read/written.
      35             :    * @param new_total supplies the final total buffer size.
      36             :    * @param previous_total supplies the previous final total buffer size. previous_total will be
      37             :    *        updated to new_total when the call is complete.
      38             :    * @param stat_total supplies the counter to increment with the delta.
      39             :    * @param stat_current supplies the gauge that should be updated with the delta of previous_total
      40             :    *        and new_total.
      41             :    */
      42             :   static void updateBufferStats(uint64_t delta, uint64_t new_total, uint64_t& previous_total,
      43             :                                 Stats::Counter& stat_total, Stats::Gauge& stat_current);
      44             : };
      45             : 
      46             : /**
      47             :  * Implementation of Network::Connection, Network::FilterManagerConnection and
      48             :  * Envoy::ScopeTrackedObject.
      49             :  */
      50             : class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallbacks {
      51             : public:
      52             :   ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,
      53             :                  TransportSocketPtr&& transport_socket, StreamInfo::StreamInfo& stream_info,
      54             :                  bool connected);
      55             : 
      56             :   ~ConnectionImpl() override;
      57             : 
      58             :   // Network::FilterManager
      59             :   void addWriteFilter(WriteFilterSharedPtr filter) override;
      60             :   void addFilter(FilterSharedPtr filter) override;
      61             :   void addReadFilter(ReadFilterSharedPtr filter) override;
      62             :   void removeReadFilter(ReadFilterSharedPtr filter) override;
      63             :   bool initializeReadFilters() override;
      64             : 
      65             :   // Network::Connection
      66             :   void addBytesSentCallback(BytesSentCb cb) override;
      67             :   void enableHalfClose(bool enabled) override;
      68           0 :   bool isHalfCloseEnabled() const override { return enable_half_close_; }
      69             :   void close(ConnectionCloseType type) final;
      70         552 :   void close(ConnectionCloseType type, absl::string_view details) override {
      71         552 :     if (!details.empty()) {
      72         552 :       setLocalCloseReason(details);
      73         552 :     }
      74         552 :     close(type);
      75         552 :   }
      76          98 :   std::string nextProtocol() const override { return transport_socket_->protocol(); }
      77             :   void noDelay(bool enable) override;
      78             :   ReadDisableStatus readDisable(bool disable) override;
      79         621 :   void detectEarlyCloseWhenReadDisabled(bool value) override { detect_early_close_ = value; }
      80             :   bool readEnabled() const override;
      81         173 :   ConnectionInfoSetter& connectionInfoSetter() override {
      82         173 :     return socket_->connectionInfoProvider();
      83         173 :   }
      84        4284 :   const ConnectionInfoProvider& connectionInfoProvider() const override {
      85        4284 :     return socket_->connectionInfoProvider();
      86        4284 :   }
      87         612 :   ConnectionInfoProviderSharedPtr connectionInfoProviderSharedPtr() const override {
      88         612 :     return socket_->connectionInfoProviderSharedPtr();
      89         612 :   }
      90             :   absl::optional<UnixDomainSocketPeerCredentials> unixSocketPeerCredentials() const override;
      91        2538 :   Ssl::ConnectionInfoConstSharedPtr ssl() const override {
      92             :     // SSL info may be overwritten by a filter in the provider.
      93        2538 :     return socket_->connectionInfoProvider().sslConnection();
      94        2538 :   }
      95             :   State state() const override;
      96         796 :   bool connecting() const override {
      97         796 :     ENVOY_CONN_LOG_EVENT(debug, "connection_connecting_state", "current connecting state: {}",
      98         796 :                          *this, connecting_);
      99         796 :     return connecting_;
     100         796 :   }
     101             :   void write(Buffer::Instance& data, bool end_stream) override;
     102             :   void setBufferLimits(uint32_t limit) override;
     103        1424 :   uint32_t bufferLimit() const override { return read_buffer_limit_; }
     104        1018 :   bool aboveHighWatermark() const override { return write_buffer_above_high_watermark_; }
     105         183 :   const ConnectionSocket::OptionsSharedPtr& socketOptions() const override {
     106         183 :     return socket_->options();
     107         183 :   }
     108           0 :   absl::string_view requestedServerName() const override { return socket_->requestedServerName(); }
     109       15144 :   StreamInfo::StreamInfo& streamInfo() override { return stream_info_; }
     110         183 :   const StreamInfo::StreamInfo& streamInfo() const override { return stream_info_; }
     111             :   absl::string_view transportFailureReason() const override;
     112           0 :   bool startSecureTransport() override { return transport_socket_->startSecureTransport(); }
     113             :   absl::optional<std::chrono::milliseconds> lastRoundTripTime() const override;
     114             :   void configureInitialCongestionWindow(uint64_t bandwidth_bits_per_sec,
     115             :                                         std::chrono::microseconds rtt) override;
     116             :   absl::optional<uint64_t> congestionWindowInBytes() const override;
     117             : 
     118             :   // Network::FilterManagerConnection
     119             :   void rawWrite(Buffer::Instance& data, bool end_stream) override;
     120             : 
     121             :   // Network::ReadBufferSource
     122        3910 :   StreamBuffer getReadBuffer() override { return {*read_buffer_, read_end_stream_}; }
     123             :   // Network::WriteBufferSource
     124        7909 :   StreamBuffer getWriteBuffer() override {
     125        7909 :     return {*current_write_buffer_, current_write_end_stream_};
     126        7909 :   }
     127             : 
     128             :   // Network::TransportSocketCallbacks
     129       32481 :   IoHandle& ioHandle() final { return socket_->ioHandle(); }
     130       26236 :   const IoHandle& ioHandle() const override { return socket_->ioHandle(); }
     131           0 :   Connection& connection() override { return *this; }
     132             :   void raiseEvent(ConnectionEvent event) override;
     133             :   // Should the read buffer be drained?
     134        1992 :   bool shouldDrainReadBuffer() override {
     135        1992 :     return read_buffer_limit_ > 0 && read_buffer_->length() >= read_buffer_limit_;
     136        1992 :   }
     137             :   // Mark read buffer ready to read in the event loop. This is used when yielding following
     138             :   // shouldDrainReadBuffer().
     139             :   // TODO(htuch): While this is the basis for also yielding to other connections to provide some
     140             :   // fair sharing of CPU resources, the underlying event loop does not make any fairness guarantees.
     141             :   // Reconsider how to make fairness happen.
     142             :   void setTransportSocketIsReadable() override;
     143             :   void flushWriteBuffer() override;
     144           0 :   TransportSocketPtr& transportSocket() { return transport_socket_; }
     145             : 
     146             :   // Obtain global next connection ID. This should only be used in tests.
     147           0 :   static uint64_t nextGlobalIdForTest() { return next_global_id_; }
     148             : 
     149             :   // ScopeTrackedObject
     150             :   void dumpState(std::ostream& os, int indent_level) const override;
     151         319 :   DetectedCloseType detectedCloseType() const override { return detected_close_type_; }
     152             : 
     153             : protected:
     154             :   // A convenience function which returns true if
     155             :   // 1) The read disable count is zero or
     156             :   // 2) The read disable count is one due to the read buffer being overrun.
     157             :   // In either case the filter chain would like to process data from the read buffer or transport
     158             :   // socket. If the read count is greater than one, or equal to one when the buffer is not overrun,
     159             :   // then the filter chain has called readDisable, and does not want additional data.
     160             :   bool filterChainWantsData();
     161             : 
     162             :   // Network::ConnectionImplBase
     163             :   void closeConnectionImmediately() final;
     164             : 
     165             :   void closeSocket(ConnectionEvent close_type);
     166             : 
     167             :   void onReadBufferLowWatermark();
     168             :   void onReadBufferHighWatermark();
     169             :   void onWriteBufferLowWatermark();
     170             :   void onWriteBufferHighWatermark();
     171             : 
     172             :   // This is called when the underlying socket is connected, not when the
     173             :   // connected event is raised.
     174             :   virtual void onConnected();
     175             : 
     176             :   void setFailureReason(absl::string_view failure_reason);
     177           0 :   const std::string& failureReason() const { return failure_reason_; }
     178             : 
     179             :   TransportSocketPtr transport_socket_;
     180             :   ConnectionSocketPtr socket_;
     181             :   StreamInfo::StreamInfo& stream_info_;
     182             :   FilterManagerImpl filter_manager_;
     183             : 
     184             :   // This must be a WatermarkBuffer, but as it is created by a factory the ConnectionImpl only has
     185             :   // a generic pointer.
     186             :   // It MUST be defined after the filter_manager_ as some filters may have callbacks that
     187             :   // write_buffer_ invokes during its clean up.
     188             :   // This buffer is always allocated, never nullptr.
     189             :   Buffer::InstancePtr write_buffer_;
     190             :   // Ensure that if the consumer of the data from this connection isn't
     191             :   // consuming, that the connection eventually stops reading from the wire.
     192             :   // This buffer is always allocated, never nullptr.
     193             :   Buffer::InstancePtr read_buffer_;
     194             :   uint32_t read_buffer_limit_ = 0;
     195             :   bool connecting_{false};
     196             :   ConnectionEvent immediate_error_event_{ConnectionEvent::Connected};
     197             :   bool bind_error_{false};
     198             : 
     199             : private:
     200             :   friend class MultiConnectionBaseImpl;
     201             :   friend class Envoy::RandomPauseFilter;
     202             :   friend class Envoy::TestPauseFilter;
     203             : 
     204             :   void onFileEvent(uint32_t events);
     205             :   void onRead(uint64_t read_buffer_size);
     206             :   void onReadReady();
     207             :   void onWriteReady();
     208             :   void updateReadBufferStats(uint64_t num_read, uint64_t new_size);
     209             :   void updateWriteBufferStats(uint64_t num_written, uint64_t new_size);
     210             : 
     211             :   // Write data to the connection bypassing filter chain (optionally).
     212             :   void write(Buffer::Instance& data, bool end_stream, bool through_filter_chain);
     213             : 
     214             :   // Returns true iff end of stream has been both written and read.
     215             :   bool bothSidesHalfClosed();
     216             : 
     217             :   // Set the detected close type for this connection.
     218             :   void setDetectedCloseType(DetectedCloseType close_type);
     219             : 
     220             :   static std::atomic<uint64_t> next_global_id_;
     221             : 
     222             :   std::list<BytesSentCb> bytes_sent_callbacks_;
     223             :   // Should be set with setFailureReason.
     224             :   std::string failure_reason_;
     225             :   // Tracks the number of times reads have been disabled. If N different components call
     226             :   // readDisabled(true) this allows the connection to only resume reads when readDisabled(false)
     227             :   // has been called N times.
     228             :   uint64_t last_read_buffer_size_{};
     229             :   uint64_t last_write_buffer_size_{};
     230             :   Buffer::Instance* current_write_buffer_{};
     231             :   uint32_t read_disable_count_{0};
     232             :   DetectedCloseType detected_close_type_{DetectedCloseType::Normal};
     233             :   bool write_buffer_above_high_watermark_ : 1;
     234             :   bool detect_early_close_ : 1;
     235             :   bool enable_half_close_ : 1;
     236             :   bool read_end_stream_raised_ : 1;
     237             :   bool read_end_stream_ : 1;
     238             :   bool write_end_stream_ : 1;
     239             :   bool current_write_end_stream_ : 1;
     240             :   bool dispatch_buffered_data_ : 1;
     241             :   // True if the most recent call to the transport socket's doRead method invoked
     242             :   // setTransportSocketIsReadable to schedule read resumption after yielding due to
     243             :   // shouldDrainReadBuffer(). When true, readDisable must schedule read resumption when
     244             :   // read_disable_count_ == 0 to ensure that read resumption happens when remaining bytes are held
     245             :   // in transport socket internal buffers.
     246             :   bool transport_wants_read_ : 1;
     247             :   bool enable_rst_detect_send_ : 1;
     248             : };
     249             : 
     250             : class ServerConnectionImpl : public ConnectionImpl, virtual public ServerConnection {
     251             : public:
     252             :   ServerConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket,
     253             :                        TransportSocketPtr&& transport_socket, StreamInfo::StreamInfo& stream_info);
     254             : 
     255             :   // ServerConnection impl
     256             :   void setTransportSocketConnectTimeout(std::chrono::milliseconds timeout,
     257             :                                         Stats::Counter& timeout_stat) override;
     258             :   void raiseEvent(ConnectionEvent event) override;
     259             :   bool initializeReadFilters() override;
     260             : 
     261             : private:
     262             :   void onTransportSocketConnectTimeout();
     263             : 
     264             :   bool transport_connect_pending_{true};
     265             :   // Implements a timeout for the transport socket signaling connection. The timer is enabled by a
     266             :   // call to setTransportSocketConnectTimeout and is reset when the connection is established.
     267             :   Event::TimerPtr transport_socket_connect_timer_;
     268             :   Stats::Counter* transport_socket_timeout_stat_;
     269             : };
     270             : 
     271             : /**
     272             :  * libevent implementation of Network::ClientConnection.
     273             :  */
     274             : class ClientConnectionImpl : public ConnectionImpl, virtual public ClientConnection {
     275             : public:
     276             :   ClientConnectionImpl(Event::Dispatcher& dispatcher,
     277             :                        const Address::InstanceConstSharedPtr& remote_address,
     278             :                        const Address::InstanceConstSharedPtr& source_address,
     279             :                        Network::TransportSocketPtr&& transport_socket,
     280             :                        const Network::ConnectionSocket::OptionsSharedPtr& options,
     281             :                        const Network::TransportSocketOptionsConstSharedPtr& transport_options);
     282             : 
     283             :   ClientConnectionImpl(Event::Dispatcher& dispatcher, std::unique_ptr<ConnectionSocket> socket,
     284             :                        const Address::InstanceConstSharedPtr& source_address,
     285             :                        Network::TransportSocketPtr&& transport_socket,
     286             :                        const Network::ConnectionSocket::OptionsSharedPtr& options,
     287             :                        const Network::TransportSocketOptionsConstSharedPtr& transport_options);
     288             : 
     289             :   // Network::ClientConnection
     290             :   void connect() override;
     291             : 
     292             : private:
     293             :   void onConnected() override;
     294             : 
     295             :   StreamInfo::StreamInfoImpl stream_info_;
     296             : };
     297             : 
     298             : } // namespace Network
     299             : } // namespace Envoy

Generated by: LCOV version 1.15