Line data Source code
1 : #pragma once
2 :
3 : #include <functional>
4 : #include <memory>
5 :
6 : #include "envoy/common/random_generator.h"
7 : #include "envoy/config/grpc_mux.h"
8 : #include "envoy/grpc/async_client.h"
9 :
10 : #include "source/common/common/backoff_strategy.h"
11 : #include "source/common/common/token_bucket_impl.h"
12 : #include "source/common/config/utility.h"
13 : #include "source/common/grpc/typed_async_client.h"
14 : #include "source/extensions/config_subscription/grpc/grpc_stream_interface.h"
15 :
16 : namespace Envoy {
17 : namespace Config {
18 :
19 : template <class ResponseProto> using ResponseProtoPtr = std::unique_ptr<ResponseProto>;
20 :
21 : // Oversees communication for gRPC xDS implementations (parent to both regular xDS and delta
22 : // xDS variants). Reestablishes the gRPC channel when necessary, and provides rate limiting of
23 : // requests.
24 : template <class RequestProto, class ResponseProto>
25 : class GrpcStream : public GrpcStreamInterface<RequestProto, ResponseProto>,
26 : public Logger::Loggable<Logger::Id::config> {
27 : public:
28 : GrpcStream(GrpcStreamCallbacks<ResponseProto>* callbacks, Grpc::RawAsyncClientPtr async_client,
29 : const Protobuf::MethodDescriptor& service_method, Event::Dispatcher& dispatcher,
30 : Stats::Scope& scope, BackOffStrategyPtr backoff_strategy,
31 : const RateLimitSettings& rate_limit_settings)
32 : : callbacks_(callbacks), async_client_(std::move(async_client)),
33 : service_method_(service_method),
34 : control_plane_stats_(Utility::generateControlPlaneStats(scope)),
35 : time_source_(dispatcher.timeSource()), backoff_strategy_(std::move(backoff_strategy)),
36 29 : rate_limiting_enabled_(rate_limit_settings.enabled_) {
37 29 : retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); });
38 29 : if (rate_limiting_enabled_) {
39 : // Default Bucket contains 100 tokens maximum and refills at 10 tokens/sec.
40 1 : limit_request_ = std::make_unique<TokenBucketImpl>(
41 1 : rate_limit_settings.max_tokens_, time_source_, rate_limit_settings.fill_rate_);
42 1 : drain_request_timer_ = dispatcher.createTimer([this]() {
43 0 : if (stream_ != nullptr) {
44 0 : callbacks_->onWriteable();
45 0 : }
46 0 : });
47 1 : }
48 29 : }
49 :
50 29 : void establishNewStream() override {
51 29 : ENVOY_LOG(debug, "Establishing new gRPC bidi stream to {} for {}", async_client_.destination(),
52 29 : service_method_.DebugString());
53 29 : if (stream_ != nullptr) {
54 0 : ENVOY_LOG(warn, "gRPC bidi stream to {} for {} already exists!", async_client_.destination(),
55 0 : service_method_.DebugString());
56 0 : return;
57 0 : }
58 29 : stream_ = async_client_->start(service_method_, *this, Http::AsyncClient::StreamOptions());
59 29 : if (stream_ == nullptr) {
60 0 : ENVOY_LOG(debug, "Unable to establish new stream to configuration server {}",
61 0 : async_client_.destination());
62 0 : callbacks_->onEstablishmentFailure();
63 0 : setRetryTimer();
64 0 : return;
65 0 : }
66 29 : control_plane_stats_.connected_state_.set(1);
67 29 : callbacks_->onStreamEstablished();
68 29 : }
69 :
70 510 : bool grpcStreamAvailable() const override { return stream_ != nullptr; }
71 :
72 360 : void sendMessage(const RequestProto& request) override { stream_->sendMessage(request, false); }
73 :
74 : // Grpc::AsyncStreamCallbacks
75 29 : void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override {
76 29 : UNREFERENCED_PARAMETER(metadata);
77 29 : }
78 :
79 28 : void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) override {
80 28 : UNREFERENCED_PARAMETER(metadata);
81 28 : }
82 :
83 222 : void onReceiveMessage(ResponseProtoPtr<ResponseProto>&& message) override {
84 : // Reset here so that it starts with fresh backoff interval on next disconnect.
85 222 : backoff_strategy_->reset();
86 : // Clear here instead of on stream establishment in case streams are immediately closed
87 : // repeatedly.
88 222 : clearCloseStatus();
89 : // Sometimes during hot restarts this stat's value becomes inconsistent and will continue to
90 : // have 0 until it is reconnected. Setting here ensures that it is consistent with the state of
91 : // management server connection.
92 222 : control_plane_stats_.connected_state_.set(1);
93 222 : callbacks_->onDiscoveryResponse(std::move(message), control_plane_stats_);
94 222 : }
95 :
96 28 : void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) override {
97 28 : UNREFERENCED_PARAMETER(metadata);
98 28 : }
99 :
100 28 : void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override {
101 28 : logClose(status, message);
102 28 : stream_ = nullptr;
103 28 : control_plane_stats_.connected_state_.set(0);
104 28 : callbacks_->onEstablishmentFailure();
105 28 : setRetryTimer();
106 28 : }
107 :
108 702 : void maybeUpdateQueueSizeStat(uint64_t size) override {
109 : // Although request_queue_.push() happens elsewhere, the only time the queue is non-transiently
110 : // non-empty is when it remains non-empty after a drain attempt. (The push() doesn't matter
111 : // because we always attempt this drain immediately after the push). Basically, a change in
112 : // queue length is not "meaningful" until it has persisted until here. We need the
113 : // if(>0 || used) to keep this stat from being wrongly marked interesting by a pointless set(0)
114 : // and needlessly taking up space. The first time we set(123), used becomes true, and so we will
115 : // subsequently always do the set (including set(0)).
116 702 : if (size > 0 || control_plane_stats_.pending_requests_.used()) {
117 2 : control_plane_stats_.pending_requests_.set(size);
118 2 : }
119 702 : }
120 :
121 362 : bool checkRateLimitAllowsDrain() override {
122 362 : if (!rate_limiting_enabled_ || limit_request_->consume(1, false)) {
123 360 : return true;
124 360 : }
125 2 : ASSERT(drain_request_timer_ != nullptr);
126 2 : control_plane_stats_.rate_limit_enforced_.inc();
127 : // Enable the drain request timer.
128 2 : if (!drain_request_timer_->enabled()) {
129 1 : drain_request_timer_->enableTimer(limit_request_->nextTokenAvailable());
130 1 : }
131 2 : return false;
132 362 : }
133 :
134 0 : absl::optional<Grpc::Status::GrpcStatus> getCloseStatusForTest() const override {
135 0 : return last_close_status_;
136 0 : }
137 :
138 : private:
139 28 : void setRetryTimer() {
140 28 : retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_->nextBackOffMs()));
141 28 : }
142 :
143 : // Log level should be reduced when the remote close failure is `Ok` or is retriable and has only
144 : // been occurring for a short amount of time.
145 28 : void logClose(Grpc::Status::GrpcStatus status, const std::string& message) {
146 28 : if (Grpc::Status::WellKnownGrpcStatus::Ok == status) {
147 0 : ENVOY_LOG(debug, "{} gRPC config stream to {} closed: {}, {}", service_method_.name(),
148 0 : async_client_.destination(), status, message);
149 0 : return;
150 0 : }
151 :
152 28 : if (!isNonRetriableFailure(status)) {
153 : // When the failure is considered non-retriable, warn.
154 28 : ENVOY_LOG(warn, "{} gRPC config stream to {} closed: {}, {}", service_method_.name(),
155 28 : async_client_.destination(), status, message);
156 28 : return;
157 28 : }
158 :
159 0 : if (!isCloseStatusSet()) {
160 : // For the first failure, record its occurrence and log at the debug level.
161 0 : ENVOY_LOG(debug, "{} gRPC config stream to {} closed: {}, {}", service_method_.name(),
162 0 : async_client_.destination(), status, message);
163 0 : setCloseStatus(status, message);
164 0 : return;
165 0 : }
166 :
167 0 : const auto duration_since_first_close = time_source_.monotonicTime() - last_close_time_;
168 0 : const uint64_t seconds_since_first_close =
169 0 : std::chrono::duration_cast<std::chrono::seconds>(duration_since_first_close).count();
170 0 : const Grpc::Status::GrpcStatus close_status = last_close_status_.value();
171 :
172 0 : if (status != close_status) {
173 : // This is a different failure. Warn on both statuses and remember the new one.
174 0 : ENVOY_LOG(warn,
175 0 : "{} gRPC config stream to {} closed: {}, {} (previously {}, {} since {}s ago)",
176 0 : service_method_.name(), async_client_.destination(), status, message, close_status,
177 0 : last_close_message_, seconds_since_first_close);
178 0 : setCloseStatus(status, message);
179 0 : return;
180 0 : }
181 :
182 : // #18508: The error message may have changed.
183 : // To reduce noise, do not update the last close time, or use the message to distinguish the
184 : // error in the previous condition.
185 0 : last_close_message_ = message;
186 :
187 0 : const uint64_t ms_since_first_close =
188 0 : std::chrono::duration_cast<std::chrono::milliseconds>(duration_since_first_close).count();
189 0 : if (backoff_strategy_->isOverTimeLimit(ms_since_first_close)) {
190 : // Warn if we are over the time limit.
191 0 : ENVOY_LOG(warn, "{} gRPC config stream to {} closed since {}s ago: {}, {}",
192 0 : service_method_.name(), async_client_.destination(), seconds_since_first_close,
193 0 : close_status, message);
194 0 : return;
195 0 : }
196 :
197 : // Failure is retriable and new enough to only log at the debug level.
198 0 : ENVOY_LOG(debug, "{} gRPC config stream to {} closed: {}, {}", service_method_.name(),
199 0 : async_client_.destination(), status, message);
200 0 : }
201 :
202 28 : bool isNonRetriableFailure(Grpc::Status::GrpcStatus status) {
203 : // Status codes from https://grpc.github.io/grpc/core/md_doc_statuscodes.html that potentially
204 : // indicate a high likelihood of success after retrying with backoff.
205 : //
206 : // - DeadlineExceeded may be from a latency spike
207 : // - ResourceExhausted may be from a rate limit with a short window or a transient period of too
208 : // many connections
209 : // - Unavailable is meant to be used for a transient downtime
210 28 : return Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded == status ||
211 28 : Grpc::Status::WellKnownGrpcStatus::ResourceExhausted == status ||
212 28 : Grpc::Status::WellKnownGrpcStatus::Unavailable == status;
213 28 : }
214 :
215 222 : void clearCloseStatus() { last_close_status_ = absl::nullopt; }
216 0 : bool isCloseStatusSet() { return last_close_status_.has_value(); }
217 :
218 0 : void setCloseStatus(Grpc::Status::GrpcStatus status, const std::string& message) {
219 0 : last_close_status_ = status;
220 0 : last_close_time_ = time_source_.monotonicTime();
221 0 : last_close_message_ = message;
222 0 : }
223 :
224 : GrpcStreamCallbacks<ResponseProto>* const callbacks_;
225 :
226 : Grpc::AsyncClient<RequestProto, ResponseProto> async_client_;
227 : Grpc::AsyncStream<RequestProto> stream_{};
228 : const Protobuf::MethodDescriptor& service_method_;
229 : ControlPlaneStats control_plane_stats_;
230 :
231 : // Reestablishes the gRPC channel when necessary, with some backoff politeness.
232 : Event::TimerPtr retry_timer_;
233 : TimeSource& time_source_;
234 : BackOffStrategyPtr backoff_strategy_;
235 :
236 : // Prevents the Envoy from making too many requests.
237 : TokenBucketPtr limit_request_;
238 : const bool rate_limiting_enabled_;
239 : Event::TimerPtr drain_request_timer_;
240 :
241 : // Records the initial message and timestamp of the most recent remote closes with the same
242 : // status.
243 : absl::optional<Grpc::Status::GrpcStatus> last_close_status_;
244 : std::string last_close_message_;
245 : MonotonicTime last_close_time_;
246 : };
247 :
248 : } // namespace Config
249 : } // namespace Envoy
|