1
#include "source/common/tcp_proxy/upstream.h"
2

            
3
#include "envoy/http/header_map.h"
4
#include "envoy/upstream/cluster_manager.h"
5

            
6
#include "source/common/http/codec_client.h"
7
#include "source/common/http/codes.h"
8
#include "source/common/http/header_map_impl.h"
9
#include "source/common/http/headers.h"
10
#include "source/common/http/null_route_impl.h"
11
#include "source/common/http/utility.h"
12
#include "source/common/protobuf/protobuf.h"
13
#include "source/common/runtime/runtime_features.h"
14

            
15
namespace Envoy {
16
namespace TcpProxy {
17

            
18
using TunnelingConfig =
19
    envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig;
20

            
21
// Constants for tunnel request ID metadata.
22
54
const std::string& tunnelRequestIdMetadataNamespace() {
23
54
  CONSTRUCT_ON_FIRST_USE(std::string, "envoy.filters.network.tcp_proxy");
24
54
}
25

            
26
29
const std::string& tunnelRequestIdMetadataKey() {
27
29
  CONSTRUCT_ON_FIRST_USE(std::string, "tunnel_request_id");
28
29
}
29

            
30
// Helper function to generate and store request ID in dynamic metadata.
31
void generateAndStoreRequestId(const TunnelingConfigHelper& config, Http::RequestHeaderMap& headers,
32
910
                               StreamInfo::StreamInfo& downstream_info) {
33
910
  if (config.requestIDExtension() != nullptr) {
34
    // For tunneling requests there is no way to get the external request ID as the incoming
35
    // traffic could be anything - HTTPS, MySQL, Postgres, etc.
36
54
    config.requestIDExtension()->set(headers, /*edge_request=*/true,
37
54
                                     /*keep_external_id=*/false);
38
    // Also store the request ID in dynamic metadata to allow TCP access logs to format it,
39
    // and optionally emit it under a custom key if configured.
40
54
    const auto rid_sv = headers.getRequestIdValue();
41
54
    if (!rid_sv.empty()) {
42
54
      const std::string rid(rid_sv);
43
      // If the request ID header override is configured, mirror the generated request ID to that
44
      // header and remove the default x-request-id to honor the configured header.
45
54
      const std::string& override_header = config.requestIDHeader();
46
54
      if (!override_header.empty()) {
47
25
        const Http::LowerCaseString custom_header(override_header);
48
25
        headers.setCopy(custom_header, rid);
49
25
        if (custom_header.get() != Http::Headers::get().RequestId.get()) {
50
25
          headers.remove(Http::Headers::get().RequestId);
51
25
        }
52
25
      }
53

            
54
      // Write dynamic metadata under the configured key (or default).
55
54
      const std::string& key_override = config.requestIDMetadataKey();
56
54
      const absl::string_view md_key =
57
54
          key_override.empty() ? tunnelRequestIdMetadataKey() : absl::string_view(key_override);
58
54
      Protobuf::Struct md;
59
54
      auto& fields = *md.mutable_fields();
60
      // Assign the request id to the configured key.
61
54
      fields[std::string(md_key)].mutable_string_value()->assign(rid.data(), rid.size());
62
54
      downstream_info.setDynamicMetadata(tunnelRequestIdMetadataNamespace(), md);
63
54
    }
64
54
  }
65
910
}
66

            
67
TcpUpstream::TcpUpstream(Tcp::ConnectionPool::ConnectionDataPtr&& data,
68
                         Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks)
69
1623
    : upstream_conn_data_(std::move(data)) {
70
1623
  Network::ClientConnection& connection = upstream_conn_data_->connection();
71
1623
  connection.enableHalfClose(true);
72
1623
  upstream_conn_data_->addUpstreamCallbacks(upstream_callbacks);
73
1623
}
74

            
75
34
bool TcpUpstream::readDisable(bool disable) {
76
34
  if (upstream_conn_data_ == nullptr ||
77
34
      upstream_conn_data_->connection().state() != Network::Connection::State::Open) {
78
    // Because we flush write downstream, we can have a case where upstream has already disconnected
79
    // and we are waiting to flush. If we had a watermark event during this time we should no
80
    // longer touch the upstream connection.
81
    return false;
82
  }
83

            
84
34
  upstream_conn_data_->connection().readDisable(disable);
85
34
  return true;
86
34
}
87

            
88
3543
void TcpUpstream::encodeData(Buffer::Instance& data, bool end_stream) {
89
3543
  upstream_conn_data_->connection().write(data, end_stream);
90
3543
}
91

            
92
1605
void TcpUpstream::addBytesSentCallback(Network::Connection::BytesSentCb cb) {
93
1605
  upstream_conn_data_->connection().addBytesSentCallback(cb);
94
1605
}
95

            
96
2
bool TcpUpstream::startUpstreamSecureTransport() {
97
2
  return (upstream_conn_data_ == nullptr)
98
2
             ? false
99
2
             : upstream_conn_data_->connection().startSecureTransport();
100
2
}
101

            
102
Ssl::ConnectionInfoConstSharedPtr TcpUpstream::getUpstreamConnectionSslInfo() {
103
  if (upstream_conn_data_ != nullptr) {
104
    return upstream_conn_data_->connection().ssl();
105
  }
106
  return nullptr;
107
}
108

            
109
1608
absl::string_view TcpUpstream::localCloseReason() const {
110
1608
  if (upstream_conn_data_ != nullptr) {
111
352
    return upstream_conn_data_->connection().localCloseReason();
112
352
  }
113
1256
  return "";
114
1608
}
115

            
116
1559
StreamInfo::DetectedCloseType TcpUpstream::detectedCloseType() const {
117
1559
  if (upstream_conn_data_ != nullptr &&
118
1559
      upstream_conn_data_->connection().streamInfo().upstreamInfo()) {
119
341
    return upstream_conn_data_->connection()
120
341
        .streamInfo()
121
341
        .upstreamInfo()
122
341
        ->upstreamDetectedCloseType();
123
341
  }
124
1218
  return StreamInfo::DetectedCloseType::Normal;
125
1559
}
126

            
127
Tcp::ConnectionPool::ConnectionData* TcpUpstream::onDownstreamEvent(Network::ConnectionEvent event,
128
1377
                                                                    absl::string_view details) {
129
  // TODO(botengyao): propagate RST back to upstream connection if RST is received from downstream.
130
1377
  if (event == Network::ConnectionEvent::RemoteClose) {
131
    // The close call may result in this object being deleted. Latch the
132
    // connection locally so it can be returned for potential draining.
133
1269
    auto* conn_data = upstream_conn_data_.release();
134
1269
    conn_data->connection().close(
135
1269
        Network::ConnectionCloseType::FlushWrite,
136
1269
        StreamInfo::LocalCloseReasons::get().ClosingUpstreamTcpDueToDownstreamRemoteClose);
137
1269
    return conn_data;
138
1327
  } else if (event == Network::ConnectionEvent::LocalClose) {
139
56
    upstream_conn_data_->connection().close(
140
56
        Network::ConnectionCloseType::NoFlush,
141
56
        !details.empty()
142
56
            ? details
143
56
            : StreamInfo::LocalCloseReasons::get().ClosingUpstreamTcpDueToDownstreamLocalClose);
144
56
  }
145
108
  return nullptr;
146
1377
}
147

            
148
HttpUpstream::HttpUpstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
149
                           const TunnelingConfigHelper& config,
150
                           StreamInfo::StreamInfo& downstream_info, Http::CodecType type)
151
480
    : config_(config), downstream_info_(downstream_info), response_decoder_(*this),
152
480
      upstream_callbacks_(callbacks), type_(type) {}
153

            
154
480
HttpUpstream::~HttpUpstream() { resetEncoder(Network::ConnectionEvent::LocalClose); }
155

            
156
268
StreamInfo::DetectedCloseType HttpUpstream::detectedCloseType() const {
157
268
  return StreamInfo::DetectedCloseType::Normal;
158
268
}
159

            
160
348
bool HttpUpstream::isValidResponse(const Http::ResponseHeaderMap& headers) {
161
348
  if (type_ == Http::CodecType::HTTP1) {
162
    //  According to RFC7231 any 2xx response indicates that the connection is
163
    //  established.
164
    //  Any 'Content-Length' or 'Transfer-Encoding' header fields MUST be ignored.
165
    //  https://tools.ietf.org/html/rfc7231#section-4.3.6
166
115
    return Http::CodeUtility::is2xx(Http::Utility::getResponseStatus(headers));
167
115
  }
168
233
  return Http::Utility::getResponseStatus(headers) == 200;
169
348
}
170

            
171
474
void HttpUpstream::setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) {
172
474
  request_encoder_ = &request_encoder;
173
474
  request_encoder_->getStream().addCallbacks(*this);
174
474
  auto headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>({
175
474
      {Http::Headers::get().Method, config_.usePost() ? "POST" : "CONNECT"},
176
474
      {Http::Headers::get().Host, config_.host(downstream_info_)},
177
474
  });
178
474
  if (config_.usePost()) {
179
21
    headers->addReference(Http::Headers::get().Path, config_.postPath());
180
21
  }
181

            
182
474
  if (type_ == Http::CodecType::HTTP1) {
183
157
    request_encoder_->enableTcpTunneling();
184
157
    ASSERT(request_encoder_->http1StreamEncoderOptions() != absl::nullopt);
185
321
  } else {
186
317
    const std::string& scheme =
187
317
        is_ssl ? Http::Headers::get().SchemeValues.Https : Http::Headers::get().SchemeValues.Http;
188

            
189
317
    if (config_.usePost()) {
190
15
      headers->addReference(Http::Headers::get().Scheme, scheme);
191
15
    }
192
317
  }
193

            
194
  // Optionally generate a request ID before evaluating configured headers so
195
  // it is available to header formatters.
196
474
  generateAndStoreRequestId(config_, *headers, downstream_info_);
197

            
198
474
  config_.headerEvaluator().evaluateHeaders(*headers, {downstream_info_.getRequestHeaders()},
199
474
                                            downstream_info_);
200
474
  const auto status = request_encoder_->encodeHeaders(*headers, false);
201
  // Encoding can only fail on missing required request headers.
202
474
  ASSERT(status.ok());
203
474
}
204

            
205
12080
bool HttpUpstream::readDisable(bool disable) {
206
12080
  if (!request_encoder_) {
207
3
    return false;
208
3
  }
209
12077
  request_encoder_->getStream().readDisable(disable);
210
12077
  return true;
211
12080
}
212

            
213
7981
void HttpUpstream::encodeData(Buffer::Instance& data, bool end_stream) {
214
7981
  if (!request_encoder_) {
215
3
    return;
216
3
  }
217
7978
  auto codec = type_;
218
7978
  request_encoder_->encodeData(data, end_stream);
219

            
220
  // doneWriting() is being skipped for H1 codec to avoid resetEncoder() call.
221
  // This is because H1 codec does not support half-closed stream. Calling resetEncoder()
222
  // will fully close the upstream connection without flushing any pending data, rather than a http
223
  // stream reset.
224
  // More details can be found on https://github.com/envoyproxy/envoy/pull/13293
225
7978
  if ((codec != Http::CodecType::HTTP1) && (end_stream)) {
226
158
    doneWriting();
227
158
  }
228
7978
}
229

            
230
284
void HttpUpstream::addBytesSentCallback(Network::Connection::BytesSentCb) {
231
  // The HTTP tunneling mode does not tickle the idle timeout when bytes are
232
  // sent to the kernel.
233
  // This can be implemented if any user cares about the difference in time
234
  // between it being sent to the HTTP/2 stack and out to the kernel.
235
284
}
236

            
237
Tcp::ConnectionPool::ConnectionData*
238
19
HttpUpstream::onDownstreamEvent(Network::ConnectionEvent event, absl::string_view /*details*/) {
239
19
  if (event == Network::ConnectionEvent::LocalClose ||
240
19
      event == Network::ConnectionEvent::RemoteClose) {
241
19
    resetEncoder(Network::ConnectionEvent::LocalClose, false);
242
19
  }
243
19
  return nullptr;
244
19
}
245

            
246
166
void HttpUpstream::onResetStream(Http::StreamResetReason, absl::string_view) {
247
166
  read_half_closed_ = true;
248
166
  write_half_closed_ = true;
249
166
  resetEncoder(Network::ConnectionEvent::LocalClose);
250
166
}
251

            
252
9014
void HttpUpstream::onAboveWriteBufferHighWatermark() {
253
9014
  upstream_callbacks_.onAboveWriteBufferHighWatermark();
254
9014
}
255

            
256
9014
void HttpUpstream::onBelowWriteBufferLowWatermark() {
257
9014
  upstream_callbacks_.onBelowWriteBufferLowWatermark();
258
9014
}
259

            
260
884
void HttpUpstream::resetEncoder(Network::ConnectionEvent event, bool inform_downstream) {
261
884
  if (!request_encoder_) {
262
410
    return;
263
410
  }
264
474
  request_encoder_->getStream().removeCallbacks(*this);
265
474
  if (!write_half_closed_ || !read_half_closed_) {
266
153
    request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
267
153
  }
268
474
  request_encoder_ = nullptr;
269
  // If we did not receive a valid CONNECT response yet we treat this as a pool
270
  // failure, otherwise we forward the event downstream.
271
474
  if (conn_pool_callbacks_ != nullptr) {
272
118
    conn_pool_callbacks_->onFailure();
273
118
    return;
274
118
  }
275

            
276
356
  if (inform_downstream) {
277
340
    upstream_callbacks_.onEvent(event);
278
340
  }
279
356
}
280

            
281
165
void HttpUpstream::doneReading() {
282
165
  read_half_closed_ = true;
283
165
  if (write_half_closed_) {
284
120
    resetEncoder(Network::ConnectionEvent::LocalClose);
285
120
  }
286
165
}
287

            
288
164
void HttpUpstream::doneWriting() {
289
164
  write_half_closed_ = true;
290
164
  if (read_half_closed_) {
291
35
    resetEncoder(Network::ConnectionEvent::LocalClose);
292
35
  }
293
164
}
294

            
295
TcpConnPool::TcpConnPool(Upstream::HostConstSharedPtr host,
296
                         Upstream::ThreadLocalCluster& thread_local_cluster,
297
                         Upstream::LoadBalancerContext* context,
298
                         Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
299
                         StreamInfo::StreamInfo& downstream_info)
300
1758
    : upstream_callbacks_(upstream_callbacks), downstream_info_(downstream_info) {
301
1758
  conn_pool_data_ =
302
1758
      thread_local_cluster.tcpConnPool(host, Upstream::ResourcePriority::Default, context);
303
1758
}
304

            
305
1758
TcpConnPool::~TcpConnPool() {
306
1758
  if (upstream_handle_ != nullptr) {
307
51
    upstream_handle_->cancel(ConnectionPool::CancelPolicy::CloseExcess);
308
51
  }
309
1758
}
310

            
311
1733
void TcpConnPool::newStream(GenericConnectionPoolCallbacks& callbacks) {
312
1733
  callbacks_ = &callbacks;
313
  // Given this function is re-entrant, make sure we only reset the upstream_handle_ if given a
314
  // valid connection handle. If newConnection fails inline it may result in attempting to
315
  // select a new host, and a recursive call to establishUpstreamConnection. In this case the
316
  // first call to newConnection will return null and the inner call will persist.
317
1733
  Tcp::ConnectionPool::Cancellable* handle = conn_pool_data_.value().newConnection(*this);
318
1733
  if (handle) {
319
1711
    ASSERT(upstream_handle_ == nullptr);
320
1711
    upstream_handle_ = handle;
321
1711
  }
322
1733
}
323

            
324
void TcpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason,
325
                                absl::string_view failure_reason,
326
59
                                Upstream::HostDescriptionConstSharedPtr host) {
327
59
  upstream_handle_ = nullptr;
328
59
  callbacks_->onGenericPoolFailure(reason, failure_reason, host);
329
59
}
330

            
331
void TcpConnPool::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
332
1623
                              Upstream::HostDescriptionConstSharedPtr host) {
333
1623
  if (downstream_info_.downstreamAddressProvider().connectionID()) {
334
1623
    uint64_t connection_id = conn_data->connection().id();
335
1623
    downstream_info_.upstreamInfo()->setUpstreamConnectionId(connection_id);
336
1623
    ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]",
337
1623
              connection_id, downstream_info_.downstreamAddressProvider().connectionID().value());
338
1623
  }
339

            
340
1623
  upstream_handle_ = nullptr;
341
1623
  Tcp::ConnectionPool::ConnectionData* latched_data = conn_data.get();
342
1623
  Network::Connection& connection = conn_data->connection();
343

            
344
1623
  auto upstream = std::make_unique<TcpUpstream>(std::move(conn_data), upstream_callbacks_);
345
1623
  callbacks_->onGenericPoolReady(
346
1623
      &connection.streamInfo(), std::move(upstream), host,
347
1623
      latched_data->connection().connectionInfoProvider(),
348
1623
      latched_data->connection().streamInfo().downstreamAddressProvider().sslConnection());
349
1623
}
350

            
351
HttpConnPool::HttpConnPool(Upstream::HostConstSharedPtr host,
352
                           Upstream::ThreadLocalCluster& thread_local_cluster,
353
                           Upstream::LoadBalancerContext* context,
354
                           const TunnelingConfigHelper& config,
355
                           Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
356
                           Http::StreamDecoderFilterCallbacks& stream_decoder_callbacks,
357
                           Http::CodecType type, StreamInfo::StreamInfo& downstream_info)
358
835
    : config_(config), type_(type), decoder_filter_callbacks_(&stream_decoder_callbacks),
359
835
      upstream_callbacks_(upstream_callbacks), downstream_info_(downstream_info) {
360
835
  absl::optional<Http::Protocol> protocol;
361
835
  if (type_ == Http::CodecType::HTTP3) {
362
218
    protocol = Http::Protocol::Http3;
363
617
  } else if (type_ == Http::CodecType::HTTP2) {
364
340
    protocol = Http::Protocol::Http2;
365
340
  }
366
835
  if (Runtime::runtimeFeatureEnabled(
367
835
          "envoy.restart_features.upstream_http_filters_with_tcp_proxy")) {
368
425
    absl::optional<Envoy::Http::Protocol> upstream_protocol = protocol;
369
425
    generic_conn_pool_ = createConnPool(host, thread_local_cluster, context, upstream_protocol);
370
425
    return;
371
425
  }
372
410
  conn_pool_data_ = thread_local_cluster.httpConnPool(host, Upstream::ResourcePriority::Default,
373
410
                                                      protocol, context);
374
410
}
375

            
376
std::unique_ptr<Router::GenericConnPool> HttpConnPool::createConnPool(
377
    Upstream::HostConstSharedPtr host, Upstream::ThreadLocalCluster& cluster,
378
425
    Upstream::LoadBalancerContext* context, absl::optional<Http::Protocol> protocol) {
379
425
  Router::GenericConnPoolFactory* factory = nullptr;
380
425
  factory = Envoy::Config::Utility::getFactoryByName<Router::GenericConnPoolFactory>(
381
425
      "envoy.filters.connection_pools.http.generic");
382
425
  if (!factory) {
383
4
    return nullptr;
384
4
  }
385

            
386
421
  Protobuf::Any message;
387
421
  if (cluster.info()->upstreamConfig()) {
388
    message = cluster.info()->upstreamConfig()->typed_config();
389
  }
390
421
  return factory->createGenericConnPool(
391
421
      host, cluster, Envoy::Router::GenericConnPoolFactory::UpstreamProtocol::HTTP,
392
421
      decoder_filter_callbacks_->route()->routeEntry()->priority(), protocol, context, message);
393
425
}
394

            
395
835
HttpConnPool::~HttpConnPool() {
396
835
  if (upstream_handle_ != nullptr) {
397
    // Because HTTP connections are generally shorter lived and have a higher probability of use
398
    // before going idle, they are closed with Default rather than CloseExcess.
399
    upstream_handle_->cancel(ConnectionPool::CancelPolicy::Default);
400
  }
401
835
  if (combined_upstream_ != nullptr) {
402
112
    combined_upstream_->onDownstreamEvent(Network::ConnectionEvent::LocalClose);
403
112
  }
404
835
}
405

            
406
814
void HttpConnPool::newStream(GenericConnectionPoolCallbacks& callbacks) {
407
814
  callbacks_ = &callbacks;
408
814
  if (Runtime::runtimeFeatureEnabled(
409
814
          "envoy.restart_features.upstream_http_filters_with_tcp_proxy")) {
410
421
    combined_upstream_ = std::make_unique<CombinedUpstream>(
411
421
        *this, upstream_callbacks_, *decoder_filter_callbacks_, config_, downstream_info_);
412
421
    RouterUpstreamRequestPtr upstream_request = std::make_unique<RouterUpstreamRequest>(
413
421
        *combined_upstream_, std::move(generic_conn_pool_), /*can_send_early_data_=*/false,
414
421
        /*can_use_http3_=*/true, true /*enable_tcp_tunneling*/);
415
421
    combined_upstream_->setRouterUpstreamRequest(std::move(upstream_request));
416
421
    combined_upstream_->newStream(callbacks);
417
421
    return;
418
421
  }
419

            
420
393
  upstream_ = std::make_unique<HttpUpstream>(upstream_callbacks_, config_, downstream_info_, type_);
421
393
  Tcp::ConnectionPool::Cancellable* handle =
422
393
      conn_pool_data_.value().newStream(upstream_->responseDecoder(), *this,
423
393
                                        {/*can_send_early_data_=*/false,
424
393
                                         /*can_use_http3_=*/true});
425
393
  if (handle != nullptr) {
426
372
    upstream_handle_ = handle;
427
372
  }
428
393
}
429

            
430
void HttpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason,
431
                                 absl::string_view failure_reason,
432
                                 Upstream::HostDescriptionConstSharedPtr host) {
433
  upstream_handle_ = nullptr;
434
  callbacks_->onGenericPoolFailure(reason, failure_reason, host);
435
}
436

            
437
void HttpConnPool::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host,
438
421
                                          bool pool_success) {
439
421
  if (!pool_success) {
440
    return;
441
  }
442
421
  combined_upstream_->setConnPoolCallbacks(std::make_unique<HttpConnPool::Callbacks>(
443
421
      *this, host, downstream_info_.downstreamAddressProvider().sslConnection()));
444
421
  combined_upstream_->recordUpstreamSslConnection();
445
421
}
446

            
447
void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder,
448
                               Upstream::HostDescriptionConstSharedPtr host,
449
393
                               StreamInfo::StreamInfo& info, absl::optional<Http::Protocol>) {
450
393
  if (info.downstreamAddressProvider().connectionID() &&
451
393
      downstream_info_.downstreamAddressProvider().connectionID()) {
452
    // info.downstreamAddressProvider() is being called to get the upstream connection ID,
453
    // because the StreamInfo object here is of the upstream connection.
454
393
    uint64_t connection_id = info.downstreamAddressProvider().connectionID().value();
455
393
    downstream_info_.upstreamInfo()->setUpstreamConnectionId(connection_id);
456
393
    ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]",
457
393
              connection_id, downstream_info_.downstreamAddressProvider().connectionID().value());
458
393
  }
459

            
460
393
  upstream_handle_ = nullptr;
461
393
  downstream_info_.setUpstreamBytesMeter(request_encoder.getStream().bytesMeter());
462
393
  upstream_->setRequestEncoder(request_encoder,
463
393
                               host->transportSocketFactory().implementsSecureTransport());
464
393
  upstream_->setConnPoolCallbacks(std::make_unique<HttpConnPool::Callbacks>(
465
393
      *this, host, info.downstreamAddressProvider().sslConnection()));
466
393
}
467

            
468
void HttpConnPool::onGenericPoolReady(Upstream::HostDescriptionConstSharedPtr& host,
469
                                      const Network::ConnectionInfoProvider& address_provider,
470
590
                                      Ssl::ConnectionInfoConstSharedPtr ssl_info) {
471
590
  if (Runtime::runtimeFeatureEnabled(
472
590
          "envoy.restart_features.upstream_http_filters_with_tcp_proxy")) {
473

            
474
309
    callbacks_->onGenericPoolReady(nullptr, std::move(combined_upstream_), host, address_provider,
475
309
                                   ssl_info);
476
309
    return;
477
309
  }
478
281
  callbacks_->onGenericPoolReady(nullptr, std::move(upstream_), host, address_provider, ssl_info);
479
281
}
480

            
481
StreamInfo::DetectedCloseType CombinedUpstream::detectedCloseType() const {
482
  return StreamInfo::DetectedCloseType::Normal;
483
}
484

            
485
CombinedUpstream::CombinedUpstream(HttpConnPool& http_conn_pool,
486
                                   Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
487
                                   Http::StreamDecoderFilterCallbacks& decoder_callbacks,
488
                                   const TunnelingConfigHelper& config,
489
                                   StreamInfo::StreamInfo& downstream_info)
490
436
    : config_(config), downstream_info_(downstream_info), parent_(http_conn_pool),
491
436
      decoder_filter_callbacks_(decoder_callbacks), response_decoder_(*this),
492
436
      upstream_callbacks_(callbacks) {
493
436
  type_ = parent_.codecType();
494
436
  downstream_headers_ = Http::createHeaderMap<Http::RequestHeaderMapImpl>({
495
436
      {Http::Headers::get().Method, config_.usePost() ? "POST" : "CONNECT"},
496
436
      {Http::Headers::get().Host, config_.host(downstream_info_)},
497
436
  });
498

            
499
436
  if (config_.usePost()) {
500
15
    downstream_headers_->addReference(Http::Headers::get().Path, config_.postPath());
501
15
  }
502

            
503
436
  generateAndStoreRequestId(config_, *downstream_headers_, downstream_info_);
504

            
505
436
  config_.headerEvaluator().evaluateHeaders(
506
436
      *downstream_headers_, {downstream_info_.getRequestHeaders()}, downstream_info_);
507
436
}
508

            
509
void CombinedUpstream::setRouterUpstreamRequest(
510
434
    Router::UpstreamRequestPtr router_upstream_request) {
511
434
  ASSERT(!upstream_request_);
512
434
  upstream_request_ = std::move(router_upstream_request);
513
434
}
514

            
515
434
void CombinedUpstream::newStream(GenericConnectionPoolCallbacks&) {
516
434
  upstream_request_->acceptHeadersFromRouter(false);
517
434
}
518

            
519
8019
void CombinedUpstream::encodeData(Buffer::Instance& data, bool end_stream) {
520
8019
  if (!upstream_request_) {
521
1
    return;
522
1
  }
523
8018
  upstream_request_->acceptDataFromRouter(data, end_stream);
524
8018
  if (end_stream) {
525
253
    doneWriting();
526
253
  }
527
8018
}
528

            
529
7967
bool CombinedUpstream::readDisable(bool disable) {
530
7967
  if (!upstream_request_) {
531
1
    return false;
532
1
  }
533
7966
  upstream_request_->readDisableOrDefer(disable);
534
7966
  return true;
535
7967
}
536

            
537
Tcp::ConnectionPool::ConnectionData*
538
126
CombinedUpstream::onDownstreamEvent(Network::ConnectionEvent event, absl::string_view /*details*/) {
539
126
  if (!upstream_request_) {
540
    return nullptr;
541
  }
542

            
543
126
  if (event == Network::ConnectionEvent::LocalClose ||
544
126
      event == Network::ConnectionEvent::RemoteClose) {
545
126
    upstream_request_->resetStream();
546
126
  }
547
126
  return nullptr;
548
126
}
549

            
550
368
bool CombinedUpstream::isValidResponse(const Http::ResponseHeaderMap& headers) {
551
368
  switch (type_) {
552
122
  case Http::CodecType::HTTP1:
553
    // According to RFC7231 any 2xx response indicates that the connection is
554
    // established.
555
    // Any 'Content-Length' or 'Transfer-Encoding' header fields MUST be ignored.
556
    // https://tools.ietf.org/html/rfc7231#section-4.3.6
557
122
    return Http::CodeUtility::is2xx(Http::Utility::getResponseStatus(headers));
558
150
  case Http::CodecType::HTTP2:
559
246
  case Http::CodecType::HTTP3:
560
246
    if (Http::Utility::getResponseStatus(headers) != 200) {
561
37
      return false;
562
37
    }
563
209
    return true;
564
368
  }
565
  return true;
566
368
}
567

            
568
218
void CombinedUpstream::onResetEncoder(Network::ConnectionEvent event, bool inform_downstream) {
569
218
  if (event == Network::ConnectionEvent::LocalClose ||
570
218
      event == Network::ConnectionEvent::RemoteClose) {
571
218
    if (upstream_request_) {
572
218
      upstream_request_->resetStream();
573
218
    }
574
218
  }
575

            
576
  // If we did not receive a valid CONNECT response yet we treat this as a pool
577
  // failure, otherwise we forward the event downstream.
578
218
  if (conn_pool_callbacks_ != nullptr) {
579
56
    conn_pool_callbacks_->onFailure();
580
56
    return;
581
56
  }
582

            
583
162
  if (inform_downstream) {
584
162
    upstream_callbacks_.onEvent(event);
585
162
  }
586
162
}
587

            
588
// Router::RouterFilterInterface
589
void CombinedUpstream::onUpstreamHeaders([[maybe_unused]] uint64_t response_code,
590
                                         Http::ResponseHeaderMapPtr&& headers,
591
                                         [[maybe_unused]] UpstreamRequest& upstream_request,
592
364
                                         bool end_stream) {
593
364
  responseDecoder().decodeHeaders(std::move(headers), end_stream);
594
364
}
595

            
596
void CombinedUpstream::onUpstreamData(Buffer::Instance& data,
597
                                      [[maybe_unused]] UpstreamRequest& upstream_request,
598
210809
                                      bool end_stream) {
599
210809
  responseDecoder().decodeData(data, end_stream);
600
210809
}
601

            
602
void CombinedUpstream::onUpstreamTrailers(Http::ResponseTrailerMapPtr&& trailers,
603
14
                                          UpstreamRequest&) {
604
14
  responseDecoder().decodeTrailers(std::move(trailers));
605
14
}
606

            
607
3748
Http::RequestHeaderMap* CombinedUpstream::downstreamHeaders() { return downstream_headers_.get(); }
608

            
609
421
void CombinedUpstream::recordUpstreamSslConnection() {
610
421
  if ((type_ != Http::CodecType::HTTP1) && (config_.usePost())) {
611
11
    auto is_ssl = upstream_request_->streamInfo().upstreamInfo()->upstreamSslConnection();
612
11
    const std::string& scheme =
613
11
        is_ssl ? Http::Headers::get().SchemeValues.Https : Http::Headers::get().SchemeValues.Http;
614
11
    if (downstream_headers_->Scheme()) {
615
      downstream_headers_->removeScheme();
616
    }
617
11
    downstream_headers_->addReference(Http::Headers::get().Scheme, scheme);
618
11
  }
619
421
}
620

            
621
168
void CombinedUpstream::doneReading() {
622
168
  read_half_closed_ = true;
623
168
  if (write_half_closed_) {
624
123
    onResetEncoder(Network::ConnectionEvent::LocalClose);
625
123
  }
626
168
}
627

            
628
void CombinedUpstream::onUpstreamReset(Http::StreamResetReason, absl::string_view,
629
181
                                       UpstreamRequest&) {
630
181
  upstream_callbacks_.onEvent(Network::ConnectionEvent::RemoteClose);
631
181
}
632

            
633
254
void CombinedUpstream::doneWriting() {
634
254
  write_half_closed_ = true;
635
254
  if (read_half_closed_) {
636
37
    onResetEncoder(Network::ConnectionEvent::LocalClose);
637
37
  }
638
254
}
639
} // namespace TcpProxy
640
} // namespace Envoy