Line data Source code
1 : #include "source/common/tcp/conn_pool.h" 2 : 3 : #include <memory> 4 : 5 : #include "envoy/event/dispatcher.h" 6 : #include "envoy/upstream/upstream.h" 7 : 8 : #include "source/common/stats/timespan_impl.h" 9 : #include "source/common/upstream/upstream_impl.h" 10 : 11 : namespace Envoy { 12 : namespace Tcp { 13 : 14 : ActiveTcpClient::ActiveTcpClient(Envoy::ConnectionPool::ConnPoolImplBase& parent, 15 : const Upstream::HostConstSharedPtr& host, 16 : uint64_t concurrent_stream_limit, 17 : absl::optional<std::chrono::milliseconds> idle_timeout) 18 : : Envoy::ConnectionPool::ActiveClient(parent, host->cluster().maxRequestsPerConnection(), 19 : concurrent_stream_limit), 20 0 : parent_(parent), idle_timeout_(idle_timeout) { 21 0 : Upstream::Host::CreateConnectionData data = host->createConnection( 22 0 : parent_.dispatcher(), parent_.socketOptions(), parent_.transportSocketOptions()); 23 0 : real_host_description_ = data.host_description_; 24 0 : connection_ = std::move(data.connection_); 25 0 : connection_->addConnectionCallbacks(*this); 26 0 : read_filter_handle_ = std::make_shared<ConnReadFilter>(*this); 27 0 : connection_->addReadFilter(read_filter_handle_); 28 0 : Upstream::ClusterTrafficStats& cluster_traffic_stats = *host->cluster().trafficStats(); 29 0 : connection_->setConnectionStats({cluster_traffic_stats.upstream_cx_rx_bytes_total_, 30 0 : cluster_traffic_stats.upstream_cx_rx_bytes_buffered_, 31 0 : cluster_traffic_stats.upstream_cx_tx_bytes_total_, 32 0 : cluster_traffic_stats.upstream_cx_tx_bytes_buffered_, 33 0 : &cluster_traffic_stats.bind_errors_, nullptr}); 34 0 : connection_->noDelay(true); 35 0 : connection_->connect(); 36 : 37 0 : if (idle_timeout_.has_value()) { 38 0 : idle_timer_ = connection_->dispatcher().createTimer([this]() -> void { onIdleTimeout(); }); 39 0 : setIdleTimer(); 40 0 : } 41 0 : } 42 : 43 0 : ActiveTcpClient::~ActiveTcpClient() { 44 : // Handle the case where deferred delete results in the ActiveClient being destroyed before 45 : // TcpConnectionData. Make sure the TcpConnectionData will not refer to this ActiveTcpClient 46 : // and handle clean up normally done in clearCallbacks() 47 0 : if (tcp_connection_data_) { 48 0 : ASSERT(state() == ActiveClient::State::Closed); 49 0 : tcp_connection_data_->release(); 50 0 : parent_.onStreamClosed(*this, true); 51 0 : parent_.checkForIdleAndCloseIdleConnsIfDraining(); 52 0 : } 53 0 : } 54 : 55 0 : void ActiveTcpClient::close() { connection_->close(Network::ConnectionCloseType::NoFlush); } 56 : 57 0 : void ActiveTcpClient::clearCallbacks() { 58 0 : if (state() == Envoy::ConnectionPool::ActiveClient::State::Busy && parent_.hasPendingStreams()) { 59 0 : auto* pool = &parent_; 60 0 : pool->scheduleOnUpstreamReady(); 61 0 : } 62 0 : callbacks_ = nullptr; 63 0 : tcp_connection_data_ = nullptr; 64 0 : parent_.onStreamClosed(*this, true); 65 0 : setIdleTimer(); 66 0 : parent_.checkForIdleAndCloseIdleConnsIfDraining(); 67 0 : } 68 : 69 0 : void ActiveTcpClient::onEvent(Network::ConnectionEvent event) { 70 : // If this is a newly established TCP connection, readDisable. This is to handle a race condition 71 : // for TCP for protocols like MySQL where the upstream writes first, and the data needs to be 72 : // preserved until a downstream connection is associated. 73 : // This is also necessary for prefetch to be used with such protocols. 74 0 : if (event == Network::ConnectionEvent::Connected) { 75 0 : connection_->readDisable(true); 76 0 : } 77 0 : ENVOY_BUG(event != Network::ConnectionEvent::ConnectedZeroRtt, 78 0 : "Unexpected 0-RTT event from the underlying TCP connection."); 79 0 : parent_.onConnectionEvent(*this, connection_->transportFailureReason(), event); 80 : 81 0 : if (event == Network::ConnectionEvent::LocalClose || 82 0 : event == Network::ConnectionEvent::RemoteClose) { 83 0 : disableIdleTimer(); 84 : 85 : // Do not pass the Connected event to any session which registered during onEvent above. 86 : // Consumers of connection pool connections assume they are receiving already connected 87 : // connections. 88 0 : if (callbacks_) { 89 0 : if (tcp_connection_data_) { 90 0 : Envoy::Upstream::reportUpstreamCxDestroyActiveRequest(parent_.host(), event); 91 0 : } 92 0 : callbacks_->onEvent(event); 93 : // After receiving a disconnect event, the owner of callbacks_ will likely self-destruct. 94 : // Clear the pointer to avoid using it again. 95 0 : callbacks_ = nullptr; 96 0 : } 97 0 : } 98 0 : } 99 : 100 0 : void ActiveTcpClient::onIdleTimeout() { 101 0 : ENVOY_CONN_LOG(debug, "per client idle timeout", *connection_); 102 0 : parent_.host()->cluster().trafficStats()->upstream_cx_idle_timeout_.inc(); 103 0 : close(); 104 0 : } 105 : 106 0 : void ActiveTcpClient::disableIdleTimer() { 107 0 : if (idle_timer_ != nullptr) { 108 0 : idle_timer_->disableTimer(); 109 0 : } 110 0 : } 111 : 112 0 : void ActiveTcpClient::setIdleTimer() { 113 0 : if (idle_timer_ != nullptr) { 114 0 : ASSERT(idle_timeout_.has_value()); 115 : 116 0 : idle_timer_->enableTimer(idle_timeout_.value()); 117 0 : } 118 0 : } 119 : 120 : } // namespace Tcp 121 : } // namespace Envoy