Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/common/tcp_proxy/upstream.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/common/tcp_proxy/upstream.h"
2
3
#include "envoy/upstream/cluster_manager.h"
4
5
#include "source/common/http/codec_client.h"
6
#include "source/common/http/codes.h"
7
#include "source/common/http/header_map_impl.h"
8
#include "source/common/http/headers.h"
9
#include "source/common/http/utility.h"
10
#include "source/common/runtime/runtime_features.h"
11
12
namespace Envoy {
13
namespace TcpProxy {
14
using TunnelingConfig =
15
    envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig;
16
17
TcpUpstream::TcpUpstream(Tcp::ConnectionPool::ConnectionDataPtr&& data,
18
                         Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks)
19
0
    : upstream_conn_data_(std::move(data)) {
20
0
  Network::ClientConnection& connection = upstream_conn_data_->connection();
21
0
  connection.enableHalfClose(true);
22
0
  upstream_conn_data_->addUpstreamCallbacks(upstream_callbacks);
23
0
}
24
25
0
bool TcpUpstream::readDisable(bool disable) {
26
0
  if (upstream_conn_data_ == nullptr ||
27
0
      upstream_conn_data_->connection().state() != Network::Connection::State::Open) {
28
    // Because we flush write downstream, we can have a case where upstream has already disconnected
29
    // and we are waiting to flush. If we had a watermark event during this time we should no
30
    // longer touch the upstream connection.
31
0
    return false;
32
0
  }
33
34
0
  upstream_conn_data_->connection().readDisable(disable);
35
0
  return true;
36
0
}
37
38
0
void TcpUpstream::encodeData(Buffer::Instance& data, bool end_stream) {
39
0
  upstream_conn_data_->connection().write(data, end_stream);
40
0
}
41
42
0
void TcpUpstream::addBytesSentCallback(Network::Connection::BytesSentCb cb) {
43
0
  upstream_conn_data_->connection().addBytesSentCallback(cb);
44
0
}
45
46
0
bool TcpUpstream::startUpstreamSecureTransport() {
47
0
  return (upstream_conn_data_ == nullptr)
48
0
             ? false
49
0
             : upstream_conn_data_->connection().startSecureTransport();
50
0
}
51
52
0
Ssl::ConnectionInfoConstSharedPtr TcpUpstream::getUpstreamConnectionSslInfo() {
53
0
  if (upstream_conn_data_ != nullptr) {
54
0
    return upstream_conn_data_->connection().ssl();
55
0
  }
56
0
  return nullptr;
57
0
}
58
59
Tcp::ConnectionPool::ConnectionData*
60
0
TcpUpstream::onDownstreamEvent(Network::ConnectionEvent event) {
61
  // TODO(botengyao): propagate RST back to upstream connection if RST is received from downstream.
62
0
  if (event == Network::ConnectionEvent::RemoteClose) {
63
    // The close call may result in this object being deleted. Latch the
64
    // connection locally so it can be returned for potential draining.
65
0
    auto* conn_data = upstream_conn_data_.release();
66
0
    conn_data->connection().close(
67
0
        Network::ConnectionCloseType::FlushWrite,
68
0
        StreamInfo::LocalCloseReasons::get().ClosingUpstreamTcpDueToDownstreamRemoteClose);
69
0
    return conn_data;
70
0
  } else if (event == Network::ConnectionEvent::LocalClose) {
71
0
    upstream_conn_data_->connection().close(
72
0
        Network::ConnectionCloseType::NoFlush,
73
0
        StreamInfo::LocalCloseReasons::get().ClosingUpstreamTcpDueToDownstreamLocalClose);
74
0
  }
75
0
  return nullptr;
76
0
}
77
78
HttpUpstream::HttpUpstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
79
                           const TunnelingConfigHelper& config,
80
                           StreamInfo::StreamInfo& downstream_info)
81
    : config_(config), downstream_info_(downstream_info), response_decoder_(*this),
82
0
      upstream_callbacks_(callbacks) {}
83
84
0
HttpUpstream::~HttpUpstream() { resetEncoder(Network::ConnectionEvent::LocalClose); }
85
86
0
bool HttpUpstream::readDisable(bool disable) {
87
0
  if (!request_encoder_) {
88
0
    return false;
89
0
  }
90
0
  request_encoder_->getStream().readDisable(disable);
91
0
  return true;
92
0
}
93
94
0
void HttpUpstream::encodeData(Buffer::Instance& data, bool end_stream) {
95
0
  if (!request_encoder_) {
96
0
    return;
97
0
  }
98
0
  request_encoder_->encodeData(data, end_stream);
99
0
  if (end_stream) {
100
0
    doneWriting();
101
0
  }
102
0
}
103
104
0
void HttpUpstream::addBytesSentCallback(Network::Connection::BytesSentCb) {
105
  // The HTTP tunneling mode does not tickle the idle timeout when bytes are
106
  // sent to the kernel.
107
  // This can be implemented if any user cares about the difference in time
108
  // between it being sent to the HTTP/2 stack and out to the kernel.
109
0
}
110
111
Tcp::ConnectionPool::ConnectionData*
112
0
HttpUpstream::onDownstreamEvent(Network::ConnectionEvent event) {
113
0
  if (event == Network::ConnectionEvent::LocalClose ||
114
0
      event == Network::ConnectionEvent::RemoteClose) {
115
0
    resetEncoder(Network::ConnectionEvent::LocalClose, false);
116
0
  }
117
0
  return nullptr;
118
0
}
119
120
0
void HttpUpstream::onResetStream(Http::StreamResetReason, absl::string_view) {
121
0
  read_half_closed_ = true;
122
0
  write_half_closed_ = true;
123
0
  resetEncoder(Network::ConnectionEvent::LocalClose);
124
0
}
125
126
0
void HttpUpstream::onAboveWriteBufferHighWatermark() {
127
0
  upstream_callbacks_.onAboveWriteBufferHighWatermark();
128
0
}
129
130
0
void HttpUpstream::onBelowWriteBufferLowWatermark() {
131
0
  upstream_callbacks_.onBelowWriteBufferLowWatermark();
132
0
}
133
134
0
void HttpUpstream::resetEncoder(Network::ConnectionEvent event, bool inform_downstream) {
135
0
  if (!request_encoder_) {
136
0
    return;
137
0
  }
138
0
  request_encoder_->getStream().removeCallbacks(*this);
139
0
  if (!write_half_closed_ || !read_half_closed_) {
140
0
    request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
141
0
  }
142
0
  request_encoder_ = nullptr;
143
  // If we did not receive a valid CONNECT response yet we treat this as a pool
144
  // failure, otherwise we forward the event downstream.
145
0
  if (conn_pool_callbacks_ != nullptr) {
146
0
    conn_pool_callbacks_->onFailure();
147
0
    return;
148
0
  }
149
0
  if (inform_downstream) {
150
0
    upstream_callbacks_.onEvent(event);
151
0
  }
152
0
}
153
154
0
void HttpUpstream::doneReading() {
155
0
  read_half_closed_ = true;
156
0
  if (write_half_closed_) {
157
0
    resetEncoder(Network::ConnectionEvent::LocalClose);
158
0
  }
159
0
}
160
161
0
void HttpUpstream::doneWriting() {
162
0
  write_half_closed_ = true;
163
0
  if (read_half_closed_) {
164
0
    resetEncoder(Network::ConnectionEvent::LocalClose);
165
0
  }
166
0
}
167
168
TcpConnPool::TcpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
169
                         Upstream::LoadBalancerContext* context,
170
                         Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
171
                         StreamInfo::StreamInfo& downstream_info)
172
0
    : upstream_callbacks_(upstream_callbacks), downstream_info_(downstream_info) {
173
0
  conn_pool_data_ = thread_local_cluster.tcpConnPool(Upstream::ResourcePriority::Default, context);
174
0
}
175
176
0
TcpConnPool::~TcpConnPool() {
177
0
  if (upstream_handle_ != nullptr) {
178
0
    upstream_handle_->cancel(ConnectionPool::CancelPolicy::CloseExcess);
179
0
  }
180
0
}
181
182
0
void TcpConnPool::newStream(GenericConnectionPoolCallbacks& callbacks) {
183
0
  callbacks_ = &callbacks;
184
  // Given this function is reentrant, make sure we only reset the upstream_handle_ if given a
185
  // valid connection handle. If newConnection fails inline it may result in attempting to
186
  // select a new host, and a recursive call to establishUpstreamConnection. In this case the
187
  // first call to newConnection will return null and the inner call will persist.
188
0
  Tcp::ConnectionPool::Cancellable* handle = conn_pool_data_.value().newConnection(*this);
189
0
  if (handle) {
190
0
    ASSERT(upstream_handle_ == nullptr);
191
0
    upstream_handle_ = handle;
192
0
  }
193
0
}
194
195
void TcpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason,
196
                                absl::string_view failure_reason,
197
0
                                Upstream::HostDescriptionConstSharedPtr host) {
198
0
  upstream_handle_ = nullptr;
199
0
  callbacks_->onGenericPoolFailure(reason, failure_reason, host);
200
0
}
201
202
void TcpConnPool::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
203
0
                              Upstream::HostDescriptionConstSharedPtr host) {
204
0
  if (downstream_info_.downstreamAddressProvider().connectionID()) {
205
0
    ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]",
206
0
              conn_data->connection().id(),
207
0
              downstream_info_.downstreamAddressProvider().connectionID().value());
208
0
  }
209
210
0
  upstream_handle_ = nullptr;
211
0
  Tcp::ConnectionPool::ConnectionData* latched_data = conn_data.get();
212
0
  Network::Connection& connection = conn_data->connection();
213
214
0
  auto upstream = std::make_unique<TcpUpstream>(std::move(conn_data), upstream_callbacks_);
215
0
  callbacks_->onGenericPoolReady(
216
0
      &connection.streamInfo(), std::move(upstream), host,
217
0
      latched_data->connection().connectionInfoProvider(),
218
0
      latched_data->connection().streamInfo().downstreamAddressProvider().sslConnection());
219
0
}
220
221
HttpConnPool::HttpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
222
                           Upstream::LoadBalancerContext* context,
223
                           const TunnelingConfigHelper& config,
224
                           Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
225
                           Http::CodecType type, StreamInfo::StreamInfo& downstream_info)
226
    : config_(config), type_(type), upstream_callbacks_(upstream_callbacks),
227
0
      downstream_info_(downstream_info) {
228
0
  absl::optional<Http::Protocol> protocol;
229
0
  if (type_ == Http::CodecType::HTTP3) {
230
0
    protocol = Http::Protocol::Http3;
231
0
  } else if (type_ == Http::CodecType::HTTP2) {
232
0
    protocol = Http::Protocol::Http2;
233
0
  }
234
0
  conn_pool_data_ =
235
0
      thread_local_cluster.httpConnPool(Upstream::ResourcePriority::Default, protocol, context);
236
0
}
237
238
0
HttpConnPool::~HttpConnPool() {
239
0
  if (upstream_handle_ != nullptr) {
240
    // Because HTTP connections are generally shorter lived and have a higher probability of use
241
    // before going idle, they are closed with Default rather than CloseExcess.
242
0
    upstream_handle_->cancel(ConnectionPool::CancelPolicy::Default);
243
0
  }
244
0
}
245
246
0
void HttpConnPool::newStream(GenericConnectionPoolCallbacks& callbacks) {
247
0
  callbacks_ = &callbacks;
248
0
  if (type_ == Http::CodecType::HTTP1) {
249
0
    upstream_ = std::make_unique<Http1Upstream>(upstream_callbacks_, config_, downstream_info_);
250
0
  } else {
251
0
    upstream_ = std::make_unique<Http2Upstream>(upstream_callbacks_, config_, downstream_info_);
252
0
  }
253
0
  Tcp::ConnectionPool::Cancellable* handle =
254
0
      conn_pool_data_.value().newStream(upstream_->responseDecoder(), *this,
255
0
                                        {/*can_send_early_data_=*/false,
256
0
                                         /*can_use_http3_=*/true});
257
0
  if (handle != nullptr) {
258
0
    upstream_handle_ = handle;
259
0
  }
260
0
}
261
262
void HttpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason,
263
                                 absl::string_view failure_reason,
264
0
                                 Upstream::HostDescriptionConstSharedPtr host) {
265
0
  upstream_handle_ = nullptr;
266
0
  callbacks_->onGenericPoolFailure(reason, failure_reason, host);
267
0
}
268
269
void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder,
270
                               Upstream::HostDescriptionConstSharedPtr host,
271
0
                               StreamInfo::StreamInfo& info, absl::optional<Http::Protocol>) {
272
0
  if (info.downstreamAddressProvider().connectionID() &&
273
0
      downstream_info_.downstreamAddressProvider().connectionID()) {
274
    // info.downstreamAddressProvider() is being called to get the upstream connection ID,
275
    // because the StreamInfo object here is of the upstream connection.
276
0
    ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]",
277
0
              info.downstreamAddressProvider().connectionID().value(),
278
0
              downstream_info_.downstreamAddressProvider().connectionID().value());
279
0
  }
280
281
0
  upstream_handle_ = nullptr;
282
0
  upstream_->setRequestEncoder(request_encoder,
283
0
                               host->transportSocketFactory().implementsSecureTransport());
284
0
  upstream_->setConnPoolCallbacks(std::make_unique<HttpConnPool::Callbacks>(
285
0
      *this, host, info.downstreamAddressProvider().sslConnection()));
286
0
}
287
288
void HttpConnPool::onGenericPoolReady(Upstream::HostDescriptionConstSharedPtr& host,
289
                                      const Network::ConnectionInfoProvider& address_provider,
290
0
                                      Ssl::ConnectionInfoConstSharedPtr ssl_info) {
291
0
  callbacks_->onGenericPoolReady(nullptr, std::move(upstream_), host, address_provider, ssl_info);
292
0
}
293
294
Http2Upstream::Http2Upstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
295
                             const TunnelingConfigHelper& config,
296
                             StreamInfo::StreamInfo& downstream_info)
297
0
    : HttpUpstream(callbacks, config, downstream_info) {}
298
299
0
bool Http2Upstream::isValidResponse(const Http::ResponseHeaderMap& headers) {
300
0
  if (Http::Utility::getResponseStatus(headers) != 200) {
301
0
    return false;
302
0
  }
303
0
  return true;
304
0
}
305
306
0
void Http2Upstream::setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) {
307
0
  request_encoder_ = &request_encoder;
308
0
  request_encoder_->getStream().addCallbacks(*this);
309
310
0
  const std::string& scheme =
311
0
      is_ssl ? Http::Headers::get().SchemeValues.Https : Http::Headers::get().SchemeValues.Http;
312
0
  auto headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>({
313
0
      {Http::Headers::get().Method, config_.usePost() ? "POST" : "CONNECT"},
314
0
      {Http::Headers::get().Host, config_.host(downstream_info_)},
315
0
  });
316
317
0
  if (config_.usePost()) {
318
0
    headers->addReference(Http::Headers::get().Path, config_.postPath());
319
0
    headers->addReference(Http::Headers::get().Scheme, scheme);
320
0
  }
321
322
0
  config_.headerEvaluator().evaluateHeaders(*headers, {downstream_info_.getRequestHeaders()},
323
0
                                            downstream_info_);
324
0
  const auto status = request_encoder_->encodeHeaders(*headers, false);
325
  // Encoding can only fail on missing required request headers.
326
0
  ASSERT(status.ok());
327
0
}
328
329
Http1Upstream::Http1Upstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
330
                             const TunnelingConfigHelper& config,
331
                             StreamInfo::StreamInfo& downstream_info)
332
0
    : HttpUpstream(callbacks, config, downstream_info) {}
333
334
0
void Http1Upstream::setRequestEncoder(Http::RequestEncoder& request_encoder, bool) {
335
0
  request_encoder_ = &request_encoder;
336
0
  request_encoder_->getStream().addCallbacks(*this);
337
0
  request_encoder_->enableTcpTunneling();
338
0
  ASSERT(request_encoder_->http1StreamEncoderOptions() != absl::nullopt);
339
340
0
  auto headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>({
341
0
      {Http::Headers::get().Method, config_.usePost() ? "POST" : "CONNECT"},
342
0
      {Http::Headers::get().Host, config_.host(downstream_info_)},
343
0
  });
344
345
0
  if (config_.usePost()) {
346
    // Path is required for POST requests.
347
0
    headers->addReference(Http::Headers::get().Path, config_.postPath());
348
0
  }
349
350
0
  config_.headerEvaluator().evaluateHeaders(*headers, {downstream_info_.getRequestHeaders()},
351
0
                                            downstream_info_);
352
0
  const auto status = request_encoder_->encodeHeaders(*headers, false);
353
  // Encoding can only fail on missing required request headers.
354
0
  ASSERT(status.ok());
355
0
}
356
357
0
bool Http1Upstream::isValidResponse(const Http::ResponseHeaderMap& headers) {
358
  // According to RFC7231 any 2xx response indicates that the connection is
359
  // established.
360
  // Any 'Content-Length' or 'Transfer-Encoding' header fields MUST be ignored.
361
  // https://tools.ietf.org/html/rfc7231#section-4.3.6
362
0
  return Http::CodeUtility::is2xx(Http::Utility::getResponseStatus(headers));
363
0
}
364
365
0
void Http1Upstream::encodeData(Buffer::Instance& data, bool end_stream) {
366
0
  if (!request_encoder_) {
367
0
    return;
368
0
  }
369
0
  request_encoder_->encodeData(data, end_stream);
370
0
}
371
372
} // namespace TcpProxy
373
} // namespace Envoy