Line data Source code
1 : #pragma once 2 : 3 : #include <atomic> 4 : #include <cstdint> 5 : #include <list> 6 : #include <memory> 7 : #include <string> 8 : 9 : #include "envoy/common/scope_tracker.h" 10 : #include "envoy/network/transport_socket.h" 11 : 12 : #include "source/common/buffer/watermark_buffer.h" 13 : #include "source/common/event/libevent.h" 14 : #include "source/common/network/connection_impl_base.h" 15 : #include "source/common/stream_info/stream_info_impl.h" 16 : 17 : #include "absl/types/optional.h" 18 : 19 : namespace Envoy { 20 : class RandomPauseFilter; 21 : class TestPauseFilter; 22 : 23 : namespace Network { 24 : 25 : class MultiConnectionBaseImpl; 26 : 27 : /** 28 : * Utility functions for the connection implementation. 29 : */ 30 : class ConnectionImplUtility { 31 : public: 32 : /** 33 : * Update the buffer stats for a connection. 34 : * @param delta supplies the data read/written. 35 : * @param new_total supplies the final total buffer size. 36 : * @param previous_total supplies the previous final total buffer size. previous_total will be 37 : * updated to new_total when the call is complete. 38 : * @param stat_total supplies the counter to increment with the delta. 39 : * @param stat_current supplies the gauge that should be updated with the delta of previous_total 40 : * and new_total. 41 : */ 42 : static void updateBufferStats(uint64_t delta, uint64_t new_total, uint64_t& previous_total, 43 : Stats::Counter& stat_total, Stats::Gauge& stat_current); 44 : }; 45 : 46 : /** 47 : * Implementation of Network::Connection, Network::FilterManagerConnection and 48 : * Envoy::ScopeTrackedObject. 49 : */ 50 : class ConnectionImpl : public ConnectionImplBase, public TransportSocketCallbacks { 51 : public: 52 : ConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket, 53 : TransportSocketPtr&& transport_socket, StreamInfo::StreamInfo& stream_info, 54 : bool connected); 55 : 56 : ~ConnectionImpl() override; 57 : 58 : // Network::FilterManager 59 : void addWriteFilter(WriteFilterSharedPtr filter) override; 60 : void addFilter(FilterSharedPtr filter) override; 61 : void addReadFilter(ReadFilterSharedPtr filter) override; 62 : void removeReadFilter(ReadFilterSharedPtr filter) override; 63 : bool initializeReadFilters() override; 64 : 65 : // Network::Connection 66 : void addBytesSentCallback(BytesSentCb cb) override; 67 : void enableHalfClose(bool enabled) override; 68 0 : bool isHalfCloseEnabled() const override { return enable_half_close_; } 69 : void close(ConnectionCloseType type) final; 70 552 : void close(ConnectionCloseType type, absl::string_view details) override { 71 552 : if (!details.empty()) { 72 552 : setLocalCloseReason(details); 73 552 : } 74 552 : close(type); 75 552 : } 76 98 : std::string nextProtocol() const override { return transport_socket_->protocol(); } 77 : void noDelay(bool enable) override; 78 : ReadDisableStatus readDisable(bool disable) override; 79 621 : void detectEarlyCloseWhenReadDisabled(bool value) override { detect_early_close_ = value; } 80 : bool readEnabled() const override; 81 173 : ConnectionInfoSetter& connectionInfoSetter() override { 82 173 : return socket_->connectionInfoProvider(); 83 173 : } 84 4284 : const ConnectionInfoProvider& connectionInfoProvider() const override { 85 4284 : return socket_->connectionInfoProvider(); 86 4284 : } 87 612 : ConnectionInfoProviderSharedPtr connectionInfoProviderSharedPtr() const override { 88 612 : return socket_->connectionInfoProviderSharedPtr(); 89 612 : } 90 : absl::optional<UnixDomainSocketPeerCredentials> unixSocketPeerCredentials() const override; 91 2538 : Ssl::ConnectionInfoConstSharedPtr ssl() const override { 92 : // SSL info may be overwritten by a filter in the provider. 93 2538 : return socket_->connectionInfoProvider().sslConnection(); 94 2538 : } 95 : State state() const override; 96 796 : bool connecting() const override { 97 796 : ENVOY_CONN_LOG_EVENT(debug, "connection_connecting_state", "current connecting state: {}", 98 796 : *this, connecting_); 99 796 : return connecting_; 100 796 : } 101 : void write(Buffer::Instance& data, bool end_stream) override; 102 : void setBufferLimits(uint32_t limit) override; 103 1424 : uint32_t bufferLimit() const override { return read_buffer_limit_; } 104 1018 : bool aboveHighWatermark() const override { return write_buffer_above_high_watermark_; } 105 183 : const ConnectionSocket::OptionsSharedPtr& socketOptions() const override { 106 183 : return socket_->options(); 107 183 : } 108 0 : absl::string_view requestedServerName() const override { return socket_->requestedServerName(); } 109 15144 : StreamInfo::StreamInfo& streamInfo() override { return stream_info_; } 110 183 : const StreamInfo::StreamInfo& streamInfo() const override { return stream_info_; } 111 : absl::string_view transportFailureReason() const override; 112 0 : bool startSecureTransport() override { return transport_socket_->startSecureTransport(); } 113 : absl::optional<std::chrono::milliseconds> lastRoundTripTime() const override; 114 : void configureInitialCongestionWindow(uint64_t bandwidth_bits_per_sec, 115 : std::chrono::microseconds rtt) override; 116 : absl::optional<uint64_t> congestionWindowInBytes() const override; 117 : 118 : // Network::FilterManagerConnection 119 : void rawWrite(Buffer::Instance& data, bool end_stream) override; 120 : 121 : // Network::ReadBufferSource 122 3910 : StreamBuffer getReadBuffer() override { return {*read_buffer_, read_end_stream_}; } 123 : // Network::WriteBufferSource 124 7909 : StreamBuffer getWriteBuffer() override { 125 7909 : return {*current_write_buffer_, current_write_end_stream_}; 126 7909 : } 127 : 128 : // Network::TransportSocketCallbacks 129 32481 : IoHandle& ioHandle() final { return socket_->ioHandle(); } 130 26236 : const IoHandle& ioHandle() const override { return socket_->ioHandle(); } 131 0 : Connection& connection() override { return *this; } 132 : void raiseEvent(ConnectionEvent event) override; 133 : // Should the read buffer be drained? 134 1992 : bool shouldDrainReadBuffer() override { 135 1992 : return read_buffer_limit_ > 0 && read_buffer_->length() >= read_buffer_limit_; 136 1992 : } 137 : // Mark read buffer ready to read in the event loop. This is used when yielding following 138 : // shouldDrainReadBuffer(). 139 : // TODO(htuch): While this is the basis for also yielding to other connections to provide some 140 : // fair sharing of CPU resources, the underlying event loop does not make any fairness guarantees. 141 : // Reconsider how to make fairness happen. 142 : void setTransportSocketIsReadable() override; 143 : void flushWriteBuffer() override; 144 0 : TransportSocketPtr& transportSocket() { return transport_socket_; } 145 : 146 : // Obtain global next connection ID. This should only be used in tests. 147 0 : static uint64_t nextGlobalIdForTest() { return next_global_id_; } 148 : 149 : // ScopeTrackedObject 150 : void dumpState(std::ostream& os, int indent_level) const override; 151 319 : DetectedCloseType detectedCloseType() const override { return detected_close_type_; } 152 : 153 : protected: 154 : // A convenience function which returns true if 155 : // 1) The read disable count is zero or 156 : // 2) The read disable count is one due to the read buffer being overrun. 157 : // In either case the filter chain would like to process data from the read buffer or transport 158 : // socket. If the read count is greater than one, or equal to one when the buffer is not overrun, 159 : // then the filter chain has called readDisable, and does not want additional data. 160 : bool filterChainWantsData(); 161 : 162 : // Network::ConnectionImplBase 163 : void closeConnectionImmediately() final; 164 : 165 : void closeSocket(ConnectionEvent close_type); 166 : 167 : void onReadBufferLowWatermark(); 168 : void onReadBufferHighWatermark(); 169 : void onWriteBufferLowWatermark(); 170 : void onWriteBufferHighWatermark(); 171 : 172 : // This is called when the underlying socket is connected, not when the 173 : // connected event is raised. 174 : virtual void onConnected(); 175 : 176 : void setFailureReason(absl::string_view failure_reason); 177 0 : const std::string& failureReason() const { return failure_reason_; } 178 : 179 : TransportSocketPtr transport_socket_; 180 : ConnectionSocketPtr socket_; 181 : StreamInfo::StreamInfo& stream_info_; 182 : FilterManagerImpl filter_manager_; 183 : 184 : // This must be a WatermarkBuffer, but as it is created by a factory the ConnectionImpl only has 185 : // a generic pointer. 186 : // It MUST be defined after the filter_manager_ as some filters may have callbacks that 187 : // write_buffer_ invokes during its clean up. 188 : // This buffer is always allocated, never nullptr. 189 : Buffer::InstancePtr write_buffer_; 190 : // Ensure that if the consumer of the data from this connection isn't 191 : // consuming, that the connection eventually stops reading from the wire. 192 : // This buffer is always allocated, never nullptr. 193 : Buffer::InstancePtr read_buffer_; 194 : uint32_t read_buffer_limit_ = 0; 195 : bool connecting_{false}; 196 : ConnectionEvent immediate_error_event_{ConnectionEvent::Connected}; 197 : bool bind_error_{false}; 198 : 199 : private: 200 : friend class MultiConnectionBaseImpl; 201 : friend class Envoy::RandomPauseFilter; 202 : friend class Envoy::TestPauseFilter; 203 : 204 : void onFileEvent(uint32_t events); 205 : void onRead(uint64_t read_buffer_size); 206 : void onReadReady(); 207 : void onWriteReady(); 208 : void updateReadBufferStats(uint64_t num_read, uint64_t new_size); 209 : void updateWriteBufferStats(uint64_t num_written, uint64_t new_size); 210 : 211 : // Write data to the connection bypassing filter chain (optionally). 212 : void write(Buffer::Instance& data, bool end_stream, bool through_filter_chain); 213 : 214 : // Returns true iff end of stream has been both written and read. 215 : bool bothSidesHalfClosed(); 216 : 217 : // Set the detected close type for this connection. 218 : void setDetectedCloseType(DetectedCloseType close_type); 219 : 220 : static std::atomic<uint64_t> next_global_id_; 221 : 222 : std::list<BytesSentCb> bytes_sent_callbacks_; 223 : // Should be set with setFailureReason. 224 : std::string failure_reason_; 225 : // Tracks the number of times reads have been disabled. If N different components call 226 : // readDisabled(true) this allows the connection to only resume reads when readDisabled(false) 227 : // has been called N times. 228 : uint64_t last_read_buffer_size_{}; 229 : uint64_t last_write_buffer_size_{}; 230 : Buffer::Instance* current_write_buffer_{}; 231 : uint32_t read_disable_count_{0}; 232 : DetectedCloseType detected_close_type_{DetectedCloseType::Normal}; 233 : bool write_buffer_above_high_watermark_ : 1; 234 : bool detect_early_close_ : 1; 235 : bool enable_half_close_ : 1; 236 : bool read_end_stream_raised_ : 1; 237 : bool read_end_stream_ : 1; 238 : bool write_end_stream_ : 1; 239 : bool current_write_end_stream_ : 1; 240 : bool dispatch_buffered_data_ : 1; 241 : // True if the most recent call to the transport socket's doRead method invoked 242 : // setTransportSocketIsReadable to schedule read resumption after yielding due to 243 : // shouldDrainReadBuffer(). When true, readDisable must schedule read resumption when 244 : // read_disable_count_ == 0 to ensure that read resumption happens when remaining bytes are held 245 : // in transport socket internal buffers. 246 : bool transport_wants_read_ : 1; 247 : bool enable_rst_detect_send_ : 1; 248 : }; 249 : 250 : class ServerConnectionImpl : public ConnectionImpl, virtual public ServerConnection { 251 : public: 252 : ServerConnectionImpl(Event::Dispatcher& dispatcher, ConnectionSocketPtr&& socket, 253 : TransportSocketPtr&& transport_socket, StreamInfo::StreamInfo& stream_info); 254 : 255 : // ServerConnection impl 256 : void setTransportSocketConnectTimeout(std::chrono::milliseconds timeout, 257 : Stats::Counter& timeout_stat) override; 258 : void raiseEvent(ConnectionEvent event) override; 259 : bool initializeReadFilters() override; 260 : 261 : private: 262 : void onTransportSocketConnectTimeout(); 263 : 264 : bool transport_connect_pending_{true}; 265 : // Implements a timeout for the transport socket signaling connection. The timer is enabled by a 266 : // call to setTransportSocketConnectTimeout and is reset when the connection is established. 267 : Event::TimerPtr transport_socket_connect_timer_; 268 : Stats::Counter* transport_socket_timeout_stat_; 269 : }; 270 : 271 : /** 272 : * libevent implementation of Network::ClientConnection. 273 : */ 274 : class ClientConnectionImpl : public ConnectionImpl, virtual public ClientConnection { 275 : public: 276 : ClientConnectionImpl(Event::Dispatcher& dispatcher, 277 : const Address::InstanceConstSharedPtr& remote_address, 278 : const Address::InstanceConstSharedPtr& source_address, 279 : Network::TransportSocketPtr&& transport_socket, 280 : const Network::ConnectionSocket::OptionsSharedPtr& options, 281 : const Network::TransportSocketOptionsConstSharedPtr& transport_options); 282 : 283 : ClientConnectionImpl(Event::Dispatcher& dispatcher, std::unique_ptr<ConnectionSocket> socket, 284 : const Address::InstanceConstSharedPtr& source_address, 285 : Network::TransportSocketPtr&& transport_socket, 286 : const Network::ConnectionSocket::OptionsSharedPtr& options, 287 : const Network::TransportSocketOptionsConstSharedPtr& transport_options); 288 : 289 : // Network::ClientConnection 290 : void connect() override; 291 : 292 : private: 293 : void onConnected() override; 294 : 295 : StreamInfo::StreamInfoImpl stream_info_; 296 : }; 297 : 298 : } // namespace Network 299 : } // namespace Envoy