Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/test/integration/server.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include <chrono>
4
#include <cstdint>
5
#include <list>
6
#include <memory>
7
#include <string>
8
9
#include "envoy/config/listener/v3/listener.pb.h"
10
#include "envoy/server/options.h"
11
#include "envoy/server/process_context.h"
12
#include "envoy/stats/histogram.h"
13
#include "envoy/stats/stats.h"
14
15
#include "source/common/common/assert.h"
16
#include "source/common/common/lock_guard.h"
17
#include "source/common/common/logger.h"
18
#include "source/common/common/thread.h"
19
#include "source/common/stats/allocator_impl.h"
20
#include "source/server/drain_manager_impl.h"
21
#include "source/server/listener_hooks.h"
22
#include "source/server/options_impl.h"
23
#include "source/server/server.h"
24
25
#include "test/integration/server_stats.h"
26
#include "test/integration/tcp_dump.h"
27
#include "test/test_common/test_time_system.h"
28
#include "test/test_common/utility.h"
29
30
#include "absl/synchronization/notification.h"
31
#include "absl/types/optional.h"
32
33
namespace Envoy {
34
namespace Server {
35
36
struct FieldValidationConfig {
37
  bool allow_unknown_static_fields = false;
38
  bool reject_unknown_dynamic_fields = false;
39
  bool ignore_unknown_dynamic_fields = false;
40
};
41
42
// Create OptionsImpl structures suitable for tests. Disables hot restart.
43
OptionsImpl
44
createTestOptionsImpl(const std::string& config_path, const std::string& config_yaml,
45
                      Network::Address::IpVersion ip_version,
46
                      FieldValidationConfig validation_config = FieldValidationConfig(),
47
                      uint32_t concurrency = 1,
48
                      std::chrono::seconds drain_time = std::chrono::seconds(1),
49
                      Server::DrainStrategy drain_strategy = Server::DrainStrategy::Gradual,
50
                      bool use_bootstrap_node_metadata = false);
51
52
class TestComponentFactory : public ComponentFactory {
53
public:
54
2.84k
  Server::DrainManagerPtr createDrainManager(Server::Instance& server) override {
55
2.84k
    return Server::DrainManagerPtr{new Server::DrainManagerImpl(
56
2.84k
        server, envoy::config::listener::v3::Listener::MODIFY_ONLY, server.dispatcher())};
57
2.84k
  }
58
  Runtime::LoaderPtr createRuntime(Server::Instance& server,
59
2.68k
                                   Server::Configuration::Initial& config) override {
60
2.68k
    return Server::InstanceUtil::createRuntime(server, config);
61
2.68k
  }
62
};
63
64
} // namespace Server
65
66
namespace Stats {
67
68
/**
69
 * This is a wrapper for Scopes for the TestIsolatedStoreImpl to ensure new scopes do
70
 * not interact with the store without grabbing the lock from TestIsolatedStoreImpl.
71
 */
72
class TestScopeWrapper : public Scope {
73
public:
74
  TestScopeWrapper(Thread::MutexBasicLockable& lock, ScopeSharedPtr wrapped_scope, Store& store)
75
35.3k
      : lock_(lock), wrapped_scope_(wrapped_scope), store_(store) {}
76
77
32.5k
  ScopeSharedPtr createScope(const std::string& name) override {
78
32.5k
    Thread::LockGuard lock(lock_);
79
32.5k
    return std::make_shared<TestScopeWrapper>(lock_, wrapped_scope_->createScope(name), store_);
80
32.5k
  }
81
82
0
  ScopeSharedPtr scopeFromStatName(StatName name) override {
83
0
    Thread::LockGuard lock(lock_);
84
0
    return std::make_shared<TestScopeWrapper>(lock_, wrapped_scope_->scopeFromStatName(name),
85
0
                                              store_);
86
0
  }
87
88
  Counter& counterFromStatNameWithTags(const StatName& name,
89
258k
                                       StatNameTagVectorOptConstRef tags) override {
90
258k
    Thread::LockGuard lock(lock_);
91
258k
    return wrapped_scope_->counterFromStatNameWithTags(name, tags);
92
258k
  }
93
94
  Gauge& gaugeFromStatNameWithTags(const StatName& name, StatNameTagVectorOptConstRef tags,
95
104k
                                   Gauge::ImportMode import_mode) override {
96
104k
    Thread::LockGuard lock(lock_);
97
104k
    return wrapped_scope_->gaugeFromStatNameWithTags(name, tags, import_mode);
98
104k
  }
99
100
  Histogram& histogramFromStatNameWithTags(const StatName& name, StatNameTagVectorOptConstRef tags,
101
12.5k
                                           Histogram::Unit unit) override {
102
12.5k
    Thread::LockGuard lock(lock_);
103
12.5k
    return wrapped_scope_->histogramFromStatNameWithTags(name, tags, unit);
104
12.5k
  }
105
106
  TextReadout& textReadoutFromStatNameWithTags(const StatName& name,
107
2.72k
                                               StatNameTagVectorOptConstRef tags) override {
108
2.72k
    Thread::LockGuard lock(lock_);
109
2.72k
    return wrapped_scope_->textReadoutFromStatNameWithTags(name, tags);
110
2.72k
  }
111
112
228k
  Counter& counterFromString(const std::string& name) override {
113
228k
    StatNameManagedStorage storage(name, symbolTable());
114
228k
    return counterFromStatName(storage.statName());
115
228k
  }
116
104k
  Gauge& gaugeFromString(const std::string& name, Gauge::ImportMode import_mode) override {
117
104k
    StatNameManagedStorage storage(name, symbolTable());
118
104k
    return gaugeFromStatName(storage.statName(), import_mode);
119
104k
  }
120
9.83k
  Histogram& histogramFromString(const std::string& name, Histogram::Unit unit) override {
121
9.83k
    StatNameManagedStorage storage(name, symbolTable());
122
9.83k
    return histogramFromStatName(storage.statName(), unit);
123
9.83k
  }
124
2.72k
  TextReadout& textReadoutFromString(const std::string& name) override {
125
2.72k
    StatNameManagedStorage storage(name, symbolTable());
126
2.72k
    return textReadoutFromStatName(storage.statName());
127
2.72k
  }
128
129
0
  CounterOptConstRef findCounter(StatName name) const override {
130
0
    Thread::LockGuard lock(lock_);
131
0
    return wrapped_scope_->findCounter(name);
132
0
  }
133
0
  GaugeOptConstRef findGauge(StatName name) const override {
134
0
    Thread::LockGuard lock(lock_);
135
0
    return wrapped_scope_->findGauge(name);
136
0
  }
137
0
  HistogramOptConstRef findHistogram(StatName name) const override {
138
0
    Thread::LockGuard lock(lock_);
139
0
    return wrapped_scope_->findHistogram(name);
140
0
  }
141
0
  TextReadoutOptConstRef findTextReadout(StatName name) const override {
142
0
    Thread::LockGuard lock(lock_);
143
0
    return wrapped_scope_->findTextReadout(name);
144
0
  }
145
146
0
  const SymbolTable& constSymbolTable() const override {
147
0
    return wrapped_scope_->constSymbolTable();
148
0
  }
149
358k
  SymbolTable& symbolTable() override { return wrapped_scope_->symbolTable(); }
150
151
0
  bool iterate(const IterateFn<Counter>& fn) const override { return wrapped_scope_->iterate(fn); }
152
0
  bool iterate(const IterateFn<Gauge>& fn) const override { return wrapped_scope_->iterate(fn); }
153
0
  bool iterate(const IterateFn<Histogram>& fn) const override {
154
0
    return wrapped_scope_->iterate(fn);
155
0
  }
156
0
  bool iterate(const IterateFn<TextReadout>& fn) const override {
157
0
    return wrapped_scope_->iterate(fn);
158
0
  }
159
0
  StatName prefix() const override { return wrapped_scope_->prefix(); }
160
0
  Store& store() override { return store_; }
161
0
  const Store& constStore() const override { return store_; }
162
163
private:
164
  Thread::MutexBasicLockable& lock_;
165
  ScopeSharedPtr wrapped_scope_;
166
  Store& store_;
167
};
168
169
// A counter which signals on a condition variable when it is incremented.
170
class NotifyingCounter : public Stats::Counter {
171
public:
172
  NotifyingCounter(Stats::Counter* counter, absl::Mutex& mutex, absl::CondVar& condvar)
173
862k
      : counter_(counter), mutex_(mutex), condvar_(condvar) {}
174
175
4.50M
  std::string name() const override { return counter_->name(); }
176
7.45M
  StatName statName() const override { return counter_->statName(); }
177
0
  TagVector tags() const override { return counter_->tags(); }
178
813k
  std::string tagExtractedName() const override { return counter_->tagExtractedName(); }
179
0
  void iterateTagStatNames(const TagStatNameIterFn& fn) const override {
180
0
    counter_->iterateTagStatNames(fn);
181
0
  }
182
192k
  void add(uint64_t amount) override {
183
192k
    counter_->add(amount);
184
192k
    absl::MutexLock l(&mutex_);
185
192k
    condvar_.Signal();
186
192k
  }
187
151k
  void inc() override { add(1); }
188
867k
  uint64_t latch() override { return counter_->latch(); }
189
0
  void reset() override { return counter_->reset(); }
190
14.6k
  uint64_t value() const override { return counter_->value(); }
191
7.58M
  void incRefCount() override { counter_->incRefCount(); }
192
7.58M
  bool decRefCount() override { return counter_->decRefCount(); }
193
0
  uint32_t use_count() const override { return counter_->use_count(); }
194
0
  StatName tagExtractedStatName() const override { return counter_->tagExtractedStatName(); }
195
0
  bool used() const override { return counter_->used(); }
196
0
  bool hidden() const override { return counter_->hidden(); }
197
0
  SymbolTable& symbolTable() override { return counter_->symbolTable(); }
198
0
  const SymbolTable& constSymbolTable() const override { return counter_->constSymbolTable(); }
199
200
private:
201
  std::unique_ptr<Stats::Counter> counter_;
202
  absl::Mutex& mutex_;
203
  absl::CondVar& condvar_;
204
};
205
206
// A stats allocator which creates NotifyingCounters rather than regular CounterImpls.
207
class NotifyingAllocatorImpl : public Stats::AllocatorImpl {
208
public:
209
  using Stats::AllocatorImpl::AllocatorImpl;
210
211
0
  void waitForCounterFromStringEq(const std::string& name, uint64_t value) {
212
0
    absl::MutexLock l(&mutex_);
213
0
    ENVOY_LOG_MISC(trace, "waiting for {} to be {}", name, value);
214
0
    while (getCounterLockHeld(name) == nullptr || getCounterLockHeld(name)->value() != value) {
215
0
      condvar_.Wait(&mutex_);
216
0
    }
217
0
    ENVOY_LOG_MISC(trace, "done waiting for {} to be {}", name, value);
218
0
  }
219
220
0
  void waitForCounterFromStringGe(const std::string& name, uint64_t value) {
221
0
    absl::MutexLock l(&mutex_);
222
0
    ENVOY_LOG_MISC(trace, "waiting for {} to be {}", name, value);
223
0
    while (getCounterLockHeld(name) == nullptr || getCounterLockHeld(name)->value() < value) {
224
0
      condvar_.Wait(&mutex_);
225
0
    }
226
0
    ENVOY_LOG_MISC(trace, "done waiting for {} to be {}", name, value);
227
0
  }
228
229
0
  void waitForCounterExists(const std::string& name) {
230
0
    absl::MutexLock l(&mutex_);
231
0
    ENVOY_LOG_MISC(trace, "waiting for {} to exist", name);
232
0
    while (getCounterLockHeld(name) == nullptr) {
233
0
      condvar_.Wait(&mutex_);
234
0
    }
235
0
    ENVOY_LOG_MISC(trace, "done waiting for {} to exist", name);
236
0
  }
237
238
protected:
239
  Stats::Counter* makeCounterInternal(StatName name, StatName tag_extracted_name,
240
862k
                                      const StatNameTagVector& stat_name_tags) override {
241
862k
    Stats::Counter* counter = new NotifyingCounter(
242
862k
        Stats::AllocatorImpl::makeCounterInternal(name, tag_extracted_name, stat_name_tags), mutex_,
243
862k
        condvar_);
244
862k
    {
245
862k
      absl::MutexLock l(&mutex_);
246
      // Allow getting the counter directly from the allocator, since it's harder to
247
      // signal when the counter has been added to a given stats store.
248
862k
      counters_.emplace(counter->name(), counter);
249
862k
      if (counter->name() == "cluster_manager.cluster_removed") {
250
2.64k
      }
251
862k
      condvar_.Signal();
252
862k
    }
253
862k
    return counter;
254
862k
  }
255
256
  virtual Stats::Counter* getCounterLockHeld(const std::string& name)
257
0
      ABSL_EXCLUSIVE_LOCKS_REQUIRED(mutex_) {
258
0
    auto it = counters_.find(name);
259
0
    if (it != counters_.end()) {
260
0
      return it->second;
261
0
    }
262
0
    return nullptr;
263
0
  }
264
265
private:
266
  absl::flat_hash_map<std::string, Stats::Counter*> counters_;
267
  absl::Mutex mutex_;
268
  absl::CondVar condvar_;
269
};
270
271
/**
272
 * This is a variant of the isolated store that has locking across all operations so that it can
273
 * be used during the integration tests.
274
 */
275
class TestIsolatedStoreImpl : public StoreRoot {
276
public:
277
  // Stats::Store
278
0
  void forEachCounter(Stats::SizeFn f_size, StatFn<Counter> f_stat) const override {
279
0
    Thread::LockGuard lock(lock_);
280
0
    store_.forEachCounter(f_size, f_stat);
281
0
  }
282
0
  void forEachGauge(Stats::SizeFn f_size, StatFn<Gauge> f_stat) const override {
283
0
    Thread::LockGuard lock(lock_);
284
0
    store_.forEachGauge(f_size, f_stat);
285
0
  }
286
0
  void forEachTextReadout(Stats::SizeFn f_size, StatFn<TextReadout> f_stat) const override {
287
0
    Thread::LockGuard lock(lock_);
288
0
    store_.forEachTextReadout(f_size, f_stat);
289
0
  }
290
0
  void forEachHistogram(Stats::SizeFn f_size, StatFn<ParentHistogram> f_stat) const override {
291
0
    Thread::LockGuard lock(lock_);
292
0
    store_.forEachHistogram(f_size, f_stat);
293
0
  }
294
  void forEachScope(std::function<void(std::size_t)> f_size,
295
0
                    StatFn<const Scope> f_scope) const override {
296
0
    Thread::LockGuard lock(lock_);
297
0
    store_.forEachScope(f_size, f_scope);
298
0
  }
299
1.11k
  void forEachSinkedCounter(Stats::SizeFn f_size, StatFn<Counter> f_stat) const override {
300
1.11k
    Thread::LockGuard lock(lock_);
301
1.11k
    store_.forEachSinkedCounter(f_size, f_stat);
302
1.11k
  }
303
1.11k
  void forEachSinkedGauge(Stats::SizeFn f_size, StatFn<Gauge> f_stat) const override {
304
1.11k
    Thread::LockGuard lock(lock_);
305
1.11k
    store_.forEachSinkedGauge(f_size, f_stat);
306
1.11k
  }
307
1.11k
  void forEachSinkedTextReadout(Stats::SizeFn f_size, StatFn<TextReadout> f_stat) const override {
308
1.11k
    Thread::LockGuard lock(lock_);
309
1.11k
    store_.forEachSinkedTextReadout(f_size, f_stat);
310
1.11k
  }
311
1.11k
  void forEachSinkedHistogram(Stats::SizeFn f_size, StatFn<ParentHistogram> f_stat) const override {
312
1.11k
    Thread::LockGuard lock(lock_);
313
1.11k
    store_.forEachSinkedHistogram(f_size, f_stat);
314
1.11k
  }
315
0
  void setSinkPredicates(std::unique_ptr<SinkPredicates>&& sink_predicates) override {
316
0
    UNREFERENCED_PARAMETER(sink_predicates);
317
0
  }
318
0
  OptRef<SinkPredicates> sinkPredicates() override { return OptRef<SinkPredicates>{}; }
319
0
  void deliverHistogramToSinks(const Histogram& histogram, uint64_t value) override {
320
0
    Thread::LockGuard lock(lock_);
321
0
    store_.deliverHistogramToSinks(histogram, value);
322
0
  }
323
0
  NullGaugeImpl& nullGauge() override { return store_.nullGauge(); }
324
0
  NullCounterImpl& nullCounter() override { return store_.nullCounter(); }
325
135k
  ScopeSharedPtr rootScope() override {
326
135k
    Thread::LockGuard lock(lock_);
327
135k
    if (lazy_default_scope_ == nullptr) {
328
2.84k
      lazy_default_scope_ = std::make_shared<TestScopeWrapper>(lock_, store_.rootScope(), *this);
329
2.84k
    }
330
135k
    return lazy_default_scope_;
331
135k
  }
332
0
  ConstScopeSharedPtr constRootScope() const override {
333
0
    return const_cast<TestIsolatedStoreImpl*>(this)->rootScope();
334
0
  }
335
0
  const SymbolTable& constSymbolTable() const override { return store_.constSymbolTable(); }
336
38.2k
  SymbolTable& symbolTable() override { return store_.symbolTable(); }
337
338
  // Stats::Store
339
0
  std::vector<CounterSharedPtr> counters() const override {
340
0
    Thread::LockGuard lock(lock_);
341
0
    return store_.counters();
342
0
  }
343
0
  std::vector<GaugeSharedPtr> gauges() const override {
344
0
    Thread::LockGuard lock(lock_);
345
0
    return store_.gauges();
346
0
  }
347
0
  std::vector<ParentHistogramSharedPtr> histograms() const override {
348
0
    Thread::LockGuard lock(lock_);
349
0
    return store_.histograms();
350
0
  }
351
0
  std::vector<TextReadoutSharedPtr> textReadouts() const override {
352
0
    Thread::LockGuard lock(lock_);
353
0
    return store_.textReadouts();
354
0
  }
355
356
0
  bool iterate(const IterateFn<Counter>& fn) const override { return store_.iterate(fn); }
357
0
  bool iterate(const IterateFn<Gauge>& fn) const override { return store_.iterate(fn); }
358
0
  bool iterate(const IterateFn<Histogram>& fn) const override { return store_.iterate(fn); }
359
0
  bool iterate(const IterateFn<TextReadout>& fn) const override { return store_.iterate(fn); }
360
361
0
  void extractAndAppendTags(StatName, StatNamePool&, StatNameTagVector&) override{};
362
0
  void extractAndAppendTags(absl::string_view, StatNamePool&, StatNameTagVector&) override{};
363
0
  const Stats::TagVector& fixedTags() override { CONSTRUCT_ON_FIRST_USE(Stats::TagVector); }
364
365
  // Stats::StoreRoot
366
0
  void addSink(Sink&) override {}
367
2.81k
  void setTagProducer(TagProducerPtr&&) override {}
368
2.75k
  void setStatsMatcher(StatsMatcherPtr&&) override {}
369
2.70k
  void setHistogramSettings(HistogramSettingsConstPtr&&) override {}
370
2.68k
  void initializeThreading(Event::Dispatcher&, ThreadLocal::Instance&) override {}
371
2.84k
  void shutdownThreading() override {}
372
0
  void mergeHistograms(PostMergeCb cb) override { merge_cb_ = cb; }
373
374
0
  void runMergeCallback() { merge_cb_(); }
375
376
private:
377
  mutable Thread::MutexBasicLockable lock_;
378
  IsolatedStoreImpl store_;
379
  PostMergeCb merge_cb_;
380
  ScopeSharedPtr lazy_default_scope_;
381
};
382
383
} // namespace Stats
384
385
class IntegrationTestServer;
386
using IntegrationTestServerPtr = std::unique_ptr<IntegrationTestServer>;
387
388
/**
389
 * Wrapper for running the real server for the purpose of integration tests.
390
 * This class is an Abstract Base Class and delegates ownership and management
391
 * of the actual envoy server to a derived class. See the documentation for
392
 * createAndRunEnvoyServer().
393
 */
394
class IntegrationTestServer : public Logger::Loggable<Logger::Id::testing>,
395
                              public ListenerHooks,
396
                              public IntegrationTestServerStats,
397
                              public Server::ComponentFactory {
398
public:
399
  static IntegrationTestServerPtr
400
  create(const std::string& config_path, const Network::Address::IpVersion version,
401
         std::function<void(IntegrationTestServer&)> on_server_ready_function,
402
         std::function<void()> on_server_init_function,
403
         absl::optional<uint64_t> deterministic_value, Event::TestTimeSystem& time_system,
404
         Api::Api& api, bool defer_listener_finalization = false,
405
         ProcessObjectOptRef process_object = absl::nullopt,
406
         Server::FieldValidationConfig validation_config = Server::FieldValidationConfig(),
407
         uint32_t concurrency = 1, std::chrono::seconds drain_time = std::chrono::seconds(1),
408
         Server::DrainStrategy drain_strategy = Server::DrainStrategy::Gradual,
409
         Buffer::WatermarkFactorySharedPtr watermark_factory = nullptr, bool use_real_stats = false,
410
         bool use_bootstrap_node_metadata = false);
411
  // Note that the derived class is responsible for tearing down the server in its
412
  // destructor.
413
  ~IntegrationTestServer() override;
414
415
  void waitUntilListenersReady();
416
417
  void setDynamicContextParam(absl::string_view resource_type_url, absl::string_view key,
418
                              absl::string_view value);
419
  void unsetDynamicContextParam(absl::string_view resource_type_url, absl::string_view key);
420
421
0
  Server::DrainManagerImpl& drainManager() { return *drain_manager_; }
422
0
  void setOnWorkerListenerAddedCb(std::function<void()> on_worker_listener_added) {
423
0
    on_worker_listener_added_cb_ = std::move(on_worker_listener_added);
424
0
  }
425
0
  void setOnWorkerListenerRemovedCb(std::function<void()> on_worker_listener_removed) {
426
0
    on_worker_listener_removed_cb_ = std::move(on_worker_listener_removed);
427
0
  }
428
0
  void setOnServerReadyCb(std::function<void(IntegrationTestServer&)> on_server_ready) {
429
0
    on_server_ready_cb_ = std::move(on_server_ready);
430
0
  }
431
2.64k
  void onWorkersStarted() override {}
432
433
  void start(const Network::Address::IpVersion version,
434
             std::function<void()> on_server_init_function,
435
             absl::optional<uint64_t> deterministic_value, bool defer_listener_finalization,
436
             ProcessObjectOptRef process_object, Server::FieldValidationConfig validation_config,
437
             uint32_t concurrency, std::chrono::seconds drain_time,
438
             Server::DrainStrategy drain_strategy,
439
             Buffer::WatermarkFactorySharedPtr watermark_factory, bool use_bootstrap_node_metadata);
440
441
  void waitForCounterEq(const std::string& name, uint64_t value,
442
                        std::chrono::milliseconds timeout = TestUtility::DefaultTimeout,
443
550
                        Event::Dispatcher* dispatcher = nullptr) override {
444
550
    ASSERT_TRUE(
445
550
        TestUtility::waitForCounterEq(statStore(), name, value, time_system_, timeout, dispatcher));
446
550
  }
447
448
  void waitForCounterGe(const std::string& name, uint64_t value,
449
0
                        std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) override {
450
0
    ASSERT_TRUE(TestUtility::waitForCounterGe(statStore(), name, value, time_system_, timeout));
451
0
  }
452
453
  void waitForGaugeEq(const std::string& name, uint64_t value,
454
402
                      std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) override {
455
402
    ASSERT_TRUE(TestUtility::waitForGaugeEq(statStore(), name, value, time_system_, timeout));
456
402
  }
457
458
  void waitForGaugeGe(const std::string& name, uint64_t value,
459
0
                      std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) override {
460
0
    ASSERT_TRUE(TestUtility::waitForGaugeGe(statStore(), name, value, time_system_, timeout));
461
0
  }
462
463
0
  void waitForCounterExists(const std::string& name) override {
464
0
    notifyingStatsAllocator().waitForCounterExists(name);
465
0
  }
466
467
  void waitForCounterNonexistent(const std::string& name,
468
0
                                 std::chrono::milliseconds timeout) override {
469
0
    Event::TestTimeSystem::RealTimeBound bound(timeout);
470
0
    while (TestUtility::findCounter(statStore(), name) != nullptr) {
471
0
      time_system_.advanceTimeWait(std::chrono::milliseconds(10));
472
0
      ASSERT_FALSE(!bound.withinBound())
473
0
          << "timed out waiting for counter " << name << " to not exist.";
474
0
    }
475
0
  }
476
477
  void waitForProactiveOverloadResourceUsageEq(
478
      const Server::OverloadProactiveResourceName resource_name, int64_t value,
479
      Event::Dispatcher& dispatcher,
480
0
      std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) {
481
0
    ASSERT_TRUE(TestUtility::waitForProactiveOverloadResourceUsageEq(
482
0
        overloadState(), resource_name, value, time_system_, dispatcher, timeout));
483
0
  }
484
485
  // TODO(#17956): Add Gauge type to NotifyingAllocator and adopt it in this method.
486
0
  void waitForGaugeDestroyed(const std::string& name) override {
487
0
    ASSERT_TRUE(TestUtility::waitForGaugeDestroyed(statStore(), name, time_system_));
488
0
  }
489
490
  void waitUntilHistogramHasSamples(
491
      const std::string& name,
492
0
      std::chrono::milliseconds timeout = std::chrono::milliseconds::zero()) override {
493
0
    waitForNumHistogramSamplesGe(name, 1, timeout);
494
0
  }
495
496
  void waitForNumHistogramSamplesGe(
497
      const std::string& name, uint64_t sample_count,
498
0
      std::chrono::milliseconds timeout = std::chrono::milliseconds::zero()) override {
499
0
    ASSERT_TRUE(TestUtility::waitForNumHistogramSamplesGe(
500
0
        statStore(), name, sample_count, time_system_, server().dispatcher(), timeout));
501
0
  }
502
503
16.6k
  Stats::CounterSharedPtr counter(const std::string& name) override {
504
    // When using the thread local store, only counters() is thread safe. This also allows us
505
    // to test if a counter exists at all versus just defaulting to zero.
506
16.6k
    return TestUtility::findCounter(statStore(), name);
507
16.6k
  }
508
509
42
  Stats::GaugeSharedPtr gauge(const std::string& name) override {
510
    // When using the thread local store, only gauges() is thread safe. This also allows us
511
    // to test if a counter exists at all versus just defaulting to zero.
512
42
    return TestUtility::findGauge(statStore(), name);
513
42
  }
514
515
0
  Stats::ParentHistogramSharedPtr histogram(const std::string& name) {
516
0
    return TestUtility::findHistogram(statStore(), name);
517
0
  }
518
519
0
  std::vector<Stats::CounterSharedPtr> counters() override { return statStore().counters(); }
520
521
0
  std::vector<Stats::GaugeSharedPtr> gauges() override { return statStore().gauges(); }
522
523
0
  std::vector<Stats::ParentHistogramSharedPtr> histograms() override {
524
0
    return statStore().histograms();
525
0
  }
526
527
  // ListenerHooks
528
  void onWorkerListenerAdded() override;
529
  void onWorkerListenerRemoved() override;
530
531
  // Server::ComponentFactory
532
2.64k
  Server::DrainManagerPtr createDrainManager(Server::Instance& server) override {
533
2.64k
    drain_manager_ = new Server::DrainManagerImpl(
534
2.64k
        server, envoy::config::listener::v3::Listener::MODIFY_ONLY, server.dispatcher());
535
2.64k
    return Server::DrainManagerPtr{drain_manager_};
536
2.64k
  }
537
  Runtime::LoaderPtr createRuntime(Server::Instance& server,
538
2.64k
                                   Server::Configuration::Initial& config) override {
539
2.64k
    return Server::InstanceUtil::createRuntime(server, config);
540
2.64k
  }
541
542
  // Should not be called until createAndRunEnvoyServer() is called.
543
  virtual Server::Instance& server() PURE;
544
  virtual Stats::Store& statStore() PURE;
545
  virtual Server::ThreadLocalOverloadState& overloadState() PURE;
546
  virtual Network::Address::InstanceConstSharedPtr adminAddress() PURE;
547
  virtual Stats::NotifyingAllocatorImpl& notifyingStatsAllocator() PURE;
548
0
  void useAdminInterfaceToQuit(bool use) { use_admin_interface_to_quit_ = use; }
549
2.64k
  bool useAdminInterfaceToQuit() { return use_admin_interface_to_quit_; }
550
551
protected:
552
  IntegrationTestServer(Event::TestTimeSystem& time_system, Api::Api& api,
553
                        const std::string& config_path)
554
2.64k
      : time_system_(time_system), api_(api), config_path_(config_path) {}
555
556
  // Create the running envoy server. This function will call serverReady() when the virtual
557
  // functions server(), statStore(), and adminAddress() may be called, but before the server
558
  // has been started.
559
  // The subclass is also responsible for tearing down this server in its destructor.
560
  virtual void createAndRunEnvoyServer(OptionsImpl& options, Event::TimeSystem& time_system,
561
                                       Network::Address::InstanceConstSharedPtr local_address,
562
                                       ListenerHooks& hooks, Thread::BasicLockable& access_log_lock,
563
                                       Server::ComponentFactory& component_factory,
564
                                       Random::RandomGeneratorPtr&& random_generator,
565
                                       ProcessObjectOptRef process_object,
566
                                       Buffer::WatermarkFactorySharedPtr watermark_factory) PURE;
567
568
  // Will be called by subclass on server thread when the server is ready to be accessed. The
569
  // server may not have been run yet, but all server access methods (server(), statStore(),
570
  // adminAddress()) will be available.
571
  void serverReady();
572
573
private:
574
  /**
575
   * Runs the real server on a thread.
576
   */
577
  void threadRoutine(const Network::Address::IpVersion version,
578
                     absl::optional<uint64_t> deterministic_value,
579
                     ProcessObjectOptRef process_object,
580
                     Server::FieldValidationConfig validation_config, uint32_t concurrency,
581
                     std::chrono::seconds drain_time, Server::DrainStrategy drain_strategy,
582
                     Buffer::WatermarkFactorySharedPtr watermark_factory,
583
                     bool use_bootstrap_node_metadata);
584
585
  Event::TestTimeSystem& time_system_;
586
  Api::Api& api_;
587
  const std::string config_path_;
588
  Thread::ThreadPtr thread_;
589
  Thread::CondVar listeners_cv_;
590
  Thread::MutexBasicLockable listeners_mutex_;
591
  uint64_t pending_listeners_;
592
  ConditionalInitializer server_set_;
593
  Server::DrainManagerImpl* drain_manager_{};
594
  std::function<void()> on_worker_listener_added_cb_;
595
  std::function<void()> on_worker_listener_removed_cb_;
596
  TcpDumpPtr tcp_dump_;
597
  std::function<void(IntegrationTestServer&)> on_server_ready_cb_;
598
  bool use_admin_interface_to_quit_{};
599
};
600
601
// Default implementation of IntegrationTestServer
602
class IntegrationTestServerImpl : public IntegrationTestServer {
603
public:
604
  IntegrationTestServerImpl(Event::TestTimeSystem& time_system, Api::Api& api,
605
                            const std::string& config_path, bool real_stats = false);
606
607
  ~IntegrationTestServerImpl() override;
608
609
13.2k
  Server::Instance& server() override {
610
13.2k
    RELEASE_ASSERT(server_ != nullptr, "");
611
13.2k
    return *server_;
612
13.2k
  }
613
25.6k
  Stats::Store& statStore() override {
614
25.6k
    RELEASE_ASSERT(stat_store_ != nullptr, "");
615
25.6k
    return *stat_store_;
616
25.6k
  }
617
0
  Server::ThreadLocalOverloadState& overloadState() override {
618
0
    RELEASE_ASSERT(server_ != nullptr, "");
619
0
    return server_->overloadManager().getThreadLocalOverloadState();
620
0
  }
621
622
2.64k
  Network::Address::InstanceConstSharedPtr adminAddress() override { return admin_address_; }
623
624
0
  Stats::NotifyingAllocatorImpl& notifyingStatsAllocator() override {
625
0
    auto* ret = dynamic_cast<Stats::NotifyingAllocatorImpl*>(stats_allocator_.get());
626
0
    RELEASE_ASSERT(ret != nullptr,
627
0
                   "notifyingStatsAllocator() is not created when real_stats is true");
628
0
    return *ret;
629
0
  }
630
631
private:
632
  void createAndRunEnvoyServer(OptionsImpl& options, Event::TimeSystem& time_system,
633
                               Network::Address::InstanceConstSharedPtr local_address,
634
                               ListenerHooks& hooks, Thread::BasicLockable& access_log_lock,
635
                               Server::ComponentFactory& component_factory,
636
                               Random::RandomGeneratorPtr&& random_generator,
637
                               ProcessObjectOptRef process_object,
638
                               Buffer::WatermarkFactorySharedPtr watermark_factory) override;
639
640
  // Owned by this class. An owning pointer is not used because the actual allocation is done
641
  // on a stack in a non-main thread.
642
  Server::Instance* server_{};
643
  Stats::Store* stat_store_{};
644
  Network::Address::InstanceConstSharedPtr admin_address_;
645
  absl::Notification server_gone_;
646
  Stats::SymbolTableImpl symbol_table_;
647
  std::unique_ptr<Stats::AllocatorImpl> stats_allocator_;
648
};
649
650
} // namespace Envoy