1
#include "source/common/grpc/async_client_manager_impl.h"
2

            
3
#include <chrono>
4

            
5
#include "envoy/config/core/v3/grpc_service.pb.h"
6
#include "envoy/stats/scope.h"
7

            
8
#include "source/common/common/base64.h"
9
#include "source/common/grpc/async_client_impl.h"
10
#include "source/common/protobuf/utility.h"
11

            
12
#include "absl/strings/match.h"
13

            
14
#ifdef ENVOY_GOOGLE_GRPC
15
#include "source/common/grpc/google_async_client_impl.h"
16
#endif
17

            
18
namespace Envoy {
19
namespace Grpc {
20
namespace {
21

            
22
constexpr uint64_t DefaultEntryIdleDuration{50000};
23

            
24
// Validates a string for gRPC header key compliance. This is a subset of legal HTTP characters.
25
// See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
26
13
bool validateGrpcHeaderChars(absl::string_view key) {
27
141
  for (auto ch : key) {
28
141
    if (!(absl::ascii_isalnum(ch) || ch == '_' || ch == '.' || ch == '-')) {
29
1
      return false;
30
1
    }
31
141
  }
32
12
  return true;
33
13
}
34

            
35
12
bool validateGrpcCompatibleAsciiHeaderValue(absl::string_view h_value) {
36
173
  for (auto ch : h_value) {
37
173
    if (ch < 0x20 || ch > 0x7e) {
38
1
      return false;
39
1
    }
40
173
  }
41
11
  return true;
42
12
}
43

            
44
} // namespace
45

            
46
AsyncClientFactoryImpl::AsyncClientFactoryImpl(const envoy::config::core::v3::GrpcService& config,
47
                                               bool skip_cluster_check,
48
                                               Server::Configuration::CommonFactoryContext& context,
49
                                               absl::Status& creation_status)
50
1680
    : config_(config), context_(context) {
51
1680
  if (skip_cluster_check) {
52
1301
    creation_status = absl::OkStatus();
53
1603
  } else {
54
379
    creation_status =
55
379
        context_.clusterManager().checkActiveStaticCluster(config.envoy_grpc().cluster_name());
56
379
  }
57
1680
}
58

            
59
AsyncClientManagerImpl::AsyncClientManagerImpl(
60
    const envoy::config::bootstrap::v3::Bootstrap::GrpcAsyncClientManagerConfig& config,
61
    Server::Configuration::CommonFactoryContext& context, const StatNames& stat_names)
62
11094
    : context_(context), stat_names_(stat_names), raw_async_client_cache_(context.threadLocal()) {
63

            
64
11094
  const auto max_cached_entry_idle_duration = std::chrono::milliseconds(
65
11094
      PROTOBUF_GET_MS_OR_DEFAULT(config, max_cached_entry_idle_duration, DefaultEntryIdleDuration));
66

            
67
21840
  raw_async_client_cache_.set([max_cached_entry_idle_duration](Event::Dispatcher& dispatcher) {
68
21840
    return std::make_shared<RawAsyncClientCache>(dispatcher, max_cached_entry_idle_duration);
69
21840
  });
70
11094
#ifdef ENVOY_GOOGLE_GRPC
71
11094
  google_tls_slot_ = context_.threadLocal().allocateSlot();
72
11094
  Api::Api& api = context_.api();
73
11094
  google_tls_slot_->set(
74
21840
      [&api](Event::Dispatcher&) { return std::make_shared<GoogleAsyncClientThreadLocal>(api); });
75
11094
#endif
76
11094
}
77

            
78
1676
absl::StatusOr<RawAsyncClientPtr> AsyncClientFactoryImpl::createUncachedRawAsyncClient() {
79
1676
  return AsyncClientImpl::create(config_, context_);
80
1676
}
81

            
82
GoogleAsyncClientFactoryImpl::GoogleAsyncClientFactoryImpl(
83
    const envoy::config::core::v3::GrpcService& config, ThreadLocal::Slot* google_tls_slot,
84
    Stats::Scope& scope, Server::Configuration::CommonFactoryContext& context,
85
    const StatNames& stat_names, absl::Status& creation_status)
86
1041
    : google_tls_slot_(google_tls_slot),
87
1041
      scope_(scope.createScope(fmt::format("grpc.{}.", config.google_grpc().stat_prefix()))),
88
1041
      config_(config), factory_context_(context), stat_names_(stat_names) {
89
#ifndef ENVOY_GOOGLE_GRPC
90
  UNREFERENCED_PARAMETER(google_tls_slot_);
91
  UNREFERENCED_PARAMETER(scope_);
92
  UNREFERENCED_PARAMETER(config_);
93
  UNREFERENCED_PARAMETER(factory_context_);
94
  UNREFERENCED_PARAMETER(stat_names_);
95
  creation_status = absl::InvalidArgumentError("Google C++ gRPC client is not linked");
96
  return;
97
#else
98
1041
  ASSERT(google_tls_slot_ != nullptr);
99
1041
#endif
100

            
101
1041
  creation_status = absl::OkStatus();
102
  // Check metadata for gRPC API compliance. Uppercase characters are lowered in the HeaderParser.
103
  // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
104
1041
  for (const auto& header : config.initial_metadata()) {
105
    // Validate key
106
13
    if (!validateGrpcHeaderChars(header.key())) {
107
1
      creation_status = absl::InvalidArgumentError(
108
1
          fmt::format("Illegal characters in gRPC initial metadata header key: {}.", header.key()));
109
1
      return;
110
1
    }
111

            
112
    // Validate value
113
    // Binary base64 encoded - handled by the GRPC library
114
12
    if (!::absl::EndsWith(header.key(), "-bin") &&
115
12
        !validateGrpcCompatibleAsciiHeaderValue(header.value())) {
116
1
      creation_status = absl::InvalidArgumentError(fmt::format(
117
1
          "Illegal ASCII value for gRPC initial metadata header key: {}.", header.key()));
118
1
      return;
119
1
    }
120
12
  }
121
1041
}
122

            
123
1037
absl::StatusOr<RawAsyncClientPtr> GoogleAsyncClientFactoryImpl::createUncachedRawAsyncClient() {
124
1037
#ifdef ENVOY_GOOGLE_GRPC
125
1037
  GoogleGenericStubFactory stub_factory;
126
1037
  return std::make_unique<GoogleAsyncClientImpl>(
127
1037
      factory_context_.threadLocal().dispatcher(),
128
1037
      google_tls_slot_->getTyped<GoogleAsyncClientThreadLocal>(), stub_factory, scope_, config_,
129
1037
      factory_context_, stat_names_);
130
#else
131
  return nullptr;
132
#endif
133
1037
}
134

            
135
absl::StatusOr<AsyncClientFactoryPtr>
136
AsyncClientManagerImpl::factoryForGrpcService(const envoy::config::core::v3::GrpcService& config,
137
2721
                                              Stats::Scope& scope, bool skip_cluster_check) {
138
2721
  absl::Status creation_status = absl::OkStatus();
139
2721
  AsyncClientFactoryPtr factory;
140
2721
  switch (config.target_specifier_case()) {
141
1680
  case envoy::config::core::v3::GrpcService::TargetSpecifierCase::kEnvoyGrpc:
142
1680
    factory = std::make_unique<AsyncClientFactoryImpl>(config, skip_cluster_check, context_,
143
1680
                                                       creation_status);
144
1680
    break;
145
1041
  case envoy::config::core::v3::GrpcService::TargetSpecifierCase::kGoogleGrpc:
146
1041
    factory = std::make_unique<GoogleAsyncClientFactoryImpl>(
147
1041
        config, google_tls_slot_.get(), scope, context_, stat_names_, creation_status);
148
1041
    break;
149
  case envoy::config::core::v3::GrpcService::TargetSpecifierCase::TARGET_SPECIFIER_NOT_SET:
150
    PANIC_DUE_TO_PROTO_UNSET;
151
2721
  }
152
2721
  if (!creation_status.ok()) {
153
4
    return creation_status;
154
4
  }
155
2717
  return factory;
156
2721
}
157

            
158
absl::StatusOr<RawAsyncClientSharedPtr> AsyncClientManagerImpl::getOrCreateRawAsyncClient(
159
    const envoy::config::core::v3::GrpcService& config, Stats::Scope& scope,
160
31
    bool skip_cluster_check) {
161
31
  const GrpcServiceConfigWithHashKey config_with_hash_key = GrpcServiceConfigWithHashKey(config);
162
31
  RawAsyncClientSharedPtr client = raw_async_client_cache_->getCache(config_with_hash_key);
163
31
  if (client != nullptr) {
164
5
    return client;
165
5
  }
166
26
  auto factory_or_error =
167
26
      factoryForGrpcService(config_with_hash_key.config(), scope, skip_cluster_check);
168
26
  RETURN_IF_NOT_OK_REF(factory_or_error.status());
169
26
  auto client_or_error = factory_or_error.value()->createUncachedRawAsyncClient();
170
26
  RETURN_IF_NOT_OK_REF(client_or_error.status());
171
26
  client = std::move(*client_or_error);
172
26
  raw_async_client_cache_->setCache(config_with_hash_key, client);
173
26
  return client;
174
26
}
175

            
176
absl::StatusOr<RawAsyncClientSharedPtr>
177
AsyncClientManagerImpl::getOrCreateRawAsyncClientWithHashKey(
178
    const GrpcServiceConfigWithHashKey& config_with_hash_key, Stats::Scope& scope,
179
1286
    bool skip_cluster_check) {
180
1286
  RawAsyncClientSharedPtr client = raw_async_client_cache_->getCache(config_with_hash_key);
181
1286
  if (client != nullptr) {
182
542
    return client;
183
542
  }
184
744
  auto factory_or_error =
185
744
      factoryForGrpcService(config_with_hash_key.config(), scope, skip_cluster_check);
186
744
  RETURN_IF_NOT_OK_REF(factory_or_error.status());
187
744
  auto client_or_error = factory_or_error.value()->createUncachedRawAsyncClient();
188
744
  RETURN_IF_NOT_OK_REF(client_or_error.status());
189
744
  client = std::move(*client_or_error);
190
744
  raw_async_client_cache_->setCache(config_with_hash_key, client);
191
744
  return client;
192
744
}
193

            
194
AsyncClientManagerImpl::RawAsyncClientCache::RawAsyncClientCache(
195
    Event::Dispatcher& dispatcher, std::chrono::milliseconds max_cached_entry_idle_duration)
196
21845
    : dispatcher_(dispatcher), max_cached_entry_idle_duration_(max_cached_entry_idle_duration) {
197
21845
  cache_eviction_timer_ = dispatcher.createTimer([this] { evictEntriesAndResetEvictionTimer(); });
198
21845
}
199

            
200
void AsyncClientManagerImpl::RawAsyncClientCache::setCache(
201
    const GrpcServiceConfigWithHashKey& config_with_hash_key,
202
875
    const RawAsyncClientSharedPtr& client) {
203
875
  ASSERT(lru_map_.find(config_with_hash_key) == lru_map_.end());
204
  // Create a new cache entry at the beginning of the list.
205
875
  lru_list_.emplace_front(config_with_hash_key, client, dispatcher_.timeSource().monotonicTime());
206
875
  lru_map_[config_with_hash_key] = lru_list_.begin();
207
  // If inserting to an empty cache, enable eviction timer.
208
875
  if (lru_list_.size() == 1) {
209
764
    evictEntriesAndResetEvictionTimer();
210
764
  }
211
875
}
212

            
213
RawAsyncClientSharedPtr AsyncClientManagerImpl::RawAsyncClientCache::getCache(
214
1427
    const GrpcServiceConfigWithHashKey& config_with_hash_key) {
215
1427
  auto it = lru_map_.find(config_with_hash_key);
216
1427
  if (it == lru_map_.end()) {
217
824
    return nullptr;
218
824
  }
219
603
  const auto cache_entry = it->second;
220
  // Reset the eviction timer if the next entry to expire is accessed.
221
603
  const bool should_reset_timer = (cache_entry == --lru_list_.end());
222
603
  cache_entry->accessed_time_ = dispatcher_.timeSource().monotonicTime();
223
  // Move the cache entry to the beginning of the list upon access.
224
603
  lru_list_.splice(lru_list_.begin(), lru_list_, cache_entry);
225
  // Get the cached async client before any cache eviction.
226
603
  RawAsyncClientSharedPtr client = cache_entry->client_;
227
603
  if (should_reset_timer) {
228
603
    evictEntriesAndResetEvictionTimer();
229
603
  }
230
603
  return client;
231
1427
}
232

            
233
1373
void AsyncClientManagerImpl::RawAsyncClientCache::evictEntriesAndResetEvictionTimer() {
234
1373
  MonotonicTime now = dispatcher_.timeSource().monotonicTime();
235
  // Evict all the entries that have expired.
236
1429
  while (!lru_list_.empty()) {
237
1424
    const MonotonicTime next_expire =
238
1424
        lru_list_.back().accessed_time_ + max_cached_entry_idle_duration_;
239
1424
    std::chrono::seconds time_to_next_expire_sec =
240
1424
        std::chrono::duration_cast<std::chrono::seconds>(next_expire - now);
241
    // since 'now' and 'next_expire' are in nanoseconds, the following condition is to
242
    // check if the difference between them is less than 1 second. If we don't do this, the
243
    // timer will be enabled with 0 seconds, which will cause the timer to fire immediately.
244
    // This will cause cpu spike.
245
1424
    if (time_to_next_expire_sec.count() <= 0) {
246
      // Erase the expired entry.
247
56
      lru_map_.erase(lru_list_.back().config_with_hash_key_);
248
56
      lru_list_.pop_back();
249
1368
    } else {
250
1368
      cache_eviction_timer_->enableTimer(time_to_next_expire_sec);
251
1368
      return;
252
1368
    }
253
1424
  }
254
1373
}
255

            
256
} // namespace Grpc
257
} // namespace Envoy