1
#include "source/server/hot_restarting_child.h"
2

            
3
#include "source/common/buffer/buffer_impl.h"
4
#include "source/common/common/utility.h"
5
#include "source/common/network/utility.h"
6

            
7
namespace Envoy {
8
namespace Server {
9

            
10
using HotRestartMessage = envoy::HotRestartMessage;
11

            
12
void HotRestartingChild::UdpForwardingContext::registerListener(
13
    Network::Address::InstanceConstSharedPtr address,
14
8
    std::shared_ptr<Network::UdpListenerConfig> listener_config) {
15
8
  const bool inserted =
16
8
      listener_map_.try_emplace(address->asString(), ForwardEntry{address, listener_config}).second;
17
8
  ASSERT(inserted, "Two udp listeners on the same address shouldn't be possible");
18
8
}
19

            
20
absl::optional<HotRestartingChild::UdpForwardingContext::ForwardEntry>
21
HotRestartingChild::UdpForwardingContext::getListenerForDestination(
22
15
    const Network::Address::Instance& address) {
23
15
  auto it = listener_map_.find(address.asString());
24
15
  if (it == listener_map_.end()) {
25
    // If no listener on the specific address was found, check for a default route.
26
    // If the address is IPv6, check default route IPv6 only, otherwise check default
27
    // route IPv4 then default route IPv6, as either can potentially receive an IPv4
28
    // packet.
29
12
    uint32_t port = address.ip()->port();
30
12
    if (address.ip()->version() == Network::Address::IpVersion::v6) {
31
5
      it = listener_map_.find(absl::StrCat("[::]:", port));
32
7
    } else {
33
7
      it = listener_map_.find(absl::StrCat("0.0.0.0:", port));
34
7
      if (it == listener_map_.end()) {
35
5
        it = listener_map_.find(absl::StrCat("[::]:", port));
36
5
        if (it != listener_map_.end() && it->second.first->ip()->ipv6()->v6only()) {
37
          // If there is a default IPv6 route but it's set v6only, don't use it.
38
1
          it = listener_map_.end();
39
1
        }
40
5
      }
41
7
    }
42
12
  }
43
15
  if (it == listener_map_.end()) {
44
7
    return absl::nullopt;
45
7
  }
46
8
  return it->second;
47
15
}
48

            
49
// If restart_epoch is 0 there is no parent, so it's effectively already
50
// drained and terminated.
51
HotRestartingChild::HotRestartingChild(int base_id, int restart_epoch,
52
                                       const std::string& socket_path, mode_t socket_mode,
53
                                       bool skip_hot_restart_on_no_parent, bool skip_parent_stats)
54
138
    : HotRestartingBase(base_id), restart_epoch_(restart_epoch),
55
138
      parent_terminated_(restart_epoch == 0), parent_drained_(restart_epoch == 0),
56
138
      skip_hot_restart_on_no_parent_(skip_hot_restart_on_no_parent),
57
138
      skip_parent_stats_(skip_parent_stats) {
58
138
  main_rpc_stream_.initDomainSocketAddress(&parent_address_);
59
138
  std::string socket_path_udp = socket_path + "_udp";
60
138
  udp_forwarding_rpc_stream_.initDomainSocketAddress(&parent_address_udp_forwarding_);
61
138
  if (restart_epoch_ != 0) {
62
11
    parent_address_ = main_rpc_stream_.createDomainSocketAddress(restart_epoch_ + -1, "parent",
63
11
                                                                 socket_path, socket_mode);
64
11
    parent_address_udp_forwarding_ = udp_forwarding_rpc_stream_.createDomainSocketAddress(
65
11
        restart_epoch_ + -1, "parent", socket_path_udp, socket_mode);
66
11
  }
67
138
  main_rpc_stream_.bindDomainSocket(restart_epoch_, "child", socket_path, socket_mode);
68
138
  udp_forwarding_rpc_stream_.bindDomainSocket(restart_epoch_, "child", socket_path_udp,
69
138
                                              socket_mode);
70
138
}
71

            
72
20
bool HotRestartingChild::abortDueToFailedParentConnection() {
73
20
  if (!skip_hot_restart_on_no_parent_) {
74
18
    return false;
75
18
  }
76
2
  HotRestartMessage wrapped_request;
77
2
  wrapped_request.mutable_request()->mutable_test_connection();
78
2
  if (!main_rpc_stream_.sendHotRestartMessage(parent_address_, wrapped_request, true)) {
79
1
    return true;
80
1
  }
81
1
  return false;
82
2
}
83

            
84
20
void HotRestartingChild::initialize(Event::Dispatcher& dispatcher) {
85
20
  if (abortDueToFailedParentConnection()) {
86
1
    ENVOY_LOG(warn, "hot restart sendmsg() connection refused, falling back to regular restart");
87
1
    absl::MutexLock lock(registry_mu_);
88
1
    parent_terminated_ = parent_drained_ = true;
89
1
    return;
90
1
  }
91
19
  socket_event_udp_forwarding_ = dispatcher.createFileEvent(
92
19
      udp_forwarding_rpc_stream_.domain_socket_,
93
19
      [this](uint32_t events) {
94
6
        ASSERT(events == Event::FileReadyType::Read);
95
6
        return onSocketEventUdpForwarding();
96
6
      },
97
19
      Event::FileTriggerType::Edge, Event::FileReadyType::Read);
98
19
}
99

            
100
10
void HotRestartingChild::shutdown() { socket_event_udp_forwarding_.reset(); }
101

            
102
2
void HotRestartingChild::onForwardedUdpPacket(uint32_t worker_index, Network::UdpRecvData&& data) {
103
2
  auto addr_and_listener =
104
2
      udp_forwarding_context_.getListenerForDestination(*data.addresses_.local_);
105
2
  if (addr_and_listener.has_value()) {
106
1
    auto [addr, listener_config] = *addr_and_listener;
107
    // We send to the worker index from the parent instance.
108
    // In the case that the number of workers changes between instances,
109
    // or the quic connection id generator changes how it selects worker
110
    // ids, the hot restart packet transfer will fail.
111
    //
112
    // One option would be to dispatch an onData call to have the receiving
113
    // worker forward the packet if the calculated destination differs from
114
    // the parent instance worker index; however, this would require
115
    // temporarily disabling kernel_worker_routing_ in each instance of
116
    // ActiveQuicListener, and a much more convoluted pipeline to collect
117
    // the set of destinations (listenerWorkerRouter doesn't currently
118
    // expose the actual listeners.)
119
    //
120
    // Since the vast majority of hot restarts will change neither of these
121
    // things, this implementation is "pretty good", and much better than no
122
    // hot restart capability at all.
123
1
    listener_config->listenerWorkerRouter(*addr).deliver(worker_index, std::move(data));
124
1
  }
125
2
}
126

            
127
int HotRestartingChild::duplicateParentListenSocket(const std::string& address,
128
                                                    uint32_t worker_index,
129
32
                                                    absl::string_view network_namespace) {
130
32
  if (parent_terminated_) {
131
32
    return -1;
132
32
  }
133

            
134
  HotRestartMessage wrapped_request;
135
  wrapped_request.mutable_request()->mutable_pass_listen_socket()->set_address(address);
136
  wrapped_request.mutable_request()->mutable_pass_listen_socket()->set_worker_index(worker_index);
137
  wrapped_request.mutable_request()->mutable_pass_listen_socket()->set_network_namespace(
138
      network_namespace);
139
  main_rpc_stream_.sendHotRestartMessage(parent_address_, wrapped_request);
140

            
141
  std::unique_ptr<HotRestartMessage> wrapped_reply =
142
      main_rpc_stream_.receiveHotRestartMessage(RpcStream::Blocking::Yes);
143
  if (!main_rpc_stream_.replyIsExpectedType(wrapped_reply.get(),
144
                                            HotRestartMessage::Reply::kPassListenSocket)) {
145
    return -1;
146
  }
147
  return wrapped_reply->reply().pass_listen_socket().fd();
148
}
149

            
150
33
std::unique_ptr<HotRestartMessage> HotRestartingChild::getParentStats() {
151
33
  if (parent_terminated_ || skip_parent_stats_) {
152
33
    return nullptr;
153
33
  }
154

            
155
  HotRestartMessage wrapped_request;
156
  wrapped_request.mutable_request()->mutable_stats();
157
  main_rpc_stream_.sendHotRestartMessage(parent_address_, wrapped_request);
158

            
159
  std::unique_ptr<HotRestartMessage> wrapped_reply =
160
      main_rpc_stream_.receiveHotRestartMessage(RpcStream::Blocking::Yes);
161
  RELEASE_ASSERT(
162
      main_rpc_stream_.replyIsExpectedType(wrapped_reply.get(), HotRestartMessage::Reply::kStats),
163
      "Hot restart parent did not respond as expected to get stats request.");
164
  return wrapped_reply;
165
}
166

            
167
9
void HotRestartingChild::drainParentListeners() {
168
9
  if (parent_terminated_) {
169
8
    return;
170
8
  }
171
  // No reply expected.
172
1
  HotRestartMessage wrapped_request;
173
1
  wrapped_request.mutable_request()->mutable_drain_listeners();
174
1
  main_rpc_stream_.sendHotRestartMessage(parent_address_, wrapped_request);
175
1
}
176

            
177
void HotRestartingChild::registerUdpForwardingListener(
178
    Network::Address::InstanceConstSharedPtr address,
179
8
    std::shared_ptr<Network::UdpListenerConfig> listener_config) {
180
8
  ASSERT_IS_MAIN_OR_TEST_THREAD();
181
8
  udp_forwarding_context_.registerListener(address, listener_config);
182
8
}
183

            
184
void HotRestartingChild::registerParentDrainedCallback(
185
5
    const Network::Address::InstanceConstSharedPtr& address, absl::AnyInvocable<void()> callback) {
186
5
  absl::MutexLock lock(registry_mu_);
187
5
  if (parent_drained_) {
188
3
    callback();
189
3
  } else {
190
2
    on_drained_actions_.emplace(address->asString(), std::move(callback));
191
2
  }
192
5
}
193

            
194
2
void HotRestartingChild::allDrainsImplicitlyComplete() {
195
2
  absl::MutexLock lock(registry_mu_);
196
2
  for (auto& drain_action : on_drained_actions_) {
197
    // Call the callback.
198
2
    std::move(drain_action.second)();
199
2
  }
200
2
  on_drained_actions_.clear();
201
2
  parent_drained_ = true;
202
2
}
203

            
204
absl::optional<HotRestart::AdminShutdownResponse>
205
9
HotRestartingChild::sendParentAdminShutdownRequest() {
206
9
  if (parent_terminated_) {
207
9
    return absl::nullopt;
208
9
  }
209

            
210
  HotRestartMessage wrapped_request;
211
  wrapped_request.mutable_request()->mutable_shutdown_admin();
212
  main_rpc_stream_.sendHotRestartMessage(parent_address_, wrapped_request);
213

            
214
  std::unique_ptr<HotRestartMessage> wrapped_reply =
215
      main_rpc_stream_.receiveHotRestartMessage(RpcStream::Blocking::Yes);
216
  RELEASE_ASSERT(main_rpc_stream_.replyIsExpectedType(wrapped_reply.get(),
217
                                                      HotRestartMessage::Reply::kShutdownAdmin),
218
                 "Hot restart parent did not respond as expected to ShutdownParentAdmin.");
219
  return HotRestart::AdminShutdownResponse{
220
      static_cast<time_t>(
221
          wrapped_reply->reply().shutdown_admin().original_start_time_unix_seconds()),
222
      wrapped_reply->reply().shutdown_admin().enable_reuse_port_default()};
223
}
224

            
225
3
void HotRestartingChild::sendParentTerminateRequest() {
226
3
  if (parent_terminated_) {
227
1
    return;
228
1
  }
229
2
  allDrainsImplicitlyComplete();
230

            
231
2
  HotRestartMessage wrapped_request;
232
2
  wrapped_request.mutable_request()->mutable_terminate();
233
2
  main_rpc_stream_.sendHotRestartMessage(parent_address_, wrapped_request);
234
2
  parent_terminated_ = true;
235

            
236
  // Note that the 'generation' counter needs to retain the contribution from
237
  // the parent.
238
2
  if (stat_merger_ != nullptr) {
239
    stat_merger_->retainParentGaugeValue(hot_restart_generation_stat_name_);
240

            
241
    // Now it is safe to forget our stat transferral state.
242
    //
243
    // This destruction is actually important far beyond memory efficiency. The
244
    // scope-based temporary counter logic relies on the StatMerger getting
245
    // destroyed once hot restart's stat merging is all done. (See stat_merger.h
246
    // for details).
247
    stat_merger_.reset();
248
  }
249
2
}
250

            
251
void HotRestartingChild::mergeParentStats(Stats::Store& stats_store,
252
1
                                          const HotRestartMessage::Reply::Stats& stats_proto) {
253
1
  if (!stat_merger_) {
254
1
    stat_merger_ = std::make_unique<Stats::StatMerger>(stats_store);
255
1
    hot_restart_generation_stat_name_ = hotRestartGeneration(*stats_store.rootScope()).statName();
256
1
  }
257

            
258
  // Convert the protobuf for serialized dynamic spans into the structure
259
  // required by StatMerger.
260
1
  Stats::StatMerger::DynamicsMap dynamics;
261
2
  for (const auto& iter : stats_proto.dynamics()) {
262
2
    Stats::DynamicSpans& spans = dynamics[iter.first];
263
4
    for (int i = 0; i < iter.second.spans_size(); ++i) {
264
2
      const HotRestartMessage::Reply::Span& span_proto = iter.second.spans(i);
265
2
      spans.push_back(Stats::DynamicSpan(span_proto.first(), span_proto.last()));
266
2
    }
267
2
  }
268
1
  stat_merger_->mergeStats(stats_proto.counter_deltas(), stats_proto.gauges(), dynamics);
269
1
}
270

            
271
6
absl::Status HotRestartingChild::onSocketEventUdpForwarding() {
272
6
  std::unique_ptr<HotRestartMessage> wrapped_request;
273
10
  while ((wrapped_request =
274
10
              udp_forwarding_rpc_stream_.receiveHotRestartMessage(RpcStream::Blocking::No))) {
275
6
    if (wrapped_request->requestreply_case() == HotRestartMessage::kReply) {
276
1
      ENVOY_LOG_PERIODIC(
277
1
          error, std::chrono::seconds(5),
278
1
          "HotRestartMessage reply received on UdpForwarding (we want only requests); ignoring.");
279
1
      continue;
280
1
    }
281
5
    switch (wrapped_request->request().request_case()) {
282
4
    case HotRestartMessage::Request::kForwardedUdpPacket: {
283
4
      const auto& req = wrapped_request->request().forwarded_udp_packet();
284
4
      Network::UdpRecvData packet;
285
4
      auto local_or_error = Network::Utility::resolveUrl(req.local_addr());
286
4
      RETURN_IF_NOT_OK(local_or_error.status());
287
3
      packet.addresses_.local_ = *local_or_error;
288
3
      auto peer_or_error = Network::Utility::resolveUrl(req.peer_addr());
289
3
      RETURN_IF_NOT_OK(peer_or_error.status());
290
2
      packet.addresses_.peer_ = *peer_or_error;
291
2
      if (!packet.addresses_.local_ || !packet.addresses_.peer_) {
292
        break;
293
      }
294
2
      packet.receive_time_ =
295
2
          MonotonicTime(std::chrono::microseconds{req.receive_time_epoch_microseconds()});
296
2
      packet.buffer_ = std::make_unique<Buffer::OwnedImpl>(req.payload());
297
2
      onForwardedUdpPacket(req.worker_index(), std::move(packet));
298
2
      break;
299
2
    }
300
1
    default: {
301
1
      ENVOY_LOG(
302
1
          error,
303
1
          "child sent a request other than ForwardedUdpPacket on udp forwarding socket; ignoring.");
304
1
      break;
305
2
    }
306
5
    }
307
5
  }
308
4
  return absl::OkStatus();
309
6
}
310

            
311
} // namespace Server
312
} // namespace Envoy