1
#include "source/server/hot_restarting_parent.h"
2

            
3
#include "envoy/server/instance.h"
4

            
5
#include "source/common/memory/stats.h"
6
#include "source/common/network/utility.h"
7
#include "source/common/stats/stat_merger.h"
8
#include "source/common/stats/symbol_table.h"
9
#include "source/common/stats/utility.h"
10

            
11
namespace Envoy {
12
namespace Server {
13

            
14
using HotRestartMessage = envoy::HotRestartMessage;
15

            
16
HotRestartingParent::HotRestartingParent(int base_id, int restart_epoch,
17
                                         const std::string& socket_path, mode_t socket_mode)
18
23
    : HotRestartingBase(base_id), restart_epoch_(restart_epoch) {
19
23
  std::string socket_path_udp = socket_path + "_udp";
20
23
  child_address_ = main_rpc_stream_.createDomainSocketAddress(restart_epoch_ + 1, "child",
21
23
                                                              socket_path, socket_mode);
22
23
  child_address_udp_forwarding_ = udp_forwarding_rpc_stream_.createDomainSocketAddress(
23
23
      restart_epoch_ + 1, "child", socket_path_udp, socket_mode);
24
23
  main_rpc_stream_.bindDomainSocket(restart_epoch_, "parent", socket_path, socket_mode);
25
23
  udp_forwarding_rpc_stream_.bindDomainSocket(restart_epoch_, "parent", socket_path_udp,
26
23
                                              socket_mode);
27
23
}
28

            
29
void HotRestartingParent::sendHotRestartMessage(envoy::HotRestartMessage&& msg) {
30
  ASSERT(dispatcher_.has_value());
31
  dispatcher_->post([this, msg = std::move(msg)]() {
32
    udp_forwarding_rpc_stream_.sendHotRestartMessage(child_address_udp_forwarding_, std::move(msg));
33
  });
34
}
35

            
36
// Network::NonDispatchedUdpPacketHandler
37
void HotRestartingParent::Internal::handle(uint32_t worker_index,
38
1
                                           const Network::UdpRecvData& packet) {
39
1
  envoy::HotRestartMessage msg;
40
1
  auto* packet_msg = msg.mutable_request()->mutable_forwarded_udp_packet();
41
1
  packet_msg->set_local_addr(Network::Utility::urlFromDatagramAddress(*packet.addresses_.local_));
42
1
  packet_msg->set_peer_addr(Network::Utility::urlFromDatagramAddress(*packet.addresses_.peer_));
43
1
  packet_msg->set_receive_time_epoch_microseconds(
44
1
      std::chrono::duration_cast<std::chrono::microseconds>(packet.receive_time_.time_since_epoch())
45
1
          .count());
46
1
  *packet_msg->mutable_payload() = packet.buffer_->toString();
47
1
  packet_msg->set_worker_index(worker_index);
48
1
  udp_sender_.sendHotRestartMessage(std::move(msg));
49
1
}
50

            
51
10
void HotRestartingParent::initialize(Event::Dispatcher& dispatcher, Server::Instance& server) {
52
10
  socket_event_ = dispatcher.createFileEvent(
53
10
      main_rpc_stream_.domain_socket_,
54
10
      [this](uint32_t events) {
55
        ASSERT(events == Event::FileReadyType::Read);
56
        onSocketEvent();
57
        return absl::OkStatus();
58
      },
59
10
      Event::FileTriggerType::Edge, Event::FileReadyType::Read);
60
10
  dispatcher_ = dispatcher;
61
10
  internal_ = std::make_unique<Internal>(&server, *this);
62
10
}
63

            
64
void HotRestartingParent::onSocketEvent() {
65
  std::unique_ptr<HotRestartMessage> wrapped_request;
66
  while ((wrapped_request = main_rpc_stream_.receiveHotRestartMessage(RpcStream::Blocking::No))) {
67
    if (wrapped_request->requestreply_case() == HotRestartMessage::kReply) {
68
      ENVOY_LOG(error, "child sent us a HotRestartMessage reply (we want requests); ignoring.");
69
      HotRestartMessage wrapped_reply;
70
      wrapped_reply.set_didnt_recognize_your_last_message(true);
71
      main_rpc_stream_.sendHotRestartMessage(child_address_, wrapped_reply);
72
      continue;
73
    }
74
    switch (wrapped_request->request().request_case()) {
75
    case HotRestartMessage::Request::kShutdownAdmin: {
76
      main_rpc_stream_.sendHotRestartMessage(child_address_, internal_->shutdownAdmin());
77
      break;
78
    }
79

            
80
    case HotRestartMessage::Request::kPassListenSocket: {
81
      main_rpc_stream_.sendHotRestartMessage(
82
          child_address_, internal_->getListenSocketsForChild(wrapped_request->request()));
83
      break;
84
    }
85

            
86
    case HotRestartMessage::Request::kStats: {
87
      HotRestartMessage wrapped_reply;
88
      internal_->exportStatsToChild(wrapped_reply.mutable_reply()->mutable_stats());
89
      main_rpc_stream_.sendHotRestartMessage(child_address_, wrapped_reply);
90
      break;
91
    }
92

            
93
    case HotRestartMessage::Request::kDrainListeners: {
94
      internal_->drainListeners();
95
      break;
96
    }
97

            
98
    case HotRestartMessage::Request::kTerminate: {
99
      ENVOY_LOG(info, "shutting down due to child request");
100
      kill(getpid(), SIGTERM);
101
      break;
102
    }
103

            
104
    case HotRestartMessage::Request::kTestConnection: {
105
      break;
106
    }
107

            
108
    default: {
109
      ENVOY_LOG(error, "child sent us an unfamiliar type of HotRestartMessage; ignoring.");
110
      HotRestartMessage wrapped_reply;
111
      wrapped_reply.set_didnt_recognize_your_last_message(true);
112
      main_rpc_stream_.sendHotRestartMessage(child_address_, wrapped_reply);
113
      break;
114
    }
115
    }
116
  }
117
}
118

            
119
10
void HotRestartingParent::shutdown() { socket_event_.reset(); }
120

            
121
HotRestartingParent::Internal::Internal(Server::Instance* server,
122
                                        HotRestartMessageSender& udp_sender)
123
23
    : server_(server), udp_sender_(udp_sender) {
124
23
  Stats::Gauge& hot_restart_generation = hotRestartGeneration(*server->stats().rootScope());
125
23
  hot_restart_generation.inc();
126
23
}
127

            
128
1
HotRestartMessage HotRestartingParent::Internal::shutdownAdmin() {
129
1
  server_->shutdownAdmin();
130
1
  HotRestartMessage wrapped_reply;
131
1
  wrapped_reply.mutable_reply()->mutable_shutdown_admin()->set_original_start_time_unix_seconds(
132
1
      server_->startTimeFirstEpoch());
133
1
  wrapped_reply.mutable_reply()->mutable_shutdown_admin()->set_enable_reuse_port_default(
134
1
      server_->enableReusePortDefault());
135
1
  return wrapped_reply;
136
1
}
137

            
138
HotRestartMessage
139
8
HotRestartingParent::Internal::getListenSocketsForChild(const HotRestartMessage::Request& request) {
140
8
  HotRestartMessage wrapped_reply;
141
8
  wrapped_reply.mutable_reply()->mutable_pass_listen_socket()->set_fd(-1);
142
8
  Network::Address::InstanceConstSharedPtr addr =
143
8
      THROW_OR_RETURN_VALUE(Network::Utility::resolveUrl(request.pass_listen_socket().address()),
144
8
                            Network::Address::InstanceConstSharedPtr);
145
8
  absl::string_view ns = request.pass_listen_socket().network_namespace();
146
8
  if (!ns.empty() && addr->ip() != nullptr) {
147
3
    addr = addr->withNetworkNamespace(ns);
148
3
  }
149

            
150
8
  for (const auto& listener : server_->listenerManager().listeners()) {
151
9
    for (auto& socket_factory : listener.get().listenSocketFactories()) {
152
9
      if (*socket_factory->localAddress() == *addr && listener.get().bindToPort()) {
153
5
        StatusOr<Network::Socket::Type> socket_type =
154
5
            Network::Utility::socketTypeFromUrl(request.pass_listen_socket().address());
155
        // socketTypeFromUrl should return a valid value since resolveUrl returned a valid address.
156
5
        ASSERT(socket_type.ok());
157

            
158
5
        if (socket_factory->socketType() == *socket_type) {
159
          // worker_index() will default to 0 if not set which is the behavior before this field
160
          // was added. Thus, this should be safe for both roll forward and roll back.
161
4
          if (request.pass_listen_socket().worker_index() < server_->options().concurrency()) {
162
4
            wrapped_reply.mutable_reply()->mutable_pass_listen_socket()->set_fd(
163
4
                socket_factory->getListenSocket(request.pass_listen_socket().worker_index())
164
4
                    ->ioHandle()
165
4
                    .fdDoNotUse());
166
4
          }
167
4
          break;
168
4
        }
169
5
      }
170
9
    }
171
8
  }
172
8
  return wrapped_reply;
173
8
}
174

            
175
// TODO(fredlas) if there are enough stats for stat name length to become an issue, this current
176
// implementation can negate the benefit of symbolized stat names by periodically reaching the
177
// magnitude of memory usage that they are meant to avoid, since this map holds full-string
178
// names. The problem can be solved by splitting the export up over many chunks.
179
4
void HotRestartingParent::Internal::exportStatsToChild(HotRestartMessage::Reply::Stats* stats) {
180
13
  server_->stats().forEachSinkedGauge(nullptr, [this, stats](Stats::Gauge& gauge) mutable {
181
13
    if (gauge.used()) {
182
12
      const std::string name = gauge.name();
183
12
      (*stats->mutable_gauges())[name] = gauge.value();
184
12
      recordDynamics(stats, name, gauge.statName());
185
12
    }
186
13
  });
187

            
188
10
  server_->stats().forEachSinkedCounter(nullptr, [this, stats](Stats::Counter& counter) mutable {
189
10
    if (counter.used()) {
190
      // The hot restart parent is expected to have stopped its normal stat exporting (and so
191
      // latching) by the time it begins exporting to the hot restart child.
192
9
      uint64_t latched_value = counter.latch();
193
9
      if (latched_value > 0) {
194
6
        const std::string name = counter.name();
195
6
        (*stats->mutable_counter_deltas())[name] = latched_value;
196
6
        recordDynamics(stats, name, counter.statName());
197
6
      }
198
9
    }
199
10
  });
200
4
  stats->set_memory_allocated(Memory::Stats::totalCurrentlyAllocated());
201
4
  stats->set_num_connections(server_->listenerManager().numConnections());
202
4
}
203

            
204
void HotRestartingParent::Internal::recordDynamics(HotRestartMessage::Reply::Stats* stats,
205
                                                   const std::string& name,
206
18
                                                   Stats::StatName stat_name) {
207
  // Compute an array of spans describing which components of the stat name are
208
  // dynamic. This is needed so that when the child recovers the StatName, it
209
  // correlates with how the system generates those stats, with the same exact
210
  // components using a dynamic representation.
211
  //
212
  // See https://github.com/envoyproxy/envoy/issues/9874 for more details.
213
18
  Stats::DynamicSpans spans = server_->stats().symbolTable().getDynamicSpans(stat_name);
214

            
215
  // Convert that C++ structure (controlled by stat_merger.cc) into a protobuf
216
  // for serialization.
217
18
  if (!spans.empty()) {
218
2
    HotRestartMessage::Reply::RepeatedSpan spans_proto;
219
2
    for (const Stats::DynamicSpan& span : spans) {
220
2
      HotRestartMessage::Reply::Span* span_proto = spans_proto.add_spans();
221
2
      span_proto->set_first(span.first);
222
2
      span_proto->set_last(span.second);
223
2
    }
224
2
    (*stats->mutable_dynamics())[name] = spans_proto;
225
2
  }
226
18
}
227

            
228
1
void HotRestartingParent::Internal::drainListeners() {
229
1
  Network::ExtraShutdownListenerOptions options;
230
1
  options.non_dispatched_udp_packet_handler_ = *this;
231
1
  server_->drainListeners(options);
232
1
}
233

            
234
} // namespace Server
235
} // namespace Envoy