Line data Source code
1 : #pragma once 2 : 3 : #include <chrono> 4 : #include <cstddef> 5 : #include <string> 6 : 7 : #include "envoy/common/optref.h" 8 : #include "envoy/event/dispatcher.h" 9 : #include "envoy/network/connection.h" 10 : #include "envoy/network/filter.h" 11 : #include "envoy/stats/timespan.h" 12 : #include "envoy/stream_info/stream_info.h" 13 : #include "envoy/tcp/async_tcp_client.h" 14 : #include "envoy/upstream/thread_local_cluster.h" 15 : 16 : #include "source/common/network/filter_impl.h" 17 : 18 : #include "absl/strings/string_view.h" 19 : #include "absl/types/optional.h" 20 : 21 : namespace Envoy { 22 : namespace Tcp { 23 : 24 : class AsyncTcpClientImpl : public AsyncTcpClient, 25 : public Network::ConnectionCallbacks, 26 : public Logger::Loggable<Logger::Id::client> { 27 : public: 28 : AsyncTcpClientImpl(Event::Dispatcher& dispatcher, 29 : Upstream::ThreadLocalCluster& thread_local_cluster, 30 : Upstream::LoadBalancerContext* context, bool enable_half_close); 31 : 32 : ~AsyncTcpClientImpl() override; 33 : 34 : void close(Network::ConnectionCloseType type) override; 35 : 36 0 : Network::DetectedCloseType detectedCloseType() const override { return detected_close_; } 37 : 38 : /** 39 : * @return true means a host is successfully picked from a Cluster. 40 : * This doesn't mean the connection is established. 41 : */ 42 : bool connect() override; 43 : 44 : void onConnectTimeout(); 45 : 46 : void setAsyncTcpClientCallbacks(AsyncTcpClientCallbacks& callbacks) override; 47 : 48 : void write(Buffer::Instance& data, bool end_stream) override; 49 : 50 0 : void readDisable(bool disable) override { 51 0 : if (connection_) { 52 0 : connection_->readDisable(disable); 53 0 : } 54 0 : }; 55 : 56 : /** 57 : * @return if the client connects to a peer host. 58 : */ 59 0 : bool connected() override { return !disconnected_; } 60 : 61 0 : Event::Dispatcher& dispatcher() override { return dispatcher_; } 62 : 63 0 : OptRef<StreamInfo::StreamInfo> getStreamInfo() override { 64 0 : if (connection_) { 65 0 : return connection_->streamInfo(); 66 0 : } else { 67 0 : return absl::nullopt; 68 0 : } 69 0 : } 70 : 71 : private: 72 : struct NetworkReadFilter : public Network::ReadFilterBaseImpl { 73 0 : NetworkReadFilter(AsyncTcpClientImpl& parent) : parent_(parent) {} 74 : 75 : // Network::ReadFilter 76 0 : Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override { 77 0 : parent_.onData(data, end_stream); 78 0 : return Network::FilterStatus::StopIteration; 79 0 : } 80 : 81 : AsyncTcpClientImpl& parent_; 82 : }; 83 : 84 : void onData(Buffer::Instance& data, bool end_stream); 85 : 86 : void onEvent(Network::ConnectionEvent event) override; 87 0 : void onAboveWriteBufferHighWatermark() override { 88 0 : if (callbacks_) { 89 0 : callbacks_->onAboveWriteBufferHighWatermark(); 90 0 : } 91 0 : } 92 0 : void onBelowWriteBufferLowWatermark() override { 93 0 : if (callbacks_) { 94 0 : callbacks_->onBelowWriteBufferLowWatermark(); 95 0 : } 96 0 : } 97 : 98 : void disableConnectTimeout(); 99 : void reportConnectionDestroy(Network::ConnectionEvent event); 100 : 101 : Event::Dispatcher& dispatcher_; 102 : Network::ClientConnectionPtr connection_; 103 : Upstream::ThreadLocalCluster& thread_local_cluster_; 104 : Upstream::ClusterInfoConstSharedPtr cluster_info_; 105 : Upstream::LoadBalancerContext* context_; 106 : Stats::TimespanPtr conn_connect_ms_; 107 : Stats::TimespanPtr conn_length_ms_; 108 : Event::TimerPtr connect_timer_; 109 : AsyncTcpClientCallbacks* callbacks_{}; 110 : Network::DetectedCloseType detected_close_{Network::DetectedCloseType::Normal}; 111 : bool disconnected_{true}; 112 : bool enable_half_close_{false}; 113 : }; 114 : 115 : } // namespace Tcp 116 : } // namespace Envoy