Line data Source code
1 : #pragma once 2 : 3 : #include <list> 4 : #include <memory> 5 : 6 : #include "envoy/event/deferred_deletable.h" 7 : #include "envoy/event/timer.h" 8 : #include "envoy/network/connection.h" 9 : #include "envoy/network/filter.h" 10 : #include "envoy/stats/timespan.h" 11 : #include "envoy/tcp/conn_pool.h" 12 : #include "envoy/upstream/upstream.h" 13 : 14 : #include "source/common/common/linked_object.h" 15 : #include "source/common/common/logger.h" 16 : #include "source/common/http/conn_pool_base.h" 17 : #include "source/common/network/filter_impl.h" 18 : #include "source/common/runtime/runtime_features.h" 19 : 20 : namespace Envoy { 21 : namespace Tcp { 22 : 23 : class ConnPoolImpl; 24 : 25 : struct TcpAttachContext : public Envoy::ConnectionPool::AttachContext { 26 0 : TcpAttachContext(Tcp::ConnectionPool::Callbacks* callbacks) : callbacks_(callbacks) {} 27 : Tcp::ConnectionPool::Callbacks* callbacks_; 28 : }; 29 : 30 : class TcpPendingStream : public Envoy::ConnectionPool::PendingStream { 31 : public: 32 : TcpPendingStream(Envoy::ConnectionPool::ConnPoolImplBase& parent, bool can_send_early_data, 33 : TcpAttachContext& context) 34 0 : : Envoy::ConnectionPool::PendingStream(parent, can_send_early_data), context_(context) {} 35 0 : Envoy::ConnectionPool::AttachContext& context() override { return context_; } 36 : 37 : TcpAttachContext context_; 38 : }; 39 : 40 : class ActiveTcpClient : public Envoy::ConnectionPool::ActiveClient { 41 : public: 42 : struct ConnReadFilter : public Network::ReadFilterBaseImpl { 43 0 : ConnReadFilter(ActiveTcpClient& parent) : parent_(parent) {} 44 : 45 : // Network::ReadFilter 46 0 : Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override { 47 0 : parent_.onUpstreamData(data, end_stream); 48 0 : return Network::FilterStatus::StopIteration; 49 0 : } 50 : ActiveTcpClient& parent_; 51 : }; 52 : 53 : // This acts as the bridge between the ActiveTcpClient and an individual TCP connection. 54 : class TcpConnectionData : public Envoy::Tcp::ConnectionPool::ConnectionData { 55 : public: 56 : TcpConnectionData(ActiveTcpClient& parent, Network::ClientConnection& connection) 57 0 : : parent_(&parent), connection_(connection) { 58 0 : parent_->tcp_connection_data_ = this; 59 0 : } 60 0 : ~TcpConnectionData() override { 61 : // Generally it is the case that TcpConnectionData will be destroyed before the 62 : // ActiveTcpClient. Because ordering on the deferred delete list is not guaranteed in the 63 : // case of a disconnect, make sure parent_ is valid before doing clean-up. 64 0 : if (parent_) { 65 0 : parent_->clearCallbacks(); 66 0 : } 67 0 : } 68 : 69 0 : Network::ClientConnection& connection() override { return connection_; } 70 0 : void setConnectionState(ConnectionPool::ConnectionStatePtr&& state) override { 71 0 : parent_->connection_state_ = std::move(state); 72 0 : } 73 : 74 0 : void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override { 75 0 : parent_->callbacks_ = &callbacks; 76 0 : } 77 0 : void release() { parent_ = nullptr; } 78 : 79 : protected: 80 0 : ConnectionPool::ConnectionState* connectionState() override { 81 0 : return parent_->connection_state_.get(); 82 0 : } 83 : 84 : private: 85 : ActiveTcpClient* parent_; 86 : Network::ClientConnection& connection_; 87 : }; 88 : 89 : ActiveTcpClient(Envoy::ConnectionPool::ConnPoolImplBase& parent, 90 : const Upstream::HostConstSharedPtr& host, uint64_t concurrent_stream_limit, 91 : absl::optional<std::chrono::milliseconds> idle_timeout); 92 : ~ActiveTcpClient() override; 93 : 94 : // Override the default's of Envoy::ConnectionPool::ActiveClient for class-specific functions. 95 : // Network::ConnectionCallbacks 96 : void onEvent(Network::ConnectionEvent event) override; 97 0 : void onAboveWriteBufferHighWatermark() override { callbacks_->onAboveWriteBufferHighWatermark(); } 98 0 : void onBelowWriteBufferLowWatermark() override { callbacks_->onBelowWriteBufferLowWatermark(); } 99 : 100 : // Undo the readDisable done in onEvent(Connected) - now that there is an associated connection, 101 : // drain any data. 102 0 : void readEnableIfNew() { 103 : // It is expected for Envoy use of ActiveTcpClient this function only be 104 : // called once. Other users of the TcpConnPool may recycle Tcp connections, 105 : // and this safeguards them against read-enabling too many times. 106 0 : if (!associated_before_) { 107 0 : associated_before_ = true; 108 0 : connection_->readDisable(false); 109 : // Also while we're at it, make sure the connection will proxy all TCP 110 : // data before picking up a FIN. 111 0 : connection_->detectEarlyCloseWhenReadDisabled(false); 112 0 : } 113 0 : } 114 : 115 0 : void initializeReadFilters() override { connection_->initializeReadFilters(); } 116 0 : absl::optional<Http::Protocol> protocol() const override { return {}; } 117 : void close() override; 118 0 : uint32_t numActiveStreams() const override { return callbacks_ ? 1 : 0; } 119 0 : bool closingWithIncompleteStream() const override { return false; } 120 0 : uint64_t id() const override { return connection_->id(); } 121 : 122 0 : void onUpstreamData(Buffer::Instance& data, bool end_stream) { 123 0 : if (callbacks_) { 124 0 : callbacks_->onUpstreamData(data, end_stream); 125 0 : } else { 126 0 : close(); 127 0 : } 128 0 : } 129 : virtual void clearCallbacks(); 130 : 131 : // Called if the underlying connection is idle over the cluster's tcpPoolIdleTimeout() 132 : void onIdleTimeout(); 133 : void disableIdleTimer(); 134 : void setIdleTimer(); 135 : 136 : std::shared_ptr<ConnReadFilter> read_filter_handle_; 137 : Envoy::ConnectionPool::ConnPoolImplBase& parent_; 138 : ConnectionPool::UpstreamCallbacks* callbacks_{}; 139 : Network::ClientConnectionPtr connection_; 140 : ConnectionPool::ConnectionStatePtr connection_state_; 141 : TcpConnectionData* tcp_connection_data_{}; 142 : bool associated_before_{}; 143 : absl::optional<std::chrono::milliseconds> idle_timeout_; 144 : Event::TimerPtr idle_timer_; 145 : }; 146 : 147 : class ConnPoolImpl : public Envoy::ConnectionPool::ConnPoolImplBase, 148 : public Tcp::ConnectionPool::Instance { 149 : public: 150 : ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::HostConstSharedPtr host, 151 : Upstream::ResourcePriority priority, 152 : const Network::ConnectionSocket::OptionsSharedPtr& options, 153 : Network::TransportSocketOptionsConstSharedPtr transport_socket_options, 154 : Upstream::ClusterConnectivityState& state, 155 : absl::optional<std::chrono::milliseconds> idle_timeout) 156 : : Envoy::ConnectionPool::ConnPoolImplBase(host, priority, dispatcher, options, 157 : transport_socket_options, state), 158 0 : idle_timeout_(idle_timeout) {} 159 0 : ~ConnPoolImpl() override { destructAllConnections(); } 160 : 161 : // Event::DeferredDeletable 162 0 : void deleteIsPending() override { deleteIsPendingImpl(); } 163 : 164 0 : void addIdleCallback(IdleCb cb) override { addIdleCallbackImpl(cb); } 165 0 : bool isIdle() const override { return isIdleImpl(); } 166 0 : void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override { 167 0 : drainConnectionsImpl(drain_behavior); 168 0 : if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) { 169 0 : return; 170 0 : } 171 : // Legacy behavior for the TCP connection pool marks all connecting clients 172 : // as draining. 173 0 : for (auto& connecting_client : connecting_clients_) { 174 0 : if (connecting_client->remaining_streams_ > 1) { 175 0 : uint64_t old_limit = connecting_client->effectiveConcurrentStreamLimit(); 176 0 : connecting_client->remaining_streams_ = 1; 177 0 : if (connecting_client->effectiveConcurrentStreamLimit() < old_limit) { 178 0 : decrConnectingAndConnectedStreamCapacity( 179 0 : old_limit - connecting_client->effectiveConcurrentStreamLimit(), *connecting_client); 180 0 : } 181 0 : } 182 0 : } 183 0 : } 184 : 185 0 : void closeConnections() override { 186 0 : for (auto* list : {&ready_clients_, &busy_clients_, &connecting_clients_}) { 187 0 : while (!list->empty()) { 188 0 : list->front()->close(); 189 0 : } 190 0 : } 191 0 : } 192 0 : ConnectionPool::Cancellable* newConnection(Tcp::ConnectionPool::Callbacks& callbacks) override { 193 0 : TcpAttachContext context(&callbacks); 194 : // TLS early data over TCP is not supported yet. 195 0 : return newStreamImpl(context, /*can_send_early_data=*/false); 196 0 : } 197 0 : bool maybePreconnect(float preconnect_ratio) override { 198 0 : return maybePreconnectImpl(preconnect_ratio); 199 0 : } 200 : 201 : ConnectionPool::Cancellable* newPendingStream(Envoy::ConnectionPool::AttachContext& context, 202 0 : bool can_send_early_data) override { 203 0 : Envoy::ConnectionPool::PendingStreamPtr pending_stream = std::make_unique<TcpPendingStream>( 204 0 : *this, can_send_early_data, typedContext<TcpAttachContext>(context)); 205 0 : return addPendingStream(std::move(pending_stream)); 206 0 : } 207 : 208 0 : Upstream::HostDescriptionConstSharedPtr host() const override { 209 0 : return Envoy::ConnectionPool::ConnPoolImplBase::host(); 210 0 : } 211 : 212 0 : Envoy::ConnectionPool::ActiveClientPtr instantiateActiveClient() override { 213 0 : return std::make_unique<ActiveTcpClient>(*this, Envoy::ConnectionPool::ConnPoolImplBase::host(), 214 0 : 1, idle_timeout_); 215 0 : } 216 : 217 : void onPoolReady(Envoy::ConnectionPool::ActiveClient& client, 218 0 : Envoy::ConnectionPool::AttachContext& context) override { 219 0 : ActiveTcpClient* tcp_client = static_cast<ActiveTcpClient*>(&client); 220 0 : tcp_client->readEnableIfNew(); 221 0 : auto* callbacks = typedContext<TcpAttachContext>(context).callbacks_; 222 0 : std::unique_ptr<Envoy::Tcp::ConnectionPool::ConnectionData> connection_data = 223 0 : std::make_unique<ActiveTcpClient::TcpConnectionData>(*tcp_client, *tcp_client->connection_); 224 0 : callbacks->onPoolReady(std::move(connection_data), tcp_client->real_host_description_); 225 : 226 : // The tcp client is taken over. Stop the idle timer. 227 0 : if (!connection_data) { 228 0 : tcp_client->disableIdleTimer(); 229 0 : } 230 0 : } 231 : 232 : void onPoolFailure(const Upstream::HostDescriptionConstSharedPtr& host_description, 233 : absl::string_view failure_reason, ConnectionPool::PoolFailureReason reason, 234 0 : Envoy::ConnectionPool::AttachContext& context) override { 235 0 : auto* callbacks = typedContext<TcpAttachContext>(context).callbacks_; 236 0 : callbacks->onPoolFailure(reason, failure_reason, host_description); 237 0 : } 238 : 239 0 : bool enforceMaxRequests() const override { return false; } 240 : // These two functions exist for testing parity between old and new Tcp Connection Pools. 241 0 : virtual void onConnReleased(Envoy::ConnectionPool::ActiveClient&) {} 242 0 : virtual void onConnDestroyed() {} 243 : 244 : absl::optional<std::chrono::milliseconds> idle_timeout_; 245 : }; 246 : 247 : } // namespace Tcp 248 : } // namespace Envoy