1
#pragma once
2

            
3
#include "envoy/api/api.h"
4
#include "envoy/common/random_generator.h"
5
#include "envoy/config/cluster/v3/cluster.pb.h"
6
#include "envoy/config/core/v3/address.pb.h"
7
#include "envoy/event/dispatcher.h"
8
#include "envoy/server/instance.h"
9
#include "envoy/server/transport_socket_config.h"
10
#include "envoy/service/health/v3/hds.pb.h"
11
#include "envoy/ssl/context_manager.h"
12
#include "envoy/stats/stats_macros.h"
13
#include "envoy/upstream/upstream.h"
14

            
15
#include "source/common/common/backoff_strategy.h"
16
#include "source/common/common/logger.h"
17
#include "source/common/common/macros.h"
18
#include "source/common/config/utility.h"
19
#include "source/common/grpc/async_client_impl.h"
20
#include "source/common/network/resolver_impl.h"
21
#include "source/common/upstream/health_checker_impl.h"
22
#include "source/common/upstream/locality_endpoint.h"
23
#include "source/common/upstream/prod_cluster_info_factory.h"
24
#include "source/common/upstream/upstream_impl.h"
25
#include "source/server/transport_socket_config_impl.h"
26

            
27
#include "absl/container/flat_hash_map.h"
28

            
29
namespace Envoy {
30
namespace Upstream {
31

            
32
using HostsMap = absl::flat_hash_map<LocalityEndpointTuple, HostSharedPtr, LocalityEndpointHash,
33
                                     LocalityEndpointEqualTo>;
34
using HealthCheckerMap =
35
    absl::flat_hash_map<envoy::config::core::v3::HealthCheck, Upstream::HealthCheckerSharedPtr,
36
                        HealthCheckerHash, HealthCheckerEqualTo>;
37

            
38
// TODO(lilika): Add HdsClusters to the /clusters endpoint to get detailed stats about each HC host.
39

            
40
/**
41
 * Implementation of Upstream::Cluster for hds clusters, clusters that are used
42
 * by HdsDelegates
43
 */
44
class HdsCluster : public Cluster, Logger::Loggable<Logger::Id::upstream> {
45
public:
46
  static ClusterSharedPtr create();
47
  HdsCluster(Server::Configuration::ServerFactoryContext& server_context,
48
             envoy::config::cluster::v3::Cluster cluster,
49
             const envoy::config::core::v3::BindConfig& bind_config, Stats::Store& stats,
50
             Ssl::ContextManager& ssl_context_manager, bool added_via_api,
51
             ClusterInfoFactory& info_factory, ThreadLocal::SlotAllocator& tls);
52

            
53
  // Upstream::Cluster
54
1
  InitializePhase initializePhase() const override { return InitializePhase::Primary; }
55
101
  PrioritySet& prioritySet() override { return priority_set_; }
56
156
  const PrioritySet& prioritySet() const override { return priority_set_; }
57
  void setOutlierDetector(const Outlier::DetectorSharedPtr& outlier_detector);
58
1
  HealthChecker* healthChecker() override { return health_checker_.get(); }
59
306
  ClusterInfoConstSharedPtr info() const override { return info_; }
60
1
  Outlier::Detector* outlierDetector() override { return outlier_detector_.get(); }
61
1
  const Outlier::Detector* outlierDetector() const override { return outlier_detector_.get(); }
62
  void initialize(std::function<absl::Status()> callback) override;
63
  // Compare changes in the cluster proto, and update parts of the cluster as needed.
64
  absl::Status update(envoy::config::cluster::v3::Cluster cluster,
65
                      const envoy::config::core::v3::BindConfig& bind_config,
66
                      ClusterInfoFactory& info_factory, ThreadLocal::SlotAllocator& tls);
67
  // Creates healthcheckers and adds them to the list, then does initial start.
68
  void initHealthchecks();
69

            
70
8
  std::vector<Upstream::HealthCheckerSharedPtr> healthCheckers() { return health_checkers_; };
71
5
  std::vector<HostSharedPtr> hosts() { return *hosts_; };
72
  UnitFloat dropOverload() const override { return drop_overload_; }
73
  const std::string& dropCategory() const override { return drop_category_; }
74
  void setDropOverload(UnitFloat) override {}
75
  void setDropCategory(absl::string_view) override {}
76

            
77
protected:
78
  PrioritySetImpl priority_set_;
79
  HealthCheckerSharedPtr health_checker_;
80
  Outlier::DetectorSharedPtr outlier_detector_;
81

            
82
private:
83
  std::function<absl::Status()> initialization_complete_callback_;
84

            
85
  Server::Configuration::ServerFactoryContext& server_context_;
86
  envoy::config::cluster::v3::Cluster cluster_;
87
  Stats::Store& stats_;
88
  Ssl::ContextManager& ssl_context_manager_;
89
  bool added_via_api_;
90
  bool initialized_ = false;
91
  uint64_t config_hash_;
92
  uint64_t socket_match_hash_;
93

            
94
  HostVectorSharedPtr hosts_;
95
  HostsPerLocalitySharedPtr hosts_per_locality_;
96
  HostsMap hosts_map_;
97
  ClusterInfoConstSharedPtr info_;
98
  std::vector<Upstream::HealthCheckerSharedPtr> health_checkers_;
99
  HealthCheckerMap health_checkers_map_;
100
  UnitFloat drop_overload_{0};
101
  const std::string drop_category_;
102

            
103
  absl::Status updateHealthchecks(
104
      const Protobuf::RepeatedPtrField<envoy::config::core::v3::HealthCheck>& health_checks);
105
  void
106
  updateHosts(const Protobuf::RepeatedPtrField<envoy::config::endpoint::v3::LocalityLbEndpoints>&
107
                  locality_endpoints,
108
              bool update_socket_matches);
109
};
110

            
111
using HdsClusterPtr = std::shared_ptr<HdsCluster>;
112

            
113
/**
114
 * All hds stats. @see stats_macros.h
115
 */
116
#define ALL_HDS_STATS(COUNTER)                                                                     \
117
63
  COUNTER(requests)                                                                                \
118
63
  COUNTER(responses)                                                                               \
119
63
  COUNTER(errors)                                                                                  \
120
63
  COUNTER(updates)
121

            
122
/**
123
 * Struct definition for all hds stats. @see stats_macros.h
124
 */
125
struct HdsDelegateStats {
126
  ALL_HDS_STATS(GENERATE_COUNTER_STRUCT)
127
};
128

            
129
// TODO(lilika): Add /config_dump support for HdsDelegate
130

            
131
/**
132
 * The HdsDelegate class is responsible for receiving requests from a management
133
 * server with a set of hosts to healthcheck, healthchecking them, and reporting
134
 * back the results.
135
 */
136
class HdsDelegate : Grpc::AsyncStreamCallbacks<envoy::service::health::v3::HealthCheckSpecifier>,
137
                    public Server::HdsDelegateApi {
138
public:
139
  HdsDelegate(Server::Configuration::ServerFactoryContext& server_context, Stats::Scope& scope,
140
              Grpc::RawAsyncClientPtr async_client, Envoy::Stats::Store& stats,
141
              Ssl::ContextManager& ssl_context_manager);
142

            
143
  // Grpc::AsyncStreamCallbacks
144
  void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override;
145
  void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) override;
146
  void onReceiveMessage(
147
      std::unique_ptr<envoy::service::health::v3::HealthCheckSpecifier>&& message) override;
148
  void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) override;
149
  void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override;
150
  envoy::service::health::v3::HealthCheckRequestOrEndpointHealthResponse sendResponse();
151

            
152
39
  std::vector<HdsClusterPtr> hdsClusters() { return hds_clusters_; };
153

            
154
private:
155
  friend class HdsDelegateFriend;
156

            
157
  void setHdsRetryTimer();
158
  void setHdsStreamResponseTimer();
159
  void handleFailure();
160
  // Establishes a connection with the management server
161
  void establishNewStream();
162
  absl::Status
163
  processMessage(std::unique_ptr<envoy::service::health::v3::HealthCheckSpecifier>&& message);
164
  envoy::config::cluster::v3::Cluster
165
  createClusterConfig(const envoy::service::health::v3::ClusterHealthCheck& cluster_health_check);
166
  absl::Status updateHdsCluster(HdsClusterPtr cluster,
167
                                const envoy::config::cluster::v3::Cluster& cluster_health_check,
168
                                const envoy::config::core::v3::BindConfig& bind_config);
169
  HdsClusterPtr createHdsCluster(const envoy::config::cluster::v3::Cluster& cluster_health_check,
170
                                 const envoy::config::core::v3::BindConfig& bind_config);
171
  HdsDelegateStats stats_;
172
  const Protobuf::MethodDescriptor& service_method_;
173

            
174
  Grpc::AsyncClient<envoy::service::health::v3::HealthCheckRequestOrEndpointHealthResponse,
175
                    envoy::service::health::v3::HealthCheckSpecifier>
176
      async_client_;
177
  Grpc::AsyncStream<envoy::service::health::v3::HealthCheckRequestOrEndpointHealthResponse>
178
      stream_{};
179
  Event::Dispatcher& dispatcher_;
180
  Server::Configuration::ServerFactoryContext& server_context_;
181
  Envoy::Stats::Store& store_stats_;
182
  Ssl::ContextManager& ssl_context_manager_;
183
  std::unique_ptr<ClusterInfoFactory> info_factory_;
184
  ThreadLocal::SlotAllocator& tls_;
185

            
186
  envoy::service::health::v3::HealthCheckRequestOrEndpointHealthResponse health_check_request_;
187
  uint64_t specifier_hash_{0};
188

            
189
  std::vector<std::string> clusters_;
190
  std::vector<HdsClusterPtr> hds_clusters_;
191
  absl::flat_hash_map<std::string, HdsClusterPtr> hds_clusters_name_map_;
192

            
193
  Event::TimerPtr hds_stream_response_timer_;
194
  Event::TimerPtr hds_retry_timer_;
195
  BackOffStrategyPtr backoff_strategy_;
196

            
197
  // Soft limit on size of the cluster’s connections read and write buffers.
198
  static constexpr uint32_t ClusterConnectionBufferLimitBytes = 32768;
199

            
200
  // TODO(lilika): Add API knob for ClusterTimeoutSeconds, instead of
201
  // hardcoding it.
202
  // The timeout for new network connections to hosts in the cluster.
203
  static constexpr uint32_t ClusterTimeoutSeconds = 1;
204

            
205
  // How often envoy reports the healthcheck results to the server
206
  uint32_t server_response_ms_ = 0;
207
};
208

            
209
using HdsDelegatePtr = std::unique_ptr<HdsDelegate>;
210

            
211
} // namespace Upstream
212
} // namespace Envoy