LCOV - code coverage report
Current view: top level - source/common/grpc - async_client_manager_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 112 133 84.2 %
Date: 2024-01-05 06:35:25 Functions: 15 17 88.2 %

          Line data    Source code
       1             : #include "source/common/grpc/async_client_manager_impl.h"
       2             : 
       3             : #include <chrono>
       4             : 
       5             : #include "envoy/config/core/v3/grpc_service.pb.h"
       6             : #include "envoy/stats/scope.h"
       7             : 
       8             : #include "source/common/common/base64.h"
       9             : #include "source/common/grpc/async_client_impl.h"
      10             : #include "source/common/protobuf/utility.h"
      11             : 
      12             : #include "absl/strings/match.h"
      13             : 
      14             : #ifdef ENVOY_GOOGLE_GRPC
      15             : #include "source/common/grpc/google_async_client_impl.h"
      16             : #endif
      17             : 
      18             : namespace Envoy {
      19             : namespace Grpc {
      20             : namespace {
      21             : 
      22             : constexpr uint64_t DefaultEntryIdleDuration{50000};
      23             : 
      24             : // Validates a string for gRPC header key compliance. This is a subset of legal HTTP characters.
      25             : // See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
      26           5 : bool validateGrpcHeaderChars(absl::string_view key) {
      27           6 :   for (auto ch : key) {
      28           6 :     if (!(absl::ascii_isalnum(ch) || ch == '_' || ch == '.' || ch == '-')) {
      29           1 :       return false;
      30           1 :     }
      31           6 :   }
      32           4 :   return true;
      33           5 : }
      34             : 
      35           4 : bool validateGrpcCompatibleAsciiHeaderValue(absl::string_view h_value) {
      36           7 :   for (auto ch : h_value) {
      37           7 :     if (ch < 0x20 || ch > 0x7e) {
      38           0 :       return false;
      39           0 :     }
      40           7 :   }
      41           4 :   return true;
      42           4 : }
      43             : 
      44             : } // namespace
      45             : 
      46             : AsyncClientFactoryImpl::AsyncClientFactoryImpl(Upstream::ClusterManager& cm,
      47             :                                                const envoy::config::core::v3::GrpcService& config,
      48             :                                                bool skip_cluster_check, TimeSource& time_source)
      49          33 :     : cm_(cm), config_(config), time_source_(time_source) {
      50          33 :   if (skip_cluster_check) {
      51           5 :     return;
      52           5 :   }
      53          28 :   THROW_IF_NOT_OK(cm_.checkActiveStaticCluster(config.envoy_grpc().cluster_name()));
      54          28 : }
      55             : 
      56             : AsyncClientManagerImpl::AsyncClientManagerImpl(
      57             :     Upstream::ClusterManager& cm, ThreadLocal::Instance& tls, TimeSource& time_source,
      58             :     Api::Api& api, const StatNames& stat_names,
      59             :     const envoy::config::bootstrap::v3::Bootstrap::GrpcAsyncClientManagerConfig& config)
      60             :     : cm_(cm), tls_(tls), time_source_(time_source), api_(api), stat_names_(stat_names),
      61         132 :       raw_async_client_cache_(tls_) {
      62             : 
      63         132 :   const auto max_cached_entry_idle_duration = std::chrono::milliseconds(
      64         132 :       PROTOBUF_GET_MS_OR_DEFAULT(config, max_cached_entry_idle_duration, DefaultEntryIdleDuration));
      65             : 
      66         226 :   raw_async_client_cache_.set([max_cached_entry_idle_duration](Event::Dispatcher& dispatcher) {
      67         226 :     return std::make_shared<RawAsyncClientCache>(dispatcher, max_cached_entry_idle_duration);
      68         226 :   });
      69         132 : #ifdef ENVOY_GOOGLE_GRPC
      70         132 :   google_tls_slot_ = tls.allocateSlot();
      71         132 :   google_tls_slot_->set(
      72         226 :       [&api](Event::Dispatcher&) { return std::make_shared<GoogleAsyncClientThreadLocal>(api); });
      73             : #else
      74             :   UNREFERENCED_PARAMETER(api_);
      75             : #endif
      76         132 : }
      77             : 
      78          33 : RawAsyncClientPtr AsyncClientFactoryImpl::createUncachedRawAsyncClient() {
      79          33 :   return std::make_unique<AsyncClientImpl>(cm_, config_, time_source_);
      80          33 : }
      81             : 
      82             : GoogleAsyncClientFactoryImpl::GoogleAsyncClientFactoryImpl(
      83             :     ThreadLocal::Instance& tls, ThreadLocal::Slot* google_tls_slot, Stats::Scope& scope,
      84             :     const envoy::config::core::v3::GrpcService& config, Api::Api& api, const StatNames& stat_names)
      85             :     : tls_(tls), google_tls_slot_(google_tls_slot),
      86             :       scope_(scope.createScope(fmt::format("grpc.{}.", config.google_grpc().stat_prefix()))),
      87           3 :       config_(config), api_(api), stat_names_(stat_names) {
      88             : 
      89             : #ifndef ENVOY_GOOGLE_GRPC
      90             :   UNREFERENCED_PARAMETER(tls_);
      91             :   UNREFERENCED_PARAMETER(google_tls_slot_);
      92             :   UNREFERENCED_PARAMETER(scope_);
      93             :   UNREFERENCED_PARAMETER(config_);
      94             :   UNREFERENCED_PARAMETER(api_);
      95             :   UNREFERENCED_PARAMETER(stat_names_);
      96             :   throwEnvoyExceptionOrPanic("Google C++ gRPC client is not linked");
      97             : #else
      98           3 :   ASSERT(google_tls_slot_ != nullptr);
      99           3 : #endif
     100             : 
     101             :   // Check metadata for gRPC API compliance. Uppercase characters are lowered in the HeaderParser.
     102             :   // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
     103           5 :   for (const auto& header : config.initial_metadata()) {
     104             :     // Validate key
     105           5 :     if (!validateGrpcHeaderChars(header.key())) {
     106           1 :       throwEnvoyExceptionOrPanic(
     107           1 :           fmt::format("Illegal characters in gRPC initial metadata header key: {}.", header.key()));
     108           1 :     }
     109             : 
     110             :     // Validate value
     111             :     // Binary base64 encoded - handled by the GRPC library
     112           4 :     if (!::absl::EndsWith(header.key(), "-bin") &&
     113           4 :         !validateGrpcCompatibleAsciiHeaderValue(header.value())) {
     114           0 :       throwEnvoyExceptionOrPanic(fmt::format(
     115           0 :           "Illegal ASCII value for gRPC initial metadata header key: {}.", header.key()));
     116           0 :     }
     117           4 :   }
     118           3 : }
     119             : 
     120           2 : RawAsyncClientPtr GoogleAsyncClientFactoryImpl::createUncachedRawAsyncClient() {
     121           2 : #ifdef ENVOY_GOOGLE_GRPC
     122           2 :   GoogleGenericStubFactory stub_factory;
     123           2 :   return std::make_unique<GoogleAsyncClientImpl>(
     124           2 :       tls_.dispatcher(), google_tls_slot_->getTyped<GoogleAsyncClientThreadLocal>(), stub_factory,
     125           2 :       scope_, config_, api_, stat_names_);
     126             : #else
     127             :   return nullptr;
     128             : #endif
     129           2 : }
     130             : 
     131             : AsyncClientFactoryPtr
     132             : AsyncClientManagerImpl::factoryForGrpcService(const envoy::config::core::v3::GrpcService& config,
     133          36 :                                               Stats::Scope& scope, bool skip_cluster_check) {
     134          36 :   switch (config.target_specifier_case()) {
     135          33 :   case envoy::config::core::v3::GrpcService::TargetSpecifierCase::kEnvoyGrpc:
     136          33 :     return std::make_unique<AsyncClientFactoryImpl>(cm_, config, skip_cluster_check, time_source_);
     137           3 :   case envoy::config::core::v3::GrpcService::TargetSpecifierCase::kGoogleGrpc:
     138           3 :     return std::make_unique<GoogleAsyncClientFactoryImpl>(tls_, google_tls_slot_.get(), scope,
     139           3 :                                                           config, api_, stat_names_);
     140           0 :   case envoy::config::core::v3::GrpcService::TargetSpecifierCase::TARGET_SPECIFIER_NOT_SET:
     141           0 :     PANIC_DUE_TO_PROTO_UNSET;
     142          36 :   }
     143           0 :   return nullptr;
     144          36 : }
     145             : 
     146             : RawAsyncClientSharedPtr AsyncClientManagerImpl::getOrCreateRawAsyncClient(
     147             :     const envoy::config::core::v3::GrpcService& config, Stats::Scope& scope,
     148           0 :     bool skip_cluster_check) {
     149           0 :   const GrpcServiceConfigWithHashKey config_with_hash_key = GrpcServiceConfigWithHashKey(config);
     150           0 :   RawAsyncClientSharedPtr client = raw_async_client_cache_->getCache(config_with_hash_key);
     151           0 :   if (client != nullptr) {
     152           0 :     return client;
     153           0 :   }
     154           0 :   client = factoryForGrpcService(config_with_hash_key.config(), scope, skip_cluster_check)
     155           0 :                ->createUncachedRawAsyncClient();
     156           0 :   raw_async_client_cache_->setCache(config_with_hash_key, client);
     157           0 :   return client;
     158           0 : }
     159             : 
     160             : RawAsyncClientSharedPtr AsyncClientManagerImpl::getOrCreateRawAsyncClientWithHashKey(
     161             :     const GrpcServiceConfigWithHashKey& config_with_hash_key, Stats::Scope& scope,
     162          40 :     bool skip_cluster_check) {
     163          40 :   RawAsyncClientSharedPtr client = raw_async_client_cache_->getCache(config_with_hash_key);
     164          40 :   if (client != nullptr) {
     165          35 :     return client;
     166          35 :   }
     167           5 :   client = factoryForGrpcService(config_with_hash_key.config(), scope, skip_cluster_check)
     168           5 :                ->createUncachedRawAsyncClient();
     169           5 :   raw_async_client_cache_->setCache(config_with_hash_key, client);
     170           5 :   return client;
     171          40 : }
     172             : 
     173             : AsyncClientManagerImpl::RawAsyncClientCache::RawAsyncClientCache(
     174             :     Event::Dispatcher& dispatcher, std::chrono::milliseconds max_cached_entry_idle_duration)
     175         226 :     : dispatcher_(dispatcher), max_cached_entry_idle_duration_(max_cached_entry_idle_duration) {
     176         226 :   cache_eviction_timer_ = dispatcher.createTimer([this] { evictEntriesAndResetEvictionTimer(); });
     177         226 : }
     178             : 
     179             : void AsyncClientManagerImpl::RawAsyncClientCache::setCache(
     180             :     const GrpcServiceConfigWithHashKey& config_with_hash_key,
     181           5 :     const RawAsyncClientSharedPtr& client) {
     182           5 :   ASSERT(lru_map_.find(config_with_hash_key) == lru_map_.end());
     183             :   // Create a new cache entry at the beginning of the list.
     184           5 :   lru_list_.emplace_front(config_with_hash_key, client, dispatcher_.timeSource().monotonicTime());
     185           5 :   lru_map_[config_with_hash_key] = lru_list_.begin();
     186             :   // If inserting to an empty cache, enable eviction timer.
     187           5 :   if (lru_list_.size() == 1) {
     188           5 :     evictEntriesAndResetEvictionTimer();
     189           5 :   }
     190           5 : }
     191             : 
     192             : RawAsyncClientSharedPtr AsyncClientManagerImpl::RawAsyncClientCache::getCache(
     193          40 :     const GrpcServiceConfigWithHashKey& config_with_hash_key) {
     194          40 :   auto it = lru_map_.find(config_with_hash_key);
     195          40 :   if (it == lru_map_.end()) {
     196           5 :     return nullptr;
     197           5 :   }
     198          35 :   const auto cache_entry = it->second;
     199             :   // Reset the eviction timer if the next entry to expire is accessed.
     200          35 :   const bool should_reset_timer = (cache_entry == --lru_list_.end());
     201          35 :   cache_entry->accessed_time_ = dispatcher_.timeSource().monotonicTime();
     202             :   // Move the cache entry to the beginning of the list upon access.
     203          35 :   lru_list_.splice(lru_list_.begin(), lru_list_, cache_entry);
     204             :   // Get the cached async client before any cache eviction.
     205          35 :   RawAsyncClientSharedPtr client = cache_entry->client_;
     206          35 :   if (should_reset_timer) {
     207          35 :     evictEntriesAndResetEvictionTimer();
     208          35 :   }
     209          35 :   return client;
     210          40 : }
     211             : 
     212          40 : void AsyncClientManagerImpl::RawAsyncClientCache::evictEntriesAndResetEvictionTimer() {
     213          40 :   MonotonicTime now = dispatcher_.timeSource().monotonicTime();
     214             :   // Evict all the entries that have expired.
     215          40 :   while (!lru_list_.empty()) {
     216          40 :     const MonotonicTime next_expire =
     217          40 :         lru_list_.back().accessed_time_ + max_cached_entry_idle_duration_;
     218          40 :     std::chrono::seconds time_to_next_expire_sec =
     219          40 :         std::chrono::duration_cast<std::chrono::seconds>(next_expire - now);
     220             :     // since 'now' and 'next_expire' are in nanoseconds, the following condition is to
     221             :     // check if the difference between them is less than 1 second. If we don't do this, the
     222             :     // timer will be enabled with 0 seconds, which will cause the timer to fire immediately.
     223             :     // This will cause cpu spike.
     224          40 :     if (time_to_next_expire_sec.count() <= 0) {
     225             :       // Erase the expired entry.
     226           0 :       lru_map_.erase(lru_list_.back().config_with_hash_key_);
     227           0 :       lru_list_.pop_back();
     228          40 :     } else {
     229          40 :       cache_eviction_timer_->enableTimer(time_to_next_expire_sec);
     230          40 :       return;
     231          40 :     }
     232          40 :   }
     233          40 : }
     234             : 
     235             : } // namespace Grpc
     236             : } // namespace Envoy

Generated by: LCOV version 1.15