Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/test/integration/server.cc
Line
Count
Source (jump to first uncovered line)
1
#include "test/integration/server.h"
2
3
#include <memory>
4
#include <string>
5
6
#include "envoy/http/header_map.h"
7
8
#include "source/common/common/random_generator.h"
9
#include "source/common/common/thread.h"
10
#include "source/common/local_info/local_info_impl.h"
11
#include "source/common/network/utility.h"
12
#include "source/common/stats/thread_local_store.h"
13
#include "source/common/thread_local/thread_local_impl.h"
14
#include "source/server/hot_restart_nop_impl.h"
15
#include "source/server/instance_impl.h"
16
#include "source/server/options_impl.h"
17
#include "source/server/process_context_impl.h"
18
19
#include "test/integration/utility.h"
20
#include "test/mocks/common.h"
21
#include "test/mocks/runtime/mocks.h"
22
#include "test/test_common/environment.h"
23
24
#include "absl/strings/str_replace.h"
25
#include "gtest/gtest.h"
26
27
namespace Envoy {
28
namespace Server {
29
30
OptionsImpl createTestOptionsImpl(const std::string& config_path, const std::string& config_yaml,
31
                                  Network::Address::IpVersion ip_version,
32
                                  FieldValidationConfig validation_config, uint32_t concurrency,
33
                                  std::chrono::seconds drain_time,
34
                                  Server::DrainStrategy drain_strategy,
35
2.64k
                                  bool use_bootstrap_node_metadata) {
36
  // Empty string values mean the Bootstrap node metadata won't be overridden.
37
2.64k
  const std::string service_cluster = use_bootstrap_node_metadata ? "" : "cluster_name";
38
2.64k
  const std::string service_node = use_bootstrap_node_metadata ? "" : "node_name";
39
2.64k
  const std::string service_zone = use_bootstrap_node_metadata ? "" : "zone_name";
40
2.64k
  OptionsImpl test_options(service_cluster, service_node, service_zone, spdlog::level::info);
41
42
2.64k
  test_options.setConfigPath(config_path);
43
2.64k
  test_options.setConfigYaml(config_yaml);
44
2.64k
  test_options.setLocalAddressIpVersion(ip_version);
45
2.64k
  test_options.setFileFlushIntervalMsec(std::chrono::milliseconds(50));
46
2.64k
  test_options.setDrainTime(drain_time);
47
2.64k
  test_options.setParentShutdownTime(std::chrono::seconds(2));
48
2.64k
  test_options.setDrainStrategy(drain_strategy);
49
2.64k
  test_options.setAllowUnknownFields(validation_config.allow_unknown_static_fields);
50
2.64k
  test_options.setRejectUnknownFieldsDynamic(validation_config.reject_unknown_dynamic_fields);
51
2.64k
  test_options.setIgnoreUnknownFieldsDynamic(validation_config.ignore_unknown_dynamic_fields);
52
2.64k
  test_options.setConcurrency(concurrency);
53
2.64k
  test_options.setHotRestartDisabled(true);
54
55
2.64k
  return test_options;
56
2.64k
}
57
58
} // namespace Server
59
60
IntegrationTestServerPtr IntegrationTestServer::create(
61
    const std::string& config_path, const Network::Address::IpVersion version,
62
    std::function<void(IntegrationTestServer&)> server_ready_function,
63
    std::function<void()> on_server_init_function, absl::optional<uint64_t> deterministic_value,
64
    Event::TestTimeSystem& time_system, Api::Api& api, bool defer_listener_finalization,
65
    ProcessObjectOptRef process_object, Server::FieldValidationConfig validation_config,
66
    uint32_t concurrency, std::chrono::seconds drain_time, Server::DrainStrategy drain_strategy,
67
    Buffer::WatermarkFactorySharedPtr watermark_factory, bool use_real_stats,
68
2.64k
    bool use_bootstrap_node_metadata) {
69
2.64k
  IntegrationTestServerPtr server{
70
2.64k
      std::make_unique<IntegrationTestServerImpl>(time_system, api, config_path, use_real_stats)};
71
2.64k
  if (server_ready_function != nullptr) {
72
0
    server->setOnServerReadyCb(server_ready_function);
73
0
  }
74
2.64k
  server->start(version, on_server_init_function, deterministic_value, defer_listener_finalization,
75
2.64k
                process_object, validation_config, concurrency, drain_time, drain_strategy,
76
2.64k
                watermark_factory, use_bootstrap_node_metadata);
77
2.64k
  return server;
78
2.64k
}
79
80
2.64k
void IntegrationTestServer::waitUntilListenersReady() {
81
2.64k
  Thread::LockGuard guard(listeners_mutex_);
82
2.64k
  while (pending_listeners_ != 0) {
83
    // If your test is hanging forever here, you may need to create your listener manually,
84
    // after BaseIntegrationTest::initialize() is done. See cds_integration_test.cc for an example.
85
0
    listeners_cv_.wait(listeners_mutex_); // Safe since CondVar::wait won't throw.
86
0
  }
87
2.64k
  ENVOY_LOG(info, "listener wait complete");
88
2.64k
}
89
90
void IntegrationTestServer::setDynamicContextParam(absl::string_view resource_type_url,
91
0
                                                   absl::string_view key, absl::string_view value) {
92
0
  server().dispatcher().post([this, resource_type_url, key, value]() {
93
0
    server().localInfo().contextProvider().setDynamicContextParam(resource_type_url, key, value);
94
0
  });
95
0
}
96
97
void IntegrationTestServer::unsetDynamicContextParam(absl::string_view resource_type_url,
98
0
                                                     absl::string_view key) {
99
0
  server().dispatcher().post([this, resource_type_url, key]() {
100
0
    server().localInfo().contextProvider().unsetDynamicContextParam(resource_type_url, key);
101
0
  });
102
0
}
103
104
void IntegrationTestServer::start(
105
    const Network::Address::IpVersion version, std::function<void()> on_server_init_function,
106
    absl::optional<uint64_t> deterministic_value, bool defer_listener_finalization,
107
    ProcessObjectOptRef process_object, Server::FieldValidationConfig validator_config,
108
    uint32_t concurrency, std::chrono::seconds drain_time, Server::DrainStrategy drain_strategy,
109
2.64k
    Buffer::WatermarkFactorySharedPtr watermark_factory, bool use_bootstrap_node_metadata) {
110
2.64k
  ENVOY_LOG(info, "starting integration test server");
111
2.64k
  ASSERT(!thread_);
112
2.64k
  thread_ = api_.threadFactory().createThread(
113
2.64k
      [version, deterministic_value, process_object, validator_config, concurrency, drain_time,
114
2.64k
       drain_strategy, watermark_factory, use_bootstrap_node_metadata, this]() -> void {
115
2.64k
        threadRoutine(version, deterministic_value, process_object, validator_config, concurrency,
116
2.64k
                      drain_time, drain_strategy, watermark_factory, use_bootstrap_node_metadata);
117
2.64k
      });
118
119
  // If any steps need to be done prior to workers starting, do them now. E.g., xDS pre-init.
120
  // Note that there is no synchronization guaranteeing this happens either
121
  // before workers starting or after server start. Any needed synchronization must occur in the
122
  // routines. These steps are executed at this point in the code to allow server initialization
123
  // to be dependent on them (e.g. control plane peers).
124
2.64k
  if (on_server_init_function != nullptr) {
125
0
    on_server_init_function();
126
0
  }
127
128
  // Wait for the server to be created and the number of initial listeners to wait for to be set.
129
2.64k
  server_set_.waitReady();
130
131
2.64k
  if (!defer_listener_finalization) {
132
    // Now wait for the initial listeners (if any) to actually be listening on the worker.
133
    // At this point the server is up and ready for testing.
134
2.64k
    waitUntilListenersReady();
135
2.64k
  }
136
137
  // If we are tapping, spin up tcpdump.
138
2.64k
  const auto tap_path = TestEnvironment::getOptionalEnvVar("TAP_PATH");
139
2.64k
  if (tap_path) {
140
0
    std::vector<uint32_t> ports;
141
0
    for (auto listener : server().listenerManager().listeners()) {
142
0
      const auto listen_addr = listener.get().listenSocketFactories()[0]->localAddress();
143
0
      if (listen_addr->type() == Network::Address::Type::Ip) {
144
0
        ports.push_back(listen_addr->ip()->port());
145
0
      }
146
0
    }
147
    // TODO(htuch): Support a different loopback interface as needed.
148
0
    const ::testing::TestInfo* const test_info =
149
0
        ::testing::UnitTest::GetInstance()->current_test_info();
150
0
    const std::string test_id =
151
0
        std::string(test_info->name()) + "_" + std::string(test_info->test_case_name());
152
0
    const std::string pcap_path =
153
0
        tap_path.value() + "_" + absl::StrReplaceAll(test_id, {{"/", "_"}}) + "_server.pcap";
154
0
    tcp_dump_ = std::make_unique<TcpDump>(pcap_path, "lo", ports);
155
0
  }
156
2.64k
}
157
158
2.64k
IntegrationTestServer::~IntegrationTestServer() {
159
  // Derived class must have shutdown server.
160
2.64k
  thread_->join();
161
2.64k
}
162
163
2.66k
void IntegrationTestServer::onWorkerListenerAdded() {
164
2.66k
  if (on_worker_listener_added_cb_) {
165
0
    on_worker_listener_added_cb_();
166
0
  }
167
168
2.66k
  Thread::LockGuard guard(listeners_mutex_);
169
2.66k
  if (pending_listeners_ > 0) {
170
0
    pending_listeners_--;
171
0
    listeners_cv_.notifyOne();
172
0
  }
173
2.66k
}
174
175
0
void IntegrationTestServer::onWorkerListenerRemoved() {
176
0
  if (on_worker_listener_removed_cb_) {
177
0
    on_worker_listener_removed_cb_();
178
0
  }
179
0
}
180
181
2.64k
void IntegrationTestServer::serverReady() {
182
2.64k
  pending_listeners_ = server().listenerManager().listeners().size();
183
2.64k
  if (on_server_ready_cb_ != nullptr) {
184
0
    on_server_ready_cb_(*this);
185
0
  }
186
2.64k
  server_set_.setReady();
187
2.64k
}
188
189
void IntegrationTestServer::threadRoutine(
190
    const Network::Address::IpVersion version, absl::optional<uint64_t> deterministic_value,
191
    ProcessObjectOptRef process_object, Server::FieldValidationConfig validation_config,
192
    uint32_t concurrency, std::chrono::seconds drain_time, Server::DrainStrategy drain_strategy,
193
2.64k
    Buffer::WatermarkFactorySharedPtr watermark_factory, bool use_bootstrap_node_metadata) {
194
2.64k
  OptionsImpl options(Server::createTestOptionsImpl(config_path_, "", version, validation_config,
195
2.64k
                                                    concurrency, drain_time, drain_strategy,
196
2.64k
                                                    use_bootstrap_node_metadata));
197
2.64k
  Thread::MutexBasicLockable lock;
198
199
2.64k
  Random::RandomGeneratorPtr random_generator;
200
2.64k
  if (deterministic_value.has_value()) {
201
0
    random_generator = std::make_unique<testing::NiceMock<Random::MockRandomGenerator>>(
202
0
        deterministic_value.value());
203
2.64k
  } else {
204
2.64k
    random_generator = std::make_unique<Random::RandomGeneratorImpl>();
205
2.64k
  }
206
207
2.64k
  createAndRunEnvoyServer(options, time_system_, Network::Utility::getLocalAddress(version), *this,
208
2.64k
                          lock, *this, std::move(random_generator), process_object,
209
2.64k
                          watermark_factory);
210
2.64k
}
211
212
IntegrationTestServerImpl::IntegrationTestServerImpl(Event::TestTimeSystem& time_system,
213
                                                     Api::Api& api, const std::string& config_path,
214
                                                     bool use_real_stats)
215
2.64k
    : IntegrationTestServer(time_system, api, config_path) {
216
2.64k
  stats_allocator_ =
217
2.64k
      (use_real_stats ? std::make_unique<Stats::AllocatorImpl>(symbol_table_)
218
2.64k
                      : std::make_unique<Stats::NotifyingAllocatorImpl>(symbol_table_));
219
2.64k
}
220
221
void IntegrationTestServerImpl::createAndRunEnvoyServer(
222
    OptionsImpl& options, Event::TimeSystem& time_system,
223
    Network::Address::InstanceConstSharedPtr local_address, ListenerHooks& hooks,
224
    Thread::BasicLockable& access_log_lock, Server::ComponentFactory& component_factory,
225
    Random::RandomGeneratorPtr&& random_generator, ProcessObjectOptRef process_object,
226
2.64k
    Buffer::WatermarkFactorySharedPtr watermark_factory) {
227
2.64k
  {
228
2.64k
    Init::ManagerImpl init_manager{"Server"};
229
2.64k
    Server::HotRestartNopImpl restarter;
230
2.64k
    ThreadLocal::InstanceImpl tls;
231
2.64k
    Stats::ThreadLocalStoreImpl stat_store(*stats_allocator_);
232
2.64k
    std::unique_ptr<ProcessContext> process_context;
233
2.64k
    if (process_object.has_value()) {
234
0
      process_context = std::make_unique<ProcessContextImpl>(process_object->get());
235
0
    }
236
2.64k
    Server::InstanceImpl server(init_manager, options, time_system, hooks, restarter, stat_store,
237
2.64k
                                access_log_lock, std::move(random_generator), tls,
238
2.64k
                                Thread::threadFactoryForTest(), Filesystem::fileSystemForTest(),
239
2.64k
                                std::move(process_context), watermark_factory);
240
2.64k
    server.initialize(local_address, component_factory);
241
    // This is technically thread unsafe (assigning to a shared_ptr accessed
242
    // across threads), but because we synchronize below through serverReady(), the only
243
    // consumer on the main test thread in ~IntegrationTestServerImpl will not race.
244
2.64k
    if (server.admin()) {
245
2.64k
      admin_address_ = server.admin()->socket().connectionInfoProvider().localAddress();
246
2.64k
    }
247
2.64k
    server_ = &server;
248
2.64k
    stat_store_ = &stat_store;
249
2.64k
    serverReady();
250
2.64k
    server.run();
251
2.64k
  }
252
2.64k
  server_gone_.Notify();
253
2.64k
}
254
255
2.64k
IntegrationTestServerImpl::~IntegrationTestServerImpl() {
256
2.64k
  ENVOY_LOG(info, "stopping integration test server");
257
258
2.64k
  if (useAdminInterfaceToQuit()) {
259
0
    Network::Address::InstanceConstSharedPtr admin_address(admin_address_);
260
0
    if (admin_address != nullptr) {
261
0
      BufferingStreamDecoderPtr response = IntegrationUtil::makeSingleRequest(
262
0
          admin_address, "POST", "/quitquitquit", "", Http::CodecType::HTTP1);
263
0
      EXPECT_TRUE(response->complete());
264
0
      EXPECT_EQ("200", response->headers().getStatusValue());
265
0
      server_gone_.WaitForNotification();
266
0
    }
267
2.64k
  } else {
268
2.64k
    if (server_) {
269
2.64k
      server_->dispatcher().post([this]() { server_->shutdown(); });
270
2.64k
      server_gone_.WaitForNotification();
271
2.64k
    }
272
2.64k
  }
273
274
2.64k
  server_ = nullptr;
275
2.64k
  admin_address_ = nullptr;
276
2.64k
  stat_store_ = nullptr;
277
2.64k
}
278
279
} // namespace Envoy