/proc/self/cwd/source/common/tcp_proxy/upstream.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/common/tcp_proxy/upstream.h" |
2 | | |
3 | | #include "envoy/upstream/cluster_manager.h" |
4 | | |
5 | | #include "source/common/http/codec_client.h" |
6 | | #include "source/common/http/codes.h" |
7 | | #include "source/common/http/header_map_impl.h" |
8 | | #include "source/common/http/headers.h" |
9 | | #include "source/common/http/utility.h" |
10 | | #include "source/common/runtime/runtime_features.h" |
11 | | |
12 | | namespace Envoy { |
13 | | namespace TcpProxy { |
14 | | using TunnelingConfig = |
15 | | envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig; |
16 | | |
17 | | TcpUpstream::TcpUpstream(Tcp::ConnectionPool::ConnectionDataPtr&& data, |
18 | | Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks) |
19 | 0 | : upstream_conn_data_(std::move(data)) { |
20 | 0 | Network::ClientConnection& connection = upstream_conn_data_->connection(); |
21 | 0 | connection.enableHalfClose(true); |
22 | 0 | upstream_conn_data_->addUpstreamCallbacks(upstream_callbacks); |
23 | 0 | } |
24 | | |
25 | 0 | bool TcpUpstream::readDisable(bool disable) { |
26 | 0 | if (upstream_conn_data_ == nullptr || |
27 | 0 | upstream_conn_data_->connection().state() != Network::Connection::State::Open) { |
28 | | // Because we flush write downstream, we can have a case where upstream has already disconnected |
29 | | // and we are waiting to flush. If we had a watermark event during this time we should no |
30 | | // longer touch the upstream connection. |
31 | 0 | return false; |
32 | 0 | } |
33 | | |
34 | 0 | upstream_conn_data_->connection().readDisable(disable); |
35 | 0 | return true; |
36 | 0 | } |
37 | | |
38 | 0 | void TcpUpstream::encodeData(Buffer::Instance& data, bool end_stream) { |
39 | 0 | upstream_conn_data_->connection().write(data, end_stream); |
40 | 0 | } |
41 | | |
42 | 0 | void TcpUpstream::addBytesSentCallback(Network::Connection::BytesSentCb cb) { |
43 | 0 | upstream_conn_data_->connection().addBytesSentCallback(cb); |
44 | 0 | } |
45 | | |
46 | 0 | bool TcpUpstream::startUpstreamSecureTransport() { |
47 | 0 | return (upstream_conn_data_ == nullptr) |
48 | 0 | ? false |
49 | 0 | : upstream_conn_data_->connection().startSecureTransport(); |
50 | 0 | } |
51 | | |
52 | 0 | Ssl::ConnectionInfoConstSharedPtr TcpUpstream::getUpstreamConnectionSslInfo() { |
53 | 0 | if (upstream_conn_data_ != nullptr) { |
54 | 0 | return upstream_conn_data_->connection().ssl(); |
55 | 0 | } |
56 | 0 | return nullptr; |
57 | 0 | } |
58 | | |
59 | | Tcp::ConnectionPool::ConnectionData* |
60 | 0 | TcpUpstream::onDownstreamEvent(Network::ConnectionEvent event) { |
61 | | // TODO(botengyao): propagate RST back to upstream connection if RST is received from downstream. |
62 | 0 | if (event == Network::ConnectionEvent::RemoteClose) { |
63 | | // The close call may result in this object being deleted. Latch the |
64 | | // connection locally so it can be returned for potential draining. |
65 | 0 | auto* conn_data = upstream_conn_data_.release(); |
66 | 0 | conn_data->connection().close( |
67 | 0 | Network::ConnectionCloseType::FlushWrite, |
68 | 0 | StreamInfo::LocalCloseReasons::get().ClosingUpstreamTcpDueToDownstreamRemoteClose); |
69 | 0 | return conn_data; |
70 | 0 | } else if (event == Network::ConnectionEvent::LocalClose) { |
71 | 0 | upstream_conn_data_->connection().close( |
72 | 0 | Network::ConnectionCloseType::NoFlush, |
73 | 0 | StreamInfo::LocalCloseReasons::get().ClosingUpstreamTcpDueToDownstreamLocalClose); |
74 | 0 | } |
75 | 0 | return nullptr; |
76 | 0 | } |
77 | | |
78 | | HttpUpstream::HttpUpstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks, |
79 | | const TunnelingConfigHelper& config, |
80 | | StreamInfo::StreamInfo& downstream_info) |
81 | | : config_(config), downstream_info_(downstream_info), response_decoder_(*this), |
82 | 0 | upstream_callbacks_(callbacks) {} |
83 | | |
84 | 0 | HttpUpstream::~HttpUpstream() { resetEncoder(Network::ConnectionEvent::LocalClose); } |
85 | | |
86 | 0 | bool HttpUpstream::readDisable(bool disable) { |
87 | 0 | if (!request_encoder_) { |
88 | 0 | return false; |
89 | 0 | } |
90 | 0 | request_encoder_->getStream().readDisable(disable); |
91 | 0 | return true; |
92 | 0 | } |
93 | | |
94 | 0 | void HttpUpstream::encodeData(Buffer::Instance& data, bool end_stream) { |
95 | 0 | if (!request_encoder_) { |
96 | 0 | return; |
97 | 0 | } |
98 | 0 | request_encoder_->encodeData(data, end_stream); |
99 | 0 | if (end_stream) { |
100 | 0 | doneWriting(); |
101 | 0 | } |
102 | 0 | } |
103 | | |
104 | 0 | void HttpUpstream::addBytesSentCallback(Network::Connection::BytesSentCb) { |
105 | | // The HTTP tunneling mode does not tickle the idle timeout when bytes are |
106 | | // sent to the kernel. |
107 | | // This can be implemented if any user cares about the difference in time |
108 | | // between it being sent to the HTTP/2 stack and out to the kernel. |
109 | 0 | } |
110 | | |
111 | | Tcp::ConnectionPool::ConnectionData* |
112 | 0 | HttpUpstream::onDownstreamEvent(Network::ConnectionEvent event) { |
113 | 0 | if (event == Network::ConnectionEvent::LocalClose || |
114 | 0 | event == Network::ConnectionEvent::RemoteClose) { |
115 | 0 | resetEncoder(Network::ConnectionEvent::LocalClose, false); |
116 | 0 | } |
117 | 0 | return nullptr; |
118 | 0 | } |
119 | | |
120 | 0 | void HttpUpstream::onResetStream(Http::StreamResetReason, absl::string_view) { |
121 | 0 | read_half_closed_ = true; |
122 | 0 | write_half_closed_ = true; |
123 | 0 | resetEncoder(Network::ConnectionEvent::LocalClose); |
124 | 0 | } |
125 | | |
126 | 0 | void HttpUpstream::onAboveWriteBufferHighWatermark() { |
127 | 0 | upstream_callbacks_.onAboveWriteBufferHighWatermark(); |
128 | 0 | } |
129 | | |
130 | 0 | void HttpUpstream::onBelowWriteBufferLowWatermark() { |
131 | 0 | upstream_callbacks_.onBelowWriteBufferLowWatermark(); |
132 | 0 | } |
133 | | |
134 | 0 | void HttpUpstream::resetEncoder(Network::ConnectionEvent event, bool inform_downstream) { |
135 | 0 | if (!request_encoder_) { |
136 | 0 | return; |
137 | 0 | } |
138 | 0 | request_encoder_->getStream().removeCallbacks(*this); |
139 | 0 | if (!write_half_closed_ || !read_half_closed_) { |
140 | 0 | request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset); |
141 | 0 | } |
142 | 0 | request_encoder_ = nullptr; |
143 | | // If we did not receive a valid CONNECT response yet we treat this as a pool |
144 | | // failure, otherwise we forward the event downstream. |
145 | 0 | if (conn_pool_callbacks_ != nullptr) { |
146 | 0 | conn_pool_callbacks_->onFailure(); |
147 | 0 | return; |
148 | 0 | } |
149 | 0 | if (inform_downstream) { |
150 | 0 | upstream_callbacks_.onEvent(event); |
151 | 0 | } |
152 | 0 | } |
153 | | |
154 | 0 | void HttpUpstream::doneReading() { |
155 | 0 | read_half_closed_ = true; |
156 | 0 | if (write_half_closed_) { |
157 | 0 | resetEncoder(Network::ConnectionEvent::LocalClose); |
158 | 0 | } |
159 | 0 | } |
160 | | |
161 | 0 | void HttpUpstream::doneWriting() { |
162 | 0 | write_half_closed_ = true; |
163 | 0 | if (read_half_closed_) { |
164 | 0 | resetEncoder(Network::ConnectionEvent::LocalClose); |
165 | 0 | } |
166 | 0 | } |
167 | | |
168 | | TcpConnPool::TcpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster, |
169 | | Upstream::LoadBalancerContext* context, |
170 | | Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks, |
171 | | StreamInfo::StreamInfo& downstream_info) |
172 | 0 | : upstream_callbacks_(upstream_callbacks), downstream_info_(downstream_info) { |
173 | 0 | conn_pool_data_ = thread_local_cluster.tcpConnPool(Upstream::ResourcePriority::Default, context); |
174 | 0 | } |
175 | | |
176 | 0 | TcpConnPool::~TcpConnPool() { |
177 | 0 | if (upstream_handle_ != nullptr) { |
178 | 0 | upstream_handle_->cancel(ConnectionPool::CancelPolicy::CloseExcess); |
179 | 0 | } |
180 | 0 | } |
181 | | |
182 | 0 | void TcpConnPool::newStream(GenericConnectionPoolCallbacks& callbacks) { |
183 | 0 | callbacks_ = &callbacks; |
184 | | // Given this function is reentrant, make sure we only reset the upstream_handle_ if given a |
185 | | // valid connection handle. If newConnection fails inline it may result in attempting to |
186 | | // select a new host, and a recursive call to establishUpstreamConnection. In this case the |
187 | | // first call to newConnection will return null and the inner call will persist. |
188 | 0 | Tcp::ConnectionPool::Cancellable* handle = conn_pool_data_.value().newConnection(*this); |
189 | 0 | if (handle) { |
190 | 0 | ASSERT(upstream_handle_ == nullptr); |
191 | 0 | upstream_handle_ = handle; |
192 | 0 | } |
193 | 0 | } |
194 | | |
195 | | void TcpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason, |
196 | | absl::string_view failure_reason, |
197 | 0 | Upstream::HostDescriptionConstSharedPtr host) { |
198 | 0 | upstream_handle_ = nullptr; |
199 | 0 | callbacks_->onGenericPoolFailure(reason, failure_reason, host); |
200 | 0 | } |
201 | | |
202 | | void TcpConnPool::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data, |
203 | 0 | Upstream::HostDescriptionConstSharedPtr host) { |
204 | 0 | if (downstream_info_.downstreamAddressProvider().connectionID()) { |
205 | 0 | ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]", |
206 | 0 | conn_data->connection().id(), |
207 | 0 | downstream_info_.downstreamAddressProvider().connectionID().value()); |
208 | 0 | } |
209 | |
|
210 | 0 | upstream_handle_ = nullptr; |
211 | 0 | Tcp::ConnectionPool::ConnectionData* latched_data = conn_data.get(); |
212 | 0 | Network::Connection& connection = conn_data->connection(); |
213 | |
|
214 | 0 | auto upstream = std::make_unique<TcpUpstream>(std::move(conn_data), upstream_callbacks_); |
215 | 0 | callbacks_->onGenericPoolReady( |
216 | 0 | &connection.streamInfo(), std::move(upstream), host, |
217 | 0 | latched_data->connection().connectionInfoProvider(), |
218 | 0 | latched_data->connection().streamInfo().downstreamAddressProvider().sslConnection()); |
219 | 0 | } |
220 | | |
221 | | HttpConnPool::HttpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster, |
222 | | Upstream::LoadBalancerContext* context, |
223 | | const TunnelingConfigHelper& config, |
224 | | Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks, |
225 | | Http::CodecType type, StreamInfo::StreamInfo& downstream_info) |
226 | | : config_(config), type_(type), upstream_callbacks_(upstream_callbacks), |
227 | 0 | downstream_info_(downstream_info) { |
228 | 0 | absl::optional<Http::Protocol> protocol; |
229 | 0 | if (type_ == Http::CodecType::HTTP3) { |
230 | 0 | protocol = Http::Protocol::Http3; |
231 | 0 | } else if (type_ == Http::CodecType::HTTP2) { |
232 | 0 | protocol = Http::Protocol::Http2; |
233 | 0 | } |
234 | 0 | conn_pool_data_ = |
235 | 0 | thread_local_cluster.httpConnPool(Upstream::ResourcePriority::Default, protocol, context); |
236 | 0 | } |
237 | | |
238 | 0 | HttpConnPool::~HttpConnPool() { |
239 | 0 | if (upstream_handle_ != nullptr) { |
240 | | // Because HTTP connections are generally shorter lived and have a higher probability of use |
241 | | // before going idle, they are closed with Default rather than CloseExcess. |
242 | 0 | upstream_handle_->cancel(ConnectionPool::CancelPolicy::Default); |
243 | 0 | } |
244 | 0 | } |
245 | | |
246 | 0 | void HttpConnPool::newStream(GenericConnectionPoolCallbacks& callbacks) { |
247 | 0 | callbacks_ = &callbacks; |
248 | 0 | if (type_ == Http::CodecType::HTTP1) { |
249 | 0 | upstream_ = std::make_unique<Http1Upstream>(upstream_callbacks_, config_, downstream_info_); |
250 | 0 | } else { |
251 | 0 | upstream_ = std::make_unique<Http2Upstream>(upstream_callbacks_, config_, downstream_info_); |
252 | 0 | } |
253 | 0 | Tcp::ConnectionPool::Cancellable* handle = |
254 | 0 | conn_pool_data_.value().newStream(upstream_->responseDecoder(), *this, |
255 | 0 | {/*can_send_early_data_=*/false, |
256 | 0 | /*can_use_http3_=*/true}); |
257 | 0 | if (handle != nullptr) { |
258 | 0 | upstream_handle_ = handle; |
259 | 0 | } |
260 | 0 | } |
261 | | |
262 | | void HttpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason, |
263 | | absl::string_view failure_reason, |
264 | 0 | Upstream::HostDescriptionConstSharedPtr host) { |
265 | 0 | upstream_handle_ = nullptr; |
266 | 0 | callbacks_->onGenericPoolFailure(reason, failure_reason, host); |
267 | 0 | } |
268 | | |
269 | | void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder, |
270 | | Upstream::HostDescriptionConstSharedPtr host, |
271 | 0 | StreamInfo::StreamInfo& info, absl::optional<Http::Protocol>) { |
272 | 0 | if (info.downstreamAddressProvider().connectionID() && |
273 | 0 | downstream_info_.downstreamAddressProvider().connectionID()) { |
274 | | // info.downstreamAddressProvider() is being called to get the upstream connection ID, |
275 | | // because the StreamInfo object here is of the upstream connection. |
276 | 0 | ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]", |
277 | 0 | info.downstreamAddressProvider().connectionID().value(), |
278 | 0 | downstream_info_.downstreamAddressProvider().connectionID().value()); |
279 | 0 | } |
280 | |
|
281 | 0 | upstream_handle_ = nullptr; |
282 | 0 | upstream_->setRequestEncoder(request_encoder, |
283 | 0 | host->transportSocketFactory().implementsSecureTransport()); |
284 | 0 | upstream_->setConnPoolCallbacks(std::make_unique<HttpConnPool::Callbacks>( |
285 | 0 | *this, host, info.downstreamAddressProvider().sslConnection())); |
286 | 0 | } |
287 | | |
288 | | void HttpConnPool::onGenericPoolReady(Upstream::HostDescriptionConstSharedPtr& host, |
289 | | const Network::ConnectionInfoProvider& address_provider, |
290 | 0 | Ssl::ConnectionInfoConstSharedPtr ssl_info) { |
291 | 0 | callbacks_->onGenericPoolReady(nullptr, std::move(upstream_), host, address_provider, ssl_info); |
292 | 0 | } |
293 | | |
294 | | Http2Upstream::Http2Upstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks, |
295 | | const TunnelingConfigHelper& config, |
296 | | StreamInfo::StreamInfo& downstream_info) |
297 | 0 | : HttpUpstream(callbacks, config, downstream_info) {} |
298 | | |
299 | 0 | bool Http2Upstream::isValidResponse(const Http::ResponseHeaderMap& headers) { |
300 | 0 | if (Http::Utility::getResponseStatus(headers) != 200) { |
301 | 0 | return false; |
302 | 0 | } |
303 | 0 | return true; |
304 | 0 | } |
305 | | |
306 | 0 | void Http2Upstream::setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) { |
307 | 0 | request_encoder_ = &request_encoder; |
308 | 0 | request_encoder_->getStream().addCallbacks(*this); |
309 | |
|
310 | 0 | const std::string& scheme = |
311 | 0 | is_ssl ? Http::Headers::get().SchemeValues.Https : Http::Headers::get().SchemeValues.Http; |
312 | 0 | auto headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>({ |
313 | 0 | {Http::Headers::get().Method, config_.usePost() ? "POST" : "CONNECT"}, |
314 | 0 | {Http::Headers::get().Host, config_.host(downstream_info_)}, |
315 | 0 | }); |
316 | |
|
317 | 0 | if (config_.usePost()) { |
318 | 0 | headers->addReference(Http::Headers::get().Path, config_.postPath()); |
319 | 0 | headers->addReference(Http::Headers::get().Scheme, scheme); |
320 | 0 | } |
321 | |
|
322 | 0 | config_.headerEvaluator().evaluateHeaders(*headers, {downstream_info_.getRequestHeaders()}, |
323 | 0 | downstream_info_); |
324 | 0 | const auto status = request_encoder_->encodeHeaders(*headers, false); |
325 | | // Encoding can only fail on missing required request headers. |
326 | 0 | ASSERT(status.ok()); |
327 | 0 | } |
328 | | |
329 | | Http1Upstream::Http1Upstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks, |
330 | | const TunnelingConfigHelper& config, |
331 | | StreamInfo::StreamInfo& downstream_info) |
332 | 0 | : HttpUpstream(callbacks, config, downstream_info) {} |
333 | | |
334 | 0 | void Http1Upstream::setRequestEncoder(Http::RequestEncoder& request_encoder, bool) { |
335 | 0 | request_encoder_ = &request_encoder; |
336 | 0 | request_encoder_->getStream().addCallbacks(*this); |
337 | 0 | request_encoder_->enableTcpTunneling(); |
338 | 0 | ASSERT(request_encoder_->http1StreamEncoderOptions() != absl::nullopt); |
339 | | |
340 | 0 | auto headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>({ |
341 | 0 | {Http::Headers::get().Method, config_.usePost() ? "POST" : "CONNECT"}, |
342 | 0 | {Http::Headers::get().Host, config_.host(downstream_info_)}, |
343 | 0 | }); |
344 | |
|
345 | 0 | if (config_.usePost()) { |
346 | | // Path is required for POST requests. |
347 | 0 | headers->addReference(Http::Headers::get().Path, config_.postPath()); |
348 | 0 | } |
349 | |
|
350 | 0 | config_.headerEvaluator().evaluateHeaders(*headers, {downstream_info_.getRequestHeaders()}, |
351 | 0 | downstream_info_); |
352 | 0 | const auto status = request_encoder_->encodeHeaders(*headers, false); |
353 | | // Encoding can only fail on missing required request headers. |
354 | 0 | ASSERT(status.ok()); |
355 | 0 | } |
356 | | |
357 | 0 | bool Http1Upstream::isValidResponse(const Http::ResponseHeaderMap& headers) { |
358 | | // According to RFC7231 any 2xx response indicates that the connection is |
359 | | // established. |
360 | | // Any 'Content-Length' or 'Transfer-Encoding' header fields MUST be ignored. |
361 | | // https://tools.ietf.org/html/rfc7231#section-4.3.6 |
362 | 0 | return Http::CodeUtility::is2xx(Http::Utility::getResponseStatus(headers)); |
363 | 0 | } |
364 | | |
365 | 0 | void Http1Upstream::encodeData(Buffer::Instance& data, bool end_stream) { |
366 | 0 | if (!request_encoder_) { |
367 | 0 | return; |
368 | 0 | } |
369 | 0 | request_encoder_->encodeData(data, end_stream); |
370 | 0 | } |
371 | | |
372 | | } // namespace TcpProxy |
373 | | } // namespace Envoy |