Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/common/tcp_proxy/upstream.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/common/tcp_proxy/upstream.h"
2
3
#include "envoy/http/header_map.h"
4
#include "envoy/upstream/cluster_manager.h"
5
6
#include "source/common/http/codec_client.h"
7
#include "source/common/http/codes.h"
8
#include "source/common/http/header_map_impl.h"
9
#include "source/common/http/headers.h"
10
#include "source/common/http/null_route_impl.h"
11
#include "source/common/http/utility.h"
12
#include "source/common/runtime/runtime_features.h"
13
14
namespace Envoy {
15
namespace TcpProxy {
16
17
using TunnelingConfig =
18
    envoy::extensions::filters::network::tcp_proxy::v3::TcpProxy_TunnelingConfig;
19
20
TcpUpstream::TcpUpstream(Tcp::ConnectionPool::ConnectionDataPtr&& data,
21
                         Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks)
22
0
    : upstream_conn_data_(std::move(data)) {
23
0
  Network::ClientConnection& connection = upstream_conn_data_->connection();
24
0
  connection.enableHalfClose(true);
25
0
  upstream_conn_data_->addUpstreamCallbacks(upstream_callbacks);
26
0
}
27
28
0
bool TcpUpstream::readDisable(bool disable) {
29
0
  if (upstream_conn_data_ == nullptr ||
30
0
      upstream_conn_data_->connection().state() != Network::Connection::State::Open) {
31
    // Because we flush write downstream, we can have a case where upstream has already disconnected
32
    // and we are waiting to flush. If we had a watermark event during this time we should no
33
    // longer touch the upstream connection.
34
0
    return false;
35
0
  }
36
37
0
  upstream_conn_data_->connection().readDisable(disable);
38
0
  return true;
39
0
}
40
41
0
void TcpUpstream::encodeData(Buffer::Instance& data, bool end_stream) {
42
0
  upstream_conn_data_->connection().write(data, end_stream);
43
0
}
44
45
0
void TcpUpstream::addBytesSentCallback(Network::Connection::BytesSentCb cb) {
46
0
  upstream_conn_data_->connection().addBytesSentCallback(cb);
47
0
}
48
49
0
bool TcpUpstream::startUpstreamSecureTransport() {
50
0
  return (upstream_conn_data_ == nullptr)
51
0
             ? false
52
0
             : upstream_conn_data_->connection().startSecureTransport();
53
0
}
54
55
0
Ssl::ConnectionInfoConstSharedPtr TcpUpstream::getUpstreamConnectionSslInfo() {
56
0
  if (upstream_conn_data_ != nullptr) {
57
0
    return upstream_conn_data_->connection().ssl();
58
0
  }
59
0
  return nullptr;
60
0
}
61
62
Tcp::ConnectionPool::ConnectionData*
63
0
TcpUpstream::onDownstreamEvent(Network::ConnectionEvent event) {
64
  // TODO(botengyao): propagate RST back to upstream connection if RST is received from downstream.
65
0
  if (event == Network::ConnectionEvent::RemoteClose) {
66
    // The close call may result in this object being deleted. Latch the
67
    // connection locally so it can be returned for potential draining.
68
0
    auto* conn_data = upstream_conn_data_.release();
69
0
    conn_data->connection().close(
70
0
        Network::ConnectionCloseType::FlushWrite,
71
0
        StreamInfo::LocalCloseReasons::get().ClosingUpstreamTcpDueToDownstreamRemoteClose);
72
0
    return conn_data;
73
0
  } else if (event == Network::ConnectionEvent::LocalClose) {
74
0
    upstream_conn_data_->connection().close(
75
0
        Network::ConnectionCloseType::NoFlush,
76
0
        StreamInfo::LocalCloseReasons::get().ClosingUpstreamTcpDueToDownstreamLocalClose);
77
0
  }
78
0
  return nullptr;
79
0
}
80
81
HttpUpstream::HttpUpstream(Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
82
                           const TunnelingConfigHelper& config,
83
                           StreamInfo::StreamInfo& downstream_info, Http::CodecType type)
84
    : config_(config), downstream_info_(downstream_info), response_decoder_(*this),
85
0
      upstream_callbacks_(callbacks), type_(type) {}
86
87
0
HttpUpstream::~HttpUpstream() { resetEncoder(Network::ConnectionEvent::LocalClose); }
88
89
0
bool HttpUpstream::isValidResponse(const Http::ResponseHeaderMap& headers) {
90
0
  if (type_ == Http::CodecType::HTTP1) {
91
    //  According to RFC7231 any 2xx response indicates that the connection is
92
    //  established.
93
    //  Any 'Content-Length' or 'Transfer-Encoding' header fields MUST be ignored.
94
    //  https://tools.ietf.org/html/rfc7231#section-4.3.6
95
0
    return Http::CodeUtility::is2xx(Http::Utility::getResponseStatus(headers));
96
0
  }
97
0
  return Http::Utility::getResponseStatus(headers) == 200;
98
0
}
99
100
0
void HttpUpstream::setRequestEncoder(Http::RequestEncoder& request_encoder, bool is_ssl) {
101
0
  request_encoder_ = &request_encoder;
102
0
  request_encoder_->getStream().addCallbacks(*this);
103
0
  auto headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>({
104
0
      {Http::Headers::get().Method, config_.usePost() ? "POST" : "CONNECT"},
105
0
      {Http::Headers::get().Host, config_.host(downstream_info_)},
106
0
  });
107
0
  if (config_.usePost()) {
108
0
    headers->addReference(Http::Headers::get().Path, config_.postPath());
109
0
  }
110
111
0
  if (type_ == Http::CodecType::HTTP1) {
112
0
    request_encoder_->enableTcpTunneling();
113
0
    ASSERT(request_encoder_->http1StreamEncoderOptions() != absl::nullopt);
114
0
  } else {
115
0
    const std::string& scheme =
116
0
        is_ssl ? Http::Headers::get().SchemeValues.Https : Http::Headers::get().SchemeValues.Http;
117
118
0
    if (config_.usePost()) {
119
0
      headers->addReference(Http::Headers::get().Scheme, scheme);
120
0
    }
121
0
  }
122
123
0
  config_.headerEvaluator().evaluateHeaders(*headers, {downstream_info_.getRequestHeaders()},
124
0
                                            downstream_info_);
125
0
  const auto status = request_encoder_->encodeHeaders(*headers, false);
126
  // Encoding can only fail on missing required request headers.
127
0
  ASSERT(status.ok());
128
0
}
129
130
0
bool HttpUpstream::readDisable(bool disable) {
131
0
  if (!request_encoder_) {
132
0
    return false;
133
0
  }
134
0
  request_encoder_->getStream().readDisable(disable);
135
0
  return true;
136
0
}
137
138
0
void HttpUpstream::encodeData(Buffer::Instance& data, bool end_stream) {
139
0
  if (!request_encoder_) {
140
0
    return;
141
0
  }
142
0
  auto codec = type_;
143
0
  request_encoder_->encodeData(data, end_stream);
144
145
  // doneWriting() is being skipped for H1 codec to avoid resetEncoder() call.
146
  // This is because H1 codec does not support half-closed stream. Calling resetEncoder()
147
  // will fully close the upstream connection without flushing any pending data, rather than a http
148
  // stream reset.
149
  // More details can be found on https://github.com/envoyproxy/envoy/pull/13293
150
0
  if ((codec != Http::CodecType::HTTP1) && (end_stream)) {
151
0
    doneWriting();
152
0
  }
153
0
}
154
155
0
void HttpUpstream::addBytesSentCallback(Network::Connection::BytesSentCb) {
156
  // The HTTP tunneling mode does not tickle the idle timeout when bytes are
157
  // sent to the kernel.
158
  // This can be implemented if any user cares about the difference in time
159
  // between it being sent to the HTTP/2 stack and out to the kernel.
160
0
}
161
162
Tcp::ConnectionPool::ConnectionData*
163
0
HttpUpstream::onDownstreamEvent(Network::ConnectionEvent event) {
164
0
  if (event == Network::ConnectionEvent::LocalClose ||
165
0
      event == Network::ConnectionEvent::RemoteClose) {
166
0
    resetEncoder(Network::ConnectionEvent::LocalClose, false);
167
0
  }
168
0
  return nullptr;
169
0
}
170
171
0
void HttpUpstream::onResetStream(Http::StreamResetReason, absl::string_view) {
172
0
  read_half_closed_ = true;
173
0
  write_half_closed_ = true;
174
0
  resetEncoder(Network::ConnectionEvent::LocalClose);
175
0
}
176
177
0
void HttpUpstream::onAboveWriteBufferHighWatermark() {
178
0
  upstream_callbacks_.onAboveWriteBufferHighWatermark();
179
0
}
180
181
0
void HttpUpstream::onBelowWriteBufferLowWatermark() {
182
0
  upstream_callbacks_.onBelowWriteBufferLowWatermark();
183
0
}
184
185
0
void HttpUpstream::resetEncoder(Network::ConnectionEvent event, bool inform_downstream) {
186
0
  if (!request_encoder_) {
187
0
    return;
188
0
  }
189
0
  request_encoder_->getStream().removeCallbacks(*this);
190
0
  if (!write_half_closed_ || !read_half_closed_) {
191
0
    request_encoder_->getStream().resetStream(Http::StreamResetReason::LocalReset);
192
0
  }
193
0
  request_encoder_ = nullptr;
194
  // If we did not receive a valid CONNECT response yet we treat this as a pool
195
  // failure, otherwise we forward the event downstream.
196
0
  if (conn_pool_callbacks_ != nullptr) {
197
0
    conn_pool_callbacks_->onFailure();
198
0
    return;
199
0
  }
200
201
0
  if (inform_downstream) {
202
0
    upstream_callbacks_.onEvent(event);
203
0
  }
204
0
}
205
206
0
void HttpUpstream::doneReading() {
207
0
  read_half_closed_ = true;
208
0
  if (write_half_closed_) {
209
0
    resetEncoder(Network::ConnectionEvent::LocalClose);
210
0
  }
211
0
}
212
213
0
void HttpUpstream::doneWriting() {
214
0
  write_half_closed_ = true;
215
0
  if (read_half_closed_) {
216
0
    resetEncoder(Network::ConnectionEvent::LocalClose);
217
0
  }
218
0
}
219
220
TcpConnPool::TcpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
221
                         Upstream::LoadBalancerContext* context,
222
                         Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
223
                         StreamInfo::StreamInfo& downstream_info)
224
0
    : upstream_callbacks_(upstream_callbacks), downstream_info_(downstream_info) {
225
0
  conn_pool_data_ = thread_local_cluster.tcpConnPool(Upstream::ResourcePriority::Default, context);
226
0
}
227
228
0
TcpConnPool::~TcpConnPool() {
229
0
  if (upstream_handle_ != nullptr) {
230
0
    upstream_handle_->cancel(ConnectionPool::CancelPolicy::CloseExcess);
231
0
  }
232
0
}
233
234
0
void TcpConnPool::newStream(GenericConnectionPoolCallbacks& callbacks) {
235
0
  callbacks_ = &callbacks;
236
  // Given this function is re-entrant, make sure we only reset the upstream_handle_ if given a
237
  // valid connection handle. If newConnection fails inline it may result in attempting to
238
  // select a new host, and a recursive call to establishUpstreamConnection. In this case the
239
  // first call to newConnection will return null and the inner call will persist.
240
0
  Tcp::ConnectionPool::Cancellable* handle = conn_pool_data_.value().newConnection(*this);
241
0
  if (handle) {
242
0
    ASSERT(upstream_handle_ == nullptr);
243
0
    upstream_handle_ = handle;
244
0
  }
245
0
}
246
247
void TcpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason,
248
                                absl::string_view failure_reason,
249
0
                                Upstream::HostDescriptionConstSharedPtr host) {
250
0
  upstream_handle_ = nullptr;
251
0
  callbacks_->onGenericPoolFailure(reason, failure_reason, host);
252
0
}
253
254
void TcpConnPool::onPoolReady(Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
255
0
                              Upstream::HostDescriptionConstSharedPtr host) {
256
0
  if (downstream_info_.downstreamAddressProvider().connectionID()) {
257
0
    downstream_info_.upstreamInfo()->setUpstreamConnectionId(conn_data->connection().id());
258
0
    ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]",
259
0
              conn_data->connection().id(),
260
0
              downstream_info_.downstreamAddressProvider().connectionID().value());
261
0
  }
262
263
0
  upstream_handle_ = nullptr;
264
0
  Tcp::ConnectionPool::ConnectionData* latched_data = conn_data.get();
265
0
  Network::Connection& connection = conn_data->connection();
266
267
0
  auto upstream = std::make_unique<TcpUpstream>(std::move(conn_data), upstream_callbacks_);
268
0
  callbacks_->onGenericPoolReady(
269
0
      &connection.streamInfo(), std::move(upstream), host,
270
0
      latched_data->connection().connectionInfoProvider(),
271
0
      latched_data->connection().streamInfo().downstreamAddressProvider().sslConnection());
272
0
}
273
274
HttpConnPool::HttpConnPool(Upstream::ThreadLocalCluster& thread_local_cluster,
275
                           Upstream::LoadBalancerContext* context,
276
                           const TunnelingConfigHelper& config,
277
                           Tcp::ConnectionPool::UpstreamCallbacks& upstream_callbacks,
278
                           Http::StreamDecoderFilterCallbacks& stream_decoder_callbacks,
279
                           Http::CodecType type, StreamInfo::StreamInfo& downstream_info)
280
    : config_(config), type_(type), decoder_filter_callbacks_(&stream_decoder_callbacks),
281
0
      upstream_callbacks_(upstream_callbacks), downstream_info_(downstream_info) {
282
0
  absl::optional<Http::Protocol> protocol;
283
0
  if (type_ == Http::CodecType::HTTP3) {
284
0
    protocol = Http::Protocol::Http3;
285
0
  } else if (type_ == Http::CodecType::HTTP2) {
286
0
    protocol = Http::Protocol::Http2;
287
0
  }
288
0
  if (Runtime::runtimeFeatureEnabled(
289
0
          "envoy.restart_features.upstream_http_filters_with_tcp_proxy")) {
290
0
    absl::optional<Envoy::Http::Protocol> upstream_protocol = protocol;
291
0
    generic_conn_pool_ = createConnPool(thread_local_cluster, context, upstream_protocol);
292
0
    return;
293
0
  }
294
0
  conn_pool_data_ =
295
0
      thread_local_cluster.httpConnPool(Upstream::ResourcePriority::Default, protocol, context);
296
0
}
297
298
std::unique_ptr<Router::GenericConnPool>
299
HttpConnPool::createConnPool(Upstream::ThreadLocalCluster& cluster,
300
                             Upstream::LoadBalancerContext* context,
301
0
                             absl::optional<Http::Protocol> protocol) {
302
0
  Router::GenericConnPoolFactory* factory = nullptr;
303
0
  factory = Envoy::Config::Utility::getFactoryByName<Router::GenericConnPoolFactory>(
304
0
      "envoy.filters.connection_pools.http.generic");
305
0
  if (!factory) {
306
0
    return nullptr;
307
0
  }
308
309
0
  return factory->createGenericConnPool(
310
0
      cluster, Envoy::Router::GenericConnPoolFactory::UpstreamProtocol::HTTP,
311
0
      decoder_filter_callbacks_->route()->routeEntry()->priority(), protocol, context);
312
0
}
313
314
0
HttpConnPool::~HttpConnPool() {
315
0
  if (upstream_handle_ != nullptr) {
316
    // Because HTTP connections are generally shorter lived and have a higher probability of use
317
    // before going idle, they are closed with Default rather than CloseExcess.
318
0
    upstream_handle_->cancel(ConnectionPool::CancelPolicy::Default);
319
0
  }
320
0
  if (combined_upstream_ != nullptr) {
321
0
    combined_upstream_->onDownstreamEvent(Network::ConnectionEvent::LocalClose);
322
0
  }
323
0
}
324
325
0
void HttpConnPool::newStream(GenericConnectionPoolCallbacks& callbacks) {
326
0
  callbacks_ = &callbacks;
327
0
  if (Runtime::runtimeFeatureEnabled(
328
0
          "envoy.restart_features.upstream_http_filters_with_tcp_proxy")) {
329
0
    combined_upstream_ = std::make_unique<CombinedUpstream>(
330
0
        *this, upstream_callbacks_, *decoder_filter_callbacks_, config_, downstream_info_);
331
0
    RouterUpstreamRequestPtr upstream_request = std::make_unique<RouterUpstreamRequest>(
332
0
        *combined_upstream_, std::move(generic_conn_pool_), /*can_send_early_data_=*/false,
333
0
        /*can_use_http3_=*/true, true /*enable_tcp_tunneling*/);
334
0
    combined_upstream_->setRouterUpstreamRequest(std::move(upstream_request));
335
0
    combined_upstream_->newStream(callbacks);
336
0
    return;
337
0
  }
338
339
0
  upstream_ = std::make_unique<HttpUpstream>(upstream_callbacks_, config_, downstream_info_, type_);
340
0
  Tcp::ConnectionPool::Cancellable* handle =
341
0
      conn_pool_data_.value().newStream(upstream_->responseDecoder(), *this,
342
0
                                        {/*can_send_early_data_=*/false,
343
0
                                         /*can_use_http3_=*/true});
344
0
  if (handle != nullptr) {
345
0
    upstream_handle_ = handle;
346
0
  }
347
0
}
348
349
void HttpConnPool::onPoolFailure(ConnectionPool::PoolFailureReason reason,
350
                                 absl::string_view failure_reason,
351
0
                                 Upstream::HostDescriptionConstSharedPtr host) {
352
0
  upstream_handle_ = nullptr;
353
0
  callbacks_->onGenericPoolFailure(reason, failure_reason, host);
354
0
}
355
356
void HttpConnPool::onUpstreamHostSelected(Upstream::HostDescriptionConstSharedPtr host,
357
0
                                          bool pool_success) {
358
0
  if (!pool_success) {
359
0
    return;
360
0
  }
361
0
  combined_upstream_->setConnPoolCallbacks(std::make_unique<HttpConnPool::Callbacks>(
362
0
      *this, host, downstream_info_.downstreamAddressProvider().sslConnection()));
363
0
}
364
365
void HttpConnPool::onPoolReady(Http::RequestEncoder& request_encoder,
366
                               Upstream::HostDescriptionConstSharedPtr host,
367
0
                               StreamInfo::StreamInfo& info, absl::optional<Http::Protocol>) {
368
0
  if (info.downstreamAddressProvider().connectionID() &&
369
0
      downstream_info_.downstreamAddressProvider().connectionID()) {
370
    // info.downstreamAddressProvider() is being called to get the upstream connection ID,
371
    // because the StreamInfo object here is of the upstream connection.
372
0
    downstream_info_.upstreamInfo()->setUpstreamConnectionId(
373
0
        info.downstreamAddressProvider().connectionID().value());
374
0
    ENVOY_LOG(debug, "Attached upstream connection [C{}] to downstream connection [C{}]",
375
0
              info.downstreamAddressProvider().connectionID().value(),
376
0
              downstream_info_.downstreamAddressProvider().connectionID().value());
377
0
  }
378
379
0
  upstream_handle_ = nullptr;
380
0
  upstream_->setRequestEncoder(request_encoder,
381
0
                               host->transportSocketFactory().implementsSecureTransport());
382
0
  upstream_->setConnPoolCallbacks(std::make_unique<HttpConnPool::Callbacks>(
383
0
      *this, host, info.downstreamAddressProvider().sslConnection()));
384
0
}
385
386
void HttpConnPool::onGenericPoolReady(Upstream::HostDescriptionConstSharedPtr& host,
387
                                      const Network::ConnectionInfoProvider& address_provider,
388
0
                                      Ssl::ConnectionInfoConstSharedPtr ssl_info) {
389
0
  if (Runtime::runtimeFeatureEnabled(
390
0
          "envoy.restart_features.upstream_http_filters_with_tcp_proxy")) {
391
392
0
    callbacks_->onGenericPoolReady(nullptr, std::move(combined_upstream_), host, address_provider,
393
0
                                   ssl_info);
394
0
    return;
395
0
  }
396
0
  callbacks_->onGenericPoolReady(nullptr, std::move(upstream_), host, address_provider, ssl_info);
397
0
}
398
399
CombinedUpstream::CombinedUpstream(HttpConnPool& http_conn_pool,
400
                                   Tcp::ConnectionPool::UpstreamCallbacks& callbacks,
401
                                   Http::StreamDecoderFilterCallbacks& decoder_callbacks,
402
                                   const TunnelingConfigHelper& config,
403
                                   StreamInfo::StreamInfo& downstream_info)
404
    : config_(config), downstream_info_(downstream_info), parent_(http_conn_pool),
405
      decoder_filter_callbacks_(decoder_callbacks), response_decoder_(*this),
406
0
      upstream_callbacks_(callbacks) {
407
0
  auto is_ssl = downstream_info_.downstreamAddressProvider().sslConnection();
408
0
  downstream_headers_ = Http::createHeaderMap<Http::RequestHeaderMapImpl>({
409
0
      {Http::Headers::get().Method, config_.usePost() ? "POST" : "CONNECT"},
410
0
      {Http::Headers::get().Host, config_.host(downstream_info_)},
411
0
  });
412
413
0
  if (config_.usePost()) {
414
0
    const std::string& scheme =
415
0
        is_ssl ? Http::Headers::get().SchemeValues.Https : Http::Headers::get().SchemeValues.Http;
416
0
    downstream_headers_->addReference(Http::Headers::get().Path, config_.postPath());
417
0
    downstream_headers_->addReference(Http::Headers::get().Scheme, scheme);
418
0
  }
419
420
0
  config_.headerEvaluator().evaluateHeaders(
421
0
      *downstream_headers_, {downstream_info_.getRequestHeaders()}, downstream_info_);
422
0
}
423
424
void CombinedUpstream::setRouterUpstreamRequest(
425
0
    Router::UpstreamRequestPtr router_upstream_request) {
426
0
  ASSERT(!upstream_request_);
427
0
  upstream_request_ = std::move(router_upstream_request);
428
0
}
429
430
0
void CombinedUpstream::newStream(GenericConnectionPoolCallbacks&) {
431
0
  upstream_request_->acceptHeadersFromRouter(false);
432
0
}
433
434
0
void CombinedUpstream::encodeData(Buffer::Instance& data, bool end_stream) {
435
0
  if (!upstream_request_) {
436
0
    return;
437
0
  }
438
0
  upstream_request_->acceptDataFromRouter(data, end_stream);
439
0
  if (end_stream) {
440
0
    doneWriting();
441
0
  }
442
0
}
443
444
0
bool CombinedUpstream::readDisable(bool disable) {
445
0
  if (!upstream_request_) {
446
0
    return false;
447
0
  }
448
0
  upstream_request_->readDisableOrDefer(disable);
449
0
  return true;
450
0
}
451
452
Tcp::ConnectionPool::ConnectionData*
453
0
CombinedUpstream::onDownstreamEvent(Network::ConnectionEvent event) {
454
0
  if (!upstream_request_) {
455
0
    return nullptr;
456
0
  }
457
458
0
  if (event == Network::ConnectionEvent::LocalClose ||
459
0
      event == Network::ConnectionEvent::RemoteClose) {
460
0
    upstream_request_->resetStream();
461
0
  }
462
0
  return nullptr;
463
0
}
464
465
0
bool CombinedUpstream::isValidResponse(const Http::ResponseHeaderMap& headers) {
466
0
  switch (parent_.codecType()) {
467
0
  case Http::CodecType::HTTP1:
468
    // According to RFC7231 any 2xx response indicates that the connection is
469
    // established.
470
    // Any 'Content-Length' or 'Transfer-Encoding' header fields MUST be ignored.
471
    // https://tools.ietf.org/html/rfc7231#section-4.3.6
472
0
    return Http::CodeUtility::is2xx(Http::Utility::getResponseStatus(headers));
473
0
  case Http::CodecType::HTTP2:
474
0
  case Http::CodecType::HTTP3:
475
0
    if (Http::Utility::getResponseStatus(headers) != 200) {
476
0
      return false;
477
0
    }
478
0
    return true;
479
0
  }
480
0
  return true;
481
0
}
482
483
0
void CombinedUpstream::onResetEncoder(Network::ConnectionEvent event, bool inform_downstream) {
484
0
  if (event == Network::ConnectionEvent::LocalClose ||
485
0
      event == Network::ConnectionEvent::RemoteClose) {
486
0
    if (upstream_request_) {
487
0
      upstream_request_->resetStream();
488
0
    }
489
0
  }
490
491
  // If we did not receive a valid CONNECT response yet we treat this as a pool
492
  // failure, otherwise we forward the event downstream.
493
0
  if (conn_pool_callbacks_ != nullptr) {
494
0
    conn_pool_callbacks_->onFailure();
495
0
    return;
496
0
  }
497
498
0
  if (inform_downstream) {
499
0
    upstream_callbacks_.onEvent(event);
500
0
  }
501
0
}
502
503
// Router::RouterFilterInterface
504
void CombinedUpstream::onUpstreamHeaders([[maybe_unused]] uint64_t response_code,
505
                                         Http::ResponseHeaderMapPtr&& headers,
506
                                         [[maybe_unused]] UpstreamRequest& upstream_request,
507
0
                                         bool end_stream) {
508
0
  responseDecoder().decodeHeaders(std::move(headers), end_stream);
509
0
}
510
511
void CombinedUpstream::onUpstreamData(Buffer::Instance& data,
512
                                      [[maybe_unused]] UpstreamRequest& upstream_request,
513
0
                                      bool end_stream) {
514
0
  responseDecoder().decodeData(data, end_stream);
515
0
}
516
517
void CombinedUpstream::onUpstreamTrailers(Http::ResponseTrailerMapPtr&& trailers,
518
0
                                          UpstreamRequest&) {
519
0
  responseDecoder().decodeTrailers(std::move(trailers));
520
0
}
521
522
0
Http::RequestHeaderMap* CombinedUpstream::downstreamHeaders() { return downstream_headers_.get(); }
523
524
0
void CombinedUpstream::doneReading() {
525
0
  read_half_closed_ = true;
526
0
  if (write_half_closed_) {
527
0
    onResetEncoder(Network::ConnectionEvent::LocalClose);
528
0
  }
529
0
}
530
531
void CombinedUpstream::onUpstreamReset(Http::StreamResetReason, absl::string_view,
532
0
                                       UpstreamRequest&) {
533
0
  upstream_callbacks_.onEvent(Network::ConnectionEvent::RemoteClose);
534
0
}
535
536
0
void CombinedUpstream::doneWriting() {
537
0
  write_half_closed_ = true;
538
0
  if (read_half_closed_) {
539
0
    onResetEncoder(Network::ConnectionEvent::LocalClose);
540
0
  }
541
0
}
542
} // namespace TcpProxy
543
} // namespace Envoy