1
#include "source/server/drain_manager_impl.h"
2

            
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <memory>
7

            
8
#include "envoy/config/listener/v3/listener.pb.h"
9
#include "envoy/event/dispatcher.h"
10
#include "envoy/event/timer.h"
11

            
12
#include "source/common/common/assert.h"
13

            
14
namespace Envoy {
15
namespace Server {
16

            
17
DrainManagerImpl::DrainManagerImpl(Instance& server,
18
                                   envoy::config::listener::v3::Listener::DrainType drain_type,
19
                                   Event::Dispatcher& dispatcher)
20
22088
    : server_(server), dispatcher_(dispatcher), drain_type_(drain_type),
21
22088
      children_(Common::ThreadSafeCallbackManager::create()) {}
22

            
23
DrainManagerPtr
24
DrainManagerImpl::createChildManager(Event::Dispatcher& dispatcher,
25
22
                                     envoy::config::listener::v3::Listener::DrainType drain_type) {
26
22
  auto child = std::make_unique<DrainManagerImpl>(server_, drain_type, dispatcher);
27

            
28
  // Wire up the child so that when the parent starts draining, the child also sees the
29
  // state-change
30
22
  auto child_cb = children_->add(dispatcher, [this, child = child.get()] {
31
    // Not a double load since we first check the child drain pair and then this.draining_
32
15
    if (!child->draining_.load().first) {
33
14
      child->startDrainSequence(this->draining_.load().second, [] {});
34
14
    }
35
15
  });
36
22
  child->parent_callback_handle_ = std::move(child_cb);
37
22
  return child;
38
22
}
39

            
40
22
DrainManagerPtr DrainManagerImpl::createChildManager(Event::Dispatcher& dispatcher) {
41
22
  return createChildManager(dispatcher, drain_type_);
42
22
}
43

            
44
93987
bool DrainManagerImpl::drainClose(Network::DrainDirection direction) const {
45
  // If we are actively health check failed and the drain type is default, always drain close.
46
  //
47
  // TODO(mattklein123): In relation to x-envoy-immediate-health-check-fail, it would be better
48
  // if even in the case of server health check failure we had some period of drain ramp up. This
49
  // would allow the other side to fail health check for the host which will require some thread
50
  // jumps versus immediately start GOAWAY/connection thrashing.
51
93987
  if (drain_type_ == envoy::config::listener::v3::Listener::DEFAULT &&
52
93987
      server_.healthCheckFailed()) {
53
12
    return true;
54
12
  }
55

            
56
93975
  auto current_drain = draining_.load();
57

            
58
93975
  if (!current_drain.first) {
59
93533
    return false;
60
93533
  }
61

            
62
  // If the direction passed in is greater than the current draining direction
63
  // (e.g. direction = ALL, but draining_.second == INBOUND_ONLY), then don't
64
  // drain. We also don't want to drain if the direction is None (which doesn't really
65
  // make sense, but it's the correct behavior).
66
442
  if (direction == Network::DrainDirection::None || direction > current_drain.second) {
67
15
    return false;
68
15
  }
69

            
70
427
  if (server_.options().drainStrategy() == Server::DrainStrategy::Immediate) {
71
41
    return true;
72
41
  }
73
386
  ASSERT(server_.options().drainStrategy() == Server::DrainStrategy::Gradual);
74

            
75
  // P(return true) = elapsed time / drain timeout
76
  // If the drain deadline is exceeded, skip the probability calculation.
77
386
  const MonotonicTime current_time = dispatcher_.timeSource().monotonicTime();
78
386
  auto deadline = drain_deadlines_.find(direction);
79
386
  ASSERT(deadline != drain_deadlines_.end());
80
386
  if (current_time >= deadline->second) {
81
5
    return true;
82
5
  }
83

            
84
381
  const auto remaining_time =
85
381
      std::chrono::duration_cast<std::chrono::seconds>(deadline->second - current_time);
86
381
  const auto drain_time = server_.options().drainTime();
87
381
  ASSERT(server_.options().drainTime() >= remaining_time);
88
381
  const auto drain_time_count = drain_time.count();
89

            
90
  // If the user hasn't specified a drain timeout it will be zero, so we'll
91
  // confirm the drainClose immediately. Otherwise we'll use the drain timeout
92
  // as a modulus to a random number to salt the drain timing.
93
381
  if (drain_time_count == 0) {
94
    return true;
95
  }
96
381
  const auto elapsed_time = drain_time - remaining_time;
97
381
  return static_cast<uint64_t>(elapsed_time.count()) >
98
381
         (server_.api().randomGenerator().random() % drain_time_count);
99
381
}
100

            
101
Common::CallbackHandlePtr DrainManagerImpl::addOnDrainCloseCb(Network::DrainDirection direction,
102
3116
                                                              DrainCloseCb cb) const {
103
3116
  ASSERT(dispatcher_.isThreadSafe());
104
3116
  auto current_drain = draining_.load();
105
3116
  if (current_drain.first && direction <= current_drain.second) {
106
5
    const MonotonicTime current_time = dispatcher_.timeSource().monotonicTime();
107

            
108
    // Calculate the delay. If using an immediate drain-strategy or past our deadline, use
109
    // a zero millisecond delay. Otherwise, pick a random value within the remaining time-span.
110
5
    std::chrono::milliseconds drain_delay{0};
111
5
    if (server_.options().drainStrategy() != Server::DrainStrategy::Immediate) {
112
5
      if (current_time < drain_deadlines_.find(direction)->second) {
113
4
        const auto delta = drain_deadlines_.find(direction)->second - current_time;
114
4
        const auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta).count();
115

            
116
        // Note; current_time may be less than drain_deadline_ by only a
117
        // microsecond (delta will be 1000 nanoseconds), in which case when we
118
        // convert to milliseconds that will be 0, which will throw a SIGFPE
119
        // if used as a modulus unguarded.
120
4
        if (ms > 0) {
121
3
          drain_delay = std::chrono::milliseconds(server_.api().randomGenerator().random() % ms);
122
3
        }
123
4
      }
124
5
    }
125
5
    THROW_IF_NOT_OK(cb(drain_delay));
126
5
    return nullptr;
127
5
  }
128

            
129
3111
  return cbs_.add(cb);
130
3116
}
131

            
132
void DrainManagerImpl::addDrainCompleteCallback(Network::DrainDirection direction,
133
175
                                                std::function<void()> cb) {
134
175
  ASSERT(dispatcher_.isThreadSafe());
135
175
  auto drain_pair = draining_.load();
136
175
  ASSERT(drain_pair.first && direction <= drain_pair.second);
137

            
138
  // If the drain-tick-timer is active, add the callback to the queue. If not defined
139
  // then it must have already expired, invoke the callback immediately.
140
175
  if (drain_tick_timers_[direction]) {
141
175
    drain_complete_cbs_.push_back(cb);
142
175
  } else {
143
    cb();
144
  }
145
175
}
146

            
147
void DrainManagerImpl::startDrainSequence(Network::DrainDirection direction,
148
175
                                          std::function<void()> drain_complete_cb) {
149
175
  ASSERT(dispatcher_.isThreadSafe());
150
175
  ASSERT(drain_complete_cb);
151
175
  ASSERT(direction != Network::DrainDirection::None, "a valid direction must be specified.");
152
175
  auto current_drain = draining_.load();
153

            
154
  // If we've already started draining (either through direct invocation or through
155
  // parent-initiated draining), enqueue the drain_complete_cb and return
156
175
  if (current_drain.first && direction <= current_drain.second) {
157
1
    addDrainCompleteCallback(direction, drain_complete_cb);
158
1
    return;
159
1
  }
160
174
  ASSERT(drain_tick_timers_.count(direction) == 0,
161
174
         "cannot run two drain sequences for the same direction.");
162

            
163
174
  const std::chrono::seconds drain_delay(server_.options().drainTime());
164
  // Note https://github.com/envoyproxy/envoy/issues/31457, previous to which,
165
  // drain_deadline_ was set *after* draining_ resulting in a read/write race between
166
  // the main thread running this function from admin, and the worker thread calling
167
  // drainClose. Note that drain_deadline_ is default-constructed which guarantees
168
  // to set the time-since epoch to a count of 0
169
  // (https://en.cppreference.com/w/cpp/chrono/time_point/time_point).
170
174
  ASSERT(drain_deadlines_[direction].time_since_epoch().count() == 0,
171
174
         "drain_deadline_ cannot be set twice for the same direction");
172
  // Since draining_ is atomic, it is safe to set drain_deadline_ without a mutex
173
  // as drain_close() only reads from drain_deadline_ if draining_ is true, and
174
  // C++ will not re-order an assign to an atomic. See
175
  // https://stackoverflow.com/questions/40320254/reordering-atomic-operations-in-c .
176

            
177
174
  drain_deadlines_[direction] = dispatcher_.timeSource().monotonicTime() + drain_delay;
178
  // Atomic assign must come after the assign to drain_deadline_.
179
174
  draining_.store(DrainPair{true, direction}, std::memory_order_seq_cst);
180

            
181
  // Signal to child drain-managers to start their drain sequence
182
174
  children_->runCallbacks();
183
  // Schedule callback to run at end of drain time
184
174
  drain_tick_timers_[direction] = dispatcher_.createTimer([this, direction]() {
185
96
    for (auto& cb : drain_complete_cbs_) {
186
96
      cb();
187
96
    }
188
96
    drain_complete_cbs_.clear();
189
96
    drain_tick_timers_[direction].reset();
190
96
  });
191
174
  addDrainCompleteCallback(direction, drain_complete_cb);
192
174
  drain_tick_timers_[direction]->enableTimer(drain_delay);
193

            
194
  // Call registered on-drain callbacks - with gradual delays
195
  // Note: This will distribute drain events in the first 1/4th of the drain window
196
  //       to ensure that we initiate draining with enough time for graceful shutdowns.
197
174
  const MonotonicTime current_time = dispatcher_.timeSource().monotonicTime();
198
174
  std::chrono::seconds remaining_time{0};
199
174
  if (server_.options().drainStrategy() != Server::DrainStrategy::Immediate &&
200
174
      current_time < drain_deadlines_[direction]) {
201
146
    remaining_time = std::chrono::duration_cast<std::chrono::seconds>(drain_deadlines_[direction] -
202
146
                                                                      current_time);
203
146
    ASSERT(server_.options().drainTime() >= remaining_time);
204
146
  }
205

            
206
174
  uint32_t step_count = 0;
207
174
  size_t num_cbs = cbs_.size();
208
174
  THROW_IF_NOT_OK(cbs_.runCallbacksWith([&]() {
209
    // switch to floating-point math to avoid issues with integer division
210
174
    std::chrono::milliseconds delay{static_cast<int64_t>(
211
174
        static_cast<double>(step_count) / 4 / num_cbs *
212
174
        std::chrono::duration_cast<std::chrono::milliseconds>(remaining_time).count())};
213
174
    step_count++;
214
174
    return delay;
215
174
  }));
216
174
}
217

            
218
10475
void DrainManagerImpl::startParentShutdownSequence() {
219
  // Do not initiate parent shutdown sequence when hot restart is disabled.
220
10475
  if (server_.options().hotRestartDisabled()) {
221
10463
    return;
222
10463
  }
223
12
  ASSERT(!parent_shutdown_timer_);
224
12
  parent_shutdown_timer_ = server_.dispatcher().createTimer([this]() -> void {
225
    // Shut down the parent now. It should have already been draining.
226
11
    ENVOY_LOG(info, "shutting down parent after drain");
227
11
    server_.hotRestart().sendParentTerminateRequest();
228
11
  });
229

            
230
12
  parent_shutdown_timer_->enableTimer(std::chrono::duration_cast<std::chrono::milliseconds>(
231
12
      server_.options().parentShutdownTime()));
232
12
}
233

            
234
} // namespace Server
235
} // namespace Envoy