1
#pragma once
2

            
3
#include <cstdint>
4
#include <functional>
5
#include <string>
6

            
7
#include "envoy/config/cluster/v3/cluster.pb.h"
8
#include "envoy/secret/secret_manager.h"
9
#include "envoy/server/transport_socket_config.h"
10
#include "envoy/stats/scope.h"
11
#include "envoy/thread_local/thread_local.h"
12

            
13
#include "source/common/common/empty_string.h"
14
#include "source/common/common/logger.h"
15
#include "source/common/config/metadata.h"
16
#include "source/common/upstream/cluster_factory_impl.h"
17
#include "source/common/upstream/upstream_impl.h"
18

            
19
namespace Envoy {
20
namespace Upstream {
21

            
22
class OriginalDstClusterFactory;
23
class OriginalDstClusterTest;
24

            
25
struct HostsForAddress {
26
48
  HostsForAddress(HostSharedPtr& host) : host_(host), used_(true) {}
27

            
28
  // Primary host for the address. This is set by the first worker that posts
29
  // to the main to add a host. The field is read by all workers.
30
  const HostSharedPtr host_;
31
  // Hosts that are added concurrently with host_ are stored in this list.
32
  // This is populated by the subsequent workers that have not received the
33
  // updated table with set host_. The field is only accessed from the main
34
  // thread.
35
  std::vector<HostSharedPtr> hosts_;
36
  // Marks as recently used by load balancers.
37
  std::atomic<bool> used_;
38
};
39

            
40
using HostsForAddressSharedPtr = std::shared_ptr<HostsForAddress>;
41
using HostMultiMap = absl::flat_hash_map<std::string, HostsForAddressSharedPtr>;
42
using HostMultiMapSharedPtr = std::shared_ptr<HostMultiMap>;
43
using HostMultiMapConstSharedPtr = std::shared_ptr<const HostMultiMap>;
44

            
45
class OriginalDstCluster;
46

            
47
// Handle object whose sole purpose is to ensure that the destructor of the inner OriginalDstCluster
48
// is called on the main thread.
49
class OriginalDstClusterHandle {
50
public:
51
  OriginalDstClusterHandle(std::shared_ptr<OriginalDstCluster> cluster)
52
54
      : cluster_(std::move(cluster)) {}
53
  ~OriginalDstClusterHandle();
54

            
55
private:
56
  std::shared_ptr<OriginalDstCluster> cluster_;
57
  friend class OriginalDstCluster;
58
};
59

            
60
using OriginalDstClusterHandleSharedPtr = std::shared_ptr<OriginalDstClusterHandle>;
61

            
62
/**
63
 * The OriginalDstCluster is a dynamic cluster that automatically adds hosts as needed based on the
64
 * original destination address of the downstream connection. These hosts are also automatically
65
 * cleaned up after they have not seen traffic for a configurable cleanup interval time
66
 * ("cleanup_interval_ms").
67
 */
68
class OriginalDstCluster : public ClusterImplBase {
69
public:
70
35
  ~OriginalDstCluster() override {
71
35
    ASSERT_IS_MAIN_OR_TEST_THREAD();
72
35
    cleanup_timer_->disableTimer();
73
35
  }
74

            
75
  // Upstream::Cluster
76
32
  InitializePhase initializePhase() const override { return InitializePhase::Primary; }
77

            
78
  /**
79
   * Special Load Balancer for Original Dst Cluster.
80
   *
81
   * Load balancer gets called with the downstream context which can be used to make sure the
82
   * Original Dst cluster has a Host for the original destination. Normally load balancers can't
83
   * modify clusters, but in this case we access a singleton OriginalDstCluster that we can ask to
84
   * add hosts on demand. Additions are synced with all other threads so that the host set in the
85
   * cluster remains (eventually) consistent. If multiple threads add a host to the same upstream
86
   * address then two distinct HostSharedPtr's (with the same upstream IP address) will be added,
87
   * and both of them will eventually time out.
88
   */
89
  class LoadBalancer : public Upstream::LoadBalancer {
90
  public:
91
    LoadBalancer(const OriginalDstClusterHandleSharedPtr& parent)
92
108
        : parent_(parent), http_header_name_(parent->cluster_->httpHeaderName()),
93
108
          metadata_key_(parent->cluster_->metadataKey()),
94
108
          port_override_(parent->cluster_->portOverride()),
95
108
          host_map_(parent->cluster_->getCurrentHostMap()) {}
96

            
97
    // Upstream::LoadBalancer
98
    HostSelectionResponse chooseHost(LoadBalancerContext* context) override;
99
    // Preconnecting is not implemented for OriginalDstCluster
100
1
    HostConstSharedPtr peekAnotherHost(LoadBalancerContext*) override { return nullptr; }
101
    // Pool selection not implemented for OriginalDstCluster
102
    absl::optional<Upstream::SelectedPoolAndConnection>
103
    selectExistingConnection(Upstream::LoadBalancerContext* /*context*/,
104
                             const Upstream::Host& /*host*/,
105
1
                             std::vector<uint8_t>& /*hash_key*/) override {
106
1
      return absl::nullopt;
107
1
    }
108
    // Lifetime tracking not implemented for OriginalDstCluster
109
1
    OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override {
110
1
      return {};
111
1
    }
112

            
113
    Network::Address::InstanceConstSharedPtr filterStateOverrideHost(LoadBalancerContext* context);
114
    Network::Address::InstanceConstSharedPtr requestOverrideHost(LoadBalancerContext* context);
115
    Network::Address::InstanceConstSharedPtr metadataOverrideHost(LoadBalancerContext* context);
116

            
117
  private:
118
    const OriginalDstClusterHandleSharedPtr parent_;
119
    // The optional original host provider that extracts the address from HTTP header map.
120
    const absl::optional<Http::LowerCaseString>& http_header_name_;
121
    const absl::optional<Config::MetadataKey>& metadata_key_;
122
    const absl::optional<uint32_t> port_override_;
123
    HostMultiMapConstSharedPtr host_map_;
124
  };
125

            
126
108
  const absl::optional<Http::LowerCaseString>& httpHeaderName() { return http_header_name_; }
127
108
  const absl::optional<Config::MetadataKey>& metadataKey() { return metadata_key_; }
128
108
  const absl::optional<uint32_t> portOverride() { return port_override_; }
129

            
130
protected:
131
  OriginalDstCluster(const envoy::config::cluster::v3::Cluster& config,
132
                     ClusterFactoryContext& context, absl::Status& creation_status);
133

            
134
private:
135
  friend class OriginalDstClusterFactory;
136
  friend class OriginalDstClusterTest;
137

            
138
  struct LoadBalancerFactory : public Upstream::LoadBalancerFactory {
139
16
    LoadBalancerFactory(const OriginalDstClusterHandleSharedPtr& cluster) : cluster_(cluster) {}
140

            
141
    // Upstream::LoadBalancerFactory
142
78
    Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams) override {
143
78
      return std::make_unique<LoadBalancer>(cluster_);
144
78
    }
145

            
146
    const OriginalDstClusterHandleSharedPtr cluster_;
147
  };
148

            
149
  struct ThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer {
150
35
    ThreadAwareLoadBalancer(const OriginalDstClusterHandleSharedPtr& cluster) : cluster_(cluster) {}
151

            
152
    // Upstream::ThreadAwareLoadBalancer
153
16
    Upstream::LoadBalancerFactorySharedPtr factory() override {
154
16
      return std::make_shared<LoadBalancerFactory>(cluster_);
155
16
    }
156
16
    absl::Status initialize() override { return absl::OkStatus(); }
157

            
158
    const OriginalDstClusterHandleSharedPtr cluster_;
159
  };
160

            
161
174
  HostMultiMapConstSharedPtr getCurrentHostMap() {
162
174
    absl::ReaderMutexLock lock(host_map_lock_);
163
174
    return host_map_;
164
174
  }
165

            
166
55
  void setHostMap(const HostMultiMapConstSharedPtr& new_host_map) {
167
55
    absl::WriterMutexLock lock(host_map_lock_);
168
55
    host_map_ = new_host_map;
169
55
  }
170

            
171
  void addHost(HostSharedPtr&);
172
  void cleanup();
173

            
174
  // ClusterImplBase
175
35
  void startPreInit() override { onPreInitComplete(); }
176

            
177
  Event::Dispatcher& dispatcher_;
178
  const std::chrono::milliseconds cleanup_interval_ms_;
179
  Event::TimerPtr cleanup_timer_;
180

            
181
  absl::Mutex host_map_lock_;
182
  HostMultiMapConstSharedPtr host_map_ ABSL_GUARDED_BY(host_map_lock_);
183
  absl::optional<Http::LowerCaseString> http_header_name_;
184
  absl::optional<Config::MetadataKey> metadata_key_;
185
  absl::optional<uint32_t> port_override_;
186
  friend class OriginalDstClusterFactory;
187
  friend class OriginalDstClusterHandle;
188
};
189

            
190
constexpr absl::string_view OriginalDstClusterFilterStateKey =
191
    "envoy.network.transport_socket.original_dst_address";
192

            
193
class OriginalDstClusterFactory : public ClusterFactoryImplBase {
194
public:
195
34
  OriginalDstClusterFactory() : ClusterFactoryImplBase("envoy.cluster.original_dst") {}
196

            
197
  absl::StatusOr<std::pair<ClusterImplBaseSharedPtr, ThreadAwareLoadBalancerPtr>>
198
  createClusterImpl(const envoy::config::cluster::v3::Cluster& cluster,
199
                    ClusterFactoryContext& context) override;
200

            
201
private:
202
  friend class OriginalDstClusterTest;
203
};
204

            
205
DECLARE_FACTORY(OriginalDstClusterFactory);
206

            
207
} // namespace Upstream
208
} // namespace Envoy