1
#pragma once
2

            
3
#include <netinet/in.h>
4
#include <sys/socket.h>
5

            
6
#include <cstdint>
7
#include <cstring>
8
#include <functional>
9
#include <string>
10
#include <vector>
11

            
12
#include "envoy/config/cluster/v3/cluster.pb.h"
13
#include "envoy/extensions/clusters/reverse_connection/v3/reverse_connection.pb.h"
14
#include "envoy/extensions/clusters/reverse_connection/v3/reverse_connection.pb.validate.h"
15

            
16
#include "source/common/common/logger.h"
17
#include "source/common/formatter/substitution_formatter.h"
18
#include "source/common/network/address_impl.h"
19
#include "source/common/network/socket_interface.h"
20
#include "source/common/upstream/cluster_factory_impl.h"
21
#include "source/common/upstream/upstream_impl.h"
22
#include "source/extensions/bootstrap/reverse_tunnel/common/reverse_connection_utility.h"
23
#include "source/extensions/bootstrap/reverse_tunnel/upstream_socket_interface/reverse_tunnel_acceptor.h"
24
#include "source/extensions/bootstrap/reverse_tunnel/upstream_socket_interface/reverse_tunnel_acceptor_extension.h"
25
#include "source/extensions/bootstrap/reverse_tunnel/upstream_socket_interface/upstream_socket_manager.h"
26

            
27
#include "absl/status/statusor.h"
28

            
29
namespace Envoy {
30
namespace Extensions {
31
namespace ReverseConnection {
32

            
33
namespace BootstrapReverseConnection = Envoy::Extensions::Bootstrap::ReverseConnection;
34

            
35
/**
36
 * Custom address type that uses the UpstreamReverseSocketInterface.
37
 * This address will be used by RevConHost to ensure socket creation goes through
38
 * the upstream socket interface.
39
 */
40
class UpstreamReverseConnectionAddress
41
    : public Network::Address::Instance,
42
      public Envoy::Logger::Loggable<Envoy::Logger::Id::connection> {
43
public:
44
  UpstreamReverseConnectionAddress(const std::string& node_id)
45
144
      : node_id_(node_id), address_string_("127.0.0.1:0") {
46

            
47
    // Create a simple socket address for filter chain matching.
48
    // Use 127.0.0.1:0 which will match the catch-all filter chain
49
144
    synthetic_sockaddr_.sin_family = AF_INET;
50
144
    synthetic_sockaddr_.sin_port = htons(0);                 // Port 0 for reverse connections
51
144
    synthetic_sockaddr_.sin_addr.s_addr = htonl(0x7f000001); // 127.0.0.1
52
144
    memset(&synthetic_sockaddr_.sin_zero, 0, sizeof(synthetic_sockaddr_.sin_zero));
53

            
54
144
    ENVOY_LOG(
55
144
        debug,
56
144
        "UpstreamReverseConnectionAddress: node: {} using 127.0.0.1:0 for filter chain matching",
57
144
        node_id_);
58
144
  }
59

            
60
  // Network::Address::Instance.
61
6
  bool operator==(const Instance& rhs) const override {
62
6
    const auto* other = dynamic_cast<const UpstreamReverseConnectionAddress*>(&rhs);
63
6
    return other && node_id_ == other->node_id_;
64
6
  }
65

            
66
20
  Network::Address::Type type() const override { return Network::Address::Type::Ip; }
67
155
  const std::string& asString() const override { return address_string_; }
68
1
  absl::string_view asStringView() const override { return address_string_; }
69
137
  const std::string& logicalName() const override { return node_id_; }
70
16
  const Network::Address::Ip* ip() const override { return &ip_; }
71
1
  const Network::Address::Pipe* pipe() const override { return nullptr; }
72
1
  const Network::Address::EnvoyInternalAddress* envoyInternalAddress() const override {
73
1
    return nullptr;
74
1
  }
75
2
  const sockaddr* sockAddr() const override {
76
2
    return reinterpret_cast<const sockaddr*>(&synthetic_sockaddr_);
77
2
  }
78
2
  socklen_t sockAddrLen() const override { return sizeof(synthetic_sockaddr_); }
79
  // Set to default so that the default client connection factory is used to initiate connections
80
  // to. the address.
81
8
  absl::string_view addressType() const override { return "default"; }
82
132
  absl::optional<std::string> networkNamespace() const override { return absl::nullopt; }
83
  Network::Address::InstanceConstSharedPtr withNetworkNamespace(absl::string_view) const override {
84
    return nullptr;
85
  }
86

            
87
  // Override socketInterface to use the ReverseTunnelAcceptor.
88
9
  const Network::SocketInterface& socketInterface() const override {
89
9
    ENVOY_LOG(debug, "UpstreamReverseConnectionAddress: socketInterface() called for node: {}",
90
9
              node_id_);
91
9
    auto* upstream_interface =
92
9
        Network::socketInterface("envoy.bootstrap.reverse_tunnel.upstream_socket_interface");
93
9
    if (upstream_interface) {
94
8
      ENVOY_LOG(debug, "UpstreamReverseConnectionAddress: Using ReverseTunnelAcceptor for node: {}",
95
8
                node_id_);
96
8
      return *upstream_interface;
97
8
    }
98
    // Fallback to default socket interface if upstream interface is not available.
99
1
    return *Network::socketInterface(
100
1
        "envoy.extensions.network.socket_interface.default_socket_interface");
101
9
  }
102

            
103
private:
104
  // Simple IPv4 implementation for upstream reverse connection addresses.
105
  struct UpstreamReverseConnectionIp : public Network::Address::Ip {
106
1
    const std::string& addressAsString() const override { return address_string_; }
107
1
    bool isAnyAddress() const override { return true; }
108
1
    bool isUnicastAddress() const override { return false; }
109
1
    const Network::Address::Ipv4* ipv4() const override { return nullptr; }
110
1
    const Network::Address::Ipv6* ipv6() const override { return nullptr; }
111
1
    uint32_t port() const override { return 0; }
112
2
    Network::Address::IpVersion version() const override { return Network::Address::IpVersion::v4; }
113

            
114
    // Additional pure virtual methods that need implementation.
115
1
    bool isLinkLocalAddress() const override { return false; }
116
1
    bool isUniqueLocalAddress() const override { return false; }
117
1
    bool isSiteLocalAddress() const override { return false; }
118
1
    bool isTeredoAddress() const override { return false; }
119

            
120
    std::string address_string_{"0.0.0.0:0"};
121
  };
122

            
123
  std::string node_id_;
124
  std::string address_string_;
125
  UpstreamReverseConnectionIp ip_;
126
  struct sockaddr_in synthetic_sockaddr_; // Socket address for filter chain matching
127
};
128

            
129
/**
130
 * The RevConCluster is a dynamic cluster that automatically adds hosts using
131
 * request context of the downstream connection. Later, these hosts are used
132
 * to retrieve reverse connection sockets to stream data to upstream endpoints.
133
 * Also, the RevConCluster cleans these hosts if no connection pool is using them.
134
 */
135
class RevConCluster : public Upstream::ClusterImplBase {
136
  friend class ReverseConnectionClusterTest;
137

            
138
public:
139
  RevConCluster(
140
      const envoy::config::cluster::v3::Cluster& config, Upstream::ClusterFactoryContext& context,
141
      absl::Status& creation_status,
142
      const envoy::extensions::clusters::reverse_connection::v3::ReverseConnectionClusterConfig&
143
          rev_con_config);
144

            
145
37
  ~RevConCluster() override { cleanup_timer_->disableTimer(); }
146

            
147
  // Upstream::Cluster.
148
49
  InitializePhase initializePhase() const override { return InitializePhase::Primary; }
149

            
150
  class LoadBalancer : public Upstream::LoadBalancer {
151
  public:
152
48
    LoadBalancer(const std::shared_ptr<RevConCluster>& parent) : parent_(parent) {}
153

            
154
    // Chooses a host to send a downstream request over a reverse connection endpoint.
155
    // The request MUST provide a host identifier via dynamic metadata populated by a matcher
156
    // action. No header or authority/SNI fallbacks are used.
157
    Upstream::HostSelectionResponse chooseHost(Upstream::LoadBalancerContext* context) override;
158

            
159
    // Virtual functions that are not supported by our custom load-balancer.
160
1
    Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override {
161
1
      return nullptr;
162
1
    }
163
    absl::optional<Upstream::SelectedPoolAndConnection>
164
    selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
165
                             const Upstream::Host& /*host*/,
166
1
                             std::vector<uint8_t>& /*hash_key*/) override {
167
1
      return absl::nullopt;
168
1
    }
169

            
170
    // Lifetime tracking not implemented.
171
1
    OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
172
1
      return {};
173
1
    }
174

            
175
  private:
176
    const std::shared_ptr<RevConCluster> parent_;
177
  };
178

            
179
private:
180
  struct LoadBalancerFactory : public Upstream::LoadBalancerFactory {
181
8
    LoadBalancerFactory(const std::shared_ptr<RevConCluster>& cluster) : cluster_(cluster) {}
182

            
183
    // Upstream::LoadBalancerFactory.
184
19
    Upstream::LoadBalancerPtr create() { return std::make_unique<LoadBalancer>(cluster_); }
185
18
    Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams) override { return create(); }
186

            
187
    const std::shared_ptr<RevConCluster> cluster_;
188
  };
189

            
190
  struct ThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer {
191
38
    ThreadAwareLoadBalancer(const std::shared_ptr<RevConCluster>& cluster) : cluster_(cluster) {}
192

            
193
    // Upstream::ThreadAwareLoadBalancer.
194
7
    Upstream::LoadBalancerFactorySharedPtr factory() override {
195
7
      return std::make_shared<LoadBalancerFactory>(cluster_);
196
7
    }
197
7
    absl::Status initialize() override { return absl::OkStatus(); }
198

            
199
    const std::shared_ptr<RevConCluster> cluster_;
200
  };
201

            
202
  // Periodically cleans the stale hosts from host_map_.
203
  void cleanup();
204

            
205
  // Checks if a host exists for a given host identifier and if not creates and caches it.
206
  Upstream::HostSelectionResponse checkAndCreateHost(absl::string_view host_id);
207

            
208
  // Get the upstream socket manager from the thread-local registry.
209
  BootstrapReverseConnection::UpstreamSocketManager* getUpstreamSocketManager() const;
210

            
211
  // No pre-initialize work needs to be completed by REVERSE CONNECTION cluster.
212
37
  void startPreInit() override { onPreInitComplete(); }
213

            
214
  Event::Dispatcher& dispatcher_;
215
  std::chrono::milliseconds cleanup_interval_;
216
  Event::TimerPtr cleanup_timer_;
217
  absl::Mutex host_map_lock_;
218
  absl::flat_hash_map<std::string, Upstream::HostSharedPtr> host_map_;
219
  // Formatter for computing host identifier from request context.
220
  Envoy::Formatter::FormatterPtr host_id_formatter_;
221
  // Optional formatter for computing tenant identifier from request context.
222
  // Used when tenant isolation is enabled to create tenant-scoped identifiers.
223
  Envoy::Formatter::FormatterPtr tenant_id_formatter_;
224
  friend class RevConClusterFactory;
225
};
226

            
227
using RevConClusterSharedPtr = std::shared_ptr<RevConCluster>;
228

            
229
class RevConClusterFactory
230
    : public Upstream::ConfigurableClusterFactoryBase<
231
          envoy::extensions::clusters::reverse_connection::v3::ReverseConnectionClusterConfig> {
232
public:
233
44
  RevConClusterFactory() : ConfigurableClusterFactoryBase("envoy.clusters.reverse_connection") {}
234

            
235
private:
236
  friend class ReverseConnectionClusterTest;
237
  absl::StatusOr<
238
      std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
239
  createClusterWithConfig(
240
      const envoy::config::cluster::v3::Cluster& cluster,
241
      const envoy::extensions::clusters::reverse_connection::v3::ReverseConnectionClusterConfig&
242
          proto_config,
243
      Upstream::ClusterFactoryContext& context) override;
244
};
245

            
246
DECLARE_FACTORY(RevConClusterFactory);
247

            
248
} // namespace ReverseConnection
249
} // namespace Extensions
250
} // namespace Envoy