LCOV - code coverage report
Current view: top level - source/extensions/config_subscription/grpc - grpc_stream.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 73 130 56.2 %
Date: 2024-01-05 06:35:25 Functions: 30 40 75.0 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include <functional>
       4             : #include <memory>
       5             : 
       6             : #include "envoy/common/random_generator.h"
       7             : #include "envoy/config/grpc_mux.h"
       8             : #include "envoy/grpc/async_client.h"
       9             : 
      10             : #include "source/common/common/backoff_strategy.h"
      11             : #include "source/common/common/token_bucket_impl.h"
      12             : #include "source/common/config/utility.h"
      13             : #include "source/common/grpc/typed_async_client.h"
      14             : #include "source/extensions/config_subscription/grpc/grpc_stream_interface.h"
      15             : 
      16             : namespace Envoy {
      17             : namespace Config {
      18             : 
      19             : template <class ResponseProto> using ResponseProtoPtr = std::unique_ptr<ResponseProto>;
      20             : 
      21             : // Oversees communication for gRPC xDS implementations (parent to both regular xDS and delta
      22             : // xDS variants). Reestablishes the gRPC channel when necessary, and provides rate limiting of
      23             : // requests.
      24             : template <class RequestProto, class ResponseProto>
      25             : class GrpcStream : public GrpcStreamInterface<RequestProto, ResponseProto>,
      26             :                    public Logger::Loggable<Logger::Id::config> {
      27             : public:
      28             :   GrpcStream(GrpcStreamCallbacks<ResponseProto>* callbacks, Grpc::RawAsyncClientPtr async_client,
      29             :              const Protobuf::MethodDescriptor& service_method, Event::Dispatcher& dispatcher,
      30             :              Stats::Scope& scope, BackOffStrategyPtr backoff_strategy,
      31             :              const RateLimitSettings& rate_limit_settings)
      32             :       : callbacks_(callbacks), async_client_(std::move(async_client)),
      33             :         service_method_(service_method),
      34             :         control_plane_stats_(Utility::generateControlPlaneStats(scope)),
      35             :         time_source_(dispatcher.timeSource()), backoff_strategy_(std::move(backoff_strategy)),
      36          29 :         rate_limiting_enabled_(rate_limit_settings.enabled_) {
      37          29 :     retry_timer_ = dispatcher.createTimer([this]() -> void { establishNewStream(); });
      38          29 :     if (rate_limiting_enabled_) {
      39             :       // Default Bucket contains 100 tokens maximum and refills at 10 tokens/sec.
      40           1 :       limit_request_ = std::make_unique<TokenBucketImpl>(
      41           1 :           rate_limit_settings.max_tokens_, time_source_, rate_limit_settings.fill_rate_);
      42           1 :       drain_request_timer_ = dispatcher.createTimer([this]() {
      43           0 :         if (stream_ != nullptr) {
      44           0 :           callbacks_->onWriteable();
      45           0 :         }
      46           0 :       });
      47           1 :     }
      48          29 :   }
      49             : 
      50          29 :   void establishNewStream() override {
      51          29 :     ENVOY_LOG(debug, "Establishing new gRPC bidi stream to {} for {}", async_client_.destination(),
      52          29 :               service_method_.DebugString());
      53          29 :     if (stream_ != nullptr) {
      54           0 :       ENVOY_LOG(warn, "gRPC bidi stream to {} for {} already exists!", async_client_.destination(),
      55           0 :                 service_method_.DebugString());
      56           0 :       return;
      57           0 :     }
      58          29 :     stream_ = async_client_->start(service_method_, *this, Http::AsyncClient::StreamOptions());
      59          29 :     if (stream_ == nullptr) {
      60           0 :       ENVOY_LOG(debug, "Unable to establish new stream to configuration server {}",
      61           0 :                 async_client_.destination());
      62           0 :       callbacks_->onEstablishmentFailure();
      63           0 :       setRetryTimer();
      64           0 :       return;
      65           0 :     }
      66          29 :     control_plane_stats_.connected_state_.set(1);
      67          29 :     callbacks_->onStreamEstablished();
      68          29 :   }
      69             : 
      70         510 :   bool grpcStreamAvailable() const override { return stream_ != nullptr; }
      71             : 
      72         360 :   void sendMessage(const RequestProto& request) override { stream_->sendMessage(request, false); }
      73             : 
      74             :   // Grpc::AsyncStreamCallbacks
      75          29 :   void onCreateInitialMetadata(Http::RequestHeaderMap& metadata) override {
      76          29 :     UNREFERENCED_PARAMETER(metadata);
      77          29 :   }
      78             : 
      79          28 :   void onReceiveInitialMetadata(Http::ResponseHeaderMapPtr&& metadata) override {
      80          28 :     UNREFERENCED_PARAMETER(metadata);
      81          28 :   }
      82             : 
      83         222 :   void onReceiveMessage(ResponseProtoPtr<ResponseProto>&& message) override {
      84             :     // Reset here so that it starts with fresh backoff interval on next disconnect.
      85         222 :     backoff_strategy_->reset();
      86             :     // Clear here instead of on stream establishment in case streams are immediately closed
      87             :     // repeatedly.
      88         222 :     clearCloseStatus();
      89             :     // Sometimes during hot restarts this stat's value becomes inconsistent and will continue to
      90             :     // have 0 until it is reconnected. Setting here ensures that it is consistent with the state of
      91             :     // management server connection.
      92         222 :     control_plane_stats_.connected_state_.set(1);
      93         222 :     callbacks_->onDiscoveryResponse(std::move(message), control_plane_stats_);
      94         222 :   }
      95             : 
      96          28 :   void onReceiveTrailingMetadata(Http::ResponseTrailerMapPtr&& metadata) override {
      97          28 :     UNREFERENCED_PARAMETER(metadata);
      98          28 :   }
      99             : 
     100          28 :   void onRemoteClose(Grpc::Status::GrpcStatus status, const std::string& message) override {
     101          28 :     logClose(status, message);
     102          28 :     stream_ = nullptr;
     103          28 :     control_plane_stats_.connected_state_.set(0);
     104          28 :     callbacks_->onEstablishmentFailure();
     105          28 :     setRetryTimer();
     106          28 :   }
     107             : 
     108         702 :   void maybeUpdateQueueSizeStat(uint64_t size) override {
     109             :     // Although request_queue_.push() happens elsewhere, the only time the queue is non-transiently
     110             :     // non-empty is when it remains non-empty after a drain attempt. (The push() doesn't matter
     111             :     // because we always attempt this drain immediately after the push). Basically, a change in
     112             :     // queue length is not "meaningful" until it has persisted until here. We need the
     113             :     // if(>0 || used) to keep this stat from being wrongly marked interesting by a pointless set(0)
     114             :     // and needlessly taking up space. The first time we set(123), used becomes true, and so we will
     115             :     // subsequently always do the set (including set(0)).
     116         702 :     if (size > 0 || control_plane_stats_.pending_requests_.used()) {
     117           2 :       control_plane_stats_.pending_requests_.set(size);
     118           2 :     }
     119         702 :   }
     120             : 
     121         362 :   bool checkRateLimitAllowsDrain() override {
     122         362 :     if (!rate_limiting_enabled_ || limit_request_->consume(1, false)) {
     123         360 :       return true;
     124         360 :     }
     125           2 :     ASSERT(drain_request_timer_ != nullptr);
     126           2 :     control_plane_stats_.rate_limit_enforced_.inc();
     127             :     // Enable the drain request timer.
     128           2 :     if (!drain_request_timer_->enabled()) {
     129           1 :       drain_request_timer_->enableTimer(limit_request_->nextTokenAvailable());
     130           1 :     }
     131           2 :     return false;
     132         362 :   }
     133             : 
     134           0 :   absl::optional<Grpc::Status::GrpcStatus> getCloseStatusForTest() const override {
     135           0 :     return last_close_status_;
     136           0 :   }
     137             : 
     138             : private:
     139          28 :   void setRetryTimer() {
     140          28 :     retry_timer_->enableTimer(std::chrono::milliseconds(backoff_strategy_->nextBackOffMs()));
     141          28 :   }
     142             : 
     143             :   // Log level should be reduced when the remote close failure is `Ok` or is retriable and has only
     144             :   // been occurring for a short amount of time.
     145          28 :   void logClose(Grpc::Status::GrpcStatus status, const std::string& message) {
     146          28 :     if (Grpc::Status::WellKnownGrpcStatus::Ok == status) {
     147           0 :       ENVOY_LOG(debug, "{} gRPC config stream to {} closed: {}, {}", service_method_.name(),
     148           0 :                 async_client_.destination(), status, message);
     149           0 :       return;
     150           0 :     }
     151             : 
     152          28 :     if (!isNonRetriableFailure(status)) {
     153             :       // When the failure is considered non-retriable, warn.
     154          28 :       ENVOY_LOG(warn, "{} gRPC config stream to {} closed: {}, {}", service_method_.name(),
     155          28 :                 async_client_.destination(), status, message);
     156          28 :       return;
     157          28 :     }
     158             : 
     159           0 :     if (!isCloseStatusSet()) {
     160             :       // For the first failure, record its occurrence and log at the debug level.
     161           0 :       ENVOY_LOG(debug, "{} gRPC config stream to {} closed: {}, {}", service_method_.name(),
     162           0 :                 async_client_.destination(), status, message);
     163           0 :       setCloseStatus(status, message);
     164           0 :       return;
     165           0 :     }
     166             : 
     167           0 :     const auto duration_since_first_close = time_source_.monotonicTime() - last_close_time_;
     168           0 :     const uint64_t seconds_since_first_close =
     169           0 :         std::chrono::duration_cast<std::chrono::seconds>(duration_since_first_close).count();
     170           0 :     const Grpc::Status::GrpcStatus close_status = last_close_status_.value();
     171             : 
     172           0 :     if (status != close_status) {
     173             :       // This is a different failure. Warn on both statuses and remember the new one.
     174           0 :       ENVOY_LOG(warn,
     175           0 :                 "{} gRPC config stream to {} closed: {}, {} (previously {}, {} since {}s ago)",
     176           0 :                 service_method_.name(), async_client_.destination(), status, message, close_status,
     177           0 :                 last_close_message_, seconds_since_first_close);
     178           0 :       setCloseStatus(status, message);
     179           0 :       return;
     180           0 :     }
     181             : 
     182             :     // #18508: The error message may have changed.
     183             :     // To reduce noise, do not update the last close time, or use the message to distinguish the
     184             :     // error in the previous condition.
     185           0 :     last_close_message_ = message;
     186             : 
     187           0 :     const uint64_t ms_since_first_close =
     188           0 :         std::chrono::duration_cast<std::chrono::milliseconds>(duration_since_first_close).count();
     189           0 :     if (backoff_strategy_->isOverTimeLimit(ms_since_first_close)) {
     190             :       // Warn if we are over the time limit.
     191           0 :       ENVOY_LOG(warn, "{} gRPC config stream to {} closed since {}s ago: {}, {}",
     192           0 :                 service_method_.name(), async_client_.destination(), seconds_since_first_close,
     193           0 :                 close_status, message);
     194           0 :       return;
     195           0 :     }
     196             : 
     197             :     // Failure is retriable and new enough to only log at the debug level.
     198           0 :     ENVOY_LOG(debug, "{} gRPC config stream to {} closed: {}, {}", service_method_.name(),
     199           0 :               async_client_.destination(), status, message);
     200           0 :   }
     201             : 
     202          28 :   bool isNonRetriableFailure(Grpc::Status::GrpcStatus status) {
     203             :     // Status codes from https://grpc.github.io/grpc/core/md_doc_statuscodes.html that potentially
     204             :     // indicate a high likelihood of success after retrying with backoff.
     205             :     //
     206             :     // - DeadlineExceeded may be from a latency spike
     207             :     // - ResourceExhausted may be from a rate limit with a short window or a transient period of too
     208             :     //   many connections
     209             :     // - Unavailable is meant to be used for a transient downtime
     210          28 :     return Grpc::Status::WellKnownGrpcStatus::DeadlineExceeded == status ||
     211          28 :            Grpc::Status::WellKnownGrpcStatus::ResourceExhausted == status ||
     212          28 :            Grpc::Status::WellKnownGrpcStatus::Unavailable == status;
     213          28 :   }
     214             : 
     215         222 :   void clearCloseStatus() { last_close_status_ = absl::nullopt; }
     216           0 :   bool isCloseStatusSet() { return last_close_status_.has_value(); }
     217             : 
     218           0 :   void setCloseStatus(Grpc::Status::GrpcStatus status, const std::string& message) {
     219           0 :     last_close_status_ = status;
     220           0 :     last_close_time_ = time_source_.monotonicTime();
     221           0 :     last_close_message_ = message;
     222           0 :   }
     223             : 
     224             :   GrpcStreamCallbacks<ResponseProto>* const callbacks_;
     225             : 
     226             :   Grpc::AsyncClient<RequestProto, ResponseProto> async_client_;
     227             :   Grpc::AsyncStream<RequestProto> stream_{};
     228             :   const Protobuf::MethodDescriptor& service_method_;
     229             :   ControlPlaneStats control_plane_stats_;
     230             : 
     231             :   // Reestablishes the gRPC channel when necessary, with some backoff politeness.
     232             :   Event::TimerPtr retry_timer_;
     233             :   TimeSource& time_source_;
     234             :   BackOffStrategyPtr backoff_strategy_;
     235             : 
     236             :   // Prevents the Envoy from making too many requests.
     237             :   TokenBucketPtr limit_request_;
     238             :   const bool rate_limiting_enabled_;
     239             :   Event::TimerPtr drain_request_timer_;
     240             : 
     241             :   // Records the initial message and timestamp of the most recent remote closes with the same
     242             :   // status.
     243             :   absl::optional<Grpc::Status::GrpcStatus> last_close_status_;
     244             :   std::string last_close_message_;
     245             :   MonotonicTime last_close_time_;
     246             : };
     247             : 
     248             : } // namespace Config
     249             : } // namespace Envoy

Generated by: LCOV version 1.15