1
#include "source/common/grpc/async_client_manager_impl.h"
2

            
3
#include <chrono>
4

            
5
#include "envoy/config/core/v3/grpc_service.pb.h"
6
#include "envoy/stats/scope.h"
7

            
8
#include "source/common/common/base64.h"
9
#include "source/common/grpc/async_client_impl.h"
10
#include "source/common/protobuf/utility.h"
11

            
12
#include "absl/strings/match.h"
13

            
14
#ifdef ENVOY_GOOGLE_GRPC
15
#include "source/common/grpc/google_async_client_impl.h"
16
#endif
17

            
18
namespace Envoy {
19
namespace Grpc {
20
namespace {
21

            
22
constexpr uint64_t DefaultEntryIdleDuration{50000};
23

            
24
// Validates a string for gRPC header key compliance. This is a subset of legal HTTP characters.
25
// See https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
26
13
bool validateGrpcHeaderChars(absl::string_view key) {
27
141
  for (auto ch : key) {
28
141
    if (!(absl::ascii_isalnum(ch) || ch == '_' || ch == '.' || ch == '-')) {
29
1
      return false;
30
1
    }
31
141
  }
32
12
  return true;
33
13
}
34

            
35
12
bool validateGrpcCompatibleAsciiHeaderValue(absl::string_view h_value) {
36
173
  for (auto ch : h_value) {
37
173
    if (ch < 0x20 || ch > 0x7e) {
38
1
      return false;
39
1
    }
40
173
  }
41
11
  return true;
42
12
}
43

            
44
} // namespace
45

            
46
AsyncClientFactoryImpl::AsyncClientFactoryImpl(const envoy::config::core::v3::GrpcService& config,
47
                                               bool skip_cluster_check,
48
                                               Server::Configuration::CommonFactoryContext& context,
49
                                               absl::Status& creation_status)
50
1718
    : config_(config), context_(context) {
51
1718
  if (skip_cluster_check) {
52
1347
    creation_status = absl::OkStatus();
53
1641
  } else {
54
371
    creation_status =
55
371
        context_.clusterManager().checkActiveStaticCluster(config.envoy_grpc().cluster_name());
56
371
  }
57
1718
}
58

            
59
AsyncClientManagerImpl::AsyncClientManagerImpl(
60
    const envoy::config::bootstrap::v3::Bootstrap::GrpcAsyncClientManagerConfig& config,
61
    Server::Configuration::CommonFactoryContext& context, const StatNames& stat_names)
62
11078
    : context_(context), stat_names_(stat_names), raw_async_client_cache_(context.threadLocal()) {
63

            
64
11078
  const auto max_cached_entry_idle_duration = std::chrono::milliseconds(
65
11078
      PROTOBUF_GET_MS_OR_DEFAULT(config, max_cached_entry_idle_duration, DefaultEntryIdleDuration));
66

            
67
21808
  raw_async_client_cache_.set([max_cached_entry_idle_duration](Event::Dispatcher& dispatcher) {
68
21808
    return std::make_shared<RawAsyncClientCache>(dispatcher, max_cached_entry_idle_duration);
69
21808
  });
70
11078
#ifdef ENVOY_GOOGLE_GRPC
71
11078
  google_tls_slot_ = context_.threadLocal().allocateSlot();
72
11078
  Api::Api& api = context_.api();
73
11078
  google_tls_slot_->set(
74
21808
      [&api](Event::Dispatcher&) { return std::make_shared<GoogleAsyncClientThreadLocal>(api); });
75
11078
#endif
76
11078
}
77

            
78
1714
absl::StatusOr<RawAsyncClientPtr> AsyncClientFactoryImpl::createUncachedRawAsyncClient() {
79
1714
  return AsyncClientImpl::create(config_, context_);
80
1714
}
81

            
82
GoogleAsyncClientFactoryImpl::GoogleAsyncClientFactoryImpl(
83
    const envoy::config::core::v3::GrpcService& config, ThreadLocal::Slot* google_tls_slot,
84
    Stats::Scope& scope, Server::Configuration::CommonFactoryContext& context,
85
    const StatNames& stat_names, absl::Status& creation_status)
86
1030
    : google_tls_slot_(google_tls_slot),
87
1030
      scope_(scope.createScope(fmt::format("grpc.{}.", config.google_grpc().stat_prefix()))),
88
1030
      config_(config), factory_context_(context), stat_names_(stat_names) {
89
#ifndef ENVOY_GOOGLE_GRPC
90
  UNREFERENCED_PARAMETER(google_tls_slot_);
91
  UNREFERENCED_PARAMETER(scope_);
92
  UNREFERENCED_PARAMETER(config_);
93
  UNREFERENCED_PARAMETER(factory_context_);
94
  UNREFERENCED_PARAMETER(stat_names_);
95
  creation_status = absl::InvalidArgumentError("Google C++ gRPC client is not linked");
96
  return;
97
#else
98
1030
  ASSERT(google_tls_slot_ != nullptr);
99
1030
#endif
100

            
101
1030
  creation_status = absl::OkStatus();
102
  // Check metadata for gRPC API compliance. Uppercase characters are lowered in the HeaderParser.
103
  // https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md
104
1030
  for (const auto& header : config.initial_metadata()) {
105
    // Validate key
106
13
    if (!validateGrpcHeaderChars(header.key())) {
107
1
      creation_status = absl::InvalidArgumentError(
108
1
          fmt::format("Illegal characters in gRPC initial metadata header key: {}.", header.key()));
109
1
      return;
110
1
    }
111

            
112
    // Validate value
113
    // Binary base64 encoded - handled by the GRPC library
114
12
    if (!::absl::EndsWith(header.key(), "-bin") &&
115
12
        !validateGrpcCompatibleAsciiHeaderValue(header.value())) {
116
1
      creation_status = absl::InvalidArgumentError(fmt::format(
117
1
          "Illegal ASCII value for gRPC initial metadata header key: {}.", header.key()));
118
1
      return;
119
1
    }
120
12
  }
121
1030
}
122

            
123
1026
absl::StatusOr<RawAsyncClientPtr> GoogleAsyncClientFactoryImpl::createUncachedRawAsyncClient() {
124
1026
#ifdef ENVOY_GOOGLE_GRPC
125
1026
  GoogleGenericStubFactory stub_factory;
126
1026
  return std::make_unique<GoogleAsyncClientImpl>(
127
1026
      factory_context_.threadLocal().dispatcher(),
128
1026
      google_tls_slot_->getTyped<GoogleAsyncClientThreadLocal>(), stub_factory, scope_, config_,
129
1026
      factory_context_, stat_names_);
130
#else
131
  return nullptr;
132
#endif
133
1026
}
134

            
135
absl::StatusOr<AsyncClientFactoryPtr>
136
AsyncClientManagerImpl::factoryForGrpcService(const envoy::config::core::v3::GrpcService& config,
137
2748
                                              Stats::Scope& scope, bool skip_cluster_check) {
138
2748
  absl::Status creation_status = absl::OkStatus();
139
2748
  AsyncClientFactoryPtr factory;
140
2748
  switch (config.target_specifier_case()) {
141
1718
  case envoy::config::core::v3::GrpcService::TargetSpecifierCase::kEnvoyGrpc:
142
1718
    factory = std::make_unique<AsyncClientFactoryImpl>(config, skip_cluster_check, context_,
143
1718
                                                       creation_status);
144
1718
    break;
145
1030
  case envoy::config::core::v3::GrpcService::TargetSpecifierCase::kGoogleGrpc:
146
1030
    factory = std::make_unique<GoogleAsyncClientFactoryImpl>(
147
1030
        config, google_tls_slot_.get(), scope, context_, stat_names_, creation_status);
148
1030
    break;
149
  case envoy::config::core::v3::GrpcService::TargetSpecifierCase::TARGET_SPECIFIER_NOT_SET:
150
    PANIC_DUE_TO_PROTO_UNSET;
151
2748
  }
152
2748
  if (!creation_status.ok()) {
153
4
    return creation_status;
154
4
  }
155
2744
  return factory;
156
2748
}
157

            
158
absl::StatusOr<RawAsyncClientSharedPtr> AsyncClientManagerImpl::getOrCreateRawAsyncClient(
159
    const envoy::config::core::v3::GrpcService& config, Stats::Scope& scope,
160
25
    bool skip_cluster_check) {
161
25
  const GrpcServiceConfigWithHashKey config_with_hash_key = GrpcServiceConfigWithHashKey(config);
162
25
  RawAsyncClientSharedPtr client = raw_async_client_cache_->getCache(config_with_hash_key);
163
25
  if (client != nullptr) {
164
5
    return client;
165
5
  }
166
20
  auto factory_or_error =
167
20
      factoryForGrpcService(config_with_hash_key.config(), scope, skip_cluster_check);
168
20
  RETURN_IF_NOT_OK_REF(factory_or_error.status());
169
20
  auto client_or_error = factory_or_error.value()->createUncachedRawAsyncClient();
170
20
  RETURN_IF_NOT_OK_REF(client_or_error.status());
171
20
  client = std::move(*client_or_error);
172
20
  raw_async_client_cache_->setCache(config_with_hash_key, client);
173
20
  return client;
174
20
}
175

            
176
absl::StatusOr<RawAsyncClientSharedPtr>
177
AsyncClientManagerImpl::getOrCreateRawAsyncClientWithHashKey(
178
    const GrpcServiceConfigWithHashKey& config_with_hash_key, Stats::Scope& scope,
179
1286
    bool skip_cluster_check) {
180
1286
  RawAsyncClientSharedPtr client = raw_async_client_cache_->getCache(config_with_hash_key);
181
1286
  if (client != nullptr) {
182
542
    return client;
183
542
  }
184
744
  auto factory_or_error =
185
744
      factoryForGrpcService(config_with_hash_key.config(), scope, skip_cluster_check);
186
744
  RETURN_IF_NOT_OK_REF(factory_or_error.status());
187
744
  auto client_or_error = factory_or_error.value()->createUncachedRawAsyncClient();
188
744
  RETURN_IF_NOT_OK_REF(client_or_error.status());
189
744
  client = std::move(*client_or_error);
190
744
  raw_async_client_cache_->setCache(config_with_hash_key, client);
191
744
  return client;
192
744
}
193

            
194
AsyncClientManagerImpl::RawAsyncClientCache::RawAsyncClientCache(
195
    Event::Dispatcher& dispatcher, std::chrono::milliseconds max_cached_entry_idle_duration)
196
21813
    : dispatcher_(dispatcher), max_cached_entry_idle_duration_(max_cached_entry_idle_duration) {
197
21813
  cache_eviction_timer_ = dispatcher.createTimer([this] { evictEntriesAndResetEvictionTimer(); });
198
21813
}
199

            
200
void AsyncClientManagerImpl::RawAsyncClientCache::setCache(
201
    const GrpcServiceConfigWithHashKey& config_with_hash_key,
202
869
    const RawAsyncClientSharedPtr& client) {
203
869
  ASSERT(lru_map_.find(config_with_hash_key) == lru_map_.end());
204
  // Create a new cache entry at the beginning of the list.
205
869
  lru_list_.emplace_front(config_with_hash_key, client, dispatcher_.timeSource().monotonicTime());
206
869
  lru_map_[config_with_hash_key] = lru_list_.begin();
207
  // If inserting to an empty cache, enable eviction timer.
208
869
  if (lru_list_.size() == 1) {
209
758
    evictEntriesAndResetEvictionTimer();
210
758
  }
211
869
}
212

            
213
RawAsyncClientSharedPtr AsyncClientManagerImpl::RawAsyncClientCache::getCache(
214
1421
    const GrpcServiceConfigWithHashKey& config_with_hash_key) {
215
1421
  auto it = lru_map_.find(config_with_hash_key);
216
1421
  if (it == lru_map_.end()) {
217
818
    return nullptr;
218
818
  }
219
603
  const auto cache_entry = it->second;
220
  // Reset the eviction timer if the next entry to expire is accessed.
221
603
  const bool should_reset_timer = (cache_entry == --lru_list_.end());
222
603
  cache_entry->accessed_time_ = dispatcher_.timeSource().monotonicTime();
223
  // Move the cache entry to the beginning of the list upon access.
224
603
  lru_list_.splice(lru_list_.begin(), lru_list_, cache_entry);
225
  // Get the cached async client before any cache eviction.
226
603
  RawAsyncClientSharedPtr client = cache_entry->client_;
227
603
  if (should_reset_timer) {
228
603
    evictEntriesAndResetEvictionTimer();
229
603
  }
230
603
  return client;
231
1421
}
232

            
233
1367
void AsyncClientManagerImpl::RawAsyncClientCache::evictEntriesAndResetEvictionTimer() {
234
1367
  MonotonicTime now = dispatcher_.timeSource().monotonicTime();
235
  // Evict all the entries that have expired.
236
1423
  while (!lru_list_.empty()) {
237
1418
    const MonotonicTime next_expire =
238
1418
        lru_list_.back().accessed_time_ + max_cached_entry_idle_duration_;
239
1418
    std::chrono::seconds time_to_next_expire_sec =
240
1418
        std::chrono::duration_cast<std::chrono::seconds>(next_expire - now);
241
    // since 'now' and 'next_expire' are in nanoseconds, the following condition is to
242
    // check if the difference between them is less than 1 second. If we don't do this, the
243
    // timer will be enabled with 0 seconds, which will cause the timer to fire immediately.
244
    // This will cause cpu spike.
245
1418
    if (time_to_next_expire_sec.count() <= 0) {
246
      // Erase the expired entry.
247
56
      lru_map_.erase(lru_list_.back().config_with_hash_key_);
248
56
      lru_list_.pop_back();
249
1362
    } else {
250
1362
      cache_eviction_timer_->enableTimer(time_to_next_expire_sec);
251
1362
      return;
252
1362
    }
253
1418
  }
254
1367
}
255

            
256
} // namespace Grpc
257
} // namespace Envoy