Line data Source code
1 : #pragma once 2 : 3 : #include "source/common/upstream/conn_pool_map.h" 4 : 5 : namespace Envoy { 6 : namespace Upstream { 7 : 8 : template <typename KEY_TYPE, typename POOL_TYPE> 9 : ConnPoolMap<KEY_TYPE, POOL_TYPE>::ConnPoolMap(Envoy::Event::Dispatcher& dispatcher, 10 : const HostConstSharedPtr& host, 11 : ResourcePriority priority) 12 346 : : thread_local_dispatcher_(dispatcher), host_(host), priority_(priority) {} 13 : 14 346 : template <typename KEY_TYPE, typename POOL_TYPE> ConnPoolMap<KEY_TYPE, POOL_TYPE>::~ConnPoolMap() { 15 : // Clean up the pools to ensure resource tracking is kept up to date. Note that we do not call 16 : // `clear()` here to avoid doing a deferred delete. This triggers some unwanted race conditions 17 : // on shutdown where deleted resources end up putting stuff on the deferred delete list after the 18 : // worker threads have shut down. 19 346 : clearActivePools(); 20 346 : } 21 : 22 : template <typename KEY_TYPE, typename POOL_TYPE> 23 : typename ConnPoolMap<KEY_TYPE, POOL_TYPE>::PoolOptRef 24 251 : ConnPoolMap<KEY_TYPE, POOL_TYPE>::getPool(const KEY_TYPE& key, const PoolFactory& factory) { 25 251 : Common::AutoDebugRecursionChecker assert_not_in(recursion_checker_); 26 : // TODO(klarose): Consider how we will change the connection pool's configuration in the future. 27 : // The plan is to change the downstream socket options... We may want to take those as a parameter 28 : // here. Maybe we'll pass them to the factory function? 29 251 : auto pool_iter = active_pools_.find(key); 30 251 : if (pool_iter != active_pools_.end()) { 31 78 : return std::ref(*(pool_iter->second)); 32 78 : } 33 173 : ResourceLimit& connPoolResource = host_->cluster().resourceManager(priority_).connectionPools(); 34 : // We need a new pool. Check if we have room. 35 173 : if (!connPoolResource.canCreate()) { 36 : // We're full. Try to free up a pool. If we can't, bail out. 37 0 : if (!freeOnePool()) { 38 0 : host_->cluster().trafficStats()->upstream_cx_pool_overflow_.inc(); 39 0 : return absl::nullopt; 40 0 : } 41 : 42 0 : ASSERT(size() < connPoolResource.max(), 43 0 : "Freeing a pool should reduce the size to below the max."); 44 : 45 : // TODO(klarose): Consider some simple hysteresis here. How can we prevent iterating over all 46 : // pools when we're at the limit every time we want to allocate a new one, even if most of the 47 : // pools are not busy, while balancing that with not unnecessarily freeing all pools? If we 48 : // start freeing once we cross a threshold, then stop after we cross another, we could 49 : // achieve that balance. 50 0 : } 51 : 52 : // We have room for a new pool. Allocate one and let it know about any cached callbacks. 53 173 : auto new_pool = factory(); 54 173 : connPoolResource.inc(); 55 173 : for (const auto& cb : cached_callbacks_) { 56 0 : new_pool->addIdleCallback(cb); 57 0 : } 58 : 59 173 : auto inserted = active_pools_.emplace(key, std::move(new_pool)); 60 173 : return std::ref(*inserted.first->second); 61 173 : } 62 : 63 : template <typename KEY_TYPE, typename POOL_TYPE> 64 173 : bool ConnPoolMap<KEY_TYPE, POOL_TYPE>::erasePool(const KEY_TYPE& key) { 65 173 : Common::AutoDebugRecursionChecker assert_not_in(recursion_checker_); 66 173 : auto pool_iter = active_pools_.find(key); 67 : 68 173 : if (pool_iter != active_pools_.end()) { 69 173 : thread_local_dispatcher_.deferredDelete(std::move(pool_iter->second)); 70 173 : active_pools_.erase(pool_iter); 71 173 : host_->cluster().resourceManager(priority_).connectionPools().dec(); 72 173 : return true; 73 173 : } else { 74 0 : return false; 75 0 : } 76 173 : } 77 : 78 : template <typename KEY_TYPE, typename POOL_TYPE> 79 0 : size_t ConnPoolMap<KEY_TYPE, POOL_TYPE>::size() const { 80 0 : return active_pools_.size(); 81 0 : } 82 : 83 : template <typename KEY_TYPE, typename POOL_TYPE> 84 346 : bool ConnPoolMap<KEY_TYPE, POOL_TYPE>::empty() const { 85 346 : return active_pools_.empty(); 86 346 : } 87 : 88 : template <typename KEY_TYPE, typename POOL_TYPE> void ConnPoolMap<KEY_TYPE, POOL_TYPE>::clear() { 89 : Common::AutoDebugRecursionChecker assert_not_in(recursion_checker_); 90 : for (auto& pool_pair : active_pools_) { 91 : thread_local_dispatcher_.deferredDelete(std::move(pool_pair.second)); 92 : } 93 : clearActivePools(); 94 : } 95 : 96 : template <typename KEY_TYPE, typename POOL_TYPE> 97 : void ConnPoolMap<KEY_TYPE, POOL_TYPE>::addIdleCallback(const IdleCb& cb) { 98 : Common::AutoDebugRecursionChecker assert_not_in(recursion_checker_); 99 : for (auto& pool_pair : active_pools_) { 100 : pool_pair.second->addIdleCallback(cb); 101 : } 102 : 103 : cached_callbacks_.emplace_back(std::move(cb)); 104 : } 105 : 106 : template <typename KEY_TYPE, typename POOL_TYPE> 107 : void ConnPoolMap<KEY_TYPE, POOL_TYPE>::drainConnections( 108 0 : Envoy::ConnectionPool::DrainBehavior drain_behavior) { 109 : // Copy the `active_pools_` so that it is safe for the call to result 110 : // in deletion, and avoid iteration through a mutating container. 111 0 : std::vector<POOL_TYPE*> pools; 112 0 : pools.reserve(active_pools_.size()); 113 0 : for (auto& pool_pair : active_pools_) { 114 0 : pools.push_back(pool_pair.second.get()); 115 0 : } 116 : 117 0 : for (auto* pool : pools) { 118 0 : pool->drainConnections(drain_behavior); 119 0 : } 120 0 : } 121 : 122 : template <typename KEY_TYPE, typename POOL_TYPE> 123 0 : bool ConnPoolMap<KEY_TYPE, POOL_TYPE>::freeOnePool() { 124 : // Try to find a pool that isn't doing anything. 125 0 : auto pool_iter = active_pools_.begin(); 126 0 : while (pool_iter != active_pools_.end()) { 127 0 : if (!pool_iter->second->hasActiveConnections()) { 128 0 : break; 129 0 : } 130 0 : ++pool_iter; 131 0 : } 132 : 133 0 : if (pool_iter != active_pools_.end()) { 134 : // We found one. Free it up, and let the caller know. 135 0 : active_pools_.erase(pool_iter); 136 0 : host_->cluster().resourceManager(priority_).connectionPools().dec(); 137 0 : return true; 138 0 : } 139 : 140 0 : return false; 141 0 : } 142 : 143 : template <typename KEY_TYPE, typename POOL_TYPE> 144 346 : void ConnPoolMap<KEY_TYPE, POOL_TYPE>::clearActivePools() { 145 346 : host_->cluster().resourceManager(priority_).connectionPools().decBy(active_pools_.size()); 146 346 : active_pools_.clear(); 147 346 : } 148 : } // namespace Upstream 149 : } // namespace Envoy