1
#pragma once
2

            
3
#include "source/common/upstream/conn_pool_map.h"
4

            
5
namespace Envoy {
6
namespace Upstream {
7

            
8
template <typename KEY_TYPE, typename POOL_TYPE>
9
ConnPoolMap<KEY_TYPE, POOL_TYPE>::ConnPoolMap(Envoy::Event::Dispatcher& dispatcher,
10
                                              const HostConstSharedPtr& host,
11
                                              ResourcePriority priority)
12
23514
    : thread_local_dispatcher_(dispatcher), host_(host), priority_(priority) {}
13

            
14
23514
template <typename KEY_TYPE, typename POOL_TYPE> ConnPoolMap<KEY_TYPE, POOL_TYPE>::~ConnPoolMap() {
15
  // Clean up the pools to ensure resource tracking is kept up to date. Note that we do not call
16
  // `clear()` here to avoid doing a deferred delete. This triggers some unwanted race conditions
17
  // on shutdown where deleted resources end up putting stuff on the deferred delete list after the
18
  // worker threads have shut down.
19
23514
  clearActivePools();
20
23514
}
21

            
22
template <typename KEY_TYPE, typename POOL_TYPE>
23
typename ConnPoolMap<KEY_TYPE, POOL_TYPE>::PoolOptRef
24
47375
ConnPoolMap<KEY_TYPE, POOL_TYPE>::getPool(const KEY_TYPE& key, const PoolFactory& factory) {
25
47375
  Common::AutoDebugRecursionChecker assert_not_in(recursion_checker_);
26
  // TODO(klarose): Consider how we will change the connection pool's configuration in the future.
27
  // The plan is to change the downstream socket options... We may want to take those as a parameter
28
  // here. Maybe we'll pass them to the factory function?
29
47375
  auto pool_iter = active_pools_.find(key);
30
47375
  if (pool_iter != active_pools_.end()) {
31
35536
    return std::ref(*(pool_iter->second));
32
35536
  }
33
11839
  ResourceLimit& connPoolResource = host_->cluster().resourceManager(priority_).connectionPools();
34
  // We need a new pool. Check if we have room.
35
11839
  if (!connPoolResource.canCreate()) {
36
    // We're full. Try to free up a pool. If we can't, bail out.
37
18
    if (!freeOnePool()) {
38
10
      host_->cluster().trafficStats()->upstream_cx_pool_overflow_.inc();
39
10
      return absl::nullopt;
40
10
    }
41

            
42
8
    ASSERT(size() < connPoolResource.max(),
43
8
           "Freeing a pool should reduce the size to below the max.");
44

            
45
    // TODO(klarose): Consider some simple hysteresis here. How can we prevent iterating over all
46
    // pools when we're at the limit every time we want to allocate a new one, even if most of the
47
    // pools are not busy, while balancing that with not unnecessarily freeing all pools? If we
48
    // start freeing once we cross a threshold, then stop after we cross another, we could
49
    // achieve that balance.
50
8
  }
51

            
52
  // We have room for a new pool. Allocate one and let it know about any cached callbacks.
53
11829
  auto new_pool = factory();
54
11829
  connPoolResource.inc();
55
11829
  for (const auto& cb : cached_callbacks_) {
56
2
    new_pool->addIdleCallback(cb);
57
2
  }
58

            
59
11829
  auto inserted = active_pools_.emplace(key, std::move(new_pool));
60
11829
  return std::ref(*inserted.first->second);
61
11839
}
62

            
63
template <typename KEY_TYPE, typename POOL_TYPE>
64
11488
bool ConnPoolMap<KEY_TYPE, POOL_TYPE>::erasePool(const KEY_TYPE& key) {
65
11488
  Common::AutoDebugRecursionChecker assert_not_in(recursion_checker_);
66
11488
  auto pool_iter = active_pools_.find(key);
67

            
68
11488
  if (pool_iter != active_pools_.end()) {
69
11486
    thread_local_dispatcher_.deferredDelete(std::move(pool_iter->second));
70
11486
    active_pools_.erase(pool_iter);
71
11486
    host_->cluster().resourceManager(priority_).connectionPools().dec();
72
11486
    return true;
73
11486
  } else {
74
2
    return false;
75
2
  }
76
11488
}
77

            
78
template <typename KEY_TYPE, typename POOL_TYPE>
79
24
size_t ConnPoolMap<KEY_TYPE, POOL_TYPE>::size() const {
80
24
  return active_pools_.size();
81
24
}
82

            
83
template <typename KEY_TYPE, typename POOL_TYPE>
84
22990
bool ConnPoolMap<KEY_TYPE, POOL_TYPE>::empty() const {
85
22990
  return active_pools_.empty();
86
22990
}
87

            
88
6
template <typename KEY_TYPE, typename POOL_TYPE> void ConnPoolMap<KEY_TYPE, POOL_TYPE>::clear() {
89
6
  Common::AutoDebugRecursionChecker assert_not_in(recursion_checker_);
90
10
  for (auto& pool_pair : active_pools_) {
91
10
    thread_local_dispatcher_.deferredDelete(std::move(pool_pair.second));
92
10
  }
93
6
  clearActivePools();
94
6
}
95

            
96
template <typename KEY_TYPE, typename POOL_TYPE>
97
4
void ConnPoolMap<KEY_TYPE, POOL_TYPE>::addIdleCallback(const IdleCb& cb) {
98
4
  Common::AutoDebugRecursionChecker assert_not_in(recursion_checker_);
99
4
  for (auto& pool_pair : active_pools_) {
100
4
    pool_pair.second->addIdleCallback(cb);
101
4
  }
102

            
103
4
  cached_callbacks_.emplace_back(std::move(cb));
104
4
}
105

            
106
template <typename KEY_TYPE, typename POOL_TYPE>
107
void ConnPoolMap<KEY_TYPE, POOL_TYPE>::drainConnections(
108
380
    Envoy::ConnectionPool::DrainBehavior drain_behavior) {
109
  // Copy the `active_pools_` so that it is safe for the call to result
110
  // in deletion, and avoid iteration through a mutating container.
111
380
  std::vector<POOL_TYPE*> pools;
112
380
  pools.reserve(active_pools_.size());
113
380
  for (auto& pool_pair : active_pools_) {
114
199
    pools.push_back(pool_pair.second.get());
115
199
  }
116

            
117
380
  for (auto* pool : pools) {
118
199
    pool->drainConnections(drain_behavior);
119
199
  }
120
380
}
121

            
122
template <typename KEY_TYPE, typename POOL_TYPE>
123
18
bool ConnPoolMap<KEY_TYPE, POOL_TYPE>::freeOnePool() {
124
  // Try to find a pool that isn't doing anything.
125
18
  auto pool_iter = active_pools_.begin();
126
29
  while (pool_iter != active_pools_.end()) {
127
19
    if (!pool_iter->second->hasActiveConnections()) {
128
8
      break;
129
8
    }
130
11
    ++pool_iter;
131
11
  }
132

            
133
18
  if (pool_iter != active_pools_.end()) {
134
    // We found one. Free it up, and let the caller know.
135
8
    active_pools_.erase(pool_iter);
136
8
    host_->cluster().resourceManager(priority_).connectionPools().dec();
137
8
    return true;
138
8
  }
139

            
140
10
  return false;
141
18
}
142

            
143
template <typename KEY_TYPE, typename POOL_TYPE>
144
23520
void ConnPoolMap<KEY_TYPE, POOL_TYPE>::clearActivePools() {
145
23520
  host_->cluster().resourceManager(priority_).connectionPools().decBy(active_pools_.size());
146
23520
  active_pools_.clear();
147
23520
}
148
} // namespace Upstream
149
} // namespace Envoy