Line data Source code
1 : #include "source/common/grpc/async_client_manager_impl.h"
2 :
3 : #include <chrono>
4 :
5 : #include "envoy/config/core/v3/grpc_service.pb.h"
6 : #include "envoy/stats/scope.h"
7 :
8 : #include "source/common/common/base64.h"
9 : #include "source/common/grpc/async_client_impl.h"
10 : #include "source/common/protobuf/utility.h"
11 :
12 : #include "absl/strings/match.h"
13 :
14 : #ifdef ENVOY_GOOGLE_GRPC
15 : #include "source/common/grpc/google_async_client_impl.h"
16 : #endif
17 :
18 : namespace Envoy {
19 : namespace Grpc {
20 : namespace {
21 :
22 : constexpr uint64_t DefaultEntryIdleDuration{50000};
23 :
24 : // Validates a string for gRPC header key compliance. This is a subset of legal HTTP characters.
25 : // See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
26 5 : bool validateGrpcHeaderChars(absl::string_view key) {
27 6 : for (auto ch : key) {
28 6 : if (!(absl::ascii_isalnum(ch) || ch == '_' || ch == '.' || ch == '-')) {
29 1 : return false;
30 1 : }
31 6 : }
32 4 : return true;
33 5 : }
34 :
35 4 : bool validateGrpcCompatibleAsciiHeaderValue(absl::string_view h_value) {
36 7 : for (auto ch : h_value) {
37 7 : if (ch < 0x20 || ch > 0x7e) {
38 0 : return false;
39 0 : }
40 7 : }
41 4 : return true;
42 4 : }
43 :
44 : } // namespace
45 :
46 : AsyncClientFactoryImpl::AsyncClientFactoryImpl(Upstream::ClusterManager& cm,
47 : const envoy::config::core::v3::GrpcService& config,
48 : bool skip_cluster_check, TimeSource& time_source)
49 33 : : cm_(cm), config_(config), time_source_(time_source) {
50 33 : if (skip_cluster_check) {
51 5 : return;
52 5 : }
53 28 : THROW_IF_NOT_OK(cm_.checkActiveStaticCluster(config.envoy_grpc().cluster_name()));
54 28 : }
55 :
56 : AsyncClientManagerImpl::AsyncClientManagerImpl(
57 : Upstream::ClusterManager& cm, ThreadLocal::Instance& tls, TimeSource& time_source,
58 : Api::Api& api, const StatNames& stat_names,
59 : const envoy::config::bootstrap::v3::Bootstrap::GrpcAsyncClientManagerConfig& config)
60 : : cm_(cm), tls_(tls), time_source_(time_source), api_(api), stat_names_(stat_names),
61 132 : raw_async_client_cache_(tls_) {
62 :
63 132 : const auto max_cached_entry_idle_duration = std::chrono::milliseconds(
64 132 : PROTOBUF_GET_MS_OR_DEFAULT(config, max_cached_entry_idle_duration, DefaultEntryIdleDuration));
65 :
66 226 : raw_async_client_cache_.set([max_cached_entry_idle_duration](Event::Dispatcher& dispatcher) {
67 226 : return std::make_shared<RawAsyncClientCache>(dispatcher, max_cached_entry_idle_duration);
68 226 : });
69 132 : #ifdef ENVOY_GOOGLE_GRPC
70 132 : google_tls_slot_ = tls.allocateSlot();
71 132 : google_tls_slot_->set(
72 226 : [&api](Event::Dispatcher&) { return std::make_shared<GoogleAsyncClientThreadLocal>(api); });
73 : #else
74 : UNREFERENCED_PARAMETER(api_);
75 : #endif
76 132 : }
77 :
78 33 : RawAsyncClientPtr AsyncClientFactoryImpl::createUncachedRawAsyncClient() {
79 33 : return std::make_unique<AsyncClientImpl>(cm_, config_, time_source_);
80 33 : }
81 :
82 : GoogleAsyncClientFactoryImpl::GoogleAsyncClientFactoryImpl(
83 : ThreadLocal::Instance& tls, ThreadLocal::Slot* google_tls_slot, Stats::Scope& scope,
84 : const envoy::config::core::v3::GrpcService& config, Api::Api& api, const StatNames& stat_names)
85 : : tls_(tls), google_tls_slot_(google_tls_slot),
86 : scope_(scope.createScope(fmt::format("grpc.{}.", config.google_grpc().stat_prefix()))),
87 3 : config_(config), api_(api), stat_names_(stat_names) {
88 :
89 : #ifndef ENVOY_GOOGLE_GRPC
90 : UNREFERENCED_PARAMETER(tls_);
91 : UNREFERENCED_PARAMETER(google_tls_slot_);
92 : UNREFERENCED_PARAMETER(scope_);
93 : UNREFERENCED_PARAMETER(config_);
94 : UNREFERENCED_PARAMETER(api_);
95 : UNREFERENCED_PARAMETER(stat_names_);
96 : throwEnvoyExceptionOrPanic("Google C++ gRPC client is not linked");
97 : #else
98 3 : ASSERT(google_tls_slot_ != nullptr);
99 3 : #endif
100 :
101 : // Check metadata for gRPC API compliance. Uppercase characters are lowered in the HeaderParser.
102 : // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
103 5 : for (const auto& header : config.initial_metadata()) {
104 : // Validate key
105 5 : if (!validateGrpcHeaderChars(header.key())) {
106 1 : throwEnvoyExceptionOrPanic(
107 1 : fmt::format("Illegal characters in gRPC initial metadata header key: {}.", header.key()));
108 1 : }
109 :
110 : // Validate value
111 : // Binary base64 encoded - handled by the GRPC library
112 4 : if (!::absl::EndsWith(header.key(), "-bin") &&
113 4 : !validateGrpcCompatibleAsciiHeaderValue(header.value())) {
114 0 : throwEnvoyExceptionOrPanic(fmt::format(
115 0 : "Illegal ASCII value for gRPC initial metadata header key: {}.", header.key()));
116 0 : }
117 4 : }
118 3 : }
119 :
120 2 : RawAsyncClientPtr GoogleAsyncClientFactoryImpl::createUncachedRawAsyncClient() {
121 2 : #ifdef ENVOY_GOOGLE_GRPC
122 2 : GoogleGenericStubFactory stub_factory;
123 2 : return std::make_unique<GoogleAsyncClientImpl>(
124 2 : tls_.dispatcher(), google_tls_slot_->getTyped<GoogleAsyncClientThreadLocal>(), stub_factory,
125 2 : scope_, config_, api_, stat_names_);
126 : #else
127 : return nullptr;
128 : #endif
129 2 : }
130 :
131 : AsyncClientFactoryPtr
132 : AsyncClientManagerImpl::factoryForGrpcService(const envoy::config::core::v3::GrpcService& config,
133 36 : Stats::Scope& scope, bool skip_cluster_check) {
134 36 : switch (config.target_specifier_case()) {
135 33 : case envoy::config::core::v3::GrpcService::TargetSpecifierCase::kEnvoyGrpc:
136 33 : return std::make_unique<AsyncClientFactoryImpl>(cm_, config, skip_cluster_check, time_source_);
137 3 : case envoy::config::core::v3::GrpcService::TargetSpecifierCase::kGoogleGrpc:
138 3 : return std::make_unique<GoogleAsyncClientFactoryImpl>(tls_, google_tls_slot_.get(), scope,
139 3 : config, api_, stat_names_);
140 0 : case envoy::config::core::v3::GrpcService::TargetSpecifierCase::TARGET_SPECIFIER_NOT_SET:
141 0 : PANIC_DUE_TO_PROTO_UNSET;
142 36 : }
143 0 : return nullptr;
144 36 : }
145 :
146 : RawAsyncClientSharedPtr AsyncClientManagerImpl::getOrCreateRawAsyncClient(
147 : const envoy::config::core::v3::GrpcService& config, Stats::Scope& scope,
148 0 : bool skip_cluster_check) {
149 0 : const GrpcServiceConfigWithHashKey config_with_hash_key = GrpcServiceConfigWithHashKey(config);
150 0 : RawAsyncClientSharedPtr client = raw_async_client_cache_->getCache(config_with_hash_key);
151 0 : if (client != nullptr) {
152 0 : return client;
153 0 : }
154 0 : client = factoryForGrpcService(config_with_hash_key.config(), scope, skip_cluster_check)
155 0 : ->createUncachedRawAsyncClient();
156 0 : raw_async_client_cache_->setCache(config_with_hash_key, client);
157 0 : return client;
158 0 : }
159 :
160 : RawAsyncClientSharedPtr AsyncClientManagerImpl::getOrCreateRawAsyncClientWithHashKey(
161 : const GrpcServiceConfigWithHashKey& config_with_hash_key, Stats::Scope& scope,
162 40 : bool skip_cluster_check) {
163 40 : RawAsyncClientSharedPtr client = raw_async_client_cache_->getCache(config_with_hash_key);
164 40 : if (client != nullptr) {
165 35 : return client;
166 35 : }
167 5 : client = factoryForGrpcService(config_with_hash_key.config(), scope, skip_cluster_check)
168 5 : ->createUncachedRawAsyncClient();
169 5 : raw_async_client_cache_->setCache(config_with_hash_key, client);
170 5 : return client;
171 40 : }
172 :
173 : AsyncClientManagerImpl::RawAsyncClientCache::RawAsyncClientCache(
174 : Event::Dispatcher& dispatcher, std::chrono::milliseconds max_cached_entry_idle_duration)
175 226 : : dispatcher_(dispatcher), max_cached_entry_idle_duration_(max_cached_entry_idle_duration) {
176 226 : cache_eviction_timer_ = dispatcher.createTimer([this] { evictEntriesAndResetEvictionTimer(); });
177 226 : }
178 :
179 : void AsyncClientManagerImpl::RawAsyncClientCache::setCache(
180 : const GrpcServiceConfigWithHashKey& config_with_hash_key,
181 5 : const RawAsyncClientSharedPtr& client) {
182 5 : ASSERT(lru_map_.find(config_with_hash_key) == lru_map_.end());
183 : // Create a new cache entry at the beginning of the list.
184 5 : lru_list_.emplace_front(config_with_hash_key, client, dispatcher_.timeSource().monotonicTime());
185 5 : lru_map_[config_with_hash_key] = lru_list_.begin();
186 : // If inserting to an empty cache, enable eviction timer.
187 5 : if (lru_list_.size() == 1) {
188 5 : evictEntriesAndResetEvictionTimer();
189 5 : }
190 5 : }
191 :
192 : RawAsyncClientSharedPtr AsyncClientManagerImpl::RawAsyncClientCache::getCache(
193 40 : const GrpcServiceConfigWithHashKey& config_with_hash_key) {
194 40 : auto it = lru_map_.find(config_with_hash_key);
195 40 : if (it == lru_map_.end()) {
196 5 : return nullptr;
197 5 : }
198 35 : const auto cache_entry = it->second;
199 : // Reset the eviction timer if the next entry to expire is accessed.
200 35 : const bool should_reset_timer = (cache_entry == --lru_list_.end());
201 35 : cache_entry->accessed_time_ = dispatcher_.timeSource().monotonicTime();
202 : // Move the cache entry to the beginning of the list upon access.
203 35 : lru_list_.splice(lru_list_.begin(), lru_list_, cache_entry);
204 : // Get the cached async client before any cache eviction.
205 35 : RawAsyncClientSharedPtr client = cache_entry->client_;
206 35 : if (should_reset_timer) {
207 35 : evictEntriesAndResetEvictionTimer();
208 35 : }
209 35 : return client;
210 40 : }
211 :
212 40 : void AsyncClientManagerImpl::RawAsyncClientCache::evictEntriesAndResetEvictionTimer() {
213 40 : MonotonicTime now = dispatcher_.timeSource().monotonicTime();
214 : // Evict all the entries that have expired.
215 40 : while (!lru_list_.empty()) {
216 40 : const MonotonicTime next_expire =
217 40 : lru_list_.back().accessed_time_ + max_cached_entry_idle_duration_;
218 40 : std::chrono::seconds time_to_next_expire_sec =
219 40 : std::chrono::duration_cast<std::chrono::seconds>(next_expire - now);
220 : // since 'now' and 'next_expire' are in nanoseconds, the following condition is to
221 : // check if the difference between them is less than 1 second. If we don't do this, the
222 : // timer will be enabled with 0 seconds, which will cause the timer to fire immediately.
223 : // This will cause cpu spike.
224 40 : if (time_to_next_expire_sec.count() <= 0) {
225 : // Erase the expired entry.
226 0 : lru_map_.erase(lru_list_.back().config_with_hash_key_);
227 0 : lru_list_.pop_back();
228 40 : } else {
229 40 : cache_eviction_timer_->enableTimer(time_to_next_expire_sec);
230 40 : return;
231 40 : }
232 40 : }
233 40 : }
234 :
235 : } // namespace Grpc
236 : } // namespace Envoy
|