1
#pragma once
2

            
3
#include <memory>
4

            
5
#include "envoy/common/backoff_strategy.h"
6
#include "envoy/common/exception.h"
7
#include "envoy/common/pure.h"
8
#include "envoy/config/custom_config_validators.h"
9
#include "envoy/config/eds_resources_cache.h"
10
#include "envoy/config/subscription.h"
11
#include "envoy/grpc/async_client.h"
12
#include "envoy/stats/stats_macros.h"
13
#include "envoy/upstream/load_stats_reporter.h"
14

            
15
#include "source/common/common/cleanup.h"
16
#include "source/common/protobuf/protobuf.h"
17

            
18
namespace Envoy {
19
namespace Config {
20

            
21
using ScopedResume = std::unique_ptr<Cleanup>;
22
/**
23
 * All control plane related stats. @see stats_macros.h
24
 */
25
#define ALL_CONTROL_PLANE_STATS(COUNTER, GAUGE, TEXT_READOUT)                                      \
26
2208
  COUNTER(rate_limit_enforced)                                                                     \
27
2208
  GAUGE(connected_state, NeverImport)                                                              \
28
2208
  GAUGE(pending_requests, Accumulate)                                                              \
29
2208
  TEXT_READOUT(identifier)
30

            
31
/**
32
 * Struct definition for all control plane stats. @see stats_macros.h
33
 */
34
struct ControlPlaneStats {
35
  ALL_CONTROL_PLANE_STATS(GENERATE_COUNTER_STRUCT, GENERATE_GAUGE_STRUCT,
36
                          GENERATE_TEXT_READOUT_STRUCT)
37
};
38

            
39
/**
40
 * Handle on a muxed gRPC subscription. The subscription is canceled on destruction.
41
 */
42
class GrpcMuxWatch {
43
public:
44
2813
  virtual ~GrpcMuxWatch() = default;
45

            
46
  /**
47
   * Updates the set of resources that the watch is interested in.
48
   * @param resources set of resource names to watch for
49
   */
50
  virtual void update(const absl::flat_hash_set<std::string>& resources) PURE;
51
};
52

            
53
using GrpcMuxWatchPtr = std::unique_ptr<GrpcMuxWatch>;
54

            
55
/**
56
 * Manage one or more gRPC subscriptions on a single stream to management server. This can be used
57
 * for a single xDS API, e.g. EDS, or to combined multiple xDS APIs for ADS.
58
 */
59
class GrpcMux {
60
public:
61
12733
  virtual ~GrpcMux() = default;
62

            
63
  /**
64
   * Initiate stream with management server.
65
   */
66
  virtual void start() PURE;
67

            
68
  /**
69
   * Pause discovery requests for a given API type. This is useful when we're processing an update
70
   * for LDS or CDS and don't want a flood of updates for RDS or EDS respectively. Discovery
71
   * requests may later be resumed with resume().
72
   * @param type_url type URL corresponding to xDS API, e.g.
73
   * type.googleapis.com/envoy.api.v2.Cluster.
74
   *
75
   * @return a ScopedResume object, which when destructed, resumes the paused discovery requests.
76
   * A discovery request will be sent if one would have been sent during the pause.
77
   */
78
  ABSL_MUST_USE_RESULT virtual ScopedResume pause(const std::string& type_url) PURE;
79

            
80
  /**
81
   * Pause discovery requests for given API types. This is useful when we're processing an update
82
   * for LDS or CDS and don't want a flood of updates for RDS or EDS respectively. Discovery
83
   * requests may later be resumed with resume().
84
   * @param type_urls type URLs corresponding to xDS API, e.g.
85
   * type.googleapis.com/envoy.api.v2.Cluster.
86
   *
87
   * @return a ScopedResume object, which when destructed, resumes the paused discovery requests.
88
   * A discovery request will be sent if one would have been sent during the pause.
89
   */
90
  ABSL_MUST_USE_RESULT virtual ScopedResume pause(const std::vector<std::string> type_urls) PURE;
91

            
92
  /**
93
   * Start a configuration subscription asynchronously for some API type and resources.
94
   * @param type_url type URL corresponding to xDS API, e.g.
95
   * type.googleapis.com/envoy.api.v2.Cluster.
96
   * @param resources set of resource names to watch for. If this is empty, then all
97
   *                  resources for type_url will result in callbacks.
98
   * @param callbacks the callbacks to be notified of configuration updates. These must be valid
99
   *                  until GrpcMuxWatch is destroyed.
100
   * @param resource_decoder how incoming opaque resource objects are to be decoded.
101
   * @param options subscription options.
102
   * @return GrpcMuxWatchPtr a handle to cancel the subscription with. E.g. when a cluster goes
103
   * away, its EDS updates should be cancelled by destroying the GrpcMuxWatchPtr.
104
   */
105
  virtual GrpcMuxWatchPtr addWatch(const std::string& type_url,
106
                                   const absl::flat_hash_set<std::string>& resources,
107
                                   SubscriptionCallbacks& callbacks,
108
                                   OpaqueResourceDecoderSharedPtr resource_decoder,
109
                                   const SubscriptionOptions& options) PURE;
110

            
111
  virtual void requestOnDemandUpdate(const std::string& type_url,
112
                                     const absl::flat_hash_set<std::string>& for_update) PURE;
113

            
114
  /**
115
   * Returns an EdsResourcesCache for this GrpcMux if there is one.
116
   * @return EdsResourcesCacheOptRef optional eds resources cache for the gRPC-mux.
117
   */
118
  virtual EdsResourcesCacheOptRef edsResourcesCache() PURE;
119

            
120
  /**
121
   * Updates the current gRPC-Mux object to use a new gRPC client, and config.
122
   */
123
  virtual absl::Status
124
  updateMuxSource(Grpc::RawAsyncClientSharedPtr&& primary_async_client,
125
                  Grpc::RawAsyncClientSharedPtr&& failover_async_client, Stats::Scope& scope,
126
                  BackOffStrategyPtr&& backoff_strategy,
127
                  const envoy::config::core::v3::ApiConfigSource& ads_config_source) PURE;
128

            
129
  /**
130
   * Returns a load-stats-reporter that was created for the gRPC-Mux.
131
   * Returns nullptr if a load-stats-reporter wasn't created for the gRPC-Mux.
132
   */
133
  virtual Upstream::LoadStatsReporter* loadStatsReporter() const PURE;
134

            
135
  /**
136
   * Returns a load-stats-reporter if it was previously created for the
137
   * gRPC-Mux, or creates one and returns it. Enables lazy-initialization of the
138
   * load-stats-reporter.
139
   */
140
  virtual Upstream::LoadStatsReporter* maybeCreateLoadStatsReporter() PURE;
141
};
142

            
143
using GrpcMuxPtr = std::unique_ptr<GrpcMux>;
144
using GrpcMuxSharedPtr = std::shared_ptr<GrpcMux>;
145

            
146
template <class ResponseProto> using ResponseProtoPtr = std::unique_ptr<ResponseProto>;
147
/**
148
 * A grouping of callbacks that a GrpcMux should provide to its GrpcStream.
149
 */
150
template <class ResponseProto> class GrpcStreamCallbacks {
151
public:
152
12840
  virtual ~GrpcStreamCallbacks() = default;
153

            
154
  /**
155
   * For the GrpcStream to prompt the context to take appropriate action in response to the
156
   * gRPC stream having been successfully established.
157
   */
158
  virtual void onStreamEstablished() PURE;
159

            
160
  /**
161
   * For the GrpcStream to prompt the context to take appropriate action in response to
162
   * failure to establish the gRPC stream.
163
   * @param next_attempt_may_send_initial_resource_version a flag indicating whether the
164
   *        next reconnection attempt will be to the same source that was previously successful
165
   *        or not (used to pass primary/failover reconnection information to the GrpcMux).
166
   */
167
  virtual void onEstablishmentFailure(bool next_attempt_may_send_initial_resource_version) PURE;
168

            
169
  /**
170
   * For the GrpcStream to pass received protos to the context.
171
   */
172
  virtual void onDiscoveryResponse(ResponseProtoPtr<ResponseProto>&& message,
173
                                   ControlPlaneStats& control_plane_stats) PURE;
174

            
175
  /**
176
   * For the GrpcStream to call when its rate limiting logic allows more requests to be sent.
177
   */
178
  virtual void onWriteable() PURE;
179
};
180

            
181
} // namespace Config
182
} // namespace Envoy