Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/test/integration/server.cc
Line
Count
Source (jump to first uncovered line)
1
#include "test/integration/server.h"
2
3
#include <memory>
4
#include <string>
5
6
#include "envoy/http/header_map.h"
7
8
#include "source/common/common/random_generator.h"
9
#include "source/common/common/thread.h"
10
#include "source/common/local_info/local_info_impl.h"
11
#include "source/common/network/utility.h"
12
#include "source/common/stats/thread_local_store.h"
13
#include "source/common/thread_local/thread_local_impl.h"
14
#include "source/server/hot_restart_nop_impl.h"
15
#include "source/server/instance_impl.h"
16
#include "source/server/options_impl_base.h"
17
#include "source/server/process_context_impl.h"
18
19
#include "test/integration/utility.h"
20
#include "test/mocks/common.h"
21
#include "test/mocks/runtime/mocks.h"
22
#include "test/test_common/environment.h"
23
24
#include "absl/strings/str_replace.h"
25
#include "gtest/gtest.h"
26
27
namespace Envoy {
28
namespace Server {
29
30
OptionsImplBase
31
createTestOptionsImpl(const std::string& config_path, const std::string& config_yaml,
32
                      Network::Address::IpVersion ip_version,
33
                      FieldValidationConfig validation_config, uint32_t concurrency,
34
                      std::chrono::seconds drain_time, Server::DrainStrategy drain_strategy,
35
                      bool use_bootstrap_node_metadata,
36
1.97k
                      std::unique_ptr<envoy::config::bootstrap::v3::Bootstrap>&& config_proto) {
37
  // Empty string values mean the Bootstrap node metadata won't be overridden.
38
1.97k
  const std::string service_cluster = use_bootstrap_node_metadata ? "" : "cluster_name";
39
1.97k
  const std::string service_node = use_bootstrap_node_metadata ? "" : "node_name";
40
1.97k
  const std::string service_zone = use_bootstrap_node_metadata ? "" : "zone_name";
41
1.97k
  OptionsImplBase test_options;
42
1.97k
  test_options.setServiceClusterName(service_cluster);
43
1.97k
  test_options.setServiceNodeName(service_node);
44
1.97k
  test_options.setServiceZone(service_zone);
45
1.97k
  test_options.setLogLevel(spdlog::level::info);
46
47
1.97k
  test_options.setConfigPath(config_path);
48
1.97k
  test_options.setConfigYaml(config_yaml);
49
1.97k
  test_options.setLocalAddressIpVersion(ip_version);
50
1.97k
  test_options.setFileFlushIntervalMsec(std::chrono::milliseconds(50));
51
1.97k
  test_options.setDrainTime(drain_time);
52
1.97k
  test_options.setParentShutdownTime(std::chrono::seconds(2));
53
1.97k
  test_options.setDrainStrategy(drain_strategy);
54
1.97k
  test_options.setAllowUnknownFields(validation_config.allow_unknown_static_fields);
55
1.97k
  test_options.setRejectUnknownFieldsDynamic(validation_config.reject_unknown_dynamic_fields);
56
1.97k
  test_options.setIgnoreUnknownFieldsDynamic(validation_config.ignore_unknown_dynamic_fields);
57
1.97k
  test_options.setConcurrency(concurrency);
58
1.97k
  test_options.setHotRestartDisabled(true);
59
1.97k
  if (config_proto) {
60
0
    test_options.setConfigProto(std::move(config_proto));
61
0
  }
62
63
1.97k
  return test_options;
64
1.97k
}
65
66
} // namespace Server
67
68
IntegrationTestServerPtr IntegrationTestServer::create(
69
    const std::string& config_path, const Network::Address::IpVersion version,
70
    std::function<void(IntegrationTestServer&)> server_ready_function,
71
    std::function<void()> on_server_init_function, absl::optional<uint64_t> deterministic_value,
72
    Event::TestTimeSystem& time_system, Api::Api& api, bool defer_listener_finalization,
73
    ProcessObjectOptRef process_object, Server::FieldValidationConfig validation_config,
74
    uint32_t concurrency, std::chrono::seconds drain_time, Server::DrainStrategy drain_strategy,
75
    Buffer::WatermarkFactorySharedPtr watermark_factory, bool use_real_stats,
76
    bool use_bootstrap_node_metadata,
77
1.97k
    std::unique_ptr<envoy::config::bootstrap::v3::Bootstrap>&& config_proto) {
78
1.97k
  IntegrationTestServerPtr server{std::make_unique<IntegrationTestServerImpl>(
79
1.97k
      time_system, api, config_path, use_real_stats, std::move(config_proto))};
80
1.97k
  if (server_ready_function != nullptr) {
81
0
    server->setOnServerReadyCb(server_ready_function);
82
0
  }
83
1.97k
  server->start(version, on_server_init_function, deterministic_value, defer_listener_finalization,
84
1.97k
                process_object, validation_config, concurrency, drain_time, drain_strategy,
85
1.97k
                watermark_factory, use_bootstrap_node_metadata);
86
1.97k
  return server;
87
1.97k
}
88
89
1.97k
void IntegrationTestServer::waitUntilListenersReady() {
90
1.97k
  Thread::LockGuard guard(listeners_mutex_);
91
1.97k
  while (pending_listeners_ != 0) {
92
    // If your test is hanging forever here, you may need to create your listener manually,
93
    // after BaseIntegrationTest::initialize() is done. See cds_integration_test.cc for an example.
94
0
    listeners_cv_.wait(listeners_mutex_); // Safe since CondVar::wait won't throw.
95
0
  }
96
1.97k
  ENVOY_LOG(info, "listener wait complete");
97
1.97k
}
98
99
void IntegrationTestServer::setDynamicContextParam(absl::string_view resource_type_url,
100
0
                                                   absl::string_view key, absl::string_view value) {
101
0
  server().dispatcher().post([this, resource_type_url, key, value]() {
102
0
    server().localInfo().contextProvider().setDynamicContextParam(resource_type_url, key, value);
103
0
  });
104
0
}
105
106
void IntegrationTestServer::unsetDynamicContextParam(absl::string_view resource_type_url,
107
0
                                                     absl::string_view key) {
108
0
  server().dispatcher().post([this, resource_type_url, key]() {
109
0
    server().localInfo().contextProvider().unsetDynamicContextParam(resource_type_url, key);
110
0
  });
111
0
}
112
113
void IntegrationTestServer::start(
114
    const Network::Address::IpVersion version, std::function<void()> on_server_init_function,
115
    absl::optional<uint64_t> deterministic_value, bool defer_listener_finalization,
116
    ProcessObjectOptRef process_object, Server::FieldValidationConfig validator_config,
117
    uint32_t concurrency, std::chrono::seconds drain_time, Server::DrainStrategy drain_strategy,
118
1.97k
    Buffer::WatermarkFactorySharedPtr watermark_factory, bool use_bootstrap_node_metadata) {
119
1.97k
  ENVOY_LOG(info, "starting integration test server");
120
1.97k
  ASSERT(!thread_);
121
1.97k
  thread_ = api_.threadFactory().createThread(
122
1.97k
      [version, deterministic_value, process_object, validator_config, concurrency, drain_time,
123
1.97k
       drain_strategy, watermark_factory, use_bootstrap_node_metadata, this]() -> void {
124
1.97k
        threadRoutine(version, deterministic_value, process_object, validator_config, concurrency,
125
1.97k
                      drain_time, drain_strategy, watermark_factory, use_bootstrap_node_metadata);
126
1.97k
      });
127
128
  // If any steps need to be done prior to workers starting, do them now. E.g., xDS pre-init.
129
  // Note that there is no synchronization guaranteeing this happens either
130
  // before workers starting or after server start. Any needed synchronization must occur in the
131
  // routines. These steps are executed at this point in the code to allow server initialization
132
  // to be dependent on them (e.g. control plane peers).
133
1.97k
  if (on_server_init_function != nullptr) {
134
0
    on_server_init_function();
135
0
  }
136
137
  // Wait for the server to be created and the number of initial listeners to wait for to be set.
138
1.97k
  server_set_.waitReady();
139
140
1.97k
  if (!defer_listener_finalization) {
141
    // Now wait for the initial listeners (if any) to actually be listening on the worker.
142
    // At this point the server is up and ready for testing.
143
1.97k
    waitUntilListenersReady();
144
1.97k
  }
145
146
  // If we are tapping, spin up tcpdump.
147
1.97k
  const auto tap_path = TestEnvironment::getOptionalEnvVar("TAP_PATH");
148
1.97k
  if (tap_path) {
149
0
    std::vector<uint32_t> ports;
150
0
    for (auto listener : server().listenerManager().listeners()) {
151
0
      const auto listen_addr = listener.get().listenSocketFactories()[0]->localAddress();
152
0
      if (listen_addr->type() == Network::Address::Type::Ip) {
153
0
        ports.push_back(listen_addr->ip()->port());
154
0
      }
155
0
    }
156
    // TODO(htuch): Support a different loopback interface as needed.
157
0
    const ::testing::TestInfo* const test_info =
158
0
        ::testing::UnitTest::GetInstance()->current_test_info();
159
0
    const std::string test_id =
160
0
        std::string(test_info->name()) + "_" + std::string(test_info->test_case_name());
161
0
    const std::string pcap_path =
162
0
        tap_path.value() + "_" + absl::StrReplaceAll(test_id, {{"/", "_"}}) + "_server.pcap";
163
0
    tcp_dump_ = std::make_unique<TcpDump>(pcap_path, "lo", ports);
164
0
  }
165
1.97k
}
166
167
1.97k
IntegrationTestServer::~IntegrationTestServer() {
168
  // Derived class must have shutdown server.
169
1.97k
  thread_->join();
170
1.97k
}
171
172
1.99k
void IntegrationTestServer::onWorkerListenerAdded() {
173
1.99k
  if (on_worker_listener_added_cb_) {
174
0
    on_worker_listener_added_cb_();
175
0
  }
176
177
1.99k
  Thread::LockGuard guard(listeners_mutex_);
178
1.99k
  if (pending_listeners_ > 0) {
179
0
    pending_listeners_--;
180
0
    listeners_cv_.notifyOne();
181
0
  }
182
1.99k
}
183
184
0
void IntegrationTestServer::onWorkerListenerRemoved() {
185
0
  if (on_worker_listener_removed_cb_) {
186
0
    on_worker_listener_removed_cb_();
187
0
  }
188
0
}
189
190
1.97k
void IntegrationTestServer::serverReady() {
191
1.97k
  pending_listeners_ = server().listenerManager().listeners().size();
192
1.97k
  if (on_server_ready_cb_ != nullptr) {
193
0
    on_server_ready_cb_(*this);
194
0
  }
195
1.97k
  server_set_.setReady();
196
1.97k
}
197
198
void IntegrationTestServer::threadRoutine(
199
    const Network::Address::IpVersion version, absl::optional<uint64_t> deterministic_value,
200
    ProcessObjectOptRef process_object, Server::FieldValidationConfig validation_config,
201
    uint32_t concurrency, std::chrono::seconds drain_time, Server::DrainStrategy drain_strategy,
202
1.97k
    Buffer::WatermarkFactorySharedPtr watermark_factory, bool use_bootstrap_node_metadata) {
203
1.97k
  OptionsImplBase options(Server::createTestOptionsImpl(
204
1.97k
      config_path_, "", version, validation_config, concurrency, drain_time, drain_strategy,
205
1.97k
      use_bootstrap_node_metadata, std::move(config_proto_)));
206
1.97k
  Thread::MutexBasicLockable lock;
207
208
1.97k
  Random::RandomGeneratorPtr random_generator;
209
1.97k
  if (deterministic_value.has_value()) {
210
0
    random_generator = std::make_unique<testing::NiceMock<Random::MockRandomGenerator>>(
211
0
        deterministic_value.value());
212
1.97k
  } else {
213
1.97k
    random_generator = std::make_unique<Random::RandomGeneratorImpl>();
214
1.97k
  }
215
216
1.97k
  createAndRunEnvoyServer(options, time_system_, Network::Utility::getLocalAddress(version), *this,
217
1.97k
                          lock, *this, std::move(random_generator), process_object,
218
1.97k
                          watermark_factory);
219
1.97k
}
220
221
IntegrationTestServerImpl::IntegrationTestServerImpl(
222
    Event::TestTimeSystem& time_system, Api::Api& api, const std::string& config_path,
223
    bool use_real_stats, std::unique_ptr<envoy::config::bootstrap::v3::Bootstrap>&& config_proto)
224
1.97k
    : IntegrationTestServer(time_system, api, config_path, std::move(config_proto)) {
225
1.97k
  stats_allocator_ =
226
1.97k
      (use_real_stats ? std::make_unique<Stats::AllocatorImpl>(symbol_table_)
227
1.97k
                      : std::make_unique<Stats::NotifyingAllocatorImpl>(symbol_table_));
228
1.97k
}
229
230
void IntegrationTestServerImpl::createAndRunEnvoyServer(
231
    OptionsImplBase& options, Event::TimeSystem& time_system,
232
    Network::Address::InstanceConstSharedPtr local_address, ListenerHooks& hooks,
233
    Thread::BasicLockable& access_log_lock, Server::ComponentFactory& component_factory,
234
    Random::RandomGeneratorPtr&& random_generator, ProcessObjectOptRef process_object,
235
1.97k
    Buffer::WatermarkFactorySharedPtr watermark_factory) {
236
1.97k
  {
237
1.97k
    Init::ManagerImpl init_manager{"Server"};
238
1.97k
    Server::HotRestartNopImpl restarter;
239
1.97k
    ThreadLocal::InstanceImpl tls;
240
1.97k
    Stats::ThreadLocalStoreImpl stat_store(*stats_allocator_);
241
1.97k
    std::unique_ptr<ProcessContext> process_context;
242
1.97k
    if (process_object.has_value()) {
243
0
      process_context = std::make_unique<ProcessContextImpl>(process_object->get());
244
0
    }
245
1.97k
    Server::InstanceImpl server(init_manager, options, time_system, hooks, restarter, stat_store,
246
1.97k
                                access_log_lock, std::move(random_generator), tls,
247
1.97k
                                Thread::threadFactoryForTest(), Filesystem::fileSystemForTest(),
248
1.97k
                                std::move(process_context), watermark_factory);
249
1.97k
    server.initialize(local_address, component_factory);
250
    // This is technically thread unsafe (assigning to a shared_ptr accessed
251
    // across threads), but because we synchronize below through serverReady(), the only
252
    // consumer on the main test thread in ~IntegrationTestServerImpl will not race.
253
1.97k
    if (server.admin()) {
254
1.97k
      admin_address_ = server.admin()->socket().connectionInfoProvider().localAddress();
255
1.97k
    }
256
1.97k
    server_ = &server;
257
1.97k
    stat_store_ = &stat_store;
258
1.97k
    serverReady();
259
1.97k
    server.run();
260
1.97k
  }
261
1.97k
  server_gone_.Notify();
262
1.97k
}
263
264
1.97k
IntegrationTestServerImpl::~IntegrationTestServerImpl() {
265
1.97k
  ENVOY_LOG(info, "stopping integration test server");
266
267
1.97k
  if (useAdminInterfaceToQuit()) {
268
0
    Network::Address::InstanceConstSharedPtr admin_address(admin_address_);
269
0
    if (admin_address != nullptr) {
270
0
      BufferingStreamDecoderPtr response = IntegrationUtil::makeSingleRequest(
271
0
          admin_address, "POST", "/quitquitquit", "", Http::CodecType::HTTP1);
272
0
      EXPECT_TRUE(response->complete());
273
0
      EXPECT_EQ("200", response->headers().getStatusValue());
274
0
      server_gone_.WaitForNotification();
275
0
    }
276
1.97k
  } else {
277
1.97k
    if (server_) {
278
1.97k
      server_->dispatcher().post([this]() { server_->shutdown(); });
279
1.97k
      server_gone_.WaitForNotification();
280
1.97k
    }
281
1.97k
  }
282
283
1.97k
  server_ = nullptr;
284
1.97k
  admin_address_ = nullptr;
285
1.97k
  stat_store_ = nullptr;
286
1.97k
}
287
288
} // namespace Envoy