Line data Source code
1 : #pragma once 2 : 3 : #include "envoy/config/cluster/v3/cluster.pb.h" 4 : #include "envoy/config/endpoint/v3/endpoint_components.pb.h" 5 : #include "envoy/extensions/clusters/dynamic_forward_proxy/v3/cluster.pb.h" 6 : #include "envoy/extensions/clusters/dynamic_forward_proxy/v3/cluster.pb.validate.h" 7 : #include "envoy/http/conn_pool.h" 8 : 9 : #include "source/common/upstream/cluster_factory_impl.h" 10 : #include "source/extensions/clusters/common/logical_host.h" 11 : #include "source/extensions/common/dynamic_forward_proxy/cluster_store.h" 12 : #include "source/extensions/common/dynamic_forward_proxy/dns_cache.h" 13 : 14 : namespace Envoy { 15 : namespace Extensions { 16 : namespace Clusters { 17 : namespace DynamicForwardProxy { 18 : 19 : class ClusterFactory; 20 : class ClusterTest; 21 : 22 : class Cluster : public Upstream::BaseDynamicClusterImpl, 23 : public Extensions::Common::DynamicForwardProxy::DfpCluster, 24 : public Extensions::Common::DynamicForwardProxy::DnsCache::UpdateCallbacks { 25 : public: 26 : ~Cluster() override; 27 : 28 : // Upstream::Cluster 29 0 : Upstream::Cluster::InitializePhase initializePhase() const override { 30 0 : return Upstream::Cluster::InitializePhase::Primary; 31 0 : } 32 : 33 : // Upstream::ClusterImplBase 34 : void startPreInit() override; 35 : 36 : // Extensions::Common::DynamicForwardProxy::DnsCache::UpdateCallbacks 37 : void onDnsHostAddOrUpdate( 38 : const std::string& host, 39 : const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info) override; 40 : void onDnsHostRemove(const std::string& host) override; 41 : void onDnsResolutionComplete(const std::string&, 42 : const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr&, 43 0 : Network::DnsResolver::ResolutionStatus) override {} 44 : 45 0 : bool allowCoalescedConnections() const { return allow_coalesced_connections_; } 46 0 : bool enableSubCluster() const override { return enable_sub_cluster_; } 47 : Upstream::HostConstSharedPtr chooseHost(absl::string_view host, 48 : Upstream::LoadBalancerContext* context) const; 49 : std::pair<bool, absl::optional<envoy::config::cluster::v3::Cluster>> 50 : createSubClusterConfig(const std::string& cluster_name, const std::string& host, 51 : const int port) override; 52 : bool touch(const std::string& cluster_name) override; 53 : void checkIdleSubCluster(); 54 : 55 : private: 56 : friend class ClusterFactory; 57 : friend class ClusterTest; 58 : 59 : Cluster(const envoy::config::cluster::v3::Cluster& cluster, 60 : Extensions::Common::DynamicForwardProxy::DnsCacheSharedPtr&& cacahe, 61 : const envoy::extensions::clusters::dynamic_forward_proxy::v3::ClusterConfig& config, 62 : Upstream::ClusterFactoryContext& context, 63 : Extensions::Common::DynamicForwardProxy::DnsCacheManagerSharedPtr&& cache_manager); 64 : struct ClusterInfo { 65 : ClusterInfo(std::string cluster_name, Cluster& parent); 66 : void touch(); 67 : bool checkIdle(); 68 : 69 : std::string cluster_name_; 70 : Cluster& parent_; 71 : std::atomic<std::chrono::steady_clock::duration> last_used_time_; 72 : }; 73 : 74 : using ClusterInfoMap = absl::flat_hash_map<std::string, std::shared_ptr<ClusterInfo>>; 75 : 76 : struct HostInfo { 77 : HostInfo(const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& shared_host_info, 78 : const Upstream::LogicalHostSharedPtr& logical_host) 79 0 : : shared_host_info_(shared_host_info), logical_host_(logical_host) {} 80 : 81 : const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr shared_host_info_; 82 : const Upstream::LogicalHostSharedPtr logical_host_; 83 : }; 84 : 85 : using HostInfoMap = absl::flat_hash_map<std::string, HostInfo>; 86 : 87 : class LoadBalancer : public Upstream::LoadBalancer, 88 : public Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks { 89 : public: 90 0 : LoadBalancer(const Cluster& cluster) : cluster_(cluster) {} 91 : 92 : // Upstream::LoadBalancer 93 : Upstream::HostConstSharedPtr chooseHost(Upstream::LoadBalancerContext* context) override; 94 : // Preconnecting not implemented. 95 0 : Upstream::HostConstSharedPtr peekAnotherHost(Upstream::LoadBalancerContext*) override { 96 0 : return nullptr; 97 0 : } 98 : absl::optional<Upstream::SelectedPoolAndConnection> 99 : selectExistingConnection(Upstream::LoadBalancerContext* context, const Upstream::Host& host, 100 : std::vector<uint8_t>& hash_key) override; 101 : OptRef<Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks> lifetimeCallbacks() override; 102 : 103 : // Envoy::Http::ConnectionPool::ConnectionLifetimeCallbacks 104 : void onConnectionOpen(Envoy::Http::ConnectionPool::Instance& pool, 105 : std::vector<uint8_t>& hash_key, 106 : const Network::Connection& connection) override; 107 : 108 : void onConnectionDraining(Envoy::Http::ConnectionPool::Instance& pool, 109 : std::vector<uint8_t>& hash_key, 110 : const Network::Connection& connection) override; 111 : 112 : private: 113 : struct ConnectionInfo { 114 : Envoy::Http::ConnectionPool::Instance* pool_; // Not a ref to allow assignment in remove(). 115 : const Network::Connection* connection_; // Not a ref to allow assignment in remove(). 116 : }; 117 : struct LookupKey { 118 : const std::vector<uint8_t> hash_key_; 119 : const Network::Address::Instance& peer_address_; 120 0 : bool operator==(const LookupKey& rhs) const { 121 0 : return std::tie(hash_key_, peer_address_) == std::tie(rhs.hash_key_, rhs.peer_address_); 122 0 : } 123 : }; 124 : struct LookupKeyHash { 125 0 : size_t operator()(const LookupKey& lookup_key) const { 126 0 : return std::hash<std::string>{}(lookup_key.peer_address_.asString()); 127 0 : } 128 : }; 129 : 130 : absl::flat_hash_map<LookupKey, std::vector<ConnectionInfo>, LookupKeyHash> connection_info_map_; 131 : 132 : const Cluster& cluster_; 133 : }; 134 : 135 : class LoadBalancerFactory : public Upstream::LoadBalancerFactory { 136 : public: 137 0 : LoadBalancerFactory(Cluster& cluster) : cluster_(cluster) {} 138 : 139 : // Upstream::LoadBalancerFactory 140 0 : Upstream::LoadBalancerPtr create(Upstream::LoadBalancerParams) override { 141 0 : return std::make_unique<LoadBalancer>(cluster_); 142 0 : } 143 : 144 : private: 145 : Cluster& cluster_; 146 : }; 147 : 148 : class ThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer { 149 : public: 150 0 : ThreadAwareLoadBalancer(Cluster& cluster) : cluster_(cluster) {} 151 : 152 : // Upstream::ThreadAwareLoadBalancer 153 0 : Upstream::LoadBalancerFactorySharedPtr factory() override { 154 0 : return std::make_shared<LoadBalancerFactory>(cluster_); 155 0 : } 156 0 : void initialize() override {} 157 : 158 : private: 159 : Cluster& cluster_; 160 : }; 161 : 162 : void 163 : addOrUpdateHost(absl::string_view host, 164 : const Extensions::Common::DynamicForwardProxy::DnsHostInfoSharedPtr& host_info, 165 : std::unique_ptr<Upstream::HostVector>& hosts_added) 166 : ABSL_LOCKS_EXCLUDED(host_map_lock_); 167 : 168 : void updatePriorityState(const Upstream::HostVector& hosts_added, 169 : const Upstream::HostVector& hosts_removed) 170 : ABSL_LOCKS_EXCLUDED(host_map_lock_); 171 : 172 : const Extensions::Common::DynamicForwardProxy::DnsCacheManagerSharedPtr dns_cache_manager_; 173 : const Extensions::Common::DynamicForwardProxy::DnsCacheSharedPtr dns_cache_; 174 : const Extensions::Common::DynamicForwardProxy::DnsCache::AddUpdateCallbacksHandlePtr 175 : update_callbacks_handle_; 176 : const envoy::config::endpoint::v3::LocalityLbEndpoints dummy_locality_lb_endpoint_; 177 : const envoy::config::endpoint::v3::LbEndpoint dummy_lb_endpoint_; 178 : const LocalInfo::LocalInfo& local_info_; 179 : Event::Dispatcher& main_thread_dispatcher_; 180 : const envoy::config::cluster::v3::Cluster orig_cluster_config_; 181 : 182 : Event::TimerPtr idle_timer_; 183 : 184 : // True if H2 and H3 connections may be reused across different origins. 185 : const bool allow_coalesced_connections_; 186 : 187 : mutable absl::Mutex host_map_lock_; 188 : HostInfoMap host_map_ ABSL_GUARDED_BY(host_map_lock_); 189 : 190 : mutable absl::Mutex cluster_map_lock_; 191 : ClusterInfoMap cluster_map_ ABSL_GUARDED_BY(cluster_map_lock_); 192 : 193 : Upstream::ClusterManager& cm_; 194 : const size_t max_sub_clusters_; 195 : const std::chrono::milliseconds sub_cluster_ttl_; 196 : const envoy::config::cluster::v3::Cluster_LbPolicy sub_cluster_lb_policy_; 197 : const bool enable_sub_cluster_; 198 : 199 : friend class ClusterFactory; 200 : friend class ClusterTest; 201 : }; 202 : 203 : class ClusterFactory : public Upstream::ConfigurableClusterFactoryBase< 204 : envoy::extensions::clusters::dynamic_forward_proxy::v3::ClusterConfig> { 205 : public: 206 3 : ClusterFactory() : ConfigurableClusterFactoryBase("envoy.clusters.dynamic_forward_proxy") {} 207 : 208 : private: 209 : absl::StatusOr< 210 : std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>> 211 : createClusterWithConfig( 212 : const envoy::config::cluster::v3::Cluster& cluster, 213 : const envoy::extensions::clusters::dynamic_forward_proxy::v3::ClusterConfig& proto_config, 214 : Upstream::ClusterFactoryContext& context) override; 215 : }; 216 : 217 : DECLARE_FACTORY(ClusterFactory); 218 : 219 : } // namespace DynamicForwardProxy 220 : } // namespace Clusters 221 : } // namespace Extensions 222 : } // namespace Envoy