1
#include "source/extensions/bootstrap/reverse_tunnel/downstream_socket_interface/rc_connection_wrapper.h"
2

            
3
#include "envoy/network/address.h"
4
#include "envoy/network/connection.h"
5

            
6
#include "source/common/buffer/buffer_impl.h"
7
#include "source/common/common/logger.h"
8
#include "source/common/http/header_map_impl.h"
9
#include "source/common/http/utility.h"
10
#include "source/common/network/address_impl.h"
11
#include "source/common/network/connection_socket_impl.h"
12
#include "source/extensions/bootstrap/reverse_tunnel/common/reverse_connection_utility.h"
13
#include "source/extensions/bootstrap/reverse_tunnel/downstream_socket_interface/reverse_connection_io_handle.h"
14
#include "source/extensions/bootstrap/reverse_tunnel/downstream_socket_interface/reverse_tunnel_initiator_extension.h"
15

            
16
namespace Envoy {
17
namespace Extensions {
18
namespace Bootstrap {
19
namespace ReverseConnection {
20

            
21
// RCConnectionWrapper constructor implementation
22
RCConnectionWrapper::RCConnectionWrapper(ReverseConnectionIOHandle& parent,
23
                                         Network::ClientConnectionPtr connection,
24
                                         Upstream::HostDescriptionConstSharedPtr host,
25
                                         const std::string& cluster_name)
26
62
    : parent_(parent), connection_(std::move(connection)), host_(std::move(host)),
27
62
      cluster_name_(cluster_name) {
28
62
  ENVOY_LOG(debug, "RCConnectionWrapper: Using HTTP handshake for reverse connections");
29
62
}
30

            
31
// RCConnectionWrapper destructor implementation.
32
62
RCConnectionWrapper::~RCConnectionWrapper() {
33
62
  ENVOY_LOG(debug, "RCConnectionWrapper destructor called");
34
62
  if (!shutdown_called_) {
35
57
    this->shutdown();
36
57
  }
37
62
}
38

            
39
24
void RCConnectionWrapper::onEvent(Network::ConnectionEvent event) {
40
24
  if (event == Network::ConnectionEvent::RemoteClose) {
41
3
    if (!connection_) {
42
1
      ENVOY_LOG(debug, "RCConnectionWrapper: connection is null, skipping event handling");
43
1
      return;
44
1
    }
45

            
46
    // Store connection info before it gets invalidated.
47
2
    const std::string connectionKey =
48
2
        connection_->connectionInfoProvider().localAddress()->asString();
49
2
    const uint64_t connectionId = connection_->id();
50

            
51
2
    ENVOY_LOG(debug, "RCConnectionWrapper: connection: {}, found connection {} remote closed",
52
2
              connectionId, connectionKey);
53

            
54
    // Don't call shutdown() here as it may cause cleanup during event processing
55
    // Instead, just notify parent of closure.
56
2
    parent_.onConnectionDone("Connection closed", this, true);
57
2
  }
58
24
}
59

            
60
// SimpleConnReadFilter::onData implementation.
61
22
Network::FilterStatus SimpleConnReadFilter::onData(Buffer::Instance& buffer, bool end_stream) {
62
22
  if (parent_ == nullptr) {
63
1
    return Network::FilterStatus::StopIteration;
64
1
  }
65

            
66
  // Cast parent_ back to RCConnectionWrapper.
67
21
  RCConnectionWrapper* wrapper = static_cast<RCConnectionWrapper*>(parent_);
68

            
69
21
  wrapper->dispatchHttp1(buffer);
70
21
  UNREFERENCED_PARAMETER(end_stream);
71
21
  return Network::FilterStatus::StopIteration;
72
22
}
73

            
74
std::string RCConnectionWrapper::connect(const std::string& src_tenant_id,
75
                                         const std::string& src_cluster_id,
76
40
                                         const std::string& src_node_id) {
77
  // Register connection callbacks.
78
40
  ENVOY_LOG(debug, "RCConnectionWrapper: connection: {}, adding connection callbacks",
79
40
            connection_->id());
80
40
  connection_->addConnectionCallbacks(*this);
81
40
  connection_->connect();
82

            
83
  // Use HTTP handshake.
84
40
  ENVOY_LOG(debug,
85
40
            "RCConnectionWrapper: connection: {}, sending reverse connection creation "
86
40
            "request through HTTP",
87
40
            connection_->id());
88

            
89
  // Create HTTP/1 codec to parse the response.
90
40
  Http::Http1Settings http1_settings = host_->cluster().httpProtocolOptions().http1Settings();
91
40
  http1_client_codec_ = std::make_unique<Http::Http1::ClientConnectionImpl>(
92
40
      *connection_, host_->cluster().http1CodecStats(), *this, http1_settings,
93
40
      host_->cluster().maxResponseHeadersKb(), host_->cluster().maxResponseHeadersCount());
94
40
  http1_parse_connection_ = http1_client_codec_.get();
95

            
96
  // Add a tiny read filter to feed bytes into the codec for response parsing.
97
40
  connection_->addReadFilter(Network::ReadFilterSharedPtr{new SimpleConnReadFilter(this)});
98

            
99
  // Build HTTP handshake headers with identifiers.
100
40
  absl::string_view tenant_id = src_tenant_id;
101
40
  absl::string_view cluster_id = src_cluster_id;
102
40
  absl::string_view node_id = src_node_id;
103
40
  std::string host_value;
104
40
  const auto& remote_address = connection_->connectionInfoProvider().remoteAddress();
105
  // This is used when reverse connections need to be established through a HTTP proxy.
106
  // The reverse connection listener connects to an internal cluster, to which an
107
  // internal listener listens. This internal listener has tunneling configuration
108
  // to tcp proxy the reverse connection requests over HTTP/1 CONNECT to the remote
109
  // proxy.
110
40
  if (remote_address->type() == Network::Address::Type::EnvoyInternal) {
111
1
    const auto& internal_address =
112
1
        std::dynamic_pointer_cast<const Network::Address::EnvoyInternalInstance>(remote_address);
113
1
    ENVOY_LOG(debug,
114
1
              "RCConnectionWrapper: connection: {}, remote address is internal "
115
1
              "listener {}, using endpoint ID in host header",
116
1
              connection_->id(), internal_address->envoyInternalAddress()->addressId());
117
1
    host_value = internal_address->envoyInternalAddress()->endpointId();
118
39
  } else {
119
39
    host_value = remote_address->asString();
120
39
    ENVOY_LOG(debug,
121
39
              "RCConnectionWrapper: connection: {}, remote address is external, "
122
39
              "using address as host header",
123
39
              connection_->id());
124
39
  }
125
40
  const Http::LowerCaseString& node_hdr =
126
40
      ::Envoy::Extensions::Bootstrap::ReverseConnection::reverseTunnelNodeIdHeader();
127
40
  const Http::LowerCaseString& cluster_hdr =
128
40
      ::Envoy::Extensions::Bootstrap::ReverseConnection::reverseTunnelClusterIdHeader();
129
40
  const Http::LowerCaseString& tenant_hdr =
130
40
      ::Envoy::Extensions::Bootstrap::ReverseConnection::reverseTunnelTenantIdHeader();
131
40
  const Http::LowerCaseString& upstream_cluster_hdr =
132
40
      ::Envoy::Extensions::Bootstrap::ReverseConnection::reverseTunnelUpstreamClusterNameHeader();
133

            
134
40
  auto headers = Http::createHeaderMap<Http::RequestHeaderMapImpl>(
135
40
      {{Http::Headers::get().Method, Http::Headers::get().MethodValues.Get},
136
40
       {Http::Headers::get().Path, parent_.requestPath()},
137
40
       {Http::Headers::get().Host, host_value}});
138
40
  headers->addCopy(node_hdr, std::string(node_id));
139
40
  headers->addCopy(cluster_hdr, std::string(cluster_id));
140
40
  headers->addCopy(tenant_hdr, std::string(tenant_id));
141
40
  headers->addCopy(upstream_cluster_hdr, cluster_name_);
142
40
  headers->setContentLength(0);
143

            
144
  // Encode via HTTP/1 codec.
145
40
  Http::RequestEncoder& request_encoder = http1_client_codec_->newStream(*this);
146
40
  const Http::Status encode_status = request_encoder.encodeHeaders(*headers, true);
147
40
  if (!encode_status.ok()) {
148
    ENVOY_LOG(error, "RCConnectionWrapper: encodeHeaders failed: {}", encode_status.message());
149
    onHandshakeFailure(HandshakeFailureReason::encodeError());
150
  }
151

            
152
40
  return connection_->connectionInfoProvider().localAddress()->asString();
153
40
}
154

            
155
16
void RCConnectionWrapper::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool) {
156
16
  const uint64_t status = Http::Utility::getResponseStatus(*headers);
157
16
  if (status == 200) {
158
15
    ENVOY_LOG(debug, "Received HTTP 200 OK response");
159
15
    onHandshakeSuccess();
160
15
  } else {
161
1
    ENVOY_LOG(error, "Received non-200 HTTP response: {}", status);
162
1
    onHandshakeFailure(HandshakeFailureReason::httpStatusError(absl::StrCat(status)));
163
1
  }
164
16
}
165

            
166
22
void RCConnectionWrapper::dispatchHttp1(Buffer::Instance& buffer) {
167
22
  if (http1_parse_connection_ != nullptr) {
168
15
    const Http::Status status = http1_parse_connection_->dispatch(buffer);
169
15
    if (!status.ok()) {
170
      ENVOY_LOG(debug, "RCConnectionWrapper: HTTP/1 codec dispatch error: {}", status.message());
171
    }
172
15
  }
173
22
}
174

            
175
19
ReverseTunnelInitiatorExtension* RCConnectionWrapper::getDownstreamExtension() const {
176
19
  return parent_.getDownstreamExtension();
177
19
}
178

            
179
16
void RCConnectionWrapper::onHandshakeSuccess() {
180
16
  std::string message = "reverse connection accepted";
181
16
  ENVOY_LOG(debug, "handshake succeeded: {}", message);
182

            
183
  // Track handshake success stats.
184
16
  auto* extension = getDownstreamExtension();
185
16
  if (extension) {
186
16
    extension->incrementHandshakeStats(cluster_name_, true, "");
187
16
  }
188

            
189
16
  parent_.onConnectionDone(message, this, false);
190
16
}
191

            
192
3
void RCConnectionWrapper::onHandshakeFailure(const HandshakeFailureReason& reason) {
193
3
  const std::string error_message = reason.getDetailedName();
194
3
  const std::string stats_failure_reason = reason.getNameForStats();
195

            
196
3
  ENVOY_LOG(trace, "handshake failed: {}", error_message);
197

            
198
  // Track handshake failure stats.
199
3
  auto* extension = getDownstreamExtension();
200
3
  if (extension) {
201
3
    extension->incrementHandshakeStats(cluster_name_, false, stats_failure_reason);
202
3
  }
203

            
204
3
  parent_.onConnectionDone(error_message, this, false);
205
3
}
206

            
207
63
void RCConnectionWrapper::shutdown() {
208
63
  if (shutdown_called_) {
209
1
    ENVOY_LOG(debug, "RCConnectionWrapper: Shutdown already called, skipping");
210
1
    return;
211
1
  }
212
62
  shutdown_called_ = true;
213

            
214
62
  if (!connection_) {
215
25
    ENVOY_LOG(error, "RCConnectionWrapper: Connection already null, nothing to shutdown");
216
25
    return;
217
25
  }
218

            
219
  // Get connection info for logging.
220
37
  uint64_t connection_id = connection_->id();
221
37
  Network::Connection::State state = connection_->state();
222
37
  ENVOY_LOG(debug, "RCConnectionWrapper: Shutting down connection ID: {}, state: {}", connection_id,
223
37
            static_cast<int>(state));
224

            
225
  // Remove connection callbacks first to prevent recursive calls during shutdown.
226
37
  if (state != Network::Connection::State::Closed) {
227
30
    connection_->removeConnectionCallbacks(*this);
228
30
    ENVOY_LOG(debug, "Connection callbacks removed");
229
30
  }
230

            
231
  // Close the connection if it's still open.
232
37
  state = connection_->state();
233
37
  if (state == Network::Connection::State::Open) {
234
29
    ENVOY_LOG(debug, "Closing open connection gracefully");
235
29
    connection_->close(Network::ConnectionCloseType::FlushWrite);
236
29
  } else if (state == Network::Connection::State::Closing) {
237
1
    ENVOY_LOG(debug, "Connection already closing");
238
7
  } else {
239
7
    ENVOY_LOG(debug, "Connection already closed");
240
7
  }
241

            
242
  // Clear the connection pointer after shutdown.
243
37
  connection_.reset();
244
37
  ENVOY_LOG(debug, "RCConnectionWrapper: Connection cleared after shutdown");
245
37
  ENVOY_LOG(debug, "RCConnectionWrapper: Shutdown completed");
246
37
}
247

            
248
} // namespace ReverseConnection
249
} // namespace Bootstrap
250
} // namespace Extensions
251
} // namespace Envoy