Line data Source code
1 : #include "source/common/tcp_proxy/upstream.h"
2 :
3 : #include "envoy/upstream/cluster_manager.h"
4 :
5 : #include "source/common/http/codec_client.h"
6 : #include "source/common/http/codes.h"
7 : #include "source/common/http/header_map_impl.h"
8 : #include "source/common/http/headers.h"
9 : #include "source/common/http/utility.h"
10 : #include "source/common/runtime/runtime_features.h"
11 :
12 : namespace Envoy {
13 : namespace TcpProxy {
14 : using TunnelingConfig =
15 : envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig;
16 :
17 : TcpUpstream::TcpUpstream(Tcp::ConnectionPool::ConnectionDataPtr&& data,
18 : Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks)
19 0 : : upstream_conn_data_(std::move(data)) {
20 0 : Network::ClientConnection& connection = upstream_conn_data_->connection();
21 0 : connection.enableHalfClose(true);
22 0 : upstream_conn_data_->addUpstreamCallbacks(upstream_callbacks);
23 0 : }
24 :
25 0 : bool TcpUpstream::readDisable(bool disable) {
26 0 : if (upstream_conn_data_ == nullptr ||
27 0 : upstream_conn_data_->connection().state() != Network::Connection::State::Open) {
28 : // Because we flush write downstream, we can have a case where upstream has already disconnected
29 : // and we are waiting to flush. If we had a watermark event during this time we should no
30 : // longer touch the upstream connection.
31 0 : return false;
32 0 : }
33 :
34 0 : upstream_conn_data_->connection().readDisable(disable);
35 0 : return true;
36 0 : }
37 :
38 0 : void TcpUpstream::encodeData(Buffer::Instance& data, bool end_stream) {
39 0 : upstream_conn_data_->connection().write(data, end_stream);
40 0 : }
41 :
42 0 : void TcpUpstream::addBytesSentCallback(Network::Connection::BytesSentCb cb) {
43 0 : upstream_conn_data_->connection().addBytesSentCallback(cb);
44 0 : }
45 :
46 0 : bool TcpUpstream::startUpstreamSecureTransport() {
47 0 : return (upstream_conn_data_ == nullptr)
48 0 : ? false
49 0 : : upstream_conn_data_->connection().startSecureTransport();
50 0 : }
51 :
52 0 : Ssl::ConnectionInfoConstSharedPtr TcpUpstream::getUpstreamConnectionSslInfo() {
53 0 : if (upstream_conn_data_ != nullptr) {
54 0 : return upstream_conn_data_->connection().ssl();
55 0 : }
56 0 : return nullptr;
57 0 : }
58 :
59 : Tcp::ConnectionPool::ConnectionData*
60 0 : TcpUpstream::onDownstreamEvent(Network::ConnectionEvent event) {
61 : // TODO(botengyao): propagate RST back to upstream connection if RST is received from downstream.
62 0 : if (event == Network::ConnectionEvent::RemoteClose) {
63 : // The close call may result in this object being deleted. Latch the
64 : // connection locally so it can be returned for potential draining.
65 0 : auto* conn_data = upstream_conn_data_.release();
66 0 : conn_data->connection().close(
67 0 : Network::ConnectionCloseType::FlushWrite,
68 0 : StreamInfo::LocalCloseReasons::get().ClosingUpstreamTcpDueToDownstreamRemoteClose);
69 0 : return conn_data;
70 0 : } else if (event == Network::ConnectionEvent::LocalClose) {
71 0 : upstream_conn_data_->connection().close(
72 0 : Network::ConnectionCloseType::NoFlush,
73 0 : StreamInfo::LocalCloseReasons::get().ClosingUpstreamTcpDueToDownstreamLocalClose);
74 0 : }
75 0 : return nullptr;
76 0 : }
77 :
78 : HttpUpstream::HttpUpstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
79 : const TunnelingConfigHelper& config,
80 : StreamInfo::StreamInfo& downstream_info, Http::CodecType type)
81 : : config_(config), downstream_info_(downstream_info), response_decoder_(*this),
82 0 : upstream_callbacks_(callbacks), type_(type) {}
83 :
84 0 : HttpUpstream::~HttpUpstream() { resetEncoder(Network::ConnectionEvent::LocalClose); }
85 :
86 0 : bool HttpUpstream::isValidResponse(const Http::ResponseHeaderMap& headers) {
87 0 : if (type_ == Http::CodecType::HTTP1) {
88 : // According to RFC7231 any 2xx response indicates that the connection is
89 : // established.
90 : // Any 'Content-Length' or 'Transfer-Encoding' header fields MUST be ignored.
91 : // https://tools.ietf.org/html/rfc7231#section-4.3.6
92 0 : return Http::CodeUtility::is2xx(Http::Utility::getResponseStatus(headers));
93 0 : }
94 0 : return Http::Utility::getResponseStatus(headers) == 200;
95 0 : }
96 :
97 0 : void HttpUpstream::setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) {
98 0 : request_encoder_ = &request_encoder;
99 0 : request_encoder_->getStream().addCallbacks(*this);
100 0 : auto headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>({
101 0 : {Http::Headers::get().Method, config_.usePost() ? "POST" : "CONNECT"},
102 0 : {Http::Headers::get().Host, config_.host(downstream_info_)},
103 0 : });
104 0 : if (config_.usePost()) {
105 0 : headers->addReference(Http::Headers::get().Path, config_.postPath());
106 0 : }
107 :
108 0 : if (type_ == Http::CodecType::HTTP1) {
109 0 : request_encoder_->enableTcpTunneling();
110 0 : ASSERT(request_encoder_->http1StreamEncoderOptions() != absl::nullopt);
111 0 : } else {
112 0 : const std::string& scheme =
113 0 : is_ssl ? Http::Headers::get().SchemeValues.Https : Http::Headers::get().SchemeValues.Http;
114 :
115 0 : if (config_.usePost()) {
116 0 : headers->addReference(Http::Headers::get().Scheme, scheme);
117 0 : }
118 0 : }
119 :
120 0 : config_.headerEvaluator().evaluateHeaders(*headers, {downstream_info_.getRequestHeaders()},
121 0 : downstream_info_);
122 0 : const auto status = request_encoder_->encodeHeaders(*headers, false);
123 : // Encoding can only fail on missing required request headers.
124 0 : ASSERT(status.ok());
125 0 : }
126 :
127 0 : bool HttpUpstream::readDisable(bool disable) {
128 0 : if (!request_encoder_) {
129 0 : return false;
130 0 : }
131 0 : request_encoder_->getStream().readDisable(disable);
132 0 : return true;
133 0 : }
134 :
135 0 : void HttpUpstream::encodeData(Buffer::Instance& data, bool end_stream) {
136 0 : if (!request_encoder_) {
137 0 : return;
138 0 : }
139 0 : auto codec = type_;
140 0 : request_encoder_->encodeData(data, end_stream);
141 :
142 : // doneWriting() is being skipped for H1 codec to avoid resetEncoder() call.
143 : // This is because H1 codec does not support half-closed stream. Calling resetEncoder()
144 : // will fully close the upstream connection without flushing any pending data, rather than a http
145 : // stream reset.
146 : // More details can be found on https://github.com/envoyproxy/envoy/pull/13293
147 0 : if ((codec != Http::CodecType::HTTP1) && (end_stream)) {
148 0 : doneWriting();
149 0 : }
150 0 : }
151 :
152 0 : void HttpUpstream::addBytesSentCallback(Network::Connection::BytesSentCb) {
153 : // The HTTP tunneling mode does not tickle the idle timeout when bytes are
154 : // sent to the kernel.
155 : // This can be implemented if any user cares about the difference in time
156 : // between it being sent to the HTTP/2 stack and out to the kernel.
157 0 : }
158 :
159 : Tcp::ConnectionPool::ConnectionData*
160 0 : HttpUpstream::onDownstreamEvent(Network::ConnectionEvent event) {
161 0 : if (event == Network::ConnectionEvent::LocalClose ||
162 0 : event == Network::ConnectionEvent::RemoteClose) {
163 0 : resetEncoder(Network::ConnectionEvent::LocalClose, false);
164 0 : }
165 0 : return nullptr;
166 0 : }
167 :
168 0 : void HttpUpstream::onResetStream(Http::StreamResetReason, absl::string_view) {
169 0 : read_half_closed_ = true;
170 0 : write_half_closed_ = true;
171 0 : resetEncoder(Network::ConnectionEvent::LocalClose);
172 0 : }
173 :
174 0 : void HttpUpstream::onAboveWriteBufferHighWatermark() {
175 0 : upstream_callbacks_.onAboveWriteBufferHighWatermark();
176 0 : }
177 :
178 0 : void HttpUpstream::onBelowWriteBufferLowWatermark() {
179 0 : upstream_callbacks_.onBelowWriteBufferLowWatermark();
180 0 : }
181 :
182 0 : void HttpUpstream::resetEncoder(Network::ConnectionEvent event, bool inform_downstream) {
183 0 : if (!request_encoder_) {
184 0 : return;
185 0 : }
186 0 : request_encoder_->getStream().removeCallbacks(*this);
187 0 : if (!write_half_closed_ || !read_half_closed_) {
188 0 : request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
189 0 : }
190 0 : request_encoder_ = nullptr;
191 : // If we did not receive a valid CONNECT response yet we treat this as a pool
192 : // failure, otherwise we forward the event downstream.
193 0 : if (conn_pool_callbacks_ != nullptr) {
194 0 : conn_pool_callbacks_->onFailure();
195 0 : return;
196 0 : }
197 0 : if (inform_downstream) {
198 0 : upstream_callbacks_.onEvent(event);
199 0 : }
200 0 : }
201 :
202 0 : void HttpUpstream::doneReading() {
203 0 : read_half_closed_ = true;
204 0 : if (write_half_closed_) {
205 0 : resetEncoder(Network::ConnectionEvent::LocalClose);
206 0 : }
207 0 : }
208 :
209 0 : void HttpUpstream::doneWriting() {
210 0 : write_half_closed_ = true;
211 0 : if (read_half_closed_) {
212 0 : resetEncoder(Network::ConnectionEvent::LocalClose);
213 0 : }
214 0 : }
215 :
216 : TcpConnPool::TcpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
217 : Upstream::LoadBalancerContext* context,
218 : Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
219 : StreamInfo::StreamInfo& downstream_info)
220 0 : : upstream_callbacks_(upstream_callbacks), downstream_info_(downstream_info) {
221 0 : conn_pool_data_ = thread_local_cluster.tcpConnPool(Upstream::ResourcePriority::Default, context);
222 0 : }
223 :
224 0 : TcpConnPool::~TcpConnPool() {
225 0 : if (upstream_handle_ != nullptr) {
226 0 : upstream_handle_->cancel(ConnectionPool::CancelPolicy::CloseExcess);
227 0 : }
228 0 : }
229 :
230 0 : void TcpConnPool::newStream(GenericConnectionPoolCallbacks& callbacks) {
231 0 : callbacks_ = &callbacks;
232 : // Given this function is reentrant, make sure we only reset the upstream_handle_ if given a
233 : // valid connection handle. If newConnection fails inline it may result in attempting to
234 : // select a new host, and a recursive call to establishUpstreamConnection. In this case the
235 : // first call to newConnection will return null and the inner call will persist.
236 0 : Tcp::ConnectionPool::Cancellable* handle = conn_pool_data_.value().newConnection(*this);
237 0 : if (handle) {
238 0 : ASSERT(upstream_handle_ == nullptr);
239 0 : upstream_handle_ = handle;
240 0 : }
241 0 : }
242 :
243 : void TcpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason,
244 : absl::string_view failure_reason,
245 0 : Upstream::HostDescriptionConstSharedPtr host) {
246 0 : upstream_handle_ = nullptr;
247 0 : callbacks_->onGenericPoolFailure(reason, failure_reason, host);
248 0 : }
249 :
250 : void TcpConnPool::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
251 0 : Upstream::HostDescriptionConstSharedPtr host) {
252 0 : if (downstream_info_.downstreamAddressProvider().connectionID()) {
253 0 : ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]",
254 0 : conn_data->connection().id(),
255 0 : downstream_info_.downstreamAddressProvider().connectionID().value());
256 0 : }
257 :
258 0 : upstream_handle_ = nullptr;
259 0 : Tcp::ConnectionPool::ConnectionData* latched_data = conn_data.get();
260 0 : Network::Connection& connection = conn_data->connection();
261 :
262 0 : auto upstream = std::make_unique<TcpUpstream>(std::move(conn_data), upstream_callbacks_);
263 0 : callbacks_->onGenericPoolReady(
264 0 : &connection.streamInfo(), std::move(upstream), host,
265 0 : latched_data->connection().connectionInfoProvider(),
266 0 : latched_data->connection().streamInfo().downstreamAddressProvider().sslConnection());
267 0 : }
268 :
269 : HttpConnPool::HttpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
270 : Upstream::LoadBalancerContext* context,
271 : const TunnelingConfigHelper& config,
272 : Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
273 : Http::CodecType type, StreamInfo::StreamInfo& downstream_info)
274 : : config_(config), type_(type), upstream_callbacks_(upstream_callbacks),
275 0 : downstream_info_(downstream_info) {
276 0 : absl::optional<Http::Protocol> protocol;
277 0 : if (type_ == Http::CodecType::HTTP3) {
278 0 : protocol = Http::Protocol::Http3;
279 0 : } else if (type_ == Http::CodecType::HTTP2) {
280 0 : protocol = Http::Protocol::Http2;
281 0 : }
282 0 : conn_pool_data_ =
283 0 : thread_local_cluster.httpConnPool(Upstream::ResourcePriority::Default, protocol, context);
284 0 : }
285 :
286 0 : HttpConnPool::~HttpConnPool() {
287 0 : if (upstream_handle_ != nullptr) {
288 : // Because HTTP connections are generally shorter lived and have a higher probability of use
289 : // before going idle, they are closed with Default rather than CloseExcess.
290 0 : upstream_handle_->cancel(ConnectionPool::CancelPolicy::Default);
291 0 : }
292 0 : }
293 :
294 0 : void HttpConnPool::newStream(GenericConnectionPoolCallbacks& callbacks) {
295 0 : callbacks_ = &callbacks;
296 0 : upstream_ = std::make_unique<HttpUpstream>(upstream_callbacks_, config_, downstream_info_, type_);
297 0 : Tcp::ConnectionPool::Cancellable* handle =
298 0 : conn_pool_data_.value().newStream(upstream_->responseDecoder(), *this,
299 0 : {/*can_send_early_data_=*/false,
300 0 : /*can_use_http3_=*/true});
301 0 : if (handle != nullptr) {
302 0 : upstream_handle_ = handle;
303 0 : }
304 0 : }
305 :
306 : void HttpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason,
307 : absl::string_view failure_reason,
308 0 : Upstream::HostDescriptionConstSharedPtr host) {
309 0 : upstream_handle_ = nullptr;
310 0 : callbacks_->onGenericPoolFailure(reason, failure_reason, host);
311 0 : }
312 :
313 : void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder,
314 : Upstream::HostDescriptionConstSharedPtr host,
315 0 : StreamInfo::StreamInfo& info, absl::optional<Http::Protocol>) {
316 0 : if (info.downstreamAddressProvider().connectionID() &&
317 0 : downstream_info_.downstreamAddressProvider().connectionID()) {
318 : // info.downstreamAddressProvider() is being called to get the upstream connection ID,
319 : // because the StreamInfo object here is of the upstream connection.
320 0 : ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]",
321 0 : info.downstreamAddressProvider().connectionID().value(),
322 0 : downstream_info_.downstreamAddressProvider().connectionID().value());
323 0 : }
324 :
325 0 : upstream_handle_ = nullptr;
326 0 : upstream_->setRequestEncoder(request_encoder,
327 0 : host->transportSocketFactory().implementsSecureTransport());
328 0 : upstream_->setConnPoolCallbacks(std::make_unique<HttpConnPool::Callbacks>(
329 0 : *this, host, info.downstreamAddressProvider().sslConnection()));
330 0 : }
331 :
332 : void HttpConnPool::onGenericPoolReady(Upstream::HostDescriptionConstSharedPtr& host,
333 : const Network::ConnectionInfoProvider& address_provider,
334 0 : Ssl::ConnectionInfoConstSharedPtr ssl_info) {
335 0 : callbacks_->onGenericPoolReady(nullptr, std::move(upstream_), host, address_provider, ssl_info);
336 0 : }
337 :
338 : } // namespace TcpProxy
339 : } // namespace Envoy
|