Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/test/integration/fake_upstream.cc
Line
Count
Source (jump to first uncovered line)
1
#include "test/integration/fake_upstream.h"
2
3
#include <chrono>
4
#include <cstdint>
5
#include <memory>
6
#include <string>
7
8
#include "source/common/buffer/buffer_impl.h"
9
#include "source/common/config/utility.h"
10
#include "source/common/http/header_map_impl.h"
11
#include "source/common/http/http1/codec_impl.h"
12
#include "source/common/http/http2/codec_impl.h"
13
#include "source/common/network/address_impl.h"
14
#include "source/common/network/connection_impl.h"
15
#include "source/common/network/listen_socket_impl.h"
16
#include "source/common/network/socket_option_factory.h"
17
#include "source/common/network/utility.h"
18
#include "source/common/runtime/runtime_features.h"
19
20
#ifdef ENVOY_ENABLE_QUIC
21
#include "source/common/quic/server_codec_impl.h"
22
#include "quiche/quic/test_tools/quic_session_peer.h"
23
#endif
24
25
#include "source/extensions/listener_managers/listener_manager/connection_handler_impl.h"
26
27
#include "test/integration/utility.h"
28
#include "test/test_common/network_utility.h"
29
#include "test/test_common/utility.h"
30
31
#include "absl/strings/str_cat.h"
32
#include "absl/synchronization/notification.h"
33
34
using namespace std::chrono_literals;
35
36
using std::chrono::milliseconds;
37
using testing::AssertionFailure;
38
using testing::AssertionResult;
39
using testing::AssertionSuccess;
40
41
namespace Envoy {
42
43
FakeStream::FakeStream(FakeHttpConnection& parent, Http::ResponseEncoder& encoder,
44
                       Event::TestTimeSystem& time_system)
45
    : parent_(parent), encoder_(encoder), time_system_(time_system),
46
1.09k
      header_validator_(parent.makeHeaderValidator()) {
47
1.09k
  encoder.getStream().addCallbacks(*this);
48
1.09k
}
Envoy::FakeStream::FakeStream(Envoy::FakeHttpConnection&, Envoy::Http::ResponseEncoder&, Envoy::Event::TestTimeSystem&)
Line
Count
Source
46
1.07k
      header_validator_(parent.makeHeaderValidator()) {
47
1.07k
  encoder.getStream().addCallbacks(*this);
48
1.07k
}
Envoy::FakeStream::FakeStream(Envoy::FakeHttpConnection&, Envoy::Http::ResponseEncoder&, Envoy::Event::TestTimeSystem&)
Line
Count
Source
46
14
      header_validator_(parent.makeHeaderValidator()) {
47
14
  encoder.getStream().addCallbacks(*this);
48
14
}
49
50
1.09k
void FakeStream::decodeHeaders(Http::RequestHeaderMapSharedPtr&& headers, bool end_stream) {
51
1.09k
  absl::MutexLock lock(&lock_);
52
1.09k
  headers_ = std::move(headers);
53
1.09k
  if (header_validator_) {
54
0
    header_validator_->transformRequestHeaders(*headers_);
55
0
  }
56
1.09k
  setEndStream(end_stream);
57
1.09k
}
58
59
1.15k
void FakeStream::decodeData(Buffer::Instance& data, bool end_stream) {
60
1.15k
  received_data_ = true;
61
1.15k
  absl::MutexLock lock(&lock_);
62
1.15k
  body_.add(data);
63
1.15k
  setEndStream(end_stream);
64
1.15k
}
65
66
0
void FakeStream::decodeTrailers(Http::RequestTrailerMapPtr&& trailers) {
67
0
  absl::MutexLock lock(&lock_);
68
0
  setEndStream(true);
69
0
  trailers_ = std::move(trailers);
70
0
}
71
72
0
void FakeStream::decodeMetadata(Http::MetadataMapPtr&& metadata_map_ptr) {
73
0
  for (const auto& metadata : *metadata_map_ptr) {
74
0
    duplicated_metadata_key_count_[metadata.first]++;
75
0
    metadata_map_.insert(metadata);
76
0
  }
77
0
}
78
79
3.41k
void FakeStream::postToConnectionThread(std::function<void()> cb) {
80
3.41k
  parent_.postToConnectionThread(cb);
81
3.41k
}
82
83
0
void FakeStream::encode1xxHeaders(const Http::ResponseHeaderMap& headers) {
84
0
  std::shared_ptr<Http::ResponseHeaderMap> headers_copy(
85
0
      Http::createHeaderMap<Http::ResponseHeaderMapImpl>(headers));
86
0
  postToConnectionThread([this, headers_copy]() -> void {
87
0
    {
88
0
      absl::MutexLock lock(&lock_);
89
0
      if (!parent_.connected() || saw_reset_) {
90
        // Encoded already deleted.
91
0
        return;
92
0
      }
93
0
    }
94
0
    encoder_.encode1xxHeaders(*headers_copy);
95
0
  });
96
0
}
97
98
1.09k
void FakeStream::encodeHeaders(const Http::HeaderMap& headers, bool end_stream) {
99
1.09k
  std::shared_ptr<Http::ResponseHeaderMap> headers_copy(
100
1.09k
      Http::createHeaderMap<Http::ResponseHeaderMapImpl>(headers));
101
1.09k
  if (add_served_by_header_) {
102
0
    headers_copy->addCopy(Http::LowerCaseString("x-served-by"),
103
0
                          parent_.connection().connectionInfoProvider().localAddress()->asString());
104
0
  }
105
106
1.09k
  if (header_validator_) {
107
    // Ignore validation results
108
0
    auto result = header_validator_->transformResponseHeaders(*headers_copy);
109
0
    if (result.new_headers) {
110
0
      headers_copy = std::move(result.new_headers);
111
0
    }
112
0
  }
113
114
1.09k
  postToConnectionThread([this, headers_copy, end_stream]() -> void {
115
1.09k
    {
116
1.09k
      absl::MutexLock lock(&lock_);
117
1.09k
      if (!parent_.connected() || saw_reset_) {
118
        // Encoded already deleted.
119
0
        return;
120
0
      }
121
1.09k
    }
122
1.09k
    encoder_.encodeHeaders(*headers_copy, end_stream);
123
1.09k
  });
124
1.09k
}
125
126
0
void FakeStream::encodeData(std::string data, bool end_stream) {
127
0
  postToConnectionThread([this, data, end_stream]() -> void {
128
0
    {
129
0
      absl::MutexLock lock(&lock_);
130
0
      if (!parent_.connected() || saw_reset_) {
131
        // Encoded already deleted.
132
0
        return;
133
0
      }
134
0
    }
135
0
    Buffer::OwnedImpl fake_data(data.data(), data.size());
136
0
    encoder_.encodeData(fake_data, end_stream);
137
0
  });
138
0
}
139
140
1.07k
void FakeStream::encodeData(uint64_t size, bool end_stream) {
141
1.07k
  postToConnectionThread([this, size, end_stream]() -> void {
142
1.07k
    {
143
1.07k
      absl::MutexLock lock(&lock_);
144
1.07k
      if (!parent_.connected() || saw_reset_) {
145
        // Encoded already deleted.
146
0
        return;
147
0
      }
148
1.07k
    }
149
1.07k
    Buffer::OwnedImpl data(std::string(size, 'a'));
150
1.07k
    encoder_.encodeData(data, end_stream);
151
1.07k
  });
152
1.07k
}
153
154
172
void FakeStream::encodeData(Buffer::Instance& data, bool end_stream) {
155
172
  std::shared_ptr<Buffer::Instance> data_copy = std::make_shared<Buffer::OwnedImpl>(data);
156
172
  postToConnectionThread([this, data_copy, end_stream]() -> void {
157
172
    {
158
172
      absl::MutexLock lock(&lock_);
159
172
      if (!parent_.connected() || saw_reset_) {
160
        // Encoded already deleted.
161
0
        return;
162
0
      }
163
172
    }
164
172
    encoder_.encodeData(*data_copy, end_stream);
165
172
  });
166
172
}
167
168
1.07k
void FakeStream::encodeTrailers(const Http::HeaderMap& trailers) {
169
1.07k
  std::shared_ptr<Http::ResponseTrailerMap> trailers_copy(
170
1.07k
      Http::createHeaderMap<Http::ResponseTrailerMapImpl>(trailers));
171
1.07k
  postToConnectionThread([this, trailers_copy]() -> void {
172
1.07k
    {
173
1.07k
      absl::MutexLock lock(&lock_);
174
1.07k
      if (!parent_.connected() || saw_reset_) {
175
        // Encoded already deleted.
176
0
        return;
177
0
      }
178
1.07k
    }
179
1.07k
    encoder_.encodeTrailers(*trailers_copy);
180
1.07k
  });
181
1.07k
}
182
183
0
void FakeStream::encodeResetStream() {
184
0
  postToConnectionThread([this]() -> void {
185
0
    {
186
0
      absl::MutexLock lock(&lock_);
187
0
      if (!parent_.connected() || saw_reset_) {
188
        // Encoded already deleted.
189
0
        return;
190
0
      }
191
0
    }
192
0
    encoder_.getStream().resetStream(Http::StreamResetReason::LocalReset);
193
0
  });
194
0
}
195
196
0
void FakeStream::encodeMetadata(const Http::MetadataMapVector& metadata_map_vector) {
197
0
  postToConnectionThread([this, &metadata_map_vector]() -> void {
198
0
    {
199
0
      absl::MutexLock lock(&lock_);
200
0
      if (!parent_.connected() || saw_reset_) {
201
        // Encoded already deleted.
202
0
        return;
203
0
      }
204
0
    }
205
0
    encoder_.encodeMetadata(metadata_map_vector);
206
0
  });
207
0
}
208
209
0
void FakeStream::readDisable(bool disable) {
210
0
  postToConnectionThread([this, disable]() -> void {
211
0
    {
212
0
      absl::MutexLock lock(&lock_);
213
0
      if (!parent_.connected() || saw_reset_) {
214
        // Encoded already deleted.
215
0
        return;
216
0
      }
217
0
    }
218
0
    encoder_.getStream().readDisable(disable);
219
0
  });
220
0
}
221
222
0
void FakeStream::onResetStream(Http::StreamResetReason, absl::string_view) {
223
0
  absl::MutexLock lock(&lock_);
224
0
  saw_reset_ = true;
225
0
}
226
227
0
AssertionResult FakeStream::waitForHeadersComplete(milliseconds timeout) {
228
0
  absl::MutexLock lock(&lock_);
229
0
  const auto reached = [this]()
230
0
                           ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return headers_ != nullptr; };
231
0
  if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
232
0
    return AssertionFailure() << "Timed out waiting for headers.";
233
0
  }
234
0
  return AssertionSuccess();
235
0
}
236
237
namespace {
238
// Perform a wait on a condition while still allowing for periodic client dispatcher runs that
239
// occur on the current thread.
240
bool waitForWithDispatcherRun(Event::TestTimeSystem& time_system, absl::Mutex& lock,
241
                              const std::function<bool()>& condition,
242
                              Event::Dispatcher& client_dispatcher, milliseconds timeout)
243
195
    ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock) {
244
195
  Event::TestTimeSystem::RealTimeBound bound(timeout);
245
274
  while (bound.withinBound()) {
246
    // Wake up periodically to run the client dispatcher.
247
274
    if (time_system.waitFor(lock, absl::Condition(&condition), 5ms * TSAN_TIMEOUT_FACTOR)) {
248
195
      return true;
249
195
    }
250
251
    // Run the client dispatcher since we may need to process window updates, etc.
252
79
    client_dispatcher.run(Event::Dispatcher::RunType::NonBlock);
253
79
  }
254
0
  return false;
255
195
}
256
} // namespace
257
258
AssertionResult FakeStream::waitForData(Event::Dispatcher& client_dispatcher, uint64_t body_length,
259
167
                                        milliseconds timeout) {
260
167
  absl::MutexLock lock(&lock_);
261
167
  if (!waitForWithDispatcherRun(
262
167
          time_system_, lock_,
263
167
          [this, body_length]()
264
599
              ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return (body_.length() >= body_length); },
265
167
          client_dispatcher, timeout)) {
266
0
    return AssertionFailure() << "Timed out waiting for data.";
267
0
  }
268
167
  return AssertionSuccess();
269
167
}
270
271
AssertionResult FakeStream::waitForData(Event::Dispatcher& client_dispatcher,
272
0
                                        absl::string_view data, milliseconds timeout) {
273
0
  auto succeeded = waitForData(client_dispatcher, data.length(), timeout);
274
0
  if (succeeded) {
275
0
    Buffer::OwnedImpl buffer(data.data(), data.length());
276
0
    if (!TestUtility::buffersEqual(body(), buffer)) {
277
0
      return AssertionFailure() << body().toString() << " not equal to " << data;
278
0
    }
279
0
  }
280
0
  return succeeded;
281
0
}
282
283
AssertionResult FakeStream::waitForData(Event::Dispatcher& client_dispatcher,
284
                                        const FakeStream::ValidatorFunction& data_validator,
285
0
                                        std::chrono::milliseconds timeout) {
286
0
  absl::MutexLock lock(&lock_);
287
0
  if (!waitForWithDispatcherRun(
288
0
          time_system_, lock_,
289
0
          [this, data_validator]()
290
0
              ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return data_validator(body_.toString()); },
291
0
          client_dispatcher, timeout)) {
292
0
    return AssertionFailure() << "Timed out waiting for data.";
293
0
  }
294
0
  return AssertionSuccess();
295
0
}
296
297
AssertionResult FakeStream::waitForEndStream(Event::Dispatcher& client_dispatcher,
298
0
                                             milliseconds timeout) {
299
0
  absl::MutexLock lock(&lock_);
300
0
  if (!waitForWithDispatcherRun(
301
0
          time_system_, lock_,
302
0
          [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return end_stream_; }, client_dispatcher,
303
0
          timeout)) {
304
0
    return AssertionFailure() << "Timed out waiting for end of stream.";
305
0
  }
306
0
  return AssertionSuccess();
307
0
}
308
309
0
AssertionResult FakeStream::waitForReset(milliseconds timeout) {
310
0
  absl::MutexLock lock(&lock_);
311
0
  if (!time_system_.waitFor(lock_, absl::Condition(&saw_reset_), timeout)) {
312
0
    return AssertionFailure() << "Timed out waiting for reset.";
313
0
  }
314
0
  return AssertionSuccess();
315
0
}
316
317
14
void FakeStream::startGrpcStream(bool send_headers) {
318
14
  ASSERT(!grpc_stream_started_, "gRPC stream should not be started more than once");
319
14
  grpc_stream_started_ = true;
320
14
  if (send_headers) {
321
14
    encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false);
322
14
  }
323
14
}
324
325
0
void FakeStream::finishGrpcStream(Grpc::Status::GrpcStatus status) {
326
0
  encodeTrailers(Http::TestResponseTrailerMapImpl{
327
0
      {"grpc-status", std::to_string(static_cast<uint32_t>(status))}});
328
0
}
329
330
class TestHttp1ServerConnectionImpl : public Http::Http1::ServerConnectionImpl {
331
public:
332
  using Http::Http1::ServerConnectionImpl::ServerConnectionImpl;
333
};
334
335
class TestHttp2ServerConnectionImpl : public Http::Http2::ServerConnectionImpl {
336
public:
337
  TestHttp2ServerConnectionImpl(
338
      Network::Connection& connection, Http::ServerConnectionCallbacks& callbacks,
339
      Http::Http2::CodecStats& stats, Random::RandomGenerator& random_generator,
340
      const envoy::config::core::v3::Http2ProtocolOptions& http2_options,
341
      const uint32_t max_request_headers_kb, const uint32_t max_request_headers_count,
342
      envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction
343
          headers_with_underscores_action,
344
      Server::OverloadManager& overload_manager)
345
      : ServerConnectionImpl(connection, callbacks, stats, random_generator, http2_options,
346
                             max_request_headers_kb, max_request_headers_count,
347
591
                             headers_with_underscores_action, overload_manager) {}
348
349
0
  void updateConcurrentStreams(uint32_t max_streams) {
350
0
    absl::InlinedVector<http2::adapter::Http2Setting, 1> settings;
351
0
    settings.push_back({http2::adapter::MAX_CONCURRENT_STREAMS, max_streams});
352
0
    adapter_->SubmitSettings(settings);
353
0
    const int rc = adapter_->Send();
354
0
    ASSERT(rc == 0);
355
0
  }
356
};
357
358
namespace {
359
// Fake upstream codec will not do path normalization, so the tests can observe
360
// the path forwarded by Envoy.
361
::envoy::extensions::http::header_validators::envoy_default::v3::HeaderValidatorConfig
362
591
fakeUpstreamHeaderValidatorConfig() {
363
591
  ::envoy::extensions::http::header_validators::envoy_default::v3::HeaderValidatorConfig config;
364
591
  config.mutable_uri_path_normalization_options()->set_skip_path_normalization(true);
365
591
  config.mutable_uri_path_normalization_options()->set_skip_merging_slashes(true);
366
591
  config.mutable_uri_path_normalization_options()->set_path_with_escaped_slashes_action(
367
591
      ::envoy::extensions::http::header_validators::envoy_default::v3::HeaderValidatorConfig::
368
591
          UriPathNormalizationOptions::KEEP_UNCHANGED);
369
591
  return config;
370
591
}
371
} // namespace
372
373
FakeHttpConnection::FakeHttpConnection(
374
    FakeUpstream& fake_upstream, SharedConnectionWrapper& shared_connection, Http::CodecType type,
375
    Event::TestTimeSystem& time_system, uint32_t max_request_headers_kb,
376
    uint32_t max_request_headers_count,
377
    envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction
378
        headers_with_underscores_action)
379
    : FakeConnectionBase(shared_connection, time_system), type_(type),
380
      header_validator_factory_(
381
591
          IntegrationUtil::makeHeaderValidationFactory(fakeUpstreamHeaderValidatorConfig())) {
382
591
  ASSERT(max_request_headers_count != 0);
383
591
  if (type == Http::CodecType::HTTP1) {
384
0
    Http::Http1Settings http1_settings;
385
0
    http1_settings.use_balsa_parser_ =
386
0
        Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http1_use_balsa_parser");
387
    // For the purpose of testing, we always have the upstream encode the trailers if any
388
0
    http1_settings.enable_trailers_ = true;
389
0
    Http::Http1::CodecStats& stats = fake_upstream.http1CodecStats();
390
0
    codec_ = std::make_unique<TestHttp1ServerConnectionImpl>(
391
0
        shared_connection_.connection(), stats, *this, http1_settings, max_request_headers_kb,
392
0
        max_request_headers_count, headers_with_underscores_action, overload_manager_);
393
591
  } else if (type == Http::CodecType::HTTP2) {
394
591
    envoy::config::core::v3::Http2ProtocolOptions http2_options = fake_upstream.http2Options();
395
591
    Http::Http2::CodecStats& stats = fake_upstream.http2CodecStats();
396
591
    codec_ = std::make_unique<TestHttp2ServerConnectionImpl>(
397
591
        shared_connection_.connection(), *this, stats, random_, http2_options,
398
591
        max_request_headers_kb, max_request_headers_count, headers_with_underscores_action,
399
591
        overload_manager_);
400
591
  } else {
401
0
    ASSERT(type == Http::CodecType::HTTP3);
402
0
#ifdef ENVOY_ENABLE_QUIC
403
0
    Http::Http3::CodecStats& stats = fake_upstream.http3CodecStats();
404
0
    codec_ = std::make_unique<Quic::QuicHttpServerConnectionImpl>(
405
0
        dynamic_cast<Quic::EnvoyQuicServerSession&>(shared_connection_.connection()), *this, stats,
406
0
        fake_upstream.http3Options(), max_request_headers_kb, max_request_headers_count,
407
0
        headers_with_underscores_action);
408
#else
409
    ASSERT(false, "running a QUIC integration test without compiling QUIC");
410
#endif
411
0
  }
412
591
  shared_connection_.connection().addReadFilter(
413
591
      Network::ReadFilterSharedPtr{new ReadFilter(*this)});
414
591
}
Envoy::FakeHttpConnection::FakeHttpConnection(Envoy::FakeUpstream&, Envoy::SharedConnectionWrapper&, Envoy::Http::CodecType, Envoy::Event::TestTimeSystem&, unsigned int, unsigned int, envoy::config::core::v3::HttpProtocolOptions_HeadersWithUnderscoresAction)
Line
Count
Source
381
577
          IntegrationUtil::makeHeaderValidationFactory(fakeUpstreamHeaderValidatorConfig())) {
382
577
  ASSERT(max_request_headers_count != 0);
383
577
  if (type == Http::CodecType::HTTP1) {
384
0
    Http::Http1Settings http1_settings;
385
0
    http1_settings.use_balsa_parser_ =
386
0
        Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http1_use_balsa_parser");
387
    // For the purpose of testing, we always have the upstream encode the trailers if any
388
0
    http1_settings.enable_trailers_ = true;
389
0
    Http::Http1::CodecStats& stats = fake_upstream.http1CodecStats();
390
0
    codec_ = std::make_unique<TestHttp1ServerConnectionImpl>(
391
0
        shared_connection_.connection(), stats, *this, http1_settings, max_request_headers_kb,
392
0
        max_request_headers_count, headers_with_underscores_action, overload_manager_);
393
577
  } else if (type == Http::CodecType::HTTP2) {
394
577
    envoy::config::core::v3::Http2ProtocolOptions http2_options = fake_upstream.http2Options();
395
577
    Http::Http2::CodecStats& stats = fake_upstream.http2CodecStats();
396
577
    codec_ = std::make_unique<TestHttp2ServerConnectionImpl>(
397
577
        shared_connection_.connection(), *this, stats, random_, http2_options,
398
577
        max_request_headers_kb, max_request_headers_count, headers_with_underscores_action,
399
577
        overload_manager_);
400
577
  } else {
401
0
    ASSERT(type == Http::CodecType::HTTP3);
402
0
#ifdef ENVOY_ENABLE_QUIC
403
0
    Http::Http3::CodecStats& stats = fake_upstream.http3CodecStats();
404
0
    codec_ = std::make_unique<Quic::QuicHttpServerConnectionImpl>(
405
0
        dynamic_cast<Quic::EnvoyQuicServerSession&>(shared_connection_.connection()), *this, stats,
406
0
        fake_upstream.http3Options(), max_request_headers_kb, max_request_headers_count,
407
0
        headers_with_underscores_action);
408
#else
409
    ASSERT(false, "running a QUIC integration test without compiling QUIC");
410
#endif
411
0
  }
412
577
  shared_connection_.connection().addReadFilter(
413
577
      Network::ReadFilterSharedPtr{new ReadFilter(*this)});
414
577
}
Envoy::FakeHttpConnection::FakeHttpConnection(Envoy::FakeUpstream&, Envoy::SharedConnectionWrapper&, Envoy::Http::CodecType, Envoy::Event::TestTimeSystem&, unsigned int, unsigned int, envoy::config::core::v3::HttpProtocolOptions_HeadersWithUnderscoresAction)
Line
Count
Source
381
14
          IntegrationUtil::makeHeaderValidationFactory(fakeUpstreamHeaderValidatorConfig())) {
382
14
  ASSERT(max_request_headers_count != 0);
383
14
  if (type == Http::CodecType::HTTP1) {
384
0
    Http::Http1Settings http1_settings;
385
0
    http1_settings.use_balsa_parser_ =
386
0
        Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http1_use_balsa_parser");
387
    // For the purpose of testing, we always have the upstream encode the trailers if any
388
0
    http1_settings.enable_trailers_ = true;
389
0
    Http::Http1::CodecStats& stats = fake_upstream.http1CodecStats();
390
0
    codec_ = std::make_unique<TestHttp1ServerConnectionImpl>(
391
0
        shared_connection_.connection(), stats, *this, http1_settings, max_request_headers_kb,
392
0
        max_request_headers_count, headers_with_underscores_action, overload_manager_);
393
14
  } else if (type == Http::CodecType::HTTP2) {
394
14
    envoy::config::core::v3::Http2ProtocolOptions http2_options = fake_upstream.http2Options();
395
14
    Http::Http2::CodecStats& stats = fake_upstream.http2CodecStats();
396
14
    codec_ = std::make_unique<TestHttp2ServerConnectionImpl>(
397
14
        shared_connection_.connection(), *this, stats, random_, http2_options,
398
14
        max_request_headers_kb, max_request_headers_count, headers_with_underscores_action,
399
14
        overload_manager_);
400
14
  } else {
401
0
    ASSERT(type == Http::CodecType::HTTP3);
402
0
#ifdef ENVOY_ENABLE_QUIC
403
0
    Http::Http3::CodecStats& stats = fake_upstream.http3CodecStats();
404
0
    codec_ = std::make_unique<Quic::QuicHttpServerConnectionImpl>(
405
0
        dynamic_cast<Quic::EnvoyQuicServerSession&>(shared_connection_.connection()), *this, stats,
406
0
        fake_upstream.http3Options(), max_request_headers_kb, max_request_headers_count,
407
0
        headers_with_underscores_action);
408
#else
409
    ASSERT(false, "running a QUIC integration test without compiling QUIC");
410
#endif
411
0
  }
412
14
  shared_connection_.connection().addReadFilter(
413
14
      Network::ReadFilterSharedPtr{new ReadFilter(*this)});
414
14
}
415
416
78
AssertionResult FakeConnectionBase::close(std::chrono::milliseconds timeout) {
417
78
  ENVOY_LOG(trace, "FakeConnectionBase close");
418
78
  if (!shared_connection_.connected()) {
419
0
    return AssertionSuccess();
420
0
  }
421
78
  return shared_connection_.executeOnDispatcher(
422
78
      [](Network::Connection& connection) {
423
74
        connection.close(Network::ConnectionCloseType::FlushWrite);
424
74
      },
425
78
      timeout);
426
78
}
427
428
AssertionResult FakeConnectionBase::close(Network::ConnectionCloseType close_type,
429
0
                                          std::chrono::milliseconds timeout) {
430
0
  ENVOY_LOG(trace, "FakeConnectionBase close type={}", static_cast<int>(close_type));
431
0
  if (!shared_connection_.connected()) {
432
0
    return AssertionSuccess();
433
0
  }
434
0
  return shared_connection_.executeOnDispatcher(
435
0
      [&close_type](Network::Connection& connection) { connection.close(close_type); }, timeout);
436
0
}
437
438
0
AssertionResult FakeConnectionBase::readDisable(bool disable, std::chrono::milliseconds timeout) {
439
0
  return shared_connection_.executeOnDispatcher(
440
0
      [disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout);
441
0
}
442
443
namespace {
444
0
Http::Protocol codeTypeToProtocol(Http::CodecType codec_type) {
445
0
  switch (codec_type) {
446
0
  case Http::CodecType::HTTP1:
447
0
    return Http::Protocol::Http11;
448
0
  case Http::CodecType::HTTP2:
449
0
    return Http::Protocol::Http2;
450
0
  case Http::CodecType::HTTP3:
451
0
    return Http::Protocol::Http3;
452
0
  }
453
0
  PANIC_DUE_TO_CORRUPT_ENUM;
454
0
}
455
} // namespace
456
457
1.09k
Http::ServerHeaderValidatorPtr FakeHttpConnection::makeHeaderValidator() {
458
1.09k
  return header_validator_factory_ ? header_validator_factory_->createServerHeaderValidator(
459
0
                                         codeTypeToProtocol(type_), header_validator_stats_)
460
1.09k
                                   : nullptr;
461
1.09k
}
462
463
14
Http::RequestDecoder& FakeHttpConnection::newStream(Http::ResponseEncoder& encoder, bool) {
464
14
  absl::MutexLock lock(&lock_);
465
14
  new_streams_.emplace_back(new FakeStream(*this, encoder, time_system_));
466
14
  return *new_streams_.back();
467
14
}
468
469
0
void FakeHttpConnection::onGoAway(Http::GoAwayErrorCode code) {
470
0
  ASSERT(type_ != Http::CodecType::HTTP1);
471
  // Usually indicates connection level errors, no operations are needed since
472
  // the connection will be closed soon.
473
0
  ENVOY_LOG(info, "FakeHttpConnection receives GOAWAY: ", static_cast<int>(code));
474
0
}
475
476
0
void FakeHttpConnection::encodeGoAway() {
477
0
  ASSERT(type_ != Http::CodecType::HTTP1);
478
479
0
  postToConnectionThread([this]() { codec_->goAway(); });
480
0
}
481
482
0
void FakeHttpConnection::updateConcurrentStreams(uint64_t max_streams) {
483
0
  ASSERT(type_ != Http::CodecType::HTTP1);
484
485
0
  if (type_ == Http::CodecType::HTTP2) {
486
0
    postToConnectionThread([this, max_streams]() {
487
0
      auto codec = dynamic_cast<TestHttp2ServerConnectionImpl*>(codec_.get());
488
0
      codec->updateConcurrentStreams(max_streams);
489
0
    });
490
0
  } else {
491
0
#ifdef ENVOY_ENABLE_QUIC
492
0
    postToConnectionThread([this, max_streams]() {
493
0
      auto codec = dynamic_cast<Quic::QuicHttpServerConnectionImpl*>(codec_.get());
494
0
      quic::test::QuicSessionPeer::SetMaxOpenIncomingBidirectionalStreams(
495
0
          &codec->quicServerSession(), max_streams);
496
0
      codec->quicServerSession().SendMaxStreams(1, false);
497
0
    });
498
#else
499
    UNREFERENCED_PARAMETER(max_streams);
500
#endif
501
0
  }
502
0
}
503
504
0
void FakeHttpConnection::encodeProtocolError() {
505
0
  ASSERT(type_ != Http::CodecType::HTTP1);
506
507
0
  Http::Http2::ServerConnectionImpl* codec =
508
0
      dynamic_cast<Http::Http2::ServerConnectionImpl*>(codec_.get());
509
0
  ASSERT(codec != nullptr);
510
0
  postToConnectionThread([codec]() {
511
0
    Http::Status status = codec->protocolErrorForTest();
512
0
    ASSERT(Http::getStatusCode(status) == Http::StatusCode::CodecProtocolError);
513
0
  });
514
0
}
515
516
115
AssertionResult FakeConnectionBase::waitForDisconnect(milliseconds timeout) {
517
115
  ENVOY_LOG(trace, "FakeConnectionBase waiting for disconnect");
518
115
  absl::MutexLock lock(&lock_);
519
117
  const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
520
117
    return !shared_connection_.connectedLockHeld();
521
117
  };
522
523
115
  if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
524
0
    if (timeout == TestUtility::DefaultTimeout) {
525
0
      ADD_FAILURE() << "Please don't waitForDisconnect with a 5s timeout if failure is expected\n";
526
0
    }
527
0
    return AssertionFailure() << "Timed out waiting for disconnect.";
528
0
  }
529
115
  ENVOY_LOG(trace, "FakeConnectionBase done waiting for disconnect");
530
115
  return AssertionSuccess();
531
115
}
532
533
0
AssertionResult FakeConnectionBase::waitForRstDisconnect(std::chrono::milliseconds timeout) {
534
0
  ENVOY_LOG(trace, "FakeConnectionBase waiting for RST disconnect");
535
0
  absl::MutexLock lock(&lock_);
536
0
  const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
537
0
    return shared_connection_.rstDisconnected();
538
0
  };
539
540
0
  if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
541
0
    if (timeout == TestUtility::DefaultTimeout) {
542
0
      ADD_FAILURE()
543
0
          << "Please don't waitForRstDisconnect with a 5s timeout if failure is expected\n";
544
0
    }
545
0
    return AssertionFailure() << "Timed out waiting for RST disconnect.";
546
0
  }
547
0
  ENVOY_LOG(trace, "FakeConnectionBase done waiting for RST disconnect");
548
0
  return AssertionSuccess();
549
0
}
550
551
0
AssertionResult FakeConnectionBase::waitForHalfClose(milliseconds timeout) {
552
0
  absl::MutexLock lock(&lock_);
553
0
  if (!time_system_.waitFor(lock_, absl::Condition(&half_closed_), timeout)) {
554
0
    return AssertionFailure() << "Timed out waiting for half close.";
555
0
  }
556
0
  return AssertionSuccess();
557
0
}
558
559
0
AssertionResult FakeConnectionBase::waitForNoPost(milliseconds timeout) {
560
0
  absl::MutexLock lock(&lock_);
561
0
  if (!time_system_.waitFor(
562
0
          lock_,
563
0
          absl::Condition(
564
0
              [](void* fake_connection) -> bool {
565
0
                return static_cast<FakeConnectionBase*>(fake_connection)->pending_cbs_ == 0;
566
0
              },
567
0
              this),
568
0
          timeout)) {
569
0
    return AssertionFailure() << "Timed out waiting for ops on this connection";
570
0
  }
571
0
  return AssertionSuccess();
572
0
}
573
574
3.41k
void FakeConnectionBase::postToConnectionThread(std::function<void()> cb) {
575
3.41k
  ++pending_cbs_;
576
3.41k
  dispatcher_.post([this, cb]() {
577
3.41k
    cb();
578
3.41k
    --pending_cbs_;
579
3.41k
  });
580
3.41k
}
581
582
AssertionResult FakeHttpConnection::waitForNewStream(Event::Dispatcher& client_dispatcher,
583
                                                     FakeStreamPtr& stream,
584
14
                                                     std::chrono::milliseconds timeout) {
585
14
  absl::MutexLock lock(&lock_);
586
14
  if (!waitForWithDispatcherRun(
587
14
          time_system_, lock_,
588
20
          [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return !new_streams_.empty(); },
589
14
          client_dispatcher, timeout)) {
590
0
    return AssertionFailure() << "Timed out waiting for new stream.";
591
0
  }
592
14
  stream = std::move(new_streams_.front());
593
14
  new_streams_.pop_front();
594
14
  return AssertionSuccess();
595
14
}
596
597
FakeUpstream::FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory,
598
                           const std::string& uds_path, const FakeUpstreamConfig& config)
599
    : FakeUpstream(std::move(transport_socket_factory),
600
                   Network::SocketPtr{new Network::UdsListenSocket(
601
                       std::make_shared<Network::Address::PipeInstance>(uds_path))},
602
0
                   config) {}
603
604
static Network::SocketPtr
605
2.66k
makeTcpListenSocket(const Network::Address::InstanceConstSharedPtr& address) {
606
2.66k
  return std::make_unique<Network::TcpListenSocket>(address, nullptr, true);
607
2.66k
}
608
609
static Network::Address::InstanceConstSharedPtr makeAddress(uint32_t port,
610
14
                                                            Network::Address::IpVersion version) {
611
14
  return Network::Utility::parseInternetAddress(Network::Test::getLoopbackAddressString(version),
612
14
                                                port);
613
14
}
614
615
static Network::SocketPtr
616
0
makeUdpListenSocket(const Network::Address::InstanceConstSharedPtr& address) {
617
0
  auto socket = std::make_unique<Network::UdpListenSocket>(address, nullptr, true);
618
  // TODO(mattklein123): These options are set in multiple locations. We should centralize them for
619
  // UDP listeners.
620
0
  socket->addOptions(Network::SocketOptionFactory::buildIpPacketInfoOptions());
621
0
  socket->addOptions(Network::SocketOptionFactory::buildRxQueueOverFlowOptions());
622
0
  return socket;
623
0
}
624
625
static Network::SocketPtr
626
makeListenSocket(const FakeUpstreamConfig& config,
627
2.66k
                 const Network::Address::InstanceConstSharedPtr& address) {
628
2.66k
  return (config.udp_fake_upstream_.has_value() ? makeUdpListenSocket(address)
629
2.66k
                                                : makeTcpListenSocket(address));
630
2.66k
}
631
632
FakeUpstream::FakeUpstream(uint32_t port, Network::Address::IpVersion version,
633
                           const FakeUpstreamConfig& config, const bool defer_initialization)
634
    : FakeUpstream(Network::Test::createRawBufferDownstreamSocketFactory(),
635
                   makeListenSocket(config, makeAddress(port, version)), config,
636
14
                   defer_initialization) {}
637
638
FakeUpstream::FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory,
639
                           const Network::Address::InstanceConstSharedPtr& address,
640
                           const FakeUpstreamConfig& config)
641
2.64k
    : FakeUpstream(std::move(transport_socket_factory), makeListenSocket(config, address), config) {
642
2.64k
}
643
644
FakeUpstream::FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory,
645
                           uint32_t port, Network::Address::IpVersion version,
646
                           const FakeUpstreamConfig& config)
647
    : FakeUpstream(std::move(transport_socket_factory),
648
0
                   makeListenSocket(config, makeAddress(port, version)), config) {}
649
650
FakeUpstream::FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory,
651
                           Network::SocketPtr&& listen_socket, const FakeUpstreamConfig& config,
652
                           const bool defer_initialization)
653
    : http_type_(config.upstream_protocol_), http2_options_(config.http2_options_),
654
      http3_options_(config.http3_options_), quic_options_(config.quic_options_),
655
      socket_(Network::SocketSharedPtr(listen_socket.release())),
656
      api_(Api::createApiForTest(stats_store_)), time_system_(config.time_system_),
657
      dispatcher_(api_->allocateDispatcher("fake_upstream")),
658
      handler_(new Server::ConnectionHandlerImpl(*dispatcher_, 0)), config_(config),
659
      read_disable_on_new_connection_(true), enable_half_close_(config.enable_half_close_),
660
      listener_(*this, http_type_ == Http::CodecType::HTTP3),
661
      filter_chain_(Network::Test::createEmptyFilterChain(std::move(transport_socket_factory))),
662
2.66k
      stats_scope_(stats_store_.createScope("test_server_scope")) {
663
2.66k
  socket_factories_.emplace_back(std::make_unique<FakeListenSocketFactory>(socket_));
664
2.66k
  ENVOY_LOG(info, "starting fake server at {}. UDP={} codec={}", localAddress()->asString(),
665
2.66k
            config.udp_fake_upstream_.has_value(), FakeHttpConnection::typeToString(http_type_));
666
2.66k
  if (config.udp_fake_upstream_.has_value() &&
667
2.66k
      config.udp_fake_upstream_->max_rx_datagram_size_.has_value()) {
668
0
    listener_.udp_listener_config_.config_.mutable_downstream_socket_config()
669
0
        ->mutable_max_rx_datagram_size()
670
0
        ->set_value(config.udp_fake_upstream_->max_rx_datagram_size_.value());
671
0
  }
672
673
2.66k
  if (!defer_initialization) {
674
2.66k
    initializeServer();
675
2.66k
  }
676
2.66k
}
677
678
2.66k
FakeUpstream::~FakeUpstream() { cleanUp(); };
679
680
2.66k
void FakeUpstream::initializeServer() {
681
2.66k
  if (initialized_) {
682
    // Already initialized.
683
0
    return;
684
0
  }
685
686
2.66k
  dispatcher_->post([this]() -> void {
687
2.66k
    socket_factories_[0]->doFinalPreWorkerInit();
688
2.66k
    handler_->addListener(absl::nullopt, listener_, runtime_);
689
2.66k
    server_initialized_.setReady();
690
2.66k
  });
691
2.66k
  thread_ = api_->threadFactory().createThread([this]() -> void { threadRoutine(); });
692
2.66k
  server_initialized_.waitReady();
693
2.66k
  initialized_ = true;
694
2.66k
}
695
696
3.25k
void FakeUpstream::cleanUp() {
697
3.25k
  if (thread_.get()) {
698
2.66k
    dispatcher_->exit();
699
2.66k
    thread_->join();
700
2.66k
    thread_.reset();
701
2.66k
  }
702
3.25k
}
703
704
bool FakeUpstream::createNetworkFilterChain(Network::Connection& connection,
705
144
                                            const Filter::NetworkFilterFactoriesList&) {
706
144
  absl::MutexLock lock(&lock_);
707
144
  if (read_disable_on_new_connection_ && http_type_ != Http::CodecType::HTTP3) {
708
    // Disable early close detection to avoid closing the network connection before full
709
    // initialization is complete.
710
144
    connection.detectEarlyCloseWhenReadDisabled(false);
711
144
    connection.readDisable(true);
712
144
    if (disable_and_do_not_enable_) {
713
0
      dynamic_cast<Network::ConnectionImpl*>(&connection)->ioHandle().enableFileEvents(0);
714
0
    }
715
144
  }
716
144
  auto connection_wrapper = std::make_unique<SharedConnectionWrapper>(connection);
717
718
144
  LinkedList::moveIntoListBack(std::move(connection_wrapper), new_connections_);
719
720
  // Normally we don't associate a logical network connection with a FakeHttpConnection  until
721
  // waitForHttpConnection is called, but QUIC needs to be set up as packets come in, so we do
722
  // not lazily create for HTTP/3
723
144
  if (http_type_ == Http::CodecType::HTTP3) {
724
0
    quic_connections_.push_back(std::make_unique<FakeHttpConnection>(
725
0
        *this, consumeConnection(), http_type_, time_system_, config_.max_request_headers_kb_,
726
0
        config_.max_request_headers_count_, config_.headers_with_underscores_action_));
727
0
    quic_connections_.back()->initialize();
728
0
  }
729
144
  return true;
730
144
}
731
732
144
bool FakeUpstream::createListenerFilterChain(Network::ListenerFilterManager&) { return true; }
733
734
void FakeUpstream::createUdpListenerFilterChain(Network::UdpListenerFilterManager& udp_listener,
735
0
                                                Network::UdpReadFilterCallbacks& callbacks) {
736
0
  udp_listener.addReadFilter(std::make_unique<FakeUdpFilter>(*this, callbacks));
737
0
}
738
739
0
bool FakeUpstream::createQuicListenerFilterChain(Network::QuicListenerFilterManager&) {
740
0
  return true;
741
0
}
742
743
2.66k
void FakeUpstream::threadRoutine() {
744
2.66k
  dispatcher_->run(Event::Dispatcher::RunType::Block);
745
2.66k
  handler_.reset();
746
2.66k
  {
747
2.66k
    absl::MutexLock lock(&lock_);
748
2.66k
    new_connections_.clear();
749
2.66k
    quic_connections_.clear();
750
2.66k
    consumed_connections_.clear();
751
2.66k
  }
752
2.66k
}
753
754
AssertionResult FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher,
755
                                                    FakeHttpConnectionPtr& connection,
756
14
                                                    milliseconds timeout) {
757
14
  if (!initialized_) {
758
0
    return AssertionFailure()
759
0
           << "Must initialize the FakeUpstream first by calling initializeServer().";
760
0
  }
761
762
14
  {
763
14
    absl::MutexLock lock(&lock_);
764
765
    // As noted in createNetworkFilterChain, HTTP3 FakeHttpConnections are not
766
    // lazily created, so HTTP3 needs a different wait path here.
767
14
    if (http_type_ == Http::CodecType::HTTP3) {
768
0
      if (quic_connections_.empty() &&
769
0
          !waitForWithDispatcherRun(
770
0
              time_system_, lock_,
771
0
              [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return !quic_connections_.empty(); },
772
0
              client_dispatcher, timeout)) {
773
0
        return AssertionFailure() << "Timed out waiting for new quic connection.";
774
0
      }
775
0
      if (!quic_connections_.empty()) {
776
0
        connection = std::move(quic_connections_.front());
777
0
        quic_connections_.pop_front();
778
0
        return AssertionSuccess();
779
0
      }
780
0
    }
781
782
14
    if (!waitForWithDispatcherRun(
783
14
            time_system_, lock_,
784
14
            [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return !new_connections_.empty(); },
785
14
            client_dispatcher, timeout)) {
786
0
      if (timeout == TestUtility::DefaultTimeout) {
787
0
        ADD_FAILURE()
788
0
            << "Please don't waitForHttpConnection with a 5s timeout if failure is expected\n";
789
0
      }
790
0
      return AssertionFailure() << "Timed out waiting for new connection.";
791
0
    }
792
14
  }
793
14
  return runOnDispatcherThreadAndWait([&]() {
794
14
    absl::MutexLock lock(&lock_);
795
14
    connection = std::make_unique<FakeHttpConnection>(
796
14
        *this, consumeConnection(), http_type_, time_system_, config_.max_request_headers_kb_,
797
14
        config_.max_request_headers_count_, config_.headers_with_underscores_action_);
798
14
    connection->initialize();
799
14
    return AssertionSuccess();
800
14
  });
801
14
}
802
803
AssertionResult
804
FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher,
805
                                    std::vector<std::unique_ptr<FakeUpstream>>& upstreams,
806
0
                                    FakeHttpConnectionPtr& connection, milliseconds timeout) {
807
0
  if (upstreams.empty()) {
808
0
    return AssertionFailure() << "No upstreams configured.";
809
0
  }
810
0
  Event::TestTimeSystem::RealTimeBound bound(timeout);
811
0
  while (bound.withinBound()) {
812
0
    for (auto& it : upstreams) {
813
0
      FakeUpstream& upstream = *it;
814
0
      {
815
0
        absl::MutexLock lock(&upstream.lock_);
816
0
        if (!upstream.isInitialized()) {
817
0
          return AssertionFailure()
818
0
                 << "Must initialize the FakeUpstream first by calling initializeServer().";
819
0
        }
820
0
        if (!waitForWithDispatcherRun(
821
0
                upstream.time_system_, upstream.lock_,
822
0
                [&upstream]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(upstream.lock_) {
823
0
                  return !upstream.new_connections_.empty();
824
0
                },
825
0
                client_dispatcher, 5ms)) {
826
0
          continue;
827
0
        }
828
0
      }
829
830
0
      return upstream.runOnDispatcherThreadAndWait([&]() {
831
0
        absl::MutexLock lock(&upstream.lock_);
832
0
        connection = std::make_unique<FakeHttpConnection>(
833
0
            upstream, upstream.consumeConnection(), upstream.http_type_, upstream.timeSystem(),
834
0
            Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT,
835
0
            envoy::config::core::v3::HttpProtocolOptions::ALLOW);
836
0
        connection->initialize();
837
0
        return AssertionSuccess();
838
0
      });
839
0
    }
840
0
  }
841
0
  return AssertionFailure() << "Timed out waiting for HTTP connection.";
842
0
}
843
844
ABSL_MUST_USE_RESULT
845
0
AssertionResult FakeUpstream::assertPendingConnectionsEmpty() {
846
0
  return runOnDispatcherThreadAndWait([&]() {
847
0
    absl::MutexLock lock(&lock_);
848
0
    return new_connections_.empty() ? AssertionSuccess() : AssertionFailure();
849
0
  });
850
0
}
851
852
AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connection,
853
261
                                                   milliseconds timeout) {
854
261
  if (!initialized_) {
855
0
    return AssertionFailure()
856
0
           << "Must initialize the FakeUpstream first by calling initializeServer().";
857
0
  }
858
859
261
  {
860
261
    absl::MutexLock lock(&lock_);
861
570
    const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
862
570
      return !new_connections_.empty();
863
570
    };
864
865
261
    ENVOY_LOG(debug, "waiting for raw connection");
866
261
    if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
867
149
      return AssertionFailure() << "Timed out waiting for raw connection";
868
149
    }
869
261
  }
870
871
112
  return runOnDispatcherThreadAndWait([&]() {
872
112
    absl::MutexLock lock(&lock_);
873
112
    connection = makeRawConnection(consumeConnection(), timeSystem());
874
112
    connection->initialize();
875
    // Skip enableHalfClose if the connection is already disconnected.
876
112
    if (connection->connected()) {
877
112
      connection->connection().enableHalfClose(enable_half_close_);
878
112
    }
879
112
    return AssertionSuccess();
880
112
  });
881
261
}
882
883
void FakeUpstream::convertFromRawToHttp(FakeRawConnectionPtr& raw_connection,
884
0
                                        FakeHttpConnectionPtr& connection) {
885
0
  absl::MutexLock lock(&lock_);
886
0
  SharedConnectionWrapper& shared_connection = raw_connection->sharedConnection();
887
888
0
  connection = std::make_unique<FakeHttpConnection>(
889
0
      *this, shared_connection, http_type_, time_system_, config_.max_request_headers_kb_,
890
0
      config_.max_request_headers_count_, config_.headers_with_underscores_action_);
891
0
  connection->initialize();
892
0
  raw_connection.release();
893
0
}
894
895
126
SharedConnectionWrapper& FakeUpstream::consumeConnection() {
896
126
  ASSERT(!new_connections_.empty());
897
126
  auto* const connection_wrapper = new_connections_.front().get();
898
  // Skip the thread safety check if the network connection has already been freed since there's no
899
  // alternate way to get access to the dispatcher.
900
126
  ASSERT(!connection_wrapper->connected() || connection_wrapper->dispatcher().isThreadSafe());
901
126
  connection_wrapper->setParented();
902
126
  connection_wrapper->moveBetweenLists(new_connections_, consumed_connections_);
903
126
  if (read_disable_on_new_connection_ && connection_wrapper->connected() &&
904
126
      http_type_ != Http::CodecType::HTTP3 && !disable_and_do_not_enable_) {
905
    // Re-enable read and early close detection.
906
126
    auto& connection = connection_wrapper->connection();
907
126
    connection.detectEarlyCloseWhenReadDisabled(true);
908
126
    connection.readDisable(false);
909
126
  }
910
126
  return *connection_wrapper;
911
126
}
912
913
AssertionResult FakeUpstream::waitForUdpDatagram(Network::UdpRecvData& data_to_fill,
914
0
                                                 std::chrono::milliseconds timeout) {
915
0
  if (!initialized_) {
916
0
    return AssertionFailure()
917
0
           << "Must initialize the FakeUpstream first by calling initializeServer().";
918
0
  }
919
920
0
  absl::MutexLock lock(&lock_);
921
0
  const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
922
0
    return !received_datagrams_.empty();
923
0
  };
924
925
0
  if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
926
0
    return AssertionFailure() << "Timed out waiting for UDP datagram.";
927
0
  }
928
929
0
  data_to_fill = std::move(received_datagrams_.front());
930
0
  received_datagrams_.pop_front();
931
0
  return AssertionSuccess();
932
0
}
933
934
0
Network::FilterStatus FakeUpstream::onRecvDatagram(Network::UdpRecvData& data) {
935
0
  absl::MutexLock lock(&lock_);
936
0
  received_datagrams_.emplace_back(std::move(data));
937
938
0
  return Network::FilterStatus::StopIteration;
939
0
}
940
941
AssertionResult FakeUpstream::runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb,
942
126
                                                           std::chrono::milliseconds timeout) {
943
126
  auto result = std::make_shared<AssertionResult>(AssertionSuccess());
944
126
  auto done = std::make_shared<absl::Notification>();
945
126
  ASSERT(!dispatcher_->isThreadSafe());
946
126
  dispatcher_->post([&]() {
947
126
    *result = cb();
948
126
    done->Notify();
949
126
  });
950
126
  RELEASE_ASSERT(done->WaitForNotificationWithTimeout(absl::FromChrono(timeout)),
951
126
                 "Timed out waiting for cb to run on dispatcher");
952
126
  return *result;
953
126
}
954
955
void FakeUpstream::sendUdpDatagram(const std::string& buffer,
956
0
                                   const Network::Address::InstanceConstSharedPtr& peer) {
957
0
  dispatcher_->post([this, buffer, peer] {
958
0
    const auto rc = Network::Utility::writeToSocket(socket_->ioHandle(), Buffer::OwnedImpl(buffer),
959
0
                                                    nullptr, *peer);
960
0
    EXPECT_TRUE(rc.return_value_ == buffer.length());
961
0
  });
962
0
}
963
964
AssertionResult FakeUpstream::rawWriteConnection(uint32_t index, const std::string& data,
965
                                                 bool end_stream,
966
0
                                                 std::chrono::milliseconds timeout) {
967
0
  if (!initialized_) {
968
0
    return AssertionFailure()
969
0
           << "Must initialize the FakeUpstream first by calling initializeServer().";
970
0
  }
971
972
0
  absl::MutexLock lock(&lock_);
973
0
  auto iter = consumed_connections_.begin();
974
0
  std::advance(iter, index);
975
0
  return (*iter)->executeOnDispatcher(
976
0
      [data, end_stream](Network::Connection& connection) {
977
0
        ASSERT(connection.state() == Network::Connection::State::Open);
978
0
        Buffer::OwnedImpl buffer(data);
979
0
        connection.write(buffer, end_stream);
980
0
      },
981
0
      timeout);
982
0
}
983
984
2.66k
void FakeUpstream::FakeListenSocketFactory::doFinalPreWorkerInit() {
985
2.66k
  if (socket_->socketType() == Network::Socket::Type::Stream) {
986
2.66k
    ASSERT_EQ(0, socket_->ioHandle().listen(ENVOY_TCP_BACKLOG_SIZE).return_value_);
987
2.66k
  } else {
988
0
    ASSERT(socket_->socketType() == Network::Socket::Type::Datagram);
989
0
    ASSERT_TRUE(Network::Socket::applyOptions(socket_->options(), *socket_,
990
0
                                              envoy::config::core::v3::SocketOption::STATE_BOUND));
991
0
  }
992
2.66k
}
993
994
112
FakeRawConnection::~FakeRawConnection() {
995
  // If the filter was already deleted, it means the shared_connection_ was too, so don't try to
996
  // access it.
997
112
  if (read_filter_ != nullptr) {
998
112
    EXPECT_TRUE(shared_connection_.executeOnDispatcher(
999
112
        [filter = std::move(read_filter_)](Network::Connection& connection) {
1000
112
          connection.removeReadFilter(filter);
1001
112
        }));
1002
112
  }
1003
112
}
1004
1005
112
void FakeRawConnection::initialize() {
1006
112
  FakeConnectionBase::initialize();
1007
112
  read_filter_ = std::make_shared<ReadFilter>(*this);
1008
112
  if (!shared_connection_.connected()) {
1009
0
    ENVOY_LOG(warn, "FakeRawConnection::initialize: network connection is already disconnected");
1010
0
    return;
1011
0
  }
1012
112
  ASSERT(shared_connection_.dispatcher().isThreadSafe());
1013
112
  shared_connection_.connection().addReadFilter(read_filter_);
1014
112
}
1015
1016
AssertionResult FakeRawConnection::waitForData(uint64_t num_bytes, std::string* data,
1017
0
                                               milliseconds timeout) {
1018
0
  absl::MutexLock lock(&lock_);
1019
0
  const auto reached = [this, num_bytes]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1020
0
    return data_.size() == num_bytes;
1021
0
  };
1022
0
  ENVOY_LOG(debug, "waiting for {} bytes of data", num_bytes);
1023
0
  if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
1024
0
    return AssertionFailure() << fmt::format(
1025
0
               "Timed out waiting for data. Got '{}', waiting for {} bytes.", data_, num_bytes);
1026
0
  }
1027
0
  if (data != nullptr) {
1028
0
    *data = data_;
1029
0
  }
1030
0
  return AssertionSuccess();
1031
0
}
1032
1033
AssertionResult
1034
FakeRawConnection::waitForData(const std::function<bool(const std::string&)>& data_validator,
1035
0
                               std::string* data, milliseconds timeout) {
1036
0
  absl::MutexLock lock(&lock_);
1037
0
  const auto reached = [this, &data_validator]()
1038
0
                           ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return data_validator(data_); };
1039
0
  ENVOY_LOG(debug, "waiting for data");
1040
0
  if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
1041
0
    return AssertionFailure() << "Timed out waiting for data.";
1042
0
  }
1043
0
  if (data != nullptr) {
1044
0
    *data = data_;
1045
0
  }
1046
0
  return AssertionSuccess();
1047
0
}
1048
1049
AssertionResult FakeRawConnection::write(const std::string& data, bool end_stream,
1050
2.18k
                                         milliseconds timeout) {
1051
2.18k
  return shared_connection_.executeOnDispatcher(
1052
2.18k
      [data, end_stream](Network::Connection& connection) {
1053
2.17k
        Buffer::OwnedImpl to_write(data);
1054
2.17k
        connection.write(to_write, end_stream);
1055
2.17k
      },
1056
2.18k
      timeout);
1057
2.18k
}
1058
1059
Network::FilterStatus FakeRawConnection::ReadFilter::onData(Buffer::Instance& data,
1060
244
                                                            bool end_stream) {
1061
244
  absl::MutexLock lock(&parent_.lock_);
1062
244
  ENVOY_LOG(debug, "got {} bytes, end_stream {}", data.length(), end_stream);
1063
244
  parent_.data_.append(data.toString());
1064
244
  parent_.half_closed_ = end_stream;
1065
244
  data.drain(data.length());
1066
244
  return Network::FilterStatus::StopIteration;
1067
244
}
1068
1069
ABSL_MUST_USE_RESULT
1070
AssertionResult FakeHttpConnection::waitForInexactRawData(const char* data, std::string* out,
1071
0
                                                          std::chrono::milliseconds timeout) {
1072
0
  absl::MutexLock lock(&lock_);
1073
0
  const auto reached = [this, data, &out]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1074
0
    char peek_buf[200];
1075
0
    auto result = dynamic_cast<Network::ConnectionImpl*>(&connection())
1076
0
                      ->ioHandle()
1077
0
                      .recv(peek_buf, 200, MSG_PEEK);
1078
0
    ASSERT(result.ok() || result.err_->getErrorCode() == Api::IoError::IoErrorCode::Again);
1079
0
    if (!result.ok()) {
1080
0
      return false;
1081
0
    }
1082
0
    absl::string_view peek_data(peek_buf, result.return_value_);
1083
0
    size_t index = peek_data.find(data);
1084
0
    if (index != absl::string_view::npos) {
1085
0
      Buffer::OwnedImpl buffer;
1086
0
      *out = std::string(peek_data.data(), index + 4);
1087
0
      auto result = dynamic_cast<Network::ConnectionImpl*>(&connection())
1088
0
                        ->ioHandle()
1089
0
                        .recv(peek_buf, index + 4, 0);
1090
0
      return true;
1091
0
    }
1092
0
    return false;
1093
0
  };
1094
  // Because the connection must be read disabled to not auto-consume the
1095
  // underlying data, waitFor hangs with no events to force the time system to
1096
  // continue. Break it up into smaller chunks.
1097
0
  for (int i = 0; i < timeout / 10ms; ++i) {
1098
0
    if (time_system_.waitFor(lock_, absl::Condition(&reached), 10ms)) {
1099
0
      return AssertionSuccess();
1100
0
    }
1101
0
  }
1102
0
  return AssertionFailure() << "timed out waiting for raw data";
1103
0
}
1104
1105
0
void FakeHttpConnection::writeRawData(absl::string_view data) {
1106
0
  Buffer::OwnedImpl buffer(data);
1107
0
  Api::IoCallUint64Result result =
1108
0
      dynamic_cast<Network::ConnectionImpl*>(&connection())->ioHandle().write(buffer);
1109
0
  ASSERT(result.ok());
1110
0
}
1111
1112
0
AssertionResult FakeHttpConnection::postWriteRawData(std::string data) {
1113
0
  return shared_connection_.executeOnDispatcher(
1114
0
      [data](Network::Connection& connection) {
1115
0
        Buffer::OwnedImpl to_write(data);
1116
0
        connection.write(to_write, false);
1117
0
      },
1118
0
      TestUtility::DefaultTimeout);
1119
0
}
1120
1121
} // namespace Envoy