Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/server/drain_manager_impl.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/server/drain_manager_impl.h"
2
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <memory>
7
8
#include "envoy/config/listener/v3/listener.pb.h"
9
#include "envoy/event/dispatcher.h"
10
#include "envoy/event/timer.h"
11
12
#include "source/common/common/assert.h"
13
14
namespace Envoy {
15
namespace Server {
16
17
DrainManagerImpl::DrainManagerImpl(Instance& server,
18
                                   envoy::config::listener::v3::Listener::DrainType drain_type,
19
                                   Event::Dispatcher& dispatcher)
20
    : server_(server), dispatcher_(dispatcher), drain_type_(drain_type),
21
9.13k
      children_(Common::ThreadSafeCallbackManager::create()) {}
22
23
DrainManagerPtr
24
DrainManagerImpl::createChildManager(Event::Dispatcher& dispatcher,
25
0
                                     envoy::config::listener::v3::Listener::DrainType drain_type) {
26
0
  auto child = std::make_unique<DrainManagerImpl>(server_, drain_type, dispatcher);
27
28
  // Wire up the child so that when the parent starts draining, the child also sees the
29
  // state-change
30
0
  auto child_cb = children_->add(dispatcher, [child = child.get()] {
31
0
    if (!child->draining_) {
32
0
      child->startDrainSequence([] {});
33
0
    }
34
0
  });
35
0
  child->parent_callback_handle_ = std::move(child_cb);
36
0
  return child;
37
0
}
38
39
0
DrainManagerPtr DrainManagerImpl::createChildManager(Event::Dispatcher& dispatcher) {
40
0
  return createChildManager(dispatcher, drain_type_);
41
0
}
42
43
2.69k
bool DrainManagerImpl::drainClose() const {
44
  // If we are actively health check failed and the drain type is default, always drain close.
45
  //
46
  // TODO(mattklein123): In relation to x-envoy-immediate-health-check-fail, it would be better
47
  // if even in the case of server health check failure we had some period of drain ramp up. This
48
  // would allow the other side to fail health check for the host which will require some thread
49
  // jumps versus immediately start GOAWAY/connection thrashing.
50
2.69k
  if (drain_type_ == envoy::config::listener::v3::Listener::DEFAULT &&
51
2.69k
      server_.healthCheckFailed()) {
52
0
    return true;
53
0
  }
54
55
2.69k
  if (!draining_) {
56
2.69k
    return false;
57
2.69k
  }
58
59
0
  if (server_.options().drainStrategy() == Server::DrainStrategy::Immediate) {
60
0
    return true;
61
0
  }
62
0
  ASSERT(server_.options().drainStrategy() == Server::DrainStrategy::Gradual);
63
64
  // P(return true) = elapsed time / drain timeout
65
  // If the drain deadline is exceeded, skip the probability calculation.
66
0
  const MonotonicTime current_time = dispatcher_.timeSource().monotonicTime();
67
0
  if (current_time >= drain_deadline_) {
68
0
    return true;
69
0
  }
70
71
0
  const auto remaining_time =
72
0
      std::chrono::duration_cast<std::chrono::seconds>(drain_deadline_ - current_time);
73
0
  const auto drain_time = server_.options().drainTime();
74
0
  ASSERT(server_.options().drainTime() >= remaining_time);
75
0
  const auto drain_time_count = drain_time.count();
76
77
  // If the user hasn't specified a drain timeout it will be zero, so we'll
78
  // confirm the drainClose immediately. Otherwise we'll use the drain timeout
79
  // as a modulus to a random number to salt the drain timing.
80
0
  if (drain_time_count == 0) {
81
0
    return true;
82
0
  }
83
0
  const auto elapsed_time = drain_time - remaining_time;
84
0
  return static_cast<uint64_t>(elapsed_time.count()) >
85
0
         (server_.api().randomGenerator().random() % drain_time_count);
86
0
}
87
88
0
Common::CallbackHandlePtr DrainManagerImpl::addOnDrainCloseCb(DrainCloseCb cb) const {
89
0
  ASSERT(dispatcher_.isThreadSafe());
90
91
0
  if (draining_) {
92
0
    const MonotonicTime current_time = dispatcher_.timeSource().monotonicTime();
93
94
    // Calculate the delay. If using an immediate drain-strategy or past our deadline, use
95
    // a zero millisecond delay. Otherwise, pick a random value within the remaining time-span.
96
0
    std::chrono::milliseconds drain_delay{0};
97
0
    if (server_.options().drainStrategy() != Server::DrainStrategy::Immediate) {
98
0
      if (current_time < drain_deadline_) {
99
0
        const auto delta = drain_deadline_ - current_time;
100
0
        const auto ms = std::chrono::duration_cast<std::chrono::milliseconds>(delta).count();
101
102
        // Note; current_time may be less than drain_deadline_ by only a
103
        // microsecond (delta will be 1000 nanoseconds), in which case when we
104
        // convert to milliseconds that will be 0, which will throw a SIGFPE
105
        // if used as a modulus unguarded.
106
0
        if (ms > 0) {
107
0
          drain_delay = std::chrono::milliseconds(server_.api().randomGenerator().random() % ms);
108
0
        }
109
0
      }
110
0
    }
111
0
    THROW_IF_NOT_OK(cb(drain_delay));
112
0
    return nullptr;
113
0
  }
114
115
0
  return cbs_.add(cb);
116
0
}
117
118
4
void DrainManagerImpl::addDrainCompleteCallback(std::function<void()> cb) {
119
4
  ASSERT(dispatcher_.isThreadSafe());
120
4
  ASSERT(draining_);
121
122
  // If the drain-tick-timer is active, add the callback to the queue. If not defined
123
  // then it must have already expired, invoke the callback immediately.
124
4
  if (drain_tick_timer_) {
125
4
    drain_complete_cbs_.push_back(cb);
126
4
  } else {
127
0
    cb();
128
0
  }
129
4
}
130
131
4
void DrainManagerImpl::startDrainSequence(std::function<void()> drain_complete_cb) {
132
4
  ASSERT(dispatcher_.isThreadSafe());
133
4
  ASSERT(drain_complete_cb);
134
135
  // If we've already started draining (either through direct invocation or through
136
  // parent-initiated draining), enqueue the drain_complete_cb and return
137
4
  if (draining_) {
138
0
    addDrainCompleteCallback(drain_complete_cb);
139
0
    return;
140
0
  }
141
142
4
  ASSERT(!drain_tick_timer_);
143
4
  const std::chrono::seconds drain_delay(server_.options().drainTime());
144
145
  // Note https://github.com/envoyproxy/envoy/issues/31457, previous to which,
146
  // drain_deadline_ was set *after* draining_ resulting in a read/write race between
147
  // the main thread running this function from admin, and the worker thread calling
148
  // drainClose. Note that drain_deadline_ is default-constructed which guarantees
149
  // to set the time-since epoch to a count of 0
150
  // (https://en.cppreference.com/w/cpp/chrono/time_point/time_point).
151
4
  ASSERT(drain_deadline_.time_since_epoch().count() == 0, "drain_deadline_ cannot be set twice.");
152
153
  // Since draining_ is atomic, it is safe to set drain_deadline_ without a mutex
154
  // as drain_close() only reads from drain_deadline_ if draining_ is true, and
155
  // C++ will not re-order an assign to an atomic. See
156
  // https://stackoverflow.com/questions/40320254/reordering-atomic-operations-in-c .
157
4
  drain_deadline_ = dispatcher_.timeSource().monotonicTime() + drain_delay;
158
159
  // Atomic assign must come after the assign to drain_deadline_.
160
4
  draining_.store(true, std::memory_order_seq_cst);
161
162
  // Signal to child drain-managers to start their drain sequence
163
4
  children_->runCallbacks();
164
165
  // Schedule callback to run at end of drain time
166
4
  drain_tick_timer_ = dispatcher_.createTimer([this]() {
167
0
    for (auto& cb : drain_complete_cbs_) {
168
0
      cb();
169
0
    }
170
0
    drain_complete_cbs_.clear();
171
0
    drain_tick_timer_.reset();
172
0
  });
173
4
  addDrainCompleteCallback(drain_complete_cb);
174
4
  drain_tick_timer_->enableTimer(drain_delay);
175
176
  // Call registered on-drain callbacks - with gradual delays
177
  // Note: This will distribute drain events in the first 1/4th of the drain window
178
  //       to ensure that we initiate draining with enough time for graceful shutdowns.
179
4
  const MonotonicTime current_time = dispatcher_.timeSource().monotonicTime();
180
4
  std::chrono::seconds remaining_time{0};
181
4
  if (server_.options().drainStrategy() != Server::DrainStrategy::Immediate &&
182
4
      current_time < drain_deadline_) {
183
4
    remaining_time =
184
4
        std::chrono::duration_cast<std::chrono::seconds>(drain_deadline_ - current_time);
185
4
    ASSERT(server_.options().drainTime() >= remaining_time);
186
4
  }
187
188
4
  uint32_t step_count = 0;
189
4
  size_t num_cbs = cbs_.size();
190
4
  THROW_IF_NOT_OK(cbs_.runCallbacksWith([&]() {
191
    // switch to floating-point math to avoid issues with integer division
192
4
    std::chrono::milliseconds delay{static_cast<int64_t>(
193
4
        static_cast<double>(step_count) / 4 / num_cbs *
194
4
        std::chrono::duration_cast<std::chrono::milliseconds>(remaining_time).count())};
195
4
    step_count++;
196
4
    return delay;
197
4
  }));
198
4
}
199
200
1.97k
void DrainManagerImpl::startParentShutdownSequence() {
201
  // Do not initiate parent shutdown sequence when hot restart is disabled.
202
1.97k
  if (server_.options().hotRestartDisabled()) {
203
1.97k
    return;
204
1.97k
  }
205
0
  ASSERT(!parent_shutdown_timer_);
206
0
  parent_shutdown_timer_ = server_.dispatcher().createTimer([this]() -> void {
207
    // Shut down the parent now. It should have already been draining.
208
0
    ENVOY_LOG(info, "shutting down parent after drain");
209
0
    server_.hotRestart().sendParentTerminateRequest();
210
0
  });
211
212
0
  parent_shutdown_timer_->enableTimer(std::chrono::duration_cast<std::chrono::milliseconds>(
213
0
      server_.options().parentShutdownTime()));
214
0
}
215
216
} // namespace Server
217
} // namespace Envoy