1
#pragma once
2

            
3
#include "envoy/config/cluster/v3/cluster.pb.h"
4
#include "envoy/config/endpoint/v3/endpoint_components.pb.h"
5
#include "envoy/extensions/clusters/dynamic_forward_proxy/v3/cluster.pb.h"
6
#include "envoy/extensions/clusters/dynamic_forward_proxy/v3/cluster.pb.validate.h"
7
#include "envoy/http/conn_pool.h"
8

            
9
#include "source/common/upstream/cluster_factory_impl.h"
10
#include "source/extensions/clusters/common/logical_host.h"
11
#include "source/extensions/common/dynamic_forward_proxy/cluster_store.h"
12
#include "source/extensions/common/dynamic_forward_proxy/dns_cache.h"
13

            
14
namespace Envoy {
15
namespace Extensions {
16
namespace Clusters {
17
namespace DynamicForwardProxy {
18

            
19
class ClusterFactory;
20
class ClusterTest;
21

            
22
class Cluster : public Upstream::BaseDynamicClusterImpl,
23
                public Extensions::Common::DynamicForwardProxy::DfpCluster,
24
                public Extensions::Common::DynamicForwardProxy::DnsCache::UpdateCallbacks {
25
public:
26
  ~Cluster() override;
27

            
28
  // Upstream::Cluster
29
208
  Upstream::Cluster::InitializePhase initializePhase() const override {
30
208
    return Upstream::Cluster::InitializePhase::Primary;
31
208
  }
32

            
33
  // Upstream::ClusterImplBase
34
  void startPreInit() override;
35

            
36
  // Extensions::Common::DynamicForwardProxy::DnsCache::UpdateCallbacks
37
  absl::Status onDnsHostAddOrUpdate(
38
      const std::string& host,
39
      const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info) override;
40
  void onDnsHostRemove(const std::string& host) override;
41
  void onDnsResolutionComplete(const std::string&,
42
                               const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr&,
43
79
                               Network::DnsResolver::ResolutionStatus) override {}
44

            
45
11
  bool allowCoalescedConnections() const { return allow_coalesced_connections_; }
46
174
  bool enableSubCluster() const override { return enable_sub_cluster_; }
47
  Upstream::HostSelectionResponse chooseHost(absl::string_view host,
48
                                             Upstream::LoadBalancerContext* context) const;
49

            
50
  // Extensions::Common::DynamicForwardProxy::DfpCluster
51
  std::pair<bool, absl::optional<envoy::config::cluster::v3::Cluster>>
52
  createSubClusterConfig(const std::string& cluster_name, const std::string& host,
53
                         const int port) override;
54
  bool touch(const std::string& cluster_name) override;
55
  void checkIdleSubCluster();
56
  Upstream::HostConstSharedPtr findHostByName(const std::string& host) const;
57

            
58
protected:
59
  Cluster(const envoy::config::cluster::v3::Cluster& cluster,
60
          Extensions::Common::DynamicForwardProxy::DnsCacheSharedPtr&& cacahe,
61
          const envoy::extensions::clusters::dynamic_forward_proxy::v3::ClusterConfig& config,
62
          Upstream::ClusterFactoryContext& context,
63
          Extensions::Common::DynamicForwardProxy::DnsCacheManagerSharedPtr&& cache_manager,
64
          absl::Status& creation_status);
65

            
66
private:
67
  friend class ClusterFactory;
68
  friend class ClusterTest;
69

            
70
  struct ClusterInfo {
71
    ClusterInfo(std::string cluster_name, Cluster& parent);
72
    void touch();
73
    bool checkIdle();
74

            
75
    std::string cluster_name_;
76
    Cluster& parent_;
77
    std::atomic<std::chrono::steady_clock::duration> last_used_time_;
78
  };
79

            
80
  using ClusterInfoMap = absl::flat_hash_map<std::string, std::shared_ptr<ClusterInfo>>;
81

            
82
  struct HostInfo {
83
    HostInfo(const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& shared_host_info,
84
             const Upstream::LogicalHostSharedPtr& logical_host)
85
76
        : shared_host_info_(shared_host_info), logical_host_(logical_host) {}
86

            
87
    const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr shared_host_info_;
88
    const Upstream::LogicalHostSharedPtr logical_host_;
89
  };
90

            
91
  using HostInfoMap = absl::flat_hash_map<std::string, HostInfo>;
92

            
93
  class LoadBalancer : public Upstream::LoadBalancer,
94
                       public Extensions::Common::DynamicForwardProxy::DfpLb,
95
                       public Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks {
96
  public:
97
392
    LoadBalancer(const Cluster& cluster) : cluster_(cluster) {}
98

            
99
    // DfpLb
100
    Upstream::HostConstSharedPtr findHostByName(const std::string& host) const override;
101
    // Upstream::LoadBalancer
102
    Upstream::HostSelectionResponse chooseHost(Upstream::LoadBalancerContext* context) override;
103
    // Preconnecting not implemented.
104
1
    Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override {
105
1
      return nullptr;
106
1
    }
107
    absl::optional<Upstream::SelectedPoolAndConnection>
108
    selectExistingConnection(Upstream::LoadBalancerContext* context, const Upstream::Host& host,
109
                             std::vector<uint8_t>& hash_key) override;
110
    OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override;
111

            
112
    // Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks
113
    void onConnectionOpen(Envoy::Http::ConnectionPool::Instance& pool,
114
                          std::vector<uint8_t>& hash_key,
115
                          const Network::Connection& connection) override;
116

            
117
    void onConnectionDraining(Envoy::Http::ConnectionPool::Instance& pool,
118
                              std::vector<uint8_t>& hash_key,
119
                              const Network::Connection& connection) override;
120

            
121
  private:
122
    struct ConnectionInfo {
123
      Envoy::Http::ConnectionPool::Instance* pool_; // Not a ref to allow assignment in remove().
124
      const Network::Connection* connection_;       // Not a ref to allow assignment in remove().
125
    };
126
    struct LookupKey {
127
      const std::vector<uint8_t> hash_key_;
128
      const Network::Address::Instance& peer_address_;
129
10
      bool operator==(const LookupKey& rhs) const {
130
10
        return std::tie(hash_key_, peer_address_) == std::tie(rhs.hash_key_, rhs.peer_address_);
131
10
      }
132
    };
133
    struct LookupKeyHash {
134
      size_t operator()(const LookupKey& lookup_key) const {
135
        return std::hash<std::string>{}(lookup_key.peer_address_.asString());
136
      }
137
    };
138

            
139
    absl::flat_hash_map<LookupKey, std::vector<ConnectionInfo>, LookupKeyHash> connection_info_map_;
140

            
141
    const Cluster& cluster_;
142
  };
143

            
144
  // This acts as the bridge for asynchronous host lookup. If the host is not
145
  // present in the DFP cluster, the DFPHostSelectionHandle will receive a onLoadDnsCacheComplete
146
  // call unless the LoadDnsCacheEntryHandlePtr is destroyed. Destruction of the
147
  // LoadDnsCacheEntryHandlePtr ensures that no callback will occur, at which
148
  // point it is safe to delete the DFPHostSelectionHandle.
149
  class DFPHostSelectionHandle
150
      : public Upstream::AsyncHostSelectionHandle,
151
        public Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryCallbacks {
152
  public:
153
    DFPHostSelectionHandle(Upstream::LoadBalancerContext* context, const Cluster& cluster,
154
                           std::string hostname)
155
75
        : context_(context), cluster_(cluster), hostname_(hostname) {};
156

            
157
2
    virtual void cancel() {
158
      // Cancels the DNS callback.
159
2
      handle_.reset();
160
2
    }
161

            
162
    virtual void
163
63
    onLoadDnsCacheComplete(const Common::DynamicForwardProxy::DnsHostInfoSharedPtr& info) {
164
63
      Upstream::HostConstSharedPtr host = cluster_.findHostByName(hostname_);
165
63
      std::string details = info->details();
166
63
      context_->onAsyncHostSelection(std::move(host), std::move(details));
167
63
    }
168

            
169
65
    void setHandle(Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryHandlePtr&& handle) {
170
65
      handle_ = std::move(handle);
171
65
    }
172
65
    void setAutoDec(Upstream::ResourceAutoIncDecPtr&& dec) { auto_dec_ = std::move(dec); }
173

            
174
  private:
175
    Upstream::LoadBalancerContext* context_;
176
    Common::DynamicForwardProxy::DnsCache::LoadDnsCacheEntryHandlePtr handle_;
177
    Upstream::ResourceAutoIncDecPtr auto_dec_;
178
    const Cluster& cluster_;
179
    std::string hostname_;
180
  };
181

            
182
  class LoadBalancerFactory : public Upstream::LoadBalancerFactory {
183
  public:
184
126
    LoadBalancerFactory(Cluster& cluster) : cluster_(cluster) {}
185

            
186
    // Upstream::LoadBalancerFactory
187
392
    Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams) override {
188
392
      return std::make_unique<LoadBalancer>(cluster_);
189
392
    }
190

            
191
  private:
192
    Cluster& cluster_;
193
  };
194

            
195
  class ThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer {
196
  public:
197
127
    ThreadAwareLoadBalancer(Cluster& cluster) : cluster_(cluster) {}
198

            
199
    // Upstream::ThreadAwareLoadBalancer
200
126
    Upstream::LoadBalancerFactorySharedPtr factory() override {
201
126
      return std::make_shared<LoadBalancerFactory>(cluster_);
202
126
    }
203
108
    absl::Status initialize() override { return absl::OkStatus(); }
204

            
205
  private:
206
    Cluster& cluster_;
207
  };
208

            
209
  absl::Status
210
  addOrUpdateHost(absl::string_view host,
211
                  const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info,
212
                  std::unique_ptr<Upstream::HostVector>& hosts_added)
213
      ABSL_LOCKS_EXCLUDED(host_map_lock_);
214

            
215
  void updatePriorityState(const Upstream::HostVector& hosts_added,
216
                           const Upstream::HostVector& hosts_removed)
217
      ABSL_LOCKS_EXCLUDED(host_map_lock_);
218

            
219
  const Extensions::Common::DynamicForwardProxy::DnsCacheManagerSharedPtr dns_cache_manager_;
220
  const Extensions::Common::DynamicForwardProxy::DnsCacheSharedPtr dns_cache_;
221
  const Extensions::Common::DynamicForwardProxy::DnsCache::AddUpdateCallbacksHandlePtr
222
      update_callbacks_handle_;
223
  const envoy::config::endpoint::v3::LocalityLbEndpoints dummy_locality_lb_endpoint_;
224
  const envoy::config::endpoint::v3::LbEndpoint dummy_lb_endpoint_;
225
  const LocalInfo::LocalInfo& local_info_;
226
  Event::Dispatcher& main_thread_dispatcher_;
227
  const envoy::config::cluster::v3::Cluster orig_cluster_config_;
228

            
229
  Event::TimerPtr idle_timer_;
230

            
231
  // True if H2 and H3 connections may be reused across different origins.
232
  const bool allow_coalesced_connections_;
233

            
234
  mutable absl::Mutex host_map_lock_;
235
  HostInfoMap host_map_ ABSL_GUARDED_BY(host_map_lock_);
236

            
237
  mutable absl::Mutex cluster_map_lock_;
238
  ClusterInfoMap cluster_map_ ABSL_GUARDED_BY(cluster_map_lock_);
239

            
240
  TimeSource& time_source_;
241
  Upstream::ClusterManager& cm_;
242
  const size_t max_sub_clusters_;
243
  const std::chrono::milliseconds sub_cluster_ttl_;
244
  const envoy::config::cluster::v3::Cluster_LbPolicy sub_cluster_lb_policy_;
245
  const bool enable_sub_cluster_;
246

            
247
  friend class ClusterFactory;
248
  friend class ClusterTest;
249
};
250

            
251
class ClusterFactory : public Upstream::ConfigurableClusterFactoryBase<
252
                           envoy::extensions::clusters::dynamic_forward_proxy::v3::ClusterConfig> {
253
public:
254
17
  ClusterFactory() : ConfigurableClusterFactoryBase("envoy.clusters.dynamic_forward_proxy") {}
255

            
256
private:
257
  absl::StatusOr<
258
      std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>>
259
  createClusterWithConfig(
260
      const envoy::config::cluster::v3::Cluster& cluster,
261
      const envoy::extensions::clusters::dynamic_forward_proxy::v3::ClusterConfig& proto_config,
262
      Upstream::ClusterFactoryContext& context) override;
263
};
264

            
265
DECLARE_FACTORY(ClusterFactory);
266

            
267
} // namespace DynamicForwardProxy
268
} // namespace Clusters
269
} // namespace Extensions
270
} // namespace Envoy