1
#pragma once
2

            
3
#include <functional>
4
#include <memory>
5

            
6
#include "envoy/common/random_generator.h"
7
#include "envoy/config/grpc_mux.h"
8
#include "envoy/grpc/async_client.h"
9
#include "envoy/grpc/status.h"
10

            
11
#include "source/common/common/backoff_strategy.h"
12
#include "source/common/common/token_bucket_impl.h"
13
#include "source/common/config/utility.h"
14
#include "source/common/grpc/typed_async_client.h"
15
#include "source/extensions/config_subscription/grpc/grpc_stream_interface.h"
16

            
17
namespace Envoy {
18
namespace Config {
19

            
20
template <class ResponseProto> using ResponseProtoPtr = std::unique_ptr<ResponseProto>;
21

            
22
// Oversees communication for gRPC xDS implementations (parent to both regular xDS and delta
23
// xDS variants). Reestablishes the gRPC channel when necessary, and provides rate limiting of
24
// requests.
25
template <class RequestProto, class ResponseProto>
26
class GrpcStream : public GrpcStreamInterface<RequestProto, ResponseProto>,
27
                   public Logger::Loggable<Logger::Id::config> {
28
public:
29
  // The entry value corresponding to the grpc stream's configuration entry index.
30
  enum class ConnectedStateValue {
31
    // The first entry in the config corresponds to the primary xDS source.
32
    FirstEntry = 1,
33
    // The second entry in the config corresponds to the failover xDS source.
34
    SecondEntry
35
  };
36

            
37
  GrpcStream(GrpcStreamCallbacks<ResponseProto>* callbacks,
38
             Grpc::RawAsyncClientSharedPtr&& async_client,
39
             const Protobuf::MethodDescriptor& service_method, Event::Dispatcher& dispatcher,
40
             Stats::Scope& scope, BackOffStrategyPtr backoff_strategy,
41
             const RateLimitSettings& rate_limit_settings, ConnectedStateValue connected_state_val)
42
1976
      : callbacks_(callbacks), async_client_(std::move(async_client)),
43
1976
        service_method_(service_method),
44
1976
        control_plane_stats_(Utility::generateControlPlaneStats(scope)),
45
1976
        time_source_(dispatcher.timeSource()), backoff_strategy_(std::move(backoff_strategy)),
46
1976
        rate_limiting_enabled_(rate_limit_settings.enabled_),
47
1976
        connected_state_val_(connected_state_val) {
48
1980
    retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); });
49
1976
    if (rate_limiting_enabled_) {
50
      // Default Bucket contains 100 tokens maximum and refills at 10 tokens/sec.
51
12
      limit_request_ = std::make_unique<TokenBucketImpl>(
52
12
          rate_limit_settings.max_tokens_, time_source_, rate_limit_settings.fill_rate_);
53
12
      drain_request_timer_ = dispatcher.createTimer([this]() {
54
4
        if (stream_ != nullptr) {
55
4
          callbacks_->onWriteable();
56
4
        }
57
4
      });
58
12
    }
59
1976
  }
60

            
61
2397
  void establishNewStream() override {
62
2397
    stream_intentionally_closed_ = false;
63
2397
    ENVOY_LOG(debug, "Establishing new gRPC bidi stream to {} for {}", async_client_.destination(),
64
2397
              service_method_.DebugString());
65
2397
    if (stream_ != nullptr) {
66
1
      ENVOY_LOG(warn, "gRPC bidi stream to {} for {} already exists!", async_client_.destination(),
67
1
                service_method_.DebugString());
68
1
      return;
69
1
    }
70
2396
    stream_ = async_client_->start(service_method_, *this, Http::AsyncClient::StreamOptions());
71
2396
    if (stream_ == nullptr) {
72
78
      ENVOY_LOG(debug, "Unable to establish new stream to configuration server {}",
73
78
                async_client_.destination());
74
78
      callbacks_->onEstablishmentFailure(true);
75
78
      setRetryTimer();
76
78
      return;
77
78
    }
78
2318
    control_plane_stats_.connected_state_.set(static_cast<uint64_t>(connected_state_val_));
79
2318
    callbacks_->onStreamEstablished();
80
2318
  }
81

            
82
15293
  bool grpcStreamAvailable() const override { return stream_ != nullptr; }
83

            
84
9777
  void sendMessage(const RequestProto& request) override { stream_->sendMessage(request, false); }
85

            
86
  // Grpc::AsyncStreamCallbacks
87
2073
  void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override {
88
2073
    UNREFERENCED_PARAMETER(metadata);
89
2073
  }
90

            
91
1959
  void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) override {
92
1959
    UNREFERENCED_PARAMETER(metadata);
93
1959
  }
94

            
95
4870
  void onReceiveMessage(ResponseProtoPtr<ResponseProto>&& message) override {
96
    // Reset here so that it starts with fresh backoff interval on next disconnect.
97
4870
    backoff_strategy_->reset();
98
    // Clear here instead of on stream establishment in case streams are immediately closed
99
    // repeatedly.
100
4870
    clearCloseStatus();
101
    // Sometimes during hot restarts this stat's value becomes inconsistent and will continue to
102
    // have 0 until it is reconnected. Setting here ensures that it is consistent with the state of
103
    // management server connection.
104
4870
    control_plane_stats_.connected_state_.set(static_cast<uint64_t>(connected_state_val_));
105
4870
    callbacks_->onDiscoveryResponse(std::move(message), control_plane_stats_);
106
4870
  }
107

            
108
2023
  void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) override {
109
2023
    UNREFERENCED_PARAMETER(metadata);
110
2023
  }
111

            
112
2117
  void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override {
113
2117
    logClose(status, message);
114
2117
    stream_ = nullptr;
115
2117
    control_plane_stats_.connected_state_.set(0);
116
    // By default Envoy will reconnect to the same server, so pass true here.
117
    // This will be overridden by the mux-failover if Envoy will reconnect to a
118
    // different server.
119
2117
    callbacks_->onEstablishmentFailure(true);
120
    // Only retry the timer if not intentionally closed by Envoy.
121
2117
    if (!stream_intentionally_closed_) {
122
2008
      setRetryTimer();
123
2008
    }
124
2117
  }
125

            
126
15939
  void maybeUpdateQueueSizeStat(uint64_t size) override {
127
    // Although request_queue_.push() happens elsewhere, the only time the queue is non-transiently
128
    // non-empty is when it remains non-empty after a drain attempt. (The push() doesn't matter
129
    // because we always attempt this drain immediately after the push). Basically, a change in
130
    // queue length is not "meaningful" until it has persisted until here. We need the
131
    // if(>0 || used) to keep this stat from being wrongly marked interesting by a pointless set(0)
132
    // and needlessly taking up space. The first time we set(123), used becomes true, and so we will
133
    // subsequently always do the set (including set(0)).
134
15939
    if (size > 0 || control_plane_stats_.pending_requests_.used()) {
135
453
      control_plane_stats_.pending_requests_.set(size);
136
453
    }
137
15939
  }
138

            
139
9868
  bool checkRateLimitAllowsDrain() override {
140
9868
    if (!rate_limiting_enabled_ || limit_request_->consume(1, false)) {
141
9780
      return true;
142
9780
    }
143
88
    ASSERT(drain_request_timer_ != nullptr);
144
88
    control_plane_stats_.rate_limit_enforced_.inc();
145
    // Enable the drain request timer.
146
88
    if (!drain_request_timer_->enabled()) {
147
8
      drain_request_timer_->enableTimer(limit_request_->nextTokenAvailable());
148
8
    }
149
88
    return false;
150
9868
  }
151

            
152
133
  void closeStream() override {
153
133
    ENVOY_LOG_MISC(debug, "Intentionally closing the gRPC stream to {}",
154
133
                   async_client_.destination());
155
133
    retry_timer_->disableTimer();
156
133
    if (rate_limiting_enabled_) {
157
      drain_request_timer_->disableTimer();
158
    }
159
133
    control_plane_stats_.connected_state_.set(0);
160
133
    logClose(Grpc::Status::WellKnownGrpcStatus::Ok, "Envoy initiated close.");
161
133
    if (stream_ != nullptr) {
162
25
      stream_.resetStream();
163
25
      stream_ = nullptr;
164
25
    }
165
133
    stream_intentionally_closed_ = true;
166
133
  }
167

            
168
11
  absl::optional<Grpc::Status::GrpcStatus> getCloseStatusForTest() const {
169
11
    return last_close_status_;
170
11
  }
171

            
172
private:
173
2086
  void setRetryTimer() {
174
2086
    retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_->nextBackOffMs()));
175
2086
  }
176

            
177
  // Log level should be reduced when the remote close failure is `Ok` or is retriable and has only
178
  // been occurring for a short amount of time.
179
2250
  void logClose(Grpc::Status::GrpcStatus status, const std::string& message) {
180
2250
    if (Grpc::Status::WellKnownGrpcStatus::Ok == status) {
181
143
      ENVOY_LOG(debug, "{} gRPC config stream to {} closed: {}, {}", service_method_.name(),
182
143
                async_client_.destination(), status, message);
183
143
      return;
184
143
    }
185

            
186
2107
    if (!isNonRetriableFailure(status)) {
187
      // When the failure is considered non-retriable, warn.
188
1171
      ENVOY_LOG(warn, "{} gRPC config stream to {} closed: {}, {}", service_method_.name(),
189
1171
                async_client_.destination(), status, message);
190
1171
      return;
191
1171
    }
192

            
193
936
    if (!isCloseStatusSet()) {
194
      // For the first failure, record its occurrence and log at the debug level.
195
826
      ENVOY_LOG(debug, "{} gRPC config stream to {} closed: {}, {}", service_method_.name(),
196
826
                async_client_.destination(), status, message);
197
826
      setCloseStatus(status, message);
198
826
      return;
199
826
    }
200

            
201
110
    const auto duration_since_first_close = time_source_.monotonicTime() - last_close_time_;
202
110
    const uint64_t seconds_since_first_close =
203
110
        std::chrono::duration_cast<std::chrono::seconds>(duration_since_first_close).count();
204
110
    const Grpc::Status::GrpcStatus close_status = last_close_status_.value();
205

            
206
110
    if (status != close_status) {
207
      // This is a different failure. Warn on both statuses and remember the new one.
208
2
      ENVOY_LOG(warn,
209
2
                "{} gRPC config stream to {} closed: {}, {} (previously {}, {} since {}s ago)",
210
2
                service_method_.name(), async_client_.destination(), status, message, close_status,
211
2
                last_close_message_, seconds_since_first_close);
212
2
      setCloseStatus(status, message);
213
2
      return;
214
2
    }
215

            
216
    // #18508: The error message may have changed.
217
    // To reduce noise, do not update the last close time, or use the message to distinguish the
218
    // error in the previous condition.
219
108
    last_close_message_ = message;
220

            
221
108
    const uint64_t ms_since_first_close =
222
108
        std::chrono::duration_cast<std::chrono::milliseconds>(duration_since_first_close).count();
223
108
    if (backoff_strategy_->isOverTimeLimit(ms_since_first_close)) {
224
      // Warn if we are over the time limit.
225
46
      ENVOY_LOG(warn, "{} gRPC config stream to {} closed since {}s ago: {}, {}",
226
46
                service_method_.name(), async_client_.destination(), seconds_since_first_close,
227
46
                close_status, message);
228
46
      return;
229
46
    }
230

            
231
    // Failure is retriable and new enough to only log at the debug level.
232
62
    ENVOY_LOG(debug, "{} gRPC config stream to {} closed: {}, {}", service_method_.name(),
233
62
              async_client_.destination(), status, message);
234
62
  }
235

            
236
2107
  bool isNonRetriableFailure(Grpc::Status::GrpcStatus status) {
237
    // Status codes from https://grpc.github.io/grpc/core/md_doc_statuscodes.html that potentially
238
    // indicate a high likelihood of success after retrying with backoff.
239
    //
240
    // - DeadlineExceeded may be from a latency spike
241
    // - ResourceExhausted may be from a rate limit with a short window or a transient period of too
242
    //   many connections
243
    // - Unavailable is meant to be used for a transient downtime
244
2107
    return Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded == status ||
245
2107
           Grpc::Status::WellKnownGrpcStatus::ResourceExhausted == status ||
246
2107
           Grpc::Status::WellKnownGrpcStatus::Unavailable == status;
247
2107
  }
248

            
249
4870
  void clearCloseStatus() { last_close_status_ = absl::nullopt; }
250
936
  bool isCloseStatusSet() { return last_close_status_.has_value(); }
251

            
252
828
  void setCloseStatus(Grpc::Status::GrpcStatus status, const std::string& message) {
253
828
    last_close_status_ = status;
254
828
    last_close_time_ = time_source_.monotonicTime();
255
828
    last_close_message_ = message;
256
828
  }
257

            
258
  GrpcStreamCallbacks<ResponseProto>* const callbacks_;
259

            
260
  Grpc::AsyncClient<RequestProto, ResponseProto> async_client_;
261
  Grpc::AsyncStream<RequestProto> stream_{};
262
  const Protobuf::MethodDescriptor& service_method_;
263
  ControlPlaneStats control_plane_stats_;
264

            
265
  // Reestablishes the gRPC channel when necessary, with some backoff politeness.
266
  Event::TimerPtr retry_timer_;
267
  TimeSource& time_source_;
268
  BackOffStrategyPtr backoff_strategy_;
269

            
270
  // Prevents the Envoy from making too many requests.
271
  TokenBucketPtr limit_request_;
272
  const bool rate_limiting_enabled_;
273
  Event::TimerPtr drain_request_timer_;
274

            
275
  // A stream value to be set in the control_plane.connected_state gauge once
276
  // the gRPC-stream is establishing a connection or connected to the server.
277
  ConnectedStateValue connected_state_val_;
278

            
279
  // Records the initial message and timestamp of the most recent remote closes with the same
280
  // status.
281
  absl::optional<Grpc::Status::GrpcStatus> last_close_status_;
282
  std::string last_close_message_;
283
  MonotonicTime last_close_time_;
284

            
285
  bool stream_intentionally_closed_{false};
286
};
287

            
288
} // namespace Config
289
} // namespace Envoy