Line data Source code
1 : #include "source/extensions/upstreams/http/tcp/upstream_request.h" 2 : 3 : #include <cstdint> 4 : #include <memory> 5 : 6 : #include "envoy/upstream/upstream.h" 7 : 8 : #include "source/common/common/assert.h" 9 : #include "source/common/common/utility.h" 10 : #include "source/common/http/codes.h" 11 : #include "source/common/http/header_map_impl.h" 12 : #include "source/common/http/headers.h" 13 : #include "source/common/http/message_impl.h" 14 : #include "source/common/network/transport_socket_options_impl.h" 15 : #include "source/common/router/router.h" 16 : #include "source/extensions/common/proxy_protocol/proxy_protocol_header.h" 17 : 18 : namespace Envoy { 19 : namespace Extensions { 20 : namespace Upstreams { 21 : namespace Http { 22 : namespace Tcp { 23 : 24 : void TcpConnPool::onPoolReady(Envoy::Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, 25 0 : Upstream::HostDescriptionConstSharedPtr host) { 26 0 : upstream_handle_ = nullptr; 27 0 : Network::Connection& latched_conn = conn_data->connection(); 28 0 : auto upstream = 29 0 : std::make_unique<TcpUpstream>(&callbacks_->upstreamToDownstream(), std::move(conn_data)); 30 0 : callbacks_->onPoolReady(std::move(upstream), host, latched_conn.connectionInfoProvider(), 31 0 : latched_conn.streamInfo(), {}); 32 0 : } 33 : 34 : TcpUpstream::TcpUpstream(Router::UpstreamToDownstream* upstream_request, 35 : Envoy::Tcp::ConnectionPool::ConnectionDataPtr&& upstream) 36 0 : : upstream_request_(upstream_request), upstream_conn_data_(std::move(upstream)) { 37 0 : upstream_conn_data_->connection().enableHalfClose(true); 38 0 : upstream_conn_data_->addUpstreamCallbacks(*this); 39 0 : } 40 : 41 0 : void TcpUpstream::encodeData(Buffer::Instance& data, bool end_stream) { 42 0 : bytes_meter_->addWireBytesSent(data.length()); 43 0 : upstream_conn_data_->connection().write(data, end_stream); 44 0 : } 45 : 46 : Envoy::Http::Status TcpUpstream::encodeHeaders(const Envoy::Http::RequestHeaderMap&, 47 0 : bool end_stream) { 48 : // Headers should only happen once, so use this opportunity to add the proxy 49 : // proto header, if configured. 50 0 : const Router::RouteEntry* route_entry = upstream_request_->route().routeEntry(); 51 0 : ASSERT(route_entry != nullptr); 52 0 : if (route_entry->connectConfig().has_value()) { 53 0 : Buffer::OwnedImpl data; 54 0 : const auto& connect_config = route_entry->connectConfig(); 55 0 : if (connect_config->has_proxy_protocol_config() && 56 0 : upstream_request_->connection().has_value()) { 57 0 : Extensions::Common::ProxyProtocol::generateProxyProtoHeader( 58 0 : connect_config->proxy_protocol_config(), *upstream_request_->connection(), data); 59 0 : } 60 : 61 0 : if (data.length() != 0 || end_stream) { 62 : // Count header bytes for proxy proto. 63 0 : bytes_meter_->addHeaderBytesSent(data.length()); 64 0 : bytes_meter_->addWireBytesSent(data.length()); 65 0 : upstream_conn_data_->connection().write(data, end_stream); 66 0 : } 67 0 : } 68 : 69 : // TcpUpstream::encodeHeaders is called after the UpstreamRequest is fully initialized. Also use 70 : // this time to synthesize the 200 response headers downstream to complete the CONNECT handshake. 71 0 : Envoy::Http::ResponseHeaderMapPtr headers{ 72 0 : Envoy::Http::createHeaderMap<Envoy::Http::ResponseHeaderMapImpl>( 73 0 : {{Envoy::Http::Headers::get().Status, "200"}})}; 74 0 : upstream_request_->decodeHeaders(std::move(headers), /*end_stream=*/false); 75 0 : return Envoy::Http::okStatus(); 76 0 : } 77 : 78 0 : void TcpUpstream::encodeTrailers(const Envoy::Http::RequestTrailerMap&) { 79 0 : Buffer::OwnedImpl data; 80 0 : upstream_conn_data_->connection().write(data, true); 81 0 : } 82 : 83 0 : void TcpUpstream::readDisable(bool disable) { 84 0 : if (upstream_conn_data_->connection().state() != Network::Connection::State::Open) { 85 0 : return; 86 0 : } 87 0 : upstream_conn_data_->connection().readDisable(disable); 88 0 : } 89 : 90 0 : void TcpUpstream::resetStream() { 91 0 : upstream_request_ = nullptr; 92 0 : upstream_conn_data_->connection().close(Network::ConnectionCloseType::NoFlush, 93 0 : "tcp_upstream_reset_stream"); 94 0 : } 95 : 96 0 : void TcpUpstream::onUpstreamData(Buffer::Instance& data, bool end_stream) { 97 0 : bytes_meter_->addWireBytesReceived(data.length()); 98 0 : upstream_request_->decodeData(data, end_stream); 99 0 : } 100 : 101 0 : void TcpUpstream::onEvent(Network::ConnectionEvent event) { 102 0 : if ((event == Network::ConnectionEvent::LocalClose || 103 0 : event == Network::ConnectionEvent::RemoteClose) && 104 0 : upstream_request_) { 105 0 : upstream_request_->onResetStream(Envoy::Http::StreamResetReason::ConnectionTermination, ""); 106 0 : } 107 0 : } 108 : 109 0 : void TcpUpstream::onAboveWriteBufferHighWatermark() { 110 0 : if (upstream_request_) { 111 0 : upstream_request_->onAboveWriteBufferHighWatermark(); 112 0 : } 113 0 : } 114 : 115 0 : void TcpUpstream::onBelowWriteBufferLowWatermark() { 116 0 : if (upstream_request_) { 117 0 : upstream_request_->onBelowWriteBufferLowWatermark(); 118 0 : } 119 0 : } 120 : 121 : } // namespace Tcp 122 : } // namespace Http 123 : } // namespace Upstreams 124 : } // namespace Extensions 125 : } // namespace Envoy