Line data Source code
1 : #pragma once 2 : 3 : #include "envoy/api/api.h" 4 : #include "envoy/common/random_generator.h" 5 : #include "envoy/config/cluster/v3/cluster.pb.h" 6 : #include "envoy/config/core/v3/address.pb.h" 7 : #include "envoy/event/dispatcher.h" 8 : #include "envoy/server/transport_socket_config.h" 9 : #include "envoy/service/health/v3/hds.pb.h" 10 : #include "envoy/ssl/context_manager.h" 11 : #include "envoy/stats/stats_macros.h" 12 : #include "envoy/upstream/upstream.h" 13 : 14 : #include "source/common/common/backoff_strategy.h" 15 : #include "source/common/common/logger.h" 16 : #include "source/common/common/macros.h" 17 : #include "source/common/config/utility.h" 18 : #include "source/common/grpc/async_client_impl.h" 19 : #include "source/common/network/resolver_impl.h" 20 : #include "source/common/upstream/health_checker_impl.h" 21 : #include "source/common/upstream/locality_endpoint.h" 22 : #include "source/common/upstream/upstream_impl.h" 23 : #include "source/server/transport_socket_config_impl.h" 24 : 25 : #include "absl/container/flat_hash_map.h" 26 : 27 : namespace Envoy { 28 : namespace Upstream { 29 : 30 : using HostsMap = absl::flat_hash_map<LocalityEndpointTuple, HostSharedPtr, LocalityEndpointHash, 31 : LocalityEndpointEqualTo>; 32 : using HealthCheckerMap = 33 : absl::flat_hash_map<envoy::config::core::v3::HealthCheck, Upstream::HealthCheckerSharedPtr, 34 : HealthCheckerHash, HealthCheckerEqualTo>; 35 : 36 : class ProdClusterInfoFactory : public ClusterInfoFactory, Logger::Loggable<Logger::Id::upstream> { 37 : public: 38 : ClusterInfoConstSharedPtr createClusterInfo(const CreateClusterInfoParams& params) override; 39 : }; 40 : 41 : // TODO(lilika): Add HdsClusters to the /clusters endpoint to get detailed stats about each HC host. 42 : 43 : /** 44 : * Implementation of Upstream::Cluster for hds clusters, clusters that are used 45 : * by HdsDelegates 46 : */ 47 : class HdsCluster : public Cluster, Logger::Loggable<Logger::Id::upstream> { 48 : public: 49 : static ClusterSharedPtr create(); 50 : HdsCluster(Server::Configuration::ServerFactoryContext& server_context, 51 : envoy::config::cluster::v3::Cluster cluster, 52 : const envoy::config::core::v3::BindConfig& bind_config, Stats::Store& stats, 53 : Ssl::ContextManager& ssl_context_manager, bool added_via_api, 54 : ClusterInfoFactory& info_factory, ThreadLocal::SlotAllocator& tls); 55 : 56 : // Upstream::Cluster 57 0 : InitializePhase initializePhase() const override { return InitializePhase::Primary; } 58 0 : PrioritySet& prioritySet() override { return priority_set_; } 59 0 : const PrioritySet& prioritySet() const override { return priority_set_; } 60 : void setOutlierDetector(const Outlier::DetectorSharedPtr& outlier_detector); 61 0 : HealthChecker* healthChecker() override { return health_checker_.get(); } 62 0 : ClusterInfoConstSharedPtr info() const override { return info_; } 63 0 : Outlier::Detector* outlierDetector() override { return outlier_detector_.get(); } 64 0 : const Outlier::Detector* outlierDetector() const override { return outlier_detector_.get(); } 65 : void initialize(std::function<void()> callback) override; 66 : // Compare changes in the cluster proto, and update parts of the cluster as needed. 67 : absl::Status update(envoy::config::cluster::v3::Cluster cluster, 68 : const envoy::config::core::v3::BindConfig& bind_config, 69 : ClusterInfoFactory& info_factory, ThreadLocal::SlotAllocator& tls); 70 : // Creates healthcheckers and adds them to the list, then does initial start. 71 : void initHealthchecks(); 72 : 73 0 : std::vector<Upstream::HealthCheckerSharedPtr> healthCheckers() { return health_checkers_; }; 74 0 : std::vector<HostSharedPtr> hosts() { return *hosts_; }; 75 0 : UnitFloat dropOverload() const override { return UnitFloat(0); } 76 0 : void setDropOverload(UnitFloat) override {} 77 : 78 : protected: 79 : PrioritySetImpl priority_set_; 80 : HealthCheckerSharedPtr health_checker_; 81 : Outlier::DetectorSharedPtr outlier_detector_; 82 : 83 : private: 84 : std::function<void()> initialization_complete_callback_; 85 : 86 : Server::Configuration::ServerFactoryContext& server_context_; 87 : envoy::config::cluster::v3::Cluster cluster_; 88 : Stats::Store& stats_; 89 : Ssl::ContextManager& ssl_context_manager_; 90 : bool added_via_api_; 91 : bool initialized_ = false; 92 : uint64_t config_hash_; 93 : uint64_t socket_match_hash_; 94 : 95 : HostVectorSharedPtr hosts_; 96 : HostsPerLocalitySharedPtr hosts_per_locality_; 97 : HostsMap hosts_map_; 98 : ClusterInfoConstSharedPtr info_; 99 : std::vector<Upstream::HealthCheckerSharedPtr> health_checkers_; 100 : HealthCheckerMap health_checkers_map_; 101 : TimeSource& time_source_; 102 : 103 : absl::Status updateHealthchecks( 104 : const Protobuf::RepeatedPtrField<envoy::config::core::v3::HealthCheck>& health_checks); 105 : void 106 : updateHosts(const Protobuf::RepeatedPtrField<envoy::config::endpoint::v3::LocalityLbEndpoints>& 107 : locality_endpoints, 108 : bool update_socket_matches); 109 : }; 110 : 111 : using HdsClusterPtr = std::shared_ptr<HdsCluster>; 112 : 113 : /** 114 : * All hds stats. @see stats_macros.h 115 : */ 116 : #define ALL_HDS_STATS(COUNTER) \ 117 : COUNTER(requests) \ 118 : COUNTER(responses) \ 119 : COUNTER(errors) \ 120 : COUNTER(updates) 121 : 122 : /** 123 : * Struct definition for all hds stats. @see stats_macros.h 124 : */ 125 : struct HdsDelegateStats { 126 : ALL_HDS_STATS(GENERATE_COUNTER_STRUCT) 127 : }; 128 : 129 : // TODO(lilika): Add /config_dump support for HdsDelegate 130 : 131 : /** 132 : * The HdsDelegate class is responsible for receiving requests from a management 133 : * server with a set of hosts to healthcheck, healthchecking them, and reporting 134 : * back the results. 135 : */ 136 : class HdsDelegate : Grpc::AsyncStreamCallbacks<envoy::service::health::v3::HealthCheckSpecifier>, 137 : Logger::Loggable<Logger::Id::upstream> { 138 : public: 139 : HdsDelegate(Server::Configuration::ServerFactoryContext& server_context, Stats::Scope& scope, 140 : Grpc::RawAsyncClientPtr async_client, Envoy::Stats::Store& stats, 141 : Ssl::ContextManager& ssl_context_manager, ClusterInfoFactory& info_factory); 142 : 143 : // Grpc::AsyncStreamCallbacks 144 : void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override; 145 : void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) override; 146 : void onReceiveMessage( 147 : std::unique_ptr<envoy::service::health::v3::HealthCheckSpecifier>&& message) override; 148 : void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) override; 149 : void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override; 150 : envoy::service::health::v3::HealthCheckRequestOrEndpointHealthResponse sendResponse(); 151 : 152 0 : std::vector<HdsClusterPtr> hdsClusters() { return hds_clusters_; }; 153 : 154 : private: 155 : friend class HdsDelegateFriend; 156 : 157 : void setHdsRetryTimer(); 158 : void setHdsStreamResponseTimer(); 159 : void handleFailure(); 160 : // Establishes a connection with the management server 161 : void establishNewStream(); 162 : absl::Status 163 : processMessage(std::unique_ptr<envoy::service::health::v3::HealthCheckSpecifier>&& message); 164 : envoy::config::cluster::v3::Cluster 165 : createClusterConfig(const envoy::service::health::v3::ClusterHealthCheck& cluster_health_check); 166 : absl::Status updateHdsCluster(HdsClusterPtr cluster, 167 : const envoy::config::cluster::v3::Cluster& cluster_health_check, 168 : const envoy::config::core::v3::BindConfig& bind_config); 169 : HdsClusterPtr createHdsCluster(const envoy::config::cluster::v3::Cluster& cluster_health_check, 170 : const envoy::config::core::v3::BindConfig& bind_config); 171 : HdsDelegateStats stats_; 172 : const Protobuf::MethodDescriptor& service_method_; 173 : 174 : Grpc::AsyncClient<envoy::service::health::v3::HealthCheckRequestOrEndpointHealthResponse, 175 : envoy::service::health::v3::HealthCheckSpecifier> 176 : async_client_; 177 : Grpc::AsyncStream<envoy::service::health::v3::HealthCheckRequestOrEndpointHealthResponse> 178 : stream_{}; 179 : Event::Dispatcher& dispatcher_; 180 : Server::Configuration::ServerFactoryContext& server_context_; 181 : Envoy::Stats::Store& store_stats_; 182 : Ssl::ContextManager& ssl_context_manager_; 183 : ClusterInfoFactory& info_factory_; 184 : ThreadLocal::SlotAllocator& tls_; 185 : 186 : envoy::service::health::v3::HealthCheckRequestOrEndpointHealthResponse health_check_request_; 187 : uint64_t specifier_hash_{0}; 188 : 189 : std::vector<std::string> clusters_; 190 : std::vector<HdsClusterPtr> hds_clusters_; 191 : absl::flat_hash_map<std::string, HdsClusterPtr> hds_clusters_name_map_; 192 : 193 : Event::TimerPtr hds_stream_response_timer_; 194 : Event::TimerPtr hds_retry_timer_; 195 : BackOffStrategyPtr backoff_strategy_; 196 : 197 : // Soft limit on size of the cluster’s connections read and write buffers. 198 : static constexpr uint32_t ClusterConnectionBufferLimitBytes = 32768; 199 : 200 : // TODO(lilika): Add API knob for ClusterTimeoutSeconds, instead of 201 : // hardcoding it. 202 : // The timeout for new network connections to hosts in the cluster. 203 : static constexpr uint32_t ClusterTimeoutSeconds = 1; 204 : 205 : // How often envoy reports the healthcheck results to the server 206 : uint32_t server_response_ms_ = 0; 207 : }; 208 : 209 : using HdsDelegatePtr = std::unique_ptr<HdsDelegate>; 210 : 211 : } // namespace Upstream 212 : } // namespace Envoy