LCOV - code coverage report
Current view: top level - source/extensions/config_subscription/grpc - grpc_subscription_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 59 97 60.8 %
Date: 2024-01-05 06:35:25 Functions: 6 12 50.0 %

          Line data    Source code
       1             : #include "source/extensions/config_subscription/grpc/grpc_subscription_impl.h"
       2             : 
       3             : #include <chrono>
       4             : 
       5             : #include "envoy/config/subscription.h"
       6             : 
       7             : #include "source/common/common/assert.h"
       8             : #include "source/common/common/logger.h"
       9             : #include "source/common/common/utility.h"
      10             : #include "source/common/config/xds_resource.h"
      11             : #include "source/common/grpc/common.h"
      12             : #include "source/common/protobuf/protobuf.h"
      13             : #include "source/common/protobuf/utility.h"
      14             : 
      15             : namespace Envoy {
      16             : namespace Config {
      17             : 
      18             : constexpr std::chrono::milliseconds UpdateDurationLogThreshold = std::chrono::milliseconds(50);
      19             : 
      20             : GrpcSubscriptionImpl::GrpcSubscriptionImpl(GrpcMuxSharedPtr grpc_mux,
      21             :                                            SubscriptionCallbacks& callbacks,
      22             :                                            OpaqueResourceDecoderSharedPtr resource_decoder,
      23             :                                            SubscriptionStats stats, absl::string_view type_url,
      24             :                                            Event::Dispatcher& dispatcher,
      25             :                                            std::chrono::milliseconds init_fetch_timeout,
      26             :                                            bool is_aggregated, const SubscriptionOptions& options)
      27             :     : grpc_mux_(grpc_mux), callbacks_(callbacks), resource_decoder_(resource_decoder),
      28             :       stats_(stats), type_url_(type_url), dispatcher_(dispatcher),
      29         135 :       init_fetch_timeout_(init_fetch_timeout), is_aggregated_(is_aggregated), options_(options) {}
      30             : 
      31             : // Config::Subscription
      32         135 : void GrpcSubscriptionImpl::start(const absl::flat_hash_set<std::string>& resources) {
      33         135 :   if (init_fetch_timeout_.count() > 0) {
      34         135 :     init_fetch_timeout_timer_ = dispatcher_.createTimer([this]() -> void {
      35           0 :       onConfigUpdateFailed(Envoy::Config::ConfigUpdateFailureReason::FetchTimedout, nullptr);
      36           0 :     });
      37         135 :     init_fetch_timeout_timer_->enableTimer(init_fetch_timeout_);
      38         135 :   }
      39             : 
      40         135 :   watch_ = grpc_mux_->addWatch(type_url_, resources, *this, resource_decoder_, options_);
      41             : 
      42             :   // The attempt stat here is maintained for the purposes of having consistency between ADS and
      43             :   // gRPC/filesystem/REST Subscriptions. Since ADS is push based and muxed, the notion of an
      44             :   // "attempt" for a given xDS API combined by ADS is not really that meaningful.
      45         135 :   stats_.update_attempt_.inc();
      46             : 
      47             :   // ADS initial request batching relies on the users of the GrpcMux *not* calling start on it,
      48             :   // whereas non-ADS xDS users must call it themselves.
      49         135 :   if (!is_aggregated_) {
      50           0 :     grpc_mux_->start();
      51           0 :   }
      52         135 : }
      53             : 
      54             : void GrpcSubscriptionImpl::updateResourceInterest(
      55           0 :     const absl::flat_hash_set<std::string>& update_to_these_names) {
      56           0 :   watch_->update(update_to_these_names);
      57           0 :   stats_.update_attempt_.inc();
      58           0 : }
      59             : 
      60             : void GrpcSubscriptionImpl::requestOnDemandUpdate(
      61           0 :     const absl::flat_hash_set<std::string>& for_update) {
      62           0 :   grpc_mux_->requestOnDemandUpdate(type_url_, for_update);
      63           0 :   stats_.update_attempt_.inc();
      64           0 : }
      65             : 
      66             : // Config::SubscriptionCallbacks
      67             : absl::Status
      68             : GrpcSubscriptionImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
      69         198 :                                      const std::string& version_info) {
      70         198 :   disableInitFetchTimeoutTimer();
      71             :   // TODO(mattklein123): In the future if we start tracking per-resource versions, we need to
      72             :   // supply those versions to onConfigUpdate() along with the xDS response ("system")
      73             :   // version_info. This way, both types of versions can be tracked and exposed for debugging by
      74             :   // the configuration update targets.
      75         198 :   auto start = dispatcher_.timeSource().monotonicTime();
      76         198 :   absl::Status status = callbacks_.onConfigUpdate(resources, version_info);
      77         198 :   if (!status.ok()) {
      78           0 :     return status;
      79           0 :   }
      80         198 :   std::chrono::milliseconds update_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
      81         198 :       dispatcher_.timeSource().monotonicTime() - start);
      82         198 :   stats_.update_success_.inc();
      83         198 :   stats_.update_attempt_.inc();
      84         198 :   stats_.update_time_.set(DateUtil::nowToMilliseconds(dispatcher_.timeSource()));
      85         198 :   stats_.version_.set(HashUtil::xxHash64(version_info));
      86         198 :   stats_.version_text_.set(version_info);
      87         198 :   stats_.update_duration_.recordValue(update_duration.count());
      88         198 :   ENVOY_LOG(debug, "gRPC config for {} accepted with {} resources with version {}", type_url_,
      89         198 :             resources.size(), version_info);
      90             : 
      91         198 :   if (update_duration > UpdateDurationLogThreshold) {
      92           0 :     ENVOY_LOG(debug, "gRPC config update took {} ms! Resources names: {}", update_duration.count(),
      93           0 :               absl::StrJoin(resources, ",", ResourceNameFormatter()));
      94           0 :   }
      95         198 :   return absl::OkStatus();
      96         198 : }
      97             : 
      98             : absl::Status GrpcSubscriptionImpl::onConfigUpdate(
      99             :     const std::vector<Config::DecodedResourceRef>& added_resources,
     100             :     const Protobuf::RepeatedPtrField<std::string>& removed_resources,
     101          46 :     const std::string& system_version_info) {
     102          46 :   disableInitFetchTimeoutTimer();
     103          46 :   stats_.update_attempt_.inc();
     104          46 :   auto start = dispatcher_.timeSource().monotonicTime();
     105          46 :   absl::Status status =
     106          46 :       callbacks_.onConfigUpdate(added_resources, removed_resources, system_version_info);
     107          46 :   if (!status.ok()) {
     108           0 :     return status;
     109           0 :   }
     110          46 :   std::chrono::milliseconds update_duration = std::chrono::duration_cast<std::chrono::milliseconds>(
     111          46 :       dispatcher_.timeSource().monotonicTime() - start);
     112          46 :   stats_.update_success_.inc();
     113          46 :   stats_.update_time_.set(DateUtil::nowToMilliseconds(dispatcher_.timeSource()));
     114          46 :   stats_.version_.set(HashUtil::xxHash64(system_version_info));
     115          46 :   stats_.version_text_.set(system_version_info);
     116          46 :   stats_.update_duration_.recordValue(update_duration.count());
     117          46 :   return absl::OkStatus();
     118          46 : }
     119             : 
     120             : void GrpcSubscriptionImpl::onConfigUpdateFailed(ConfigUpdateFailureReason reason,
     121         130 :                                                 const EnvoyException* e) {
     122         130 :   switch (reason) {
     123         130 :   case Envoy::Config::ConfigUpdateFailureReason::ConnectionFailure:
     124         130 :     stats_.update_failure_.inc();
     125         130 :     ENVOY_LOG(debug, "gRPC update for {} failed", type_url_);
     126         130 :     break;
     127           0 :   case Envoy::Config::ConfigUpdateFailureReason::FetchTimedout:
     128           0 :     stats_.init_fetch_timeout_.inc();
     129           0 :     disableInitFetchTimeoutTimer();
     130           0 :     ENVOY_LOG(warn, "gRPC config: initial fetch timed out for {}", type_url_);
     131           0 :     callbacks_.onConfigUpdateFailed(reason, e);
     132           0 :     break;
     133           0 :   case Envoy::Config::ConfigUpdateFailureReason::UpdateRejected:
     134             :     // We expect Envoy exception to be thrown when update is rejected.
     135           0 :     ASSERT(e != nullptr);
     136           0 :     disableInitFetchTimeoutTimer();
     137           0 :     stats_.update_rejected_.inc();
     138           0 :     ENVOY_LOG(warn, "gRPC config for {} rejected: {}", type_url_, e->what());
     139           0 :     callbacks_.onConfigUpdateFailed(reason, e);
     140           0 :     break;
     141         130 :   }
     142             : 
     143         130 :   stats_.update_attempt_.inc();
     144         130 : }
     145             : 
     146           0 : ScopedResume GrpcSubscriptionImpl::pause() { return grpc_mux_->pause(type_url_); }
     147             : 
     148         244 : void GrpcSubscriptionImpl::disableInitFetchTimeoutTimer() {
     149         244 :   if (init_fetch_timeout_timer_) {
     150         116 :     init_fetch_timeout_timer_->disableTimer();
     151         116 :     init_fetch_timeout_timer_.reset();
     152         116 :   }
     153         244 : }
     154             : 
     155             : GrpcCollectionSubscriptionImpl::GrpcCollectionSubscriptionImpl(
     156             :     const xds::core::v3::ResourceLocator& collection_locator, GrpcMuxSharedPtr grpc_mux,
     157             :     SubscriptionCallbacks& callbacks, OpaqueResourceDecoderSharedPtr resource_decoder,
     158             :     SubscriptionStats stats, Event::Dispatcher& dispatcher,
     159             :     std::chrono::milliseconds init_fetch_timeout, bool is_aggregated,
     160             :     const SubscriptionOptions& options)
     161             :     : GrpcSubscriptionImpl(
     162             :           grpc_mux, callbacks, resource_decoder, stats,
     163             :           TypeUtil::descriptorFullNameToTypeUrl(collection_locator.resource_type()), dispatcher,
     164             :           init_fetch_timeout, is_aggregated, options),
     165           0 :       collection_locator_(collection_locator) {}
     166             : 
     167           0 : void GrpcCollectionSubscriptionImpl::start(const absl::flat_hash_set<std::string>& resource_names) {
     168           0 :   ASSERT(resource_names.empty());
     169           0 :   GrpcSubscriptionImpl::start({XdsResourceIdentifier::encodeUrl(collection_locator_)});
     170           0 : }
     171             : 
     172             : } // namespace Config
     173             : } // namespace Envoy

Generated by: LCOV version 1.15