Line data Source code
1 : #include "source/common/tcp/async_tcp_client_impl.h" 2 : 3 : #include <cstddef> 4 : #include <memory> 5 : #include <utility> 6 : 7 : #include "envoy/event/dispatcher.h" 8 : #include "envoy/network/connection.h" 9 : #include "envoy/tcp/async_tcp_client.h" 10 : #include "envoy/upstream/upstream.h" 11 : 12 : #include "source/common/common/assert.h" 13 : #include "source/common/stats/timespan_impl.h" 14 : 15 : namespace Envoy { 16 : namespace Tcp { 17 : 18 : AsyncTcpClientImpl::AsyncTcpClientImpl(Event::Dispatcher& dispatcher, 19 : Upstream::ThreadLocalCluster& thread_local_cluster, 20 : Upstream::LoadBalancerContext* context, 21 : bool enable_half_close) 22 : : dispatcher_(dispatcher), thread_local_cluster_(thread_local_cluster), 23 : cluster_info_(thread_local_cluster_.info()), context_(context), 24 0 : connect_timer_(dispatcher.createTimer([this]() { onConnectTimeout(); })), 25 0 : enable_half_close_(enable_half_close) { 26 0 : connect_timer_->enableTimer(cluster_info_->connectTimeout()); 27 0 : cluster_info_->trafficStats()->upstream_cx_active_.inc(); 28 0 : cluster_info_->trafficStats()->upstream_cx_total_.inc(); 29 0 : } 30 : 31 0 : AsyncTcpClientImpl::~AsyncTcpClientImpl() { 32 0 : cluster_info_->trafficStats()->upstream_cx_active_.dec(); 33 0 : } 34 : 35 0 : bool AsyncTcpClientImpl::connect() { 36 0 : connection_ = std::move(thread_local_cluster_.tcpConn(context_).connection_); 37 0 : if (!connection_) { 38 0 : return false; 39 0 : } 40 0 : connection_->enableHalfClose(enable_half_close_); 41 0 : connection_->addConnectionCallbacks(*this); 42 0 : connection_->addReadFilter(std::make_shared<NetworkReadFilter>(*this)); 43 0 : conn_connect_ms_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>( 44 0 : cluster_info_->trafficStats()->upstream_cx_connect_ms_, dispatcher_.timeSource()); 45 : 46 0 : connect_timer_->enableTimer(cluster_info_->connectTimeout()); 47 0 : connection_->setConnectionStats({cluster_info_->trafficStats()->upstream_cx_rx_bytes_total_, 48 0 : cluster_info_->trafficStats()->upstream_cx_rx_bytes_buffered_, 49 0 : cluster_info_->trafficStats()->upstream_cx_tx_bytes_total_, 50 0 : cluster_info_->trafficStats()->upstream_cx_tx_bytes_buffered_, 51 0 : &cluster_info_->trafficStats()->bind_errors_, nullptr}); 52 0 : connection_->noDelay(true); 53 0 : connection_->connect(); 54 0 : return true; 55 0 : } 56 : 57 0 : void AsyncTcpClientImpl::onConnectTimeout() { 58 0 : ENVOY_CONN_LOG(debug, "async tcp connection timed out", *connection_); 59 0 : cluster_info_->trafficStats()->upstream_cx_connect_timeout_.inc(); 60 0 : close(Network::ConnectionCloseType::NoFlush); 61 0 : } 62 : 63 0 : void AsyncTcpClientImpl::close(Network::ConnectionCloseType type) { 64 0 : if (connection_) { 65 0 : connection_->close(type); 66 0 : } 67 0 : } 68 : 69 0 : void AsyncTcpClientImpl::setAsyncTcpClientCallbacks(AsyncTcpClientCallbacks& callbacks) { 70 0 : callbacks_ = &callbacks; 71 0 : } 72 : 73 0 : void AsyncTcpClientImpl::write(Buffer::Instance& data, bool end_stream) { 74 0 : ASSERT(connection_ != nullptr); 75 0 : connection_->write(data, end_stream); 76 0 : } 77 : 78 0 : void AsyncTcpClientImpl::onData(Buffer::Instance& data, bool end_stream) { 79 0 : if (callbacks_) { 80 0 : callbacks_->onData(data, end_stream); 81 0 : } 82 0 : } 83 : 84 0 : void AsyncTcpClientImpl::disableConnectTimeout() { 85 0 : if (connect_timer_) { 86 0 : connect_timer_->disableTimer(); 87 0 : connect_timer_.reset(); 88 0 : } 89 0 : } 90 : 91 0 : void AsyncTcpClientImpl::reportConnectionDestroy(Network::ConnectionEvent event) { 92 0 : auto& stats = cluster_info_->trafficStats(); 93 0 : stats->upstream_cx_destroy_.inc(); 94 0 : if (event == Network::ConnectionEvent::RemoteClose) { 95 0 : stats->upstream_cx_destroy_remote_.inc(); 96 0 : } else { 97 0 : stats->upstream_cx_destroy_local_.inc(); 98 0 : } 99 0 : } 100 : 101 0 : void AsyncTcpClientImpl::onEvent(Network::ConnectionEvent event) { 102 0 : if (event == Network::ConnectionEvent::RemoteClose || 103 0 : event == Network::ConnectionEvent::LocalClose) { 104 0 : if (disconnected_) { 105 0 : cluster_info_->trafficStats()->upstream_cx_connect_fail_.inc(); 106 0 : } 107 : 108 0 : if (!disconnected_ && conn_length_ms_ != nullptr) { 109 0 : conn_length_ms_->complete(); 110 0 : conn_length_ms_.reset(); 111 0 : } 112 0 : disableConnectTimeout(); 113 0 : reportConnectionDestroy(event); 114 0 : disconnected_ = true; 115 0 : if (connection_) { 116 0 : detected_close_ = connection_->detectedCloseType(); 117 0 : } 118 : 119 0 : dispatcher_.deferredDelete(std::move(connection_)); 120 0 : if (callbacks_) { 121 0 : callbacks_->onEvent(event); 122 0 : callbacks_ = nullptr; 123 0 : } 124 0 : } else { 125 0 : disconnected_ = false; 126 0 : conn_connect_ms_->complete(); 127 0 : conn_connect_ms_.reset(); 128 0 : disableConnectTimeout(); 129 : 130 0 : if (!conn_length_ms_) { 131 0 : conn_length_ms_ = std::make_unique<Stats::HistogramCompletableTimespanImpl>( 132 0 : cluster_info_->trafficStats()->upstream_cx_length_ms_, dispatcher_.timeSource()); 133 0 : } 134 0 : if (callbacks_) { 135 0 : callbacks_->onEvent(event); 136 0 : } 137 0 : } 138 0 : } 139 : 140 : } // namespace Tcp 141 : } // namespace Envoy