1
#include "source/extensions/clusters/reverse_connection/reverse_connection.h"
2

            
3
#include <chrono>
4
#include <list>
5
#include <string>
6
#include <vector>
7

            
8
#include "envoy/config/cluster/v3/cluster.pb.h"
9
#include "envoy/config/core/v3/base.pb.h"
10
#include "envoy/config/core/v3/health_check.pb.h"
11
#include "envoy/config/endpoint/v3/endpoint_components.pb.h"
12

            
13
#include "source/common/common/fmt.h"
14
#include "source/common/config/utility.h"
15
#include "source/common/formatter/substitution_formatter.h"
16
#include "source/common/network/address_impl.h"
17
#include "source/common/protobuf/protobuf.h"
18
#include "source/common/protobuf/utility.h"
19
#include "source/extensions/bootstrap/reverse_tunnel/common/reverse_connection_utility.h"
20

            
21
#include "absl/status/status.h"
22
#include "absl/status/statusor.h"
23
#include "absl/synchronization/mutex.h"
24

            
25
namespace Envoy {
26
namespace Extensions {
27
namespace ReverseConnection {
28

            
29
namespace BootstrapReverseConnection = Envoy::Extensions::Bootstrap::ReverseConnection;
30

            
31
Upstream::HostSelectionResponse
32
173
RevConCluster::LoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {
33
173
  if (context == nullptr) {
34
1
    ENVOY_LOG(error, "reverse_connection: chooseHost called with null context");
35
1
    return {nullptr};
36
1
  }
37

            
38
  // Evaluate the configured host-id formatter to obtain the host identifier.
39
172
  if (context->downstreamHeaders() == nullptr) {
40
2
    ENVOY_LOG(error, "reverse_connection: missing downstream headers; cannot evaluate formatter.");
41
2
    return {nullptr};
42
2
  }
43

            
44
  // Format the host identifier using the configured formatter.
45
170
  const Envoy::Formatter::Context formatter_context{
46
170
      context->downstreamHeaders(),     nullptr /* response_headers */,
47
170
      nullptr /* response_trailers */,  "" /* local_reply_body */,
48
170
      AccessLog::AccessLogType::NotSet, nullptr /* active_span */};
49

            
50
  // Use request stream info if available, otherwise fall back to connection stream info.
51
170
  const StreamInfo::StreamInfo& stream_info = context->requestStreamInfo()
52
170
                                                  ? *context->requestStreamInfo()
53
170
                                                  : context->downstreamConnection()->streamInfo();
54

            
55
170
  const std::string host_id = parent_->host_id_formatter_->format(formatter_context, stream_info);
56

            
57
  // Treat "-" (formatter default for missing) as empty as well.
58
170
  if (host_id.empty() || host_id == "-") {
59
4
    ENVOY_LOG(error, "reverse_connection: host_id formatter returned empty value.");
60
4
    return {nullptr};
61
4
  }
62

            
63
  // Check if tenant isolation is enabled and tenant_id_formatter is configured.
64
166
  std::string final_host_id = host_id;
65
166
  auto* socket_manager = parent_->getUpstreamSocketManager();
66
166
  if (socket_manager != nullptr && socket_manager->tenantIsolationEnabled()) {
67
    // When tenant isolation is enabled, tenant_id_formatter must be configured.
68
6
    if (parent_->tenant_id_formatter_ == nullptr) {
69
1
      ENVOY_LOG(error,
70
1
                "reverse_connection: tenant isolation is enabled but tenant_id_format is not "
71
1
                "configured. tenant_id_format is required when tenant isolation is enabled.");
72
1
      return {nullptr};
73
1
    }
74
    // Format tenant identifier.
75
5
    const std::string tenant_id =
76
5
        parent_->tenant_id_formatter_->format(formatter_context, stream_info);
77

            
78
    // Treat "-" (formatter default for missing) as empty as well.
79
5
    if (!tenant_id.empty() && tenant_id != "-") {
80
      // Concatenate tenant_id and host_id using the utility function.
81
3
      final_host_id =
82
3
          BootstrapReverseConnection::ReverseConnectionUtility::buildTenantScopedIdentifier(
83
3
              tenant_id, host_id);
84
3
      ENVOY_LOG(debug,
85
3
                "reverse_connection: tenant isolation enabled, using tenant-scoped identifier: {}",
86
3
                final_host_id);
87
3
    } else {
88
      // When tenant isolation is enabled, tenant_id must be derivable.
89
2
      ENVOY_LOG(error,
90
2
                "reverse_connection: tenant isolation enabled but tenant_id cannot be inferred "
91
2
                "(formatter returned empty value)");
92
2
      return {nullptr};
93
2
    }
94
5
  }
95

            
96
163
  ENVOY_LOG(debug, "reverse_connection: using host identifier: {}", final_host_id);
97
163
  return parent_->checkAndCreateHost(final_host_id);
98
166
}
99

            
100
163
Upstream::HostSelectionResponse RevConCluster::checkAndCreateHost(absl::string_view host_id) {
101
  // Get the SocketManager to resolve cluster ID to node ID.
102
  // The bootstrap extension is validated during cluster creation, and TLS is initialized before
103
  // request handling, so socket_manager should always be available.
104
163
  auto* socket_manager = getUpstreamSocketManager();
105
163
  ASSERT(socket_manager != nullptr, "Socket manager should be initialized before request handling");
106

            
107
  // Use SocketManager to resolve the key to a node ID.
108
163
  std::string node_id = socket_manager->getNodeWithSocket(std::string(host_id));
109
163
  ENVOY_LOG(debug, "reverse_connection: resolved key '{}' to node: '{}'", host_id, node_id);
110

            
111
163
  {
112
163
    absl::ReaderMutexLock rlock(host_map_lock_);
113
    // Check if node_id is already present in host_map_ or not. This ensures,
114
    // that envoy reuses a conn_pool_container for an endpoint.
115
163
    auto host_itr = host_map_.find(node_id);
116
163
    if (host_itr != host_map_.end()) {
117
32
      ENVOY_LOG(debug, "reverse_connection: reusing existing host for {}.", node_id);
118
32
      Upstream::HostSharedPtr host = host_itr->second;
119
32
      return {host};
120
32
    }
121
163
  }
122

            
123
131
  absl::WriterMutexLock wlock(host_map_lock_);
124

            
125
  // Re-check under writer lock to avoid duplicate creation under contention.
126
131
  auto host_itr2 = host_map_.find(node_id);
127
131
  if (host_itr2 != host_map_.end()) {
128
    ENVOY_LOG(debug, "reverse_connection: host already created for {} during contention.", node_id);
129
    return {host_itr2->second};
130
  }
131

            
132
  // Create a custom address that uses the UpstreamReverseSocketInterface.
133
131
  Network::Address::InstanceConstSharedPtr host_address(
134
131
      std::make_shared<UpstreamReverseConnectionAddress>(node_id));
135

            
136
  // Create a standard HostImpl using the custom address.
137
131
  auto host_result = Upstream::HostImpl::create(
138
131
      info(), absl::StrCat(info()->name(), static_cast<std::string>(node_id)),
139
131
      std::move(host_address), nullptr /* endpoint_metadata */, nullptr /* locality_metadata */,
140
131
      1 /* initial_weight */, std::make_shared<const envoy::config::core::v3::Locality>(),
141
131
      envoy::config::endpoint::v3::Endpoint::HealthCheckConfig().default_instance(),
142
131
      0 /* priority */, envoy::config::core::v3::UNKNOWN);
143

            
144
  // Convert unique_ptr to shared_ptr.
145
131
  Upstream::HostSharedPtr host(std::move(host_result.value()));
146
131
  ENVOY_LOG(trace, "reverse_connection: created HostImpl {} for {}.", *host, node_id);
147

            
148
131
  host_map_[node_id] = host;
149
131
  return {host};
150
131
}
151

            
152
2
void RevConCluster::cleanup() {
153
2
  absl::WriterMutexLock wlock(host_map_lock_);
154

            
155
6
  for (auto iter = host_map_.begin(); iter != host_map_.end();) {
156
    // Check if the host handle is acquired by any connection pool container or not. If not
157
    // clean those host to prevent memory leakage.
158
4
    const auto& host = iter->second;
159
4
    if (!host->used()) {
160
3
      ENVOY_LOG(debug, "Removing stale host: {}", *host);
161
3
      host_map_.erase(iter++);
162
3
    } else {
163
1
      ++iter;
164
1
    }
165
4
  }
166

            
167
  // Reschedule the cleanup after cleanup_interval_ duration.
168
2
  cleanup_timer_->enableTimer(cleanup_interval_);
169
2
}
170

            
171
329
BootstrapReverseConnection::UpstreamSocketManager* RevConCluster::getUpstreamSocketManager() const {
172
329
  auto* upstream_interface =
173
329
      Network::socketInterface("envoy.bootstrap.reverse_tunnel.upstream_socket_interface");
174
329
  ASSERT(upstream_interface != nullptr,
175
329
         "Upstream reverse socket interface should be validated during cluster creation");
176

            
177
329
  auto* upstream_socket_interface =
178
329
      dynamic_cast<const BootstrapReverseConnection::ReverseTunnelAcceptor*>(upstream_interface);
179
329
  ASSERT(upstream_socket_interface != nullptr,
180
329
         "Socket interface type should be validated during cluster creation");
181

            
182
  // TLS is initialized in onServerInitialized() which is called after cluster creation but before
183
  // request handling, so it should always be available when this method is called.
184
329
  auto* tls_registry = upstream_socket_interface->getLocalRegistry();
185
329
  ASSERT(tls_registry != nullptr,
186
329
         "TLS should be initialized by onServerInitialized() before request handling");
187

            
188
329
  return tls_registry->socketManager();
189
329
}
190

            
191
RevConCluster::RevConCluster(
192
    const envoy::config::cluster::v3::Cluster& config, Upstream::ClusterFactoryContext& context,
193
    absl::Status& creation_status,
194
    const envoy::extensions::clusters::reverse_connection::v3::ReverseConnectionClusterConfig&
195
        rev_con_config)
196
37
    : ClusterImplBase(config, context, creation_status),
197
37
      dispatcher_(context.serverFactoryContext().mainThreadDispatcher()),
198
37
      cleanup_interval_(std::chrono::milliseconds(
199
37
          PROTOBUF_GET_MS_OR_DEFAULT(rev_con_config, cleanup_interval, 60000))),
200
37
      cleanup_timer_(dispatcher_.createTimer([this]() -> void { cleanup(); })) {
201
  // Create the host-id formatter from the format string.
202
37
  auto formatter_or_error = Envoy::Formatter::FormatterImpl::create(
203
37
      rev_con_config.host_id_format(), /*omit_empty_values=*/false,
204
37
      Envoy::Formatter::BuiltInCommandParserFactoryHelper::commandParsers());
205
37
  host_id_formatter_ = std::move(*formatter_or_error);
206

            
207
  // Create the tenant-id formatter if configured.
208
37
  if (!rev_con_config.tenant_id_format().empty()) {
209
7
    auto tenant_formatter_or_error = Envoy::Formatter::FormatterImpl::create(
210
7
        rev_con_config.tenant_id_format(), /*omit_empty_values=*/false,
211
7
        Envoy::Formatter::BuiltInCommandParserFactoryHelper::commandParsers());
212
7
    tenant_id_formatter_ = std::move(*tenant_formatter_or_error);
213
7
  }
214

            
215
  // Schedule periodic cleanup.
216
37
  cleanup_timer_->enableTimer(cleanup_interval_);
217
37
}
218

            
219
absl::StatusOr<std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
220
RevConClusterFactory::createClusterWithConfig(
221
    const envoy::config::cluster::v3::Cluster& cluster,
222
    const envoy::extensions::clusters::reverse_connection::v3::ReverseConnectionClusterConfig&
223
        proto_config,
224
44
    Upstream::ClusterFactoryContext& context) {
225
44
  if (cluster.lb_policy() != envoy::config::cluster::v3::Cluster::CLUSTER_PROVIDED) {
226
1
    return absl::InvalidArgumentError(
227
1
        fmt::format("cluster: LB policy {} is not valid for Cluster type {}. Only "
228
1
                    "'CLUSTER_PROVIDED' is allowed with cluster type 'REVERSE_CONNECTION'",
229
1
                    envoy::config::cluster::v3::Cluster::LbPolicy_Name(cluster.lb_policy()),
230
1
                    cluster.cluster_type().name()));
231
1
  }
232

            
233
43
  if (cluster.has_load_assignment()) {
234
1
    return absl::InvalidArgumentError(
235
1
        "Reverse Conn clusters must have no load assignment configured");
236
1
  }
237

            
238
  // Validate that the required bootstrap extension is configured using Envoy's standard utility.
239
42
  const std::string extension_name = "envoy.bootstrap.reverse_tunnel.upstream_socket_interface";
240
42
  auto* factory =
241
42
      Config::Utility::getAndCheckFactoryByName<Server::Configuration::BootstrapExtensionFactory>(
242
42
          extension_name, /*is_optional=*/true);
243
42
  if (factory == nullptr) {
244
1
    return absl::InvalidArgumentError(fmt::format(
245
1
        "Reverse connection cluster requires the upstream reverse tunnel bootstrap extension '{}' "
246
1
        "to be configured. Please add it to bootstrap_extensions in your bootstrap configuration.",
247
1
        extension_name));
248
1
  }
249

            
250
  // Validate that the factory is a ReverseTunnelAcceptor.
251
41
  auto* upstream_socket_interface =
252
41
      dynamic_cast<const BootstrapReverseConnection::ReverseTunnelAcceptor*>(factory);
253
41
  if (upstream_socket_interface == nullptr) {
254
1
    return absl::InvalidArgumentError(
255
1
        fmt::format("Bootstrap extension '{}' exists but is not of the expected type "
256
1
                    "(ReverseTunnelAcceptor). This indicates a configuration error.",
257
1
                    extension_name));
258
1
  }
259

            
260
  // Validate that if tenant isolation is enabled in bootstrap config, tenant_id_format is
261
  // configured.
262
40
  auto* extension = upstream_socket_interface->getExtension();
263
40
  if (extension != nullptr && extension->enableTenantIsolation() &&
264
40
      proto_config.tenant_id_format().empty()) {
265
1
    return absl::InvalidArgumentError(
266
1
        fmt::format("tenant_id_format must be configured for reverse connection cluster '{}' when "
267
1
                    "tenant isolation is enabled in the bootstrap configuration. Please configure "
268
1
                    "tenant_id_format in the reverse connection cluster configuration.",
269
1
                    cluster.name()));
270
1
  }
271

            
272
  // Validate the host_id_format early to catch formatter errors.
273
39
  auto validation_or_error = Envoy::Formatter::FormatterImpl::create(
274
39
      proto_config.host_id_format(), /*omit_empty_values=*/false,
275
39
      Envoy::Formatter::BuiltInCommandParserFactoryHelper::commandParsers());
276
39
  RETURN_IF_NOT_OK_REF(validation_or_error.status());
277

            
278
  // Validate the tenant_id_format if provided.
279
38
  if (!proto_config.tenant_id_format().empty()) {
280
8
    auto tenant_validation_or_error = Envoy::Formatter::FormatterImpl::create(
281
8
        proto_config.tenant_id_format(), /*omit_empty_values=*/false,
282
8
        Envoy::Formatter::BuiltInCommandParserFactoryHelper::commandParsers());
283
8
    RETURN_IF_NOT_OK_REF(tenant_validation_or_error.status());
284
7
  }
285

            
286
37
  absl::Status creation_status = absl::OkStatus();
287
37
  auto new_cluster = std::shared_ptr<RevConCluster>(
288
37
      new RevConCluster(cluster, context, creation_status, proto_config));
289
37
  RETURN_IF_NOT_OK(creation_status);
290
37
  auto lb = std::make_unique<RevConCluster::ThreadAwareLoadBalancer>(new_cluster);
291
37
  return std::make_pair(new_cluster, std::move(lb));
292
37
}
293

            
294
/**
295
 * Static registration for the rev-con cluster factory. @see RegisterFactory.
296
 */
297
REGISTER_FACTORY(RevConClusterFactory, Upstream::ClusterFactory);
298

            
299
} // namespace ReverseConnection
300
} // namespace Extensions
301
} // namespace Envoy