1
#include "source/extensions/transport_sockets/alts/alts_channel_pool.h"
2

            
3
#include <algorithm>
4
#include <cstring>
5
#include <memory>
6
#include <string>
7
#include <utility>
8
#include <vector>
9

            
10
#include "absl/memory/memory.h"
11
#include "absl/strings/string_view.h"
12
#include "absl/synchronization/mutex.h"
13
#include "grpcpp/channel.h"
14
#include "grpcpp/create_channel.h"
15
#include "grpcpp/security/credentials.h"
16

            
17
namespace Envoy {
18
namespace Extensions {
19
namespace TransportSockets {
20
namespace Alts {
21

            
22
// TODO(matthewstevenson88): Extend this to be configurable through API.
23
constexpr std::size_t ChannelPoolSize = 10;
24
constexpr char UseGrpcExperimentalAltsHandshakerKeepaliveParams[] =
25
    "GRPC_EXPERIMENTAL_ALTS_HANDSHAKER_KEEPALIVE_PARAMS";
26

            
27
// 10 seconds
28
constexpr int ExperimentalKeepAliveTimeoutMs = 10 * 1000;
29
// 10 minutes
30
constexpr int ExperimentalKeepAliveTimeMs = 10 * 60 * 1000;
31

            
32
std::unique_ptr<AltsChannelPool>
33
20
AltsChannelPool::create(absl::string_view handshaker_service_address) {
34
20
  std::vector<std::shared_ptr<grpc::Channel>> channel_pool;
35
20
  channel_pool.reserve(ChannelPoolSize);
36
20
  grpc::ChannelArguments channel_args;
37
20
  channel_args.SetInt(GRPC_ARG_USE_LOCAL_SUBCHANNEL_POOL, 1);
38
20
  const char* keep_alive = std::getenv(UseGrpcExperimentalAltsHandshakerKeepaliveParams);
39
20
  if (keep_alive != nullptr && std::strcmp(keep_alive, "true") == 0) {
40
1
    channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIMEOUT_MS, ExperimentalKeepAliveTimeoutMs);
41
1
    channel_args.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, ExperimentalKeepAliveTimeMs);
42
1
  }
43
220
  for (std::size_t i = 0; i < ChannelPoolSize; ++i) {
44
200
    channel_pool.push_back(grpc::CreateCustomChannel(
45
200
        std::string(handshaker_service_address), grpc::InsecureChannelCredentials(), channel_args));
46
200
  }
47
20
  return absl::WrapUnique(new AltsChannelPool(std::move(channel_pool)));
48
20
}
49

            
50
AltsChannelPool::AltsChannelPool(const std::vector<std::shared_ptr<grpc::Channel>>& channel_pool)
51
20
    : channel_pool_(channel_pool) {}
52

            
53
// TODO(matthewstevenson88): Add logic to limit number of outstanding channels.
54
37
std::shared_ptr<grpc::Channel> AltsChannelPool::getChannel() {
55
37
  std::shared_ptr<grpc::Channel> channel;
56
37
  {
57
37
    absl::MutexLock lock(mu_);
58
37
    channel = channel_pool_[index_];
59
37
    index_ = (index_ + 1) % channel_pool_.size();
60
37
  }
61
37
  return channel;
62
37
}
63

            
64
2
std::size_t AltsChannelPool::getChannelPoolSize() const { return channel_pool_.size(); }
65

            
66
} // namespace Alts
67
} // namespace TransportSockets
68
} // namespace Extensions
69
} // namespace Envoy