Line data Source code
1 : #pragma once 2 : 3 : #include <cstdint> 4 : #include <memory> 5 : 6 : #include "envoy/http/codec.h" 7 : #include "envoy/tcp/conn_pool.h" 8 : #include "envoy/upstream/thread_local_cluster.h" 9 : 10 : #include "source/common/buffer/watermark_buffer.h" 11 : #include "source/common/common/cleanup.h" 12 : #include "source/common/common/logger.h" 13 : #include "source/common/config/well_known_names.h" 14 : #include "source/common/router/upstream_request.h" 15 : #include "source/common/stream_info/stream_info_impl.h" 16 : 17 : namespace Envoy { 18 : namespace Extensions { 19 : namespace Upstreams { 20 : namespace Http { 21 : namespace Tcp { 22 : 23 : class TcpConnPool : public Router::GenericConnPool, public Envoy::Tcp::ConnectionPool::Callbacks { 24 : public: 25 : TcpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster, 26 0 : Upstream::ResourcePriority priority, Upstream::LoadBalancerContext* ctx) { 27 0 : conn_pool_data_ = thread_local_cluster.tcpConnPool(priority, ctx); 28 0 : } 29 : // Router::GenericConnPool 30 0 : void newStream(Router::GenericConnectionPoolCallbacks* callbacks) override { 31 0 : callbacks_ = callbacks; 32 0 : upstream_handle_ = conn_pool_data_.value().newConnection(*this); 33 0 : } 34 0 : bool cancelAnyPendingStream() override { 35 0 : if (upstream_handle_) { 36 0 : upstream_handle_->cancel(Envoy::Tcp::ConnectionPool::CancelPolicy::Default); 37 0 : upstream_handle_ = nullptr; 38 0 : return true; 39 0 : } 40 0 : return false; 41 0 : } 42 0 : Upstream::HostDescriptionConstSharedPtr host() const override { 43 0 : return conn_pool_data_.value().host(); 44 0 : } 45 0 : bool valid() const override { return conn_pool_data_.has_value(); } 46 : 47 : // Tcp::ConnectionPool::Callbacks 48 : void onPoolFailure(ConnectionPool::PoolFailureReason reason, 49 : absl::string_view transport_failure_reason, 50 0 : Upstream::HostDescriptionConstSharedPtr host) override { 51 0 : upstream_handle_ = nullptr; 52 0 : callbacks_->onPoolFailure(reason, transport_failure_reason, host); 53 0 : } 54 : 55 : void onPoolReady(Envoy::Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, 56 : Upstream::HostDescriptionConstSharedPtr host) override; 57 : 58 : private: 59 : absl::optional<Envoy::Upstream::TcpPoolData> conn_pool_data_; 60 : Envoy::Tcp::ConnectionPool::Cancellable* upstream_handle_{}; 61 : Router::GenericConnectionPoolCallbacks* callbacks_{}; 62 : }; 63 : 64 : class TcpUpstream : public Router::GenericUpstream, 65 : public Envoy::Tcp::ConnectionPool::UpstreamCallbacks { 66 : public: 67 : TcpUpstream(Router::UpstreamToDownstream* upstream_request, 68 : Envoy::Tcp::ConnectionPool::ConnectionDataPtr&& upstream); 69 : 70 : // GenericUpstream 71 : void encodeData(Buffer::Instance& data, bool end_stream) override; 72 0 : void encodeMetadata(const Envoy::Http::MetadataMapVector&) override {} 73 : Envoy::Http::Status encodeHeaders(const Envoy::Http::RequestHeaderMap&, bool end_stream) override; 74 : void encodeTrailers(const Envoy::Http::RequestTrailerMap&) override; 75 : void readDisable(bool disable) override; 76 : void resetStream() override; 77 0 : void setAccount(Buffer::BufferMemoryAccountSharedPtr) override {} 78 : 79 : // Tcp::ConnectionPool::UpstreamCallbacks 80 : void onUpstreamData(Buffer::Instance& data, bool end_stream) override; 81 : void onEvent(Network::ConnectionEvent event) override; 82 : void onAboveWriteBufferHighWatermark() override; 83 : void onBelowWriteBufferLowWatermark() override; 84 0 : const StreamInfo::BytesMeterSharedPtr& bytesMeter() override { return bytes_meter_; } 85 : 86 : private: 87 : Router::UpstreamToDownstream* upstream_request_; 88 : Envoy::Tcp::ConnectionPool::ConnectionDataPtr upstream_conn_data_; 89 : StreamInfo::BytesMeterSharedPtr bytes_meter_{std::make_shared<StreamInfo::BytesMeter>()}; 90 : }; 91 : 92 : } // namespace Tcp 93 : } // namespace Http 94 : } // namespace Upstreams 95 : } // namespace Extensions 96 : } // namespace Envoy