Line data Source code
1 : #include "source/server/drain_manager_impl.h" 2 : 3 : #include <chrono> 4 : #include <cstdint> 5 : #include <functional> 6 : #include <memory> 7 : 8 : #include "envoy/config/listener/v3/listener.pb.h" 9 : #include "envoy/event/dispatcher.h" 10 : #include "envoy/event/timer.h" 11 : 12 : #include "source/common/common/assert.h" 13 : 14 : namespace Envoy { 15 : namespace Server { 16 : 17 : DrainManagerImpl::DrainManagerImpl(Instance& server, 18 : envoy::config::listener::v3::Listener::DrainType drain_type, 19 : Event::Dispatcher& dispatcher) 20 : : server_(server), dispatcher_(dispatcher), drain_type_(drain_type), 21 272 : children_(Common::ThreadSafeCallbackManager::create()) {} 22 : 23 : DrainManagerPtr 24 : DrainManagerImpl::createChildManager(Event::Dispatcher& dispatcher, 25 0 : envoy::config::listener::v3::Listener::DrainType drain_type) { 26 0 : auto child = std::make_unique<DrainManagerImpl>(server_, drain_type, dispatcher); 27 : 28 : // Wire up the child so that when the parent starts draining, the child also sees the 29 : // state-change 30 0 : auto child_cb = children_->add(dispatcher, [child = child.get()] { 31 0 : if (!child->draining_) { 32 0 : child->startDrainSequence([] {}); 33 0 : } 34 0 : }); 35 0 : child->parent_callback_handle_ = std::move(child_cb); 36 0 : return child; 37 0 : } 38 : 39 0 : DrainManagerPtr DrainManagerImpl::createChildManager(Event::Dispatcher& dispatcher) { 40 0 : return createChildManager(dispatcher, drain_type_); 41 0 : } 42 : 43 936 : bool DrainManagerImpl::drainClose() const { 44 : // If we are actively health check failed and the drain type is default, always drain close. 45 : // 46 : // TODO(mattklein123): In relation to x-envoy-immediate-health-check-fail, it would be better 47 : // if even in the case of server health check failure we had some period of drain ramp up. This 48 : // would allow the other side to fail health check for the host which will require some thread 49 : // jumps versus immediately start GOAWAY/connection thrashing. 50 936 : if (drain_type_ == envoy::config::listener::v3::Listener::DEFAULT && 51 936 : server_.healthCheckFailed()) { 52 0 : return true; 53 0 : } 54 : 55 936 : if (!draining_) { 56 936 : return false; 57 936 : } 58 : 59 0 : if (server_.options().drainStrategy() == Server::DrainStrategy::Immediate) { 60 0 : return true; 61 0 : } 62 0 : ASSERT(server_.options().drainStrategy() == Server::DrainStrategy::Gradual); 63 : 64 : // P(return true) = elapsed time / drain timeout 65 : // If the drain deadline is exceeded, skip the probability calculation. 66 0 : const MonotonicTime current_time = dispatcher_.timeSource().monotonicTime(); 67 0 : if (current_time >= drain_deadline_) { 68 0 : return true; 69 0 : } 70 : 71 0 : const auto remaining_time = 72 0 : std::chrono::duration_cast<std::chrono::seconds>(drain_deadline_ - current_time); 73 0 : const auto drain_time = server_.options().drainTime(); 74 0 : ASSERT(server_.options().drainTime() >= remaining_time); 75 0 : const auto drain_time_count = drain_time.count(); 76 : 77 : // If the user hasn't specified a drain timeout it will be zero, so we'll 78 : // confirm the drainClose immediately. Otherwise we'll use the drain timeout 79 : // as a modulus to a random number to salt the drain timing. 80 0 : if (drain_time_count == 0) { 81 0 : return true; 82 0 : } 83 0 : const auto elapsed_time = drain_time - remaining_time; 84 0 : return static_cast<uint64_t>(elapsed_time.count()) > 85 0 : (server_.api().randomGenerator().random() % drain_time_count); 86 0 : } 87 : 88 0 : Common::CallbackHandlePtr DrainManagerImpl::addOnDrainCloseCb(DrainCloseCb cb) const { 89 0 : ASSERT_IS_MAIN_OR_TEST_THREAD(); 90 0 : ASSERT(dispatcher_.isThreadSafe()); 91 : 92 0 : if (draining_) { 93 0 : const MonotonicTime current_time = dispatcher_.timeSource().monotonicTime(); 94 : 95 : // Calculate the delay. If using an immediate drain-strategy or past our deadline, use 96 : // a zero millisecond delay. Otherwise, pick a random value within the remaining time-span. 97 0 : std::chrono::milliseconds drain_delay{0}; 98 0 : if (server_.options().drainStrategy() != Server::DrainStrategy::Immediate) { 99 0 : if (current_time < drain_deadline_) { 100 0 : const auto delta = drain_deadline_ - current_time; 101 0 : const auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta).count(); 102 : 103 : // Note; current_time may be less than drain_deadline_ by only a 104 : // microsecond (delta will be 1000 nanoseconds), in which case when we 105 : // convert to milliseconds that will be 0, which will throw a SIGFPE 106 : // if used as a modulus unguarded. 107 0 : if (ms > 0) { 108 0 : drain_delay = std::chrono::milliseconds(server_.api().randomGenerator().random() % ms); 109 0 : } 110 0 : } 111 0 : } 112 0 : cb(drain_delay); 113 0 : return nullptr; 114 0 : } 115 : 116 0 : return cbs_.add(cb); 117 0 : } 118 : 119 10 : void DrainManagerImpl::addDrainCompleteCallback(std::function<void()> cb) { 120 10 : ASSERT_IS_MAIN_OR_TEST_THREAD(); 121 10 : ASSERT(draining_); 122 : 123 : // If the drain-tick-timer is active, add the callback to the queue. If not defined 124 : // then it must have already expired, invoke the callback immediately. 125 10 : if (drain_tick_timer_) { 126 10 : drain_complete_cbs_.push_back(cb); 127 10 : } else { 128 0 : cb(); 129 0 : } 130 10 : } 131 : 132 10 : void DrainManagerImpl::startDrainSequence(std::function<void()> drain_complete_cb) { 133 10 : ASSERT_IS_MAIN_OR_TEST_THREAD(); 134 10 : ASSERT(drain_complete_cb); 135 : 136 : // If we've already started draining (either through direct invocation or through 137 : // parent-initiated draining), enqueue the drain_complete_cb and return 138 10 : if (draining_) { 139 0 : addDrainCompleteCallback(drain_complete_cb); 140 0 : return; 141 0 : } 142 : 143 10 : ASSERT(!drain_tick_timer_); 144 10 : const std::chrono::seconds drain_delay(server_.options().drainTime()); 145 : 146 : // Note https://github.com/envoyproxy/envoy/issues/31457, previous to which, 147 : // drain_deadline_ was set *after* draining_ resulting in a read/write race between 148 : // the main thread running this function from admin, and the worker thread calling 149 : // drainClose. Note that drain_deadline_ is default-constructed which guarantees 150 : // to set the time-since epoch to a count of 0 151 : // (https://en.cppreference.com/w/cpp/chrono/time_point/time_point). 152 10 : ASSERT(drain_deadline_.time_since_epoch().count() == 0, "drain_deadline_ cannot be set twice."); 153 : 154 : // Since draining_ is atomic, it is safe to set drain_deadline_ without a mutex 155 : // as drain_close() only reads from drain_deadline_ if draining_ is true, and 156 : // C++ will not re-order an assign to an atomic. See 157 : // https://stackoverflow.com/questions/40320254/reordering-atomic-operations-in-c . 158 10 : drain_deadline_ = dispatcher_.timeSource().monotonicTime() + drain_delay; 159 : 160 : // Atomic assign must come after the assign to drain_deadline_. 161 10 : draining_.store(true, std::memory_order_seq_cst); 162 : 163 : // Signal to child drain-managers to start their drain sequence 164 10 : children_->runCallbacks(); 165 : 166 : // Schedule callback to run at end of drain time 167 10 : drain_tick_timer_ = dispatcher_.createTimer([this]() { 168 0 : for (auto& cb : drain_complete_cbs_) { 169 0 : cb(); 170 0 : } 171 0 : drain_complete_cbs_.clear(); 172 0 : drain_tick_timer_.reset(); 173 0 : }); 174 10 : addDrainCompleteCallback(drain_complete_cb); 175 10 : drain_tick_timer_->enableTimer(drain_delay); 176 : 177 : // Call registered on-drain callbacks - with gradual delays 178 : // Note: This will distribute drain events in the first 1/4th of the drain window 179 : // to ensure that we initiate draining with enough time for graceful shutdowns. 180 10 : const MonotonicTime current_time = dispatcher_.timeSource().monotonicTime(); 181 10 : std::chrono::seconds remaining_time{0}; 182 10 : if (server_.options().drainStrategy() != Server::DrainStrategy::Immediate && 183 10 : current_time < drain_deadline_) { 184 10 : remaining_time = 185 10 : std::chrono::duration_cast<std::chrono::seconds>(drain_deadline_ - current_time); 186 10 : ASSERT(server_.options().drainTime() >= remaining_time); 187 10 : } 188 : 189 10 : uint32_t step_count = 0; 190 10 : size_t num_cbs = cbs_.size(); 191 10 : cbs_.runCallbacksWith([&]() { 192 : // switch to floating-point math to avoid issues with integer division 193 0 : std::chrono::milliseconds delay{static_cast<int64_t>( 194 0 : static_cast<double>(step_count) / 4 / num_cbs * 195 0 : std::chrono::duration_cast<std::chrono::milliseconds>(remaining_time).count())}; 196 0 : step_count++; 197 0 : return delay; 198 0 : }); 199 10 : } 200 : 201 94 : void DrainManagerImpl::startParentShutdownSequence() { 202 : // Do not initiate parent shutdown sequence when hot restart is disabled. 203 94 : if (server_.options().hotRestartDisabled()) { 204 94 : return; 205 94 : } 206 0 : ASSERT(!parent_shutdown_timer_); 207 0 : parent_shutdown_timer_ = server_.dispatcher().createTimer([this]() -> void { 208 : // Shut down the parent now. It should have already been draining. 209 0 : ENVOY_LOG(info, "shutting down parent after drain"); 210 0 : server_.hotRestart().sendParentTerminateRequest(); 211 0 : }); 212 : 213 0 : parent_shutdown_timer_->enableTimer(std::chrono::duration_cast<std::chrono::milliseconds>( 214 0 : server_.options().parentShutdownTime())); 215 0 : } 216 : 217 : } // namespace Server 218 : } // namespace Envoy