1
#include "source/extensions/upstreams/http/tcp/upstream_request.h"
2

            
3
#include <cstdint>
4
#include <memory>
5

            
6
#include "envoy/upstream/upstream.h"
7

            
8
#include "source/common/common/assert.h"
9
#include "source/common/common/utility.h"
10
#include "source/common/http/codes.h"
11
#include "source/common/http/header_map_impl.h"
12
#include "source/common/http/headers.h"
13
#include "source/common/http/message_impl.h"
14
#include "source/common/network/transport_socket_options_impl.h"
15
#include "source/common/router/router.h"
16
#include "source/extensions/common/proxy_protocol/proxy_protocol_header.h"
17

            
18
namespace Envoy {
19
namespace Extensions {
20
namespace Upstreams {
21
namespace Http {
22
namespace Tcp {
23

            
24
void TcpConnPool::onPoolReady(Envoy::Tcp::ConnectionPool::ConnectionDataPtr&& conn_data,
25
219
                              Upstream::HostDescriptionConstSharedPtr host) {
26
219
  upstream_handle_ = nullptr;
27
219
  Network::Connection& latched_conn = conn_data->connection();
28
219
  auto upstream =
29
219
      std::make_unique<TcpUpstream>(&callbacks_->upstreamToDownstream(), std::move(conn_data));
30
219
  callbacks_->onPoolReady(std::move(upstream), host, latched_conn.connectionInfoProvider(),
31
219
                          latched_conn.streamInfo(), {});
32
219
}
33

            
34
TcpUpstream::TcpUpstream(Router::UpstreamToDownstream* upstream_request,
35
                         Envoy::Tcp::ConnectionPool::ConnectionDataPtr&& upstream)
36
227
    : upstream_request_(upstream_request), upstream_conn_data_(std::move(upstream)),
37
227
      force_reset_on_upstream_half_close_(Runtime::runtimeFeatureEnabled(
38
227
          "envoy.reloadable_features.allow_multiplexed_upstream_half_close")) {
39
227
  upstream_conn_data_->connection().enableHalfClose(true);
40
227
  upstream_conn_data_->addUpstreamCallbacks(*this);
41
227
}
42

            
43
296
void TcpUpstream::encodeData(Buffer::Instance& data, bool end_stream) {
44
296
  downstream_complete_ = end_stream;
45
296
  bytes_meter_->addWireBytesSent(data.length());
46
296
  upstream_conn_data_->connection().write(data, end_stream);
47
296
}
48

            
49
Envoy::Http::Status TcpUpstream::encodeHeaders(const Envoy::Http::RequestHeaderMap&,
50
223
                                               bool end_stream) {
51
223
  downstream_complete_ = end_stream;
52
  // Headers should only happen once, so use this opportunity to add the proxy
53
  // proto header, if configured.
54
223
  const Router::RouteEntry* route_entry = upstream_request_->route().routeEntry();
55
223
  ASSERT(route_entry != nullptr);
56
223
  if (route_entry->connectConfig().has_value()) {
57
219
    Buffer::OwnedImpl data;
58
219
    const auto& connect_config = route_entry->connectConfig();
59
219
    if (connect_config->has_proxy_protocol_config() &&
60
219
        upstream_request_->connection().has_value()) {
61
10
      Extensions::Common::ProxyProtocol::generateProxyProtoHeader(
62
10
          connect_config->proxy_protocol_config(), *upstream_request_->connection(), data);
63
10
    }
64

            
65
219
    if (data.length() != 0 || end_stream) {
66
      // Count header bytes for proxy proto.
67
14
      bytes_meter_->addHeaderBytesSent(data.length());
68
14
      bytes_meter_->addWireBytesSent(data.length());
69
14
      upstream_conn_data_->connection().write(data, end_stream);
70
14
    }
71
219
  }
72

            
73
  // TcpUpstream::encodeHeaders is called after the UpstreamRequest is fully initialized. Also use
74
  // this time to synthesize the 200 response headers downstream to complete the CONNECT handshake.
75
223
  Envoy::Http::ResponseHeaderMapPtr headers{
76
223
      Envoy::Http::createHeaderMap<Envoy::Http::ResponseHeaderMapImpl>(
77
223
          {{Envoy::Http::Headers::get().Status, "200"}})};
78
223
  upstream_request_->decodeHeaders(std::move(headers), /*end_stream=*/false);
79
223
  return Envoy::Http::okStatus();
80
223
}
81

            
82
1
void TcpUpstream::encodeTrailers(const Envoy::Http::RequestTrailerMap&) {
83
1
  downstream_complete_ = true;
84
1
  Buffer::OwnedImpl data;
85
1
  upstream_conn_data_->connection().write(data, true);
86
1
}
87

            
88
3
void TcpUpstream::readDisable(bool disable) {
89
3
  if (upstream_conn_data_->connection().state() != Network::Connection::State::Open) {
90
1
    return;
91
1
  }
92
2
  upstream_conn_data_->connection().readDisable(disable);
93
2
}
94

            
95
180
void TcpUpstream::resetStream() {
96
180
  upstream_request_ = nullptr;
97
180
  upstream_conn_data_->connection().close(Network::ConnectionCloseType::NoFlush,
98
180
                                          "tcp_upstream_reset_stream");
99
180
}
100

            
101
291
void TcpUpstream::onUpstreamData(Buffer::Instance& data, bool end_stream) {
102
  // In the TCP proxy case the filter manager used to trigger the full stream closure when the
103
  // upstream server half closed its end of the TCP connection. With the
104
  // allow_multiplexed_upstream_half_close enabled filter manager no longer closes stream that were
105
  // half closed by upstream before downstream. To keep the behavior the same for TCP proxy the
106
  // upstream force closes the connection when server half closes.
107
  //
108
  // Save the indicator to close the stream before calling the decodeData since when the
109
  // allow_multiplexed_upstream_half_close is false the call to decodeHeader with end_stream==true
110
  // will delete the TcpUpstream object.
111
  // NOTE: it this point Envoy can not support half closed TCP upstream as there is currently no
112
  // distinction between half closed vs fully closed TCP peers.
113
291
  const bool force_reset =
114
291
      force_reset_on_upstream_half_close_ && end_stream && !downstream_complete_;
115
291
  bytes_meter_->addWireBytesReceived(data.length());
116
291
  upstream_request_->decodeData(data, end_stream);
117
  // force_reset is true only when allow_multiplexed_upstream_half_close is true and in this case
118
  // the decodeData will never cause the stream to be closed and as such it safe to access
119
  // upstream_request_
120
291
  if (force_reset && upstream_request_) {
121
4
    upstream_request_->onResetStream(Envoy::Http::StreamResetReason::ConnectionTermination,
122
4
                                     "half_close_initiated_full_close");
123
4
  }
124
291
}
125

            
126
181
void TcpUpstream::onEvent(Network::ConnectionEvent event) {
127
181
  if ((event == Network::ConnectionEvent::LocalClose ||
128
181
       event == Network::ConnectionEvent::RemoteClose) &&
129
181
      upstream_request_) {
130
1
    upstream_request_->onResetStream(Envoy::Http::StreamResetReason::ConnectionTermination, "");
131
1
  }
132
181
}
133

            
134
1
void TcpUpstream::onAboveWriteBufferHighWatermark() {
135
1
  if (upstream_request_) {
136
1
    upstream_request_->onAboveWriteBufferHighWatermark();
137
1
  }
138
1
}
139

            
140
1
void TcpUpstream::onBelowWriteBufferLowWatermark() {
141
1
  if (upstream_request_) {
142
1
    upstream_request_->onBelowWriteBufferLowWatermark();
143
1
  }
144
1
}
145

            
146
} // namespace Tcp
147
} // namespace Http
148
} // namespace Upstreams
149
} // namespace Extensions
150
} // namespace Envoy