Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/test/integration/fake_upstream.cc
Line
Count
Source (jump to first uncovered line)
1
#include "test/integration/fake_upstream.h"
2
3
#include <chrono>
4
#include <cstdint>
5
#include <memory>
6
#include <string>
7
8
#include "source/common/buffer/buffer_impl.h"
9
#include "source/common/config/utility.h"
10
#include "source/common/http/header_map_impl.h"
11
#include "source/common/http/http1/codec_impl.h"
12
#include "source/common/http/http2/codec_impl.h"
13
#include "source/common/network/address_impl.h"
14
#include "source/common/network/connection_impl.h"
15
#include "source/common/network/listen_socket_impl.h"
16
#include "source/common/network/socket_option_factory.h"
17
#include "source/common/network/utility.h"
18
#include "source/common/runtime/runtime_features.h"
19
20
#ifdef ENVOY_ENABLE_QUIC
21
#include "source/common/quic/server_codec_impl.h"
22
#include "quiche/quic/test_tools/quic_session_peer.h"
23
#endif
24
25
#include "source/common/listener_manager/connection_handler_impl.h"
26
27
#include "test/integration/utility.h"
28
#include "test/test_common/network_utility.h"
29
#include "test/test_common/utility.h"
30
31
#include "absl/strings/str_cat.h"
32
#include "absl/synchronization/notification.h"
33
34
using namespace std::chrono_literals;
35
36
using std::chrono::milliseconds;
37
using testing::AssertionFailure;
38
using testing::AssertionResult;
39
using testing::AssertionSuccess;
40
41
namespace Envoy {
42
43
FakeStream::FakeStream(FakeHttpConnection& parent, Http::ResponseEncoder& encoder,
44
                       Event::TestTimeSystem& time_system)
45
    : parent_(parent), encoder_(encoder), time_system_(time_system),
46
800
      header_validator_(parent.makeHeaderValidator()) {
47
800
  encoder.getStream().addCallbacks(*this);
48
800
}
Envoy::FakeStream::FakeStream(Envoy::FakeHttpConnection&, Envoy::Http::ResponseEncoder&, Envoy::Event::TestTimeSystem&)
Line
Count
Source
46
786
      header_validator_(parent.makeHeaderValidator()) {
47
786
  encoder.getStream().addCallbacks(*this);
48
786
}
Envoy::FakeStream::FakeStream(Envoy::FakeHttpConnection&, Envoy::Http::ResponseEncoder&, Envoy::Event::TestTimeSystem&)
Line
Count
Source
46
14
      header_validator_(parent.makeHeaderValidator()) {
47
14
  encoder.getStream().addCallbacks(*this);
48
14
}
49
50
800
void FakeStream::decodeHeaders(Http::RequestHeaderMapSharedPtr&& headers, bool end_stream) {
51
800
  absl::MutexLock lock(&lock_);
52
800
  headers_ = std::move(headers);
53
800
  if (header_validator_) {
54
0
    header_validator_->transformRequestHeaders(*headers_);
55
0
  }
56
800
  setEndStream(end_stream);
57
800
}
58
59
643
void FakeStream::decodeData(Buffer::Instance& data, bool end_stream) {
60
643
  received_data_ = true;
61
643
  absl::MutexLock lock(&lock_);
62
643
  body_.add(data);
63
643
  setEndStream(end_stream);
64
643
}
65
66
0
void FakeStream::decodeTrailers(Http::RequestTrailerMapPtr&& trailers) {
67
0
  absl::MutexLock lock(&lock_);
68
0
  setEndStream(true);
69
0
  trailers_ = std::move(trailers);
70
0
}
71
72
0
void FakeStream::decodeMetadata(Http::MetadataMapPtr&& metadata_map_ptr) {
73
0
  for (const auto& metadata : *metadata_map_ptr) {
74
0
    duplicated_metadata_key_count_[metadata.first]++;
75
0
    metadata_map_.insert(metadata);
76
0
  }
77
0
}
78
79
2.54k
void FakeStream::postToConnectionThread(std::function<void()> cb) {
80
2.54k
  parent_.postToConnectionThread(cb);
81
2.54k
}
82
83
0
void FakeStream::encode1xxHeaders(const Http::ResponseHeaderMap& headers) {
84
0
  std::shared_ptr<Http::ResponseHeaderMap> headers_copy(
85
0
      Http::createHeaderMap<Http::ResponseHeaderMapImpl>(headers));
86
0
  postToConnectionThread([this, headers_copy]() -> void {
87
0
    {
88
0
      absl::MutexLock lock(&lock_);
89
0
      if (!parent_.connected() || saw_reset_) {
90
        // Encoded already deleted.
91
0
        return;
92
0
      }
93
0
    }
94
0
    encoder_.encode1xxHeaders(*headers_copy);
95
0
  });
96
0
}
97
98
800
void FakeStream::encodeHeaders(const Http::HeaderMap& headers, bool end_stream) {
99
800
  std::shared_ptr<Http::ResponseHeaderMap> headers_copy(
100
800
      Http::createHeaderMap<Http::ResponseHeaderMapImpl>(headers));
101
800
  if (add_served_by_header_) {
102
0
    headers_copy->addCopy(Http::LowerCaseString("x-served-by"),
103
0
                          parent_.connection().connectionInfoProvider().localAddress()->asString());
104
0
  }
105
106
800
  if (header_validator_) {
107
    // Ignore validation results
108
0
    auto result = header_validator_->transformResponseHeaders(*headers_copy);
109
0
    if (result.new_headers) {
110
0
      headers_copy = std::move(result.new_headers);
111
0
    }
112
0
  }
113
114
800
  postToConnectionThread([this, headers_copy = std::move(headers_copy), end_stream]() -> void {
115
800
    {
116
800
      absl::MutexLock lock(&lock_);
117
800
      if (!parent_.connected() || saw_reset_) {
118
        // Encoded already deleted.
119
0
        return;
120
0
      }
121
800
    }
122
800
    encoder_.encodeHeaders(*headers_copy, end_stream);
123
800
  });
124
800
}
125
126
0
void FakeStream::encodeData(std::string data, bool end_stream) {
127
0
  postToConnectionThread([this, data, end_stream]() -> void {
128
0
    {
129
0
      absl::MutexLock lock(&lock_);
130
0
      if (!parent_.connected() || saw_reset_) {
131
        // Encoded already deleted.
132
0
        return;
133
0
      }
134
0
    }
135
0
    Buffer::OwnedImpl fake_data(data.data(), data.size());
136
0
    encoder_.encodeData(fake_data, end_stream);
137
0
  });
138
0
}
139
140
786
void FakeStream::encodeData(uint64_t size, bool end_stream) {
141
786
  postToConnectionThread([this, size, end_stream]() -> void {
142
786
    {
143
786
      absl::MutexLock lock(&lock_);
144
786
      if (!parent_.connected() || saw_reset_) {
145
        // Encoded already deleted.
146
0
        return;
147
0
      }
148
786
    }
149
786
    Buffer::OwnedImpl data(std::string(size, 'a'));
150
786
    encoder_.encodeData(data, end_stream);
151
786
  });
152
786
}
153
154
172
void FakeStream::encodeData(Buffer::Instance& data, bool end_stream) {
155
172
  std::shared_ptr<Buffer::Instance> data_copy = std::make_shared<Buffer::OwnedImpl>(data);
156
172
  postToConnectionThread([this, data_copy, end_stream]() -> void {
157
172
    {
158
172
      absl::MutexLock lock(&lock_);
159
172
      if (!parent_.connected() || saw_reset_) {
160
        // Encoded already deleted.
161
0
        return;
162
0
      }
163
172
    }
164
172
    encoder_.encodeData(*data_copy, end_stream);
165
172
  });
166
172
}
167
168
786
void FakeStream::encodeTrailers(const Http::HeaderMap& trailers) {
169
786
  std::shared_ptr<Http::ResponseTrailerMap> trailers_copy(
170
786
      Http::createHeaderMap<Http::ResponseTrailerMapImpl>(trailers));
171
786
  postToConnectionThread([this, trailers_copy]() -> void {
172
786
    {
173
786
      absl::MutexLock lock(&lock_);
174
786
      if (!parent_.connected() || saw_reset_) {
175
        // Encoded already deleted.
176
0
        return;
177
0
      }
178
786
    }
179
786
    encoder_.encodeTrailers(*trailers_copy);
180
786
  });
181
786
}
182
183
0
void FakeStream::encodeResetStream() {
184
0
  postToConnectionThread([this]() -> void {
185
0
    {
186
0
      absl::MutexLock lock(&lock_);
187
0
      if (!parent_.connected() || saw_reset_) {
188
        // Encoded already deleted.
189
0
        return;
190
0
      }
191
0
    }
192
0
    if (parent_.type() == Http::CodecType::HTTP1) {
193
0
      parent_.connection().close(Network::ConnectionCloseType::FlushWrite);
194
0
    } else {
195
0
      encoder_.getStream().resetStream(Http::StreamResetReason::LocalReset);
196
0
    }
197
0
  });
198
0
}
199
200
0
void FakeStream::encodeMetadata(const Http::MetadataMapVector& metadata_map_vector) {
201
0
  postToConnectionThread([this, &metadata_map_vector]() -> void {
202
0
    {
203
0
      absl::MutexLock lock(&lock_);
204
0
      if (!parent_.connected() || saw_reset_) {
205
        // Encoded already deleted.
206
0
        return;
207
0
      }
208
0
    }
209
0
    encoder_.encodeMetadata(metadata_map_vector);
210
0
  });
211
0
}
212
213
0
void FakeStream::readDisable(bool disable) {
214
0
  postToConnectionThread([this, disable]() -> void {
215
0
    {
216
0
      absl::MutexLock lock(&lock_);
217
0
      if (!parent_.connected() || saw_reset_) {
218
        // Encoded already deleted.
219
0
        return;
220
0
      }
221
0
    }
222
0
    encoder_.getStream().readDisable(disable);
223
0
  });
224
0
}
225
226
0
void FakeStream::onResetStream(Http::StreamResetReason, absl::string_view) {
227
0
  absl::MutexLock lock(&lock_);
228
0
  saw_reset_ = true;
229
0
}
230
231
0
AssertionResult FakeStream::waitForHeadersComplete(milliseconds timeout) {
232
0
  absl::MutexLock lock(&lock_);
233
0
  const auto reached = [this]()
234
0
                           ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return headers_ != nullptr; };
235
0
  if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
236
0
    return AssertionFailure() << "Timed out waiting for headers.";
237
0
  }
238
0
  return AssertionSuccess();
239
0
}
240
241
namespace {
242
// Perform a wait on a condition while still allowing for periodic client dispatcher runs that
243
// occur on the current thread.
244
bool waitForWithDispatcherRun(Event::TestTimeSystem& time_system, absl::Mutex& lock,
245
                              const std::function<bool()>& condition,
246
                              Event::Dispatcher& client_dispatcher, milliseconds timeout)
247
205
    ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock) {
248
205
  Event::TestTimeSystem::RealTimeBound bound(timeout);
249
208
  while (bound.withinBound()) {
250
    // Wake up periodically to run the client dispatcher.
251
208
    if (time_system.waitFor(lock, absl::Condition(&condition), 5ms * TIMEOUT_FACTOR)) {
252
205
      return true;
253
205
    }
254
255
    // Run the client dispatcher since we may need to process window updates, etc.
256
3
    client_dispatcher.run(Event::Dispatcher::RunType::NonBlock);
257
3
  }
258
0
  return false;
259
205
}
260
} // namespace
261
262
AssertionResult FakeStream::waitForData(Event::Dispatcher& client_dispatcher, uint64_t body_length,
263
177
                                        milliseconds timeout) {
264
177
  absl::MutexLock lock(&lock_);
265
177
  if (!waitForWithDispatcherRun(
266
177
          time_system_, lock_,
267
177
          [this, body_length]()
268
598
              ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return (body_.length() >= body_length); },
269
177
          client_dispatcher, timeout)) {
270
0
    return AssertionFailure() << "Timed out waiting for data.";
271
0
  }
272
177
  return AssertionSuccess();
273
177
}
274
275
AssertionResult FakeStream::waitForData(Event::Dispatcher& client_dispatcher,
276
0
                                        absl::string_view data, milliseconds timeout) {
277
0
  auto succeeded = waitForData(client_dispatcher, data.length(), timeout);
278
0
  if (succeeded) {
279
0
    Buffer::OwnedImpl buffer(data.data(), data.length());
280
0
    if (!TestUtility::buffersEqual(body(), buffer)) {
281
0
      return AssertionFailure() << body().toString() << " not equal to " << data;
282
0
    }
283
0
  }
284
0
  return succeeded;
285
0
}
286
287
AssertionResult FakeStream::waitForData(Event::Dispatcher& client_dispatcher,
288
                                        const FakeStream::ValidatorFunction& data_validator,
289
0
                                        std::chrono::milliseconds timeout) {
290
0
  absl::MutexLock lock(&lock_);
291
0
  if (!waitForWithDispatcherRun(
292
0
          time_system_, lock_,
293
0
          [this, data_validator]()
294
0
              ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return data_validator(body_.toString()); },
295
0
          client_dispatcher, timeout)) {
296
0
    return AssertionFailure() << "Timed out waiting for data.";
297
0
  }
298
0
  return AssertionSuccess();
299
0
}
300
301
AssertionResult FakeStream::waitForEndStream(Event::Dispatcher& client_dispatcher,
302
0
                                             milliseconds timeout) {
303
0
  absl::MutexLock lock(&lock_);
304
0
  if (!waitForWithDispatcherRun(
305
0
          time_system_, lock_,
306
0
          [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return end_stream_; }, client_dispatcher,
307
0
          timeout)) {
308
0
    return AssertionFailure() << "Timed out waiting for end of stream.";
309
0
  }
310
0
  return AssertionSuccess();
311
0
}
312
313
0
AssertionResult FakeStream::waitForReset(milliseconds timeout) {
314
0
  absl::MutexLock lock(&lock_);
315
0
  if (!time_system_.waitFor(lock_, absl::Condition(&saw_reset_), timeout)) {
316
0
    return AssertionFailure() << "Timed out waiting for reset.";
317
0
  }
318
0
  return AssertionSuccess();
319
0
}
320
321
14
void FakeStream::startGrpcStream(bool send_headers) {
322
14
  ASSERT(!grpc_stream_started_, "gRPC stream should not be started more than once");
323
14
  grpc_stream_started_ = true;
324
14
  if (send_headers) {
325
14
    encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false);
326
14
  }
327
14
}
328
329
0
void FakeStream::finishGrpcStream(Grpc::Status::GrpcStatus status) {
330
0
  encodeTrailers(Http::TestResponseTrailerMapImpl{
331
0
      {"grpc-status", std::to_string(static_cast<uint32_t>(status))}});
332
0
}
333
334
class TestHttp1ServerConnectionImpl : public Http::Http1::ServerConnectionImpl {
335
public:
336
  using Http::Http1::ServerConnectionImpl::ServerConnectionImpl;
337
};
338
339
class TestHttp2ServerConnectionImpl : public Http::Http2::ServerConnectionImpl {
340
public:
341
  TestHttp2ServerConnectionImpl(
342
      Network::Connection& connection, Http::ServerConnectionCallbacks& callbacks,
343
      Http::Http2::CodecStats& stats, Random::RandomGenerator& random_generator,
344
      const envoy::config::core::v3::Http2ProtocolOptions& http2_options,
345
      const uint32_t max_request_headers_kb, const uint32_t max_request_headers_count,
346
      envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction
347
          headers_with_underscores_action,
348
      Server::OverloadManager& overload_manager)
349
      : ServerConnectionImpl(connection, callbacks, stats, random_generator, http2_options,
350
                             max_request_headers_kb, max_request_headers_count,
351
407
                             headers_with_underscores_action, overload_manager) {}
352
353
0
  void updateConcurrentStreams(uint32_t max_streams) {
354
0
    absl::InlinedVector<http2::adapter::Http2Setting, 1> settings;
355
0
    settings.push_back({http2::adapter::MAX_CONCURRENT_STREAMS, max_streams});
356
0
    adapter_->SubmitSettings(settings);
357
0
    const int rc = adapter_->Send();
358
0
    ASSERT(rc == 0);
359
0
  }
360
};
361
362
namespace {
363
// Fake upstream codec will not do path normalization, so the tests can observe
364
// the path forwarded by Envoy.
365
::envoy::extensions::http::header_validators::envoy_default::v3::HeaderValidatorConfig
366
407
fakeUpstreamHeaderValidatorConfig() {
367
407
  ::envoy::extensions::http::header_validators::envoy_default::v3::HeaderValidatorConfig config;
368
407
  config.mutable_uri_path_normalization_options()->set_skip_path_normalization(true);
369
407
  config.mutable_uri_path_normalization_options()->set_skip_merging_slashes(true);
370
407
  config.mutable_uri_path_normalization_options()->set_path_with_escaped_slashes_action(
371
407
      ::envoy::extensions::http::header_validators::envoy_default::v3::HeaderValidatorConfig::
372
407
          UriPathNormalizationOptions::KEEP_UNCHANGED);
373
407
  return config;
374
407
}
375
} // namespace
376
377
FakeHttpConnection::FakeHttpConnection(
378
    FakeUpstream& fake_upstream, SharedConnectionWrapper& shared_connection, Http::CodecType type,
379
    Event::TestTimeSystem& time_system, uint32_t max_request_headers_kb,
380
    uint32_t max_request_headers_count,
381
    envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction
382
        headers_with_underscores_action)
383
    : FakeConnectionBase(shared_connection, time_system), type_(type),
384
      header_validator_factory_(
385
407
          IntegrationUtil::makeHeaderValidationFactory(fakeUpstreamHeaderValidatorConfig())) {
386
407
  ASSERT(max_request_headers_count != 0);
387
407
  if (type == Http::CodecType::HTTP1) {
388
0
    Http::Http1Settings http1_settings;
389
0
    http1_settings.use_balsa_parser_ =
390
0
        Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http1_use_balsa_parser");
391
    // For the purpose of testing, we always have the upstream encode the trailers if any
392
0
    http1_settings.enable_trailers_ = true;
393
0
    Http::Http1::CodecStats& stats = fake_upstream.http1CodecStats();
394
0
    codec_ = std::make_unique<TestHttp1ServerConnectionImpl>(
395
0
        shared_connection_.connection(), stats, *this, http1_settings, max_request_headers_kb,
396
0
        max_request_headers_count, headers_with_underscores_action, overload_manager_);
397
407
  } else if (type == Http::CodecType::HTTP2) {
398
407
    envoy::config::core::v3::Http2ProtocolOptions http2_options = fake_upstream.http2Options();
399
407
    Http::Http2::CodecStats& stats = fake_upstream.http2CodecStats();
400
407
    codec_ = std::make_unique<TestHttp2ServerConnectionImpl>(
401
407
        shared_connection_.connection(), *this, stats, random_, http2_options,
402
407
        max_request_headers_kb, max_request_headers_count, headers_with_underscores_action,
403
407
        overload_manager_);
404
407
  } else {
405
0
    ASSERT(type == Http::CodecType::HTTP3);
406
0
#ifdef ENVOY_ENABLE_QUIC
407
0
    Http::Http3::CodecStats& stats = fake_upstream.http3CodecStats();
408
0
    codec_ = std::make_unique<Quic::QuicHttpServerConnectionImpl>(
409
0
        dynamic_cast<Quic::EnvoyQuicServerSession&>(shared_connection_.connection()), *this, stats,
410
0
        fake_upstream.http3Options(), max_request_headers_kb, max_request_headers_count,
411
0
        headers_with_underscores_action);
412
#else
413
    ASSERT(false, "running a QUIC integration test without compiling QUIC");
414
#endif
415
0
  }
416
407
  shared_connection_.connection().addReadFilter(
417
407
      Network::ReadFilterSharedPtr{new ReadFilter(*this)});
418
407
}
Envoy::FakeHttpConnection::FakeHttpConnection(Envoy::FakeUpstream&, Envoy::SharedConnectionWrapper&, Envoy::Http::CodecType, Envoy::Event::TestTimeSystem&, unsigned int, unsigned int, envoy::config::core::v3::HttpProtocolOptions_HeadersWithUnderscoresAction)
Line
Count
Source
385
393
          IntegrationUtil::makeHeaderValidationFactory(fakeUpstreamHeaderValidatorConfig())) {
386
393
  ASSERT(max_request_headers_count != 0);
387
393
  if (type == Http::CodecType::HTTP1) {
388
0
    Http::Http1Settings http1_settings;
389
0
    http1_settings.use_balsa_parser_ =
390
0
        Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http1_use_balsa_parser");
391
    // For the purpose of testing, we always have the upstream encode the trailers if any
392
0
    http1_settings.enable_trailers_ = true;
393
0
    Http::Http1::CodecStats& stats = fake_upstream.http1CodecStats();
394
0
    codec_ = std::make_unique<TestHttp1ServerConnectionImpl>(
395
0
        shared_connection_.connection(), stats, *this, http1_settings, max_request_headers_kb,
396
0
        max_request_headers_count, headers_with_underscores_action, overload_manager_);
397
393
  } else if (type == Http::CodecType::HTTP2) {
398
393
    envoy::config::core::v3::Http2ProtocolOptions http2_options = fake_upstream.http2Options();
399
393
    Http::Http2::CodecStats& stats = fake_upstream.http2CodecStats();
400
393
    codec_ = std::make_unique<TestHttp2ServerConnectionImpl>(
401
393
        shared_connection_.connection(), *this, stats, random_, http2_options,
402
393
        max_request_headers_kb, max_request_headers_count, headers_with_underscores_action,
403
393
        overload_manager_);
404
393
  } else {
405
0
    ASSERT(type == Http::CodecType::HTTP3);
406
0
#ifdef ENVOY_ENABLE_QUIC
407
0
    Http::Http3::CodecStats& stats = fake_upstream.http3CodecStats();
408
0
    codec_ = std::make_unique<Quic::QuicHttpServerConnectionImpl>(
409
0
        dynamic_cast<Quic::EnvoyQuicServerSession&>(shared_connection_.connection()), *this, stats,
410
0
        fake_upstream.http3Options(), max_request_headers_kb, max_request_headers_count,
411
0
        headers_with_underscores_action);
412
#else
413
    ASSERT(false, "running a QUIC integration test without compiling QUIC");
414
#endif
415
0
  }
416
393
  shared_connection_.connection().addReadFilter(
417
393
      Network::ReadFilterSharedPtr{new ReadFilter(*this)});
418
393
}
Envoy::FakeHttpConnection::FakeHttpConnection(Envoy::FakeUpstream&, Envoy::SharedConnectionWrapper&, Envoy::Http::CodecType, Envoy::Event::TestTimeSystem&, unsigned int, unsigned int, envoy::config::core::v3::HttpProtocolOptions_HeadersWithUnderscoresAction)
Line
Count
Source
385
14
          IntegrationUtil::makeHeaderValidationFactory(fakeUpstreamHeaderValidatorConfig())) {
386
14
  ASSERT(max_request_headers_count != 0);
387
14
  if (type == Http::CodecType::HTTP1) {
388
0
    Http::Http1Settings http1_settings;
389
0
    http1_settings.use_balsa_parser_ =
390
0
        Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http1_use_balsa_parser");
391
    // For the purpose of testing, we always have the upstream encode the trailers if any
392
0
    http1_settings.enable_trailers_ = true;
393
0
    Http::Http1::CodecStats& stats = fake_upstream.http1CodecStats();
394
0
    codec_ = std::make_unique<TestHttp1ServerConnectionImpl>(
395
0
        shared_connection_.connection(), stats, *this, http1_settings, max_request_headers_kb,
396
0
        max_request_headers_count, headers_with_underscores_action, overload_manager_);
397
14
  } else if (type == Http::CodecType::HTTP2) {
398
14
    envoy::config::core::v3::Http2ProtocolOptions http2_options = fake_upstream.http2Options();
399
14
    Http::Http2::CodecStats& stats = fake_upstream.http2CodecStats();
400
14
    codec_ = std::make_unique<TestHttp2ServerConnectionImpl>(
401
14
        shared_connection_.connection(), *this, stats, random_, http2_options,
402
14
        max_request_headers_kb, max_request_headers_count, headers_with_underscores_action,
403
14
        overload_manager_);
404
14
  } else {
405
0
    ASSERT(type == Http::CodecType::HTTP3);
406
0
#ifdef ENVOY_ENABLE_QUIC
407
0
    Http::Http3::CodecStats& stats = fake_upstream.http3CodecStats();
408
0
    codec_ = std::make_unique<Quic::QuicHttpServerConnectionImpl>(
409
0
        dynamic_cast<Quic::EnvoyQuicServerSession&>(shared_connection_.connection()), *this, stats,
410
0
        fake_upstream.http3Options(), max_request_headers_kb, max_request_headers_count,
411
0
        headers_with_underscores_action);
412
#else
413
    ASSERT(false, "running a QUIC integration test without compiling QUIC");
414
#endif
415
0
  }
416
14
  shared_connection_.connection().addReadFilter(
417
14
      Network::ReadFilterSharedPtr{new ReadFilter(*this)});
418
14
}
419
420
51
AssertionResult FakeConnectionBase::close(std::chrono::milliseconds timeout) {
421
51
  ENVOY_LOG(trace, "FakeConnectionBase close");
422
51
  if (!shared_connection_.connected()) {
423
0
    return AssertionSuccess();
424
0
  }
425
51
  return shared_connection_.executeOnDispatcher(
426
51
      [](Network::Connection& connection) {
427
47
        connection.close(Network::ConnectionCloseType::FlushWrite);
428
47
      },
429
51
      timeout);
430
51
}
431
432
AssertionResult FakeConnectionBase::close(Network::ConnectionCloseType close_type,
433
0
                                          std::chrono::milliseconds timeout) {
434
0
  ENVOY_LOG(trace, "FakeConnectionBase close type={}", static_cast<int>(close_type));
435
0
  if (!shared_connection_.connected()) {
436
0
    return AssertionSuccess();
437
0
  }
438
0
  return shared_connection_.executeOnDispatcher(
439
0
      [&close_type](Network::Connection& connection) { connection.close(close_type); }, timeout);
440
0
}
441
442
0
AssertionResult FakeConnectionBase::readDisable(bool disable, std::chrono::milliseconds timeout) {
443
0
  return shared_connection_.executeOnDispatcher(
444
0
      [disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout);
445
0
}
446
447
namespace {
448
0
Http::Protocol codeTypeToProtocol(Http::CodecType codec_type) {
449
0
  switch (codec_type) {
450
0
  case Http::CodecType::HTTP1:
451
0
    return Http::Protocol::Http11;
452
0
  case Http::CodecType::HTTP2:
453
0
    return Http::Protocol::Http2;
454
0
  case Http::CodecType::HTTP3:
455
0
    return Http::Protocol::Http3;
456
0
  }
457
0
  PANIC_DUE_TO_CORRUPT_ENUM;
458
0
}
459
} // namespace
460
461
800
Http::ServerHeaderValidatorPtr FakeHttpConnection::makeHeaderValidator() {
462
800
  return header_validator_factory_ ? header_validator_factory_->createServerHeaderValidator(
463
0
                                         codeTypeToProtocol(type_), header_validator_stats_)
464
800
                                   : nullptr;
465
800
}
466
467
14
Http::RequestDecoder& FakeHttpConnection::newStream(Http::ResponseEncoder& encoder, bool) {
468
14
  absl::MutexLock lock(&lock_);
469
14
  new_streams_.emplace_back(new FakeStream(*this, encoder, time_system_));
470
14
  return *new_streams_.back();
471
14
}
472
473
0
void FakeHttpConnection::onGoAway(Http::GoAwayErrorCode code) {
474
0
  ASSERT(type_ != Http::CodecType::HTTP1);
475
  // Usually indicates connection level errors, no operations are needed since
476
  // the connection will be closed soon.
477
0
  ENVOY_LOG(info, "FakeHttpConnection receives GOAWAY: ", static_cast<int>(code));
478
0
}
479
480
0
void FakeHttpConnection::encodeGoAway() {
481
0
  ASSERT(type_ != Http::CodecType::HTTP1);
482
483
0
  postToConnectionThread([this]() { codec_->goAway(); });
484
0
}
485
486
0
void FakeHttpConnection::updateConcurrentStreams(uint64_t max_streams) {
487
0
  ASSERT(type_ != Http::CodecType::HTTP1);
488
489
0
  if (type_ == Http::CodecType::HTTP2) {
490
0
    postToConnectionThread([this, max_streams]() {
491
0
      auto codec = dynamic_cast<TestHttp2ServerConnectionImpl*>(codec_.get());
492
0
      codec->updateConcurrentStreams(max_streams);
493
0
    });
494
0
  } else {
495
0
#ifdef ENVOY_ENABLE_QUIC
496
0
    postToConnectionThread([this, max_streams]() {
497
0
      auto codec = dynamic_cast<Quic::QuicHttpServerConnectionImpl*>(codec_.get());
498
0
      quic::test::QuicSessionPeer::SetMaxOpenIncomingBidirectionalStreams(
499
0
          &codec->quicServerSession(), max_streams);
500
0
      codec->quicServerSession().SendMaxStreams(1, false);
501
0
    });
502
#else
503
    UNREFERENCED_PARAMETER(max_streams);
504
#endif
505
0
  }
506
0
}
507
508
0
void FakeHttpConnection::encodeProtocolError() {
509
0
  ASSERT(type_ != Http::CodecType::HTTP1);
510
511
0
  Http::Http2::ServerConnectionImpl* codec =
512
0
      dynamic_cast<Http::Http2::ServerConnectionImpl*>(codec_.get());
513
0
  ASSERT(codec != nullptr);
514
0
  postToConnectionThread([codec]() {
515
0
    Http::Status status = codec->protocolErrorForTest();
516
0
    ASSERT(Http::getStatusCode(status) == Http::StatusCode::CodecProtocolError);
517
0
  });
518
0
}
519
520
92
AssertionResult FakeConnectionBase::waitForDisconnect(milliseconds timeout) {
521
92
  ENVOY_LOG(trace, "FakeConnectionBase waiting for disconnect");
522
92
  absl::MutexLock lock(&lock_);
523
92
  const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
524
92
    return !shared_connection_.connectedLockHeld();
525
92
  };
526
527
92
  if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
528
0
    if (timeout == TestUtility::DefaultTimeout) {
529
0
      ADD_FAILURE() << "Please don't waitForDisconnect with a 5s timeout if failure is expected\n";
530
0
    }
531
0
    return AssertionFailure() << "Timed out waiting for disconnect.";
532
0
  }
533
92
  ENVOY_LOG(trace, "FakeConnectionBase done waiting for disconnect");
534
92
  return AssertionSuccess();
535
92
}
536
537
0
AssertionResult FakeConnectionBase::waitForRstDisconnect(std::chrono::milliseconds timeout) {
538
0
  ENVOY_LOG(trace, "FakeConnectionBase waiting for RST disconnect");
539
0
  absl::MutexLock lock(&lock_);
540
0
  const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
541
0
    return shared_connection_.rstDisconnected();
542
0
  };
543
544
0
  if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
545
0
    if (timeout == TestUtility::DefaultTimeout) {
546
0
      ADD_FAILURE()
547
0
          << "Please don't waitForRstDisconnect with a 5s timeout if failure is expected\n";
548
0
    }
549
0
    return AssertionFailure() << "Timed out waiting for RST disconnect.";
550
0
  }
551
0
  ENVOY_LOG(trace, "FakeConnectionBase done waiting for RST disconnect");
552
0
  return AssertionSuccess();
553
0
}
554
555
0
AssertionResult FakeConnectionBase::waitForHalfClose(milliseconds timeout) {
556
0
  absl::MutexLock lock(&lock_);
557
0
  if (!time_system_.waitFor(lock_, absl::Condition(&half_closed_), timeout)) {
558
0
    return AssertionFailure() << "Timed out waiting for half close.";
559
0
  }
560
0
  return AssertionSuccess();
561
0
}
562
563
0
AssertionResult FakeConnectionBase::waitForNoPost(milliseconds timeout) {
564
0
  absl::MutexLock lock(&lock_);
565
0
  if (!time_system_.waitFor(
566
0
          lock_,
567
0
          absl::Condition(
568
0
              [](void* fake_connection) -> bool {
569
0
                return static_cast<FakeConnectionBase*>(fake_connection)->pending_cbs_ == 0;
570
0
              },
571
0
              this),
572
0
          timeout)) {
573
0
    return AssertionFailure() << "Timed out waiting for ops on this connection";
574
0
  }
575
0
  return AssertionSuccess();
576
0
}
577
578
2.54k
void FakeConnectionBase::postToConnectionThread(std::function<void()> cb) {
579
2.54k
  ++pending_cbs_;
580
2.54k
  dispatcher_.post([this, cb]() {
581
2.54k
    cb();
582
2.54k
    {
583
      // Snag this lock not because it's needed but so waitForNoPost doesn't stall
584
2.54k
      absl::MutexLock lock(&lock_);
585
2.54k
      --pending_cbs_;
586
2.54k
    }
587
2.54k
  });
588
2.54k
}
589
590
AssertionResult FakeHttpConnection::waitForNewStream(Event::Dispatcher& client_dispatcher,
591
                                                     FakeStreamPtr& stream,
592
14
                                                     std::chrono::milliseconds timeout) {
593
14
  absl::MutexLock lock(&lock_);
594
14
  if (!waitForWithDispatcherRun(
595
14
          time_system_, lock_,
596
42
          [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return !new_streams_.empty(); },
597
14
          client_dispatcher, timeout)) {
598
0
    return AssertionFailure() << "Timed out waiting for new stream.";
599
0
  }
600
14
  stream = std::move(new_streams_.front());
601
14
  new_streams_.pop_front();
602
14
  return AssertionSuccess();
603
14
}
604
605
FakeUpstream::FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory,
606
                           const std::string& uds_path, const FakeUpstreamConfig& config)
607
    : FakeUpstream(std::move(transport_socket_factory),
608
                   Network::SocketPtr{new Network::UdsListenSocket(
609
                       *Network::Address::PipeInstance::create(uds_path))},
610
0
                   config) {}
611
612
static Network::SocketPtr
613
1.99k
makeTcpListenSocket(const Network::Address::InstanceConstSharedPtr& address) {
614
1.99k
  return std::make_unique<Network::TcpListenSocket>(address, nullptr, true);
615
1.99k
}
616
617
static Network::Address::InstanceConstSharedPtr makeAddress(uint32_t port,
618
14
                                                            Network::Address::IpVersion version) {
619
14
  return Network::Utility::parseInternetAddressNoThrow(
620
14
      Network::Test::getLoopbackAddressString(version), port);
621
14
}
622
623
static Network::SocketPtr
624
0
makeUdpListenSocket(const Network::Address::InstanceConstSharedPtr& address) {
625
0
  auto socket = std::make_unique<Network::UdpListenSocket>(address, nullptr, true);
626
  // TODO(mattklein123): These options are set in multiple locations. We should centralize them for
627
  // UDP listeners.
628
0
  socket->addOptions(Network::SocketOptionFactory::buildIpPacketInfoOptions());
629
0
  socket->addOptions(Network::SocketOptionFactory::buildRxQueueOverFlowOptions());
630
0
  return socket;
631
0
}
632
633
static Network::SocketPtr
634
makeListenSocket(const FakeUpstreamConfig& config,
635
1.99k
                 const Network::Address::InstanceConstSharedPtr& address) {
636
1.99k
  return (config.udp_fake_upstream_.has_value() ? makeUdpListenSocket(address)
637
1.99k
                                                : makeTcpListenSocket(address));
638
1.99k
}
639
640
FakeUpstream::FakeUpstream(uint32_t port, Network::Address::IpVersion version,
641
                           const FakeUpstreamConfig& config, const bool defer_initialization)
642
    : FakeUpstream(Network::Test::createRawBufferDownstreamSocketFactory(),
643
                   makeListenSocket(config, makeAddress(port, version)), config,
644
14
                   defer_initialization) {}
645
646
FakeUpstream::FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory,
647
                           const Network::Address::InstanceConstSharedPtr& address,
648
                           const FakeUpstreamConfig& config)
649
1.97k
    : FakeUpstream(std::move(transport_socket_factory), makeListenSocket(config, address), config) {
650
1.97k
}
651
652
FakeUpstream::FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory,
653
                           uint32_t port, Network::Address::IpVersion version,
654
                           const FakeUpstreamConfig& config)
655
    : FakeUpstream(std::move(transport_socket_factory),
656
0
                   makeListenSocket(config, makeAddress(port, version)), config) {}
657
658
FakeUpstream::FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory,
659
                           Network::SocketPtr&& listen_socket, const FakeUpstreamConfig& config,
660
                           const bool defer_initialization)
661
    : http_type_(config.upstream_protocol_), http2_options_(config.http2_options_),
662
      http3_options_(config.http3_options_), quic_options_(config.quic_options_),
663
      socket_(Network::SocketSharedPtr(listen_socket.release())),
664
      api_(Api::createApiForTest(stats_store_)), time_system_(config.time_system_),
665
      dispatcher_(api_->allocateDispatcher("fake_upstream")),
666
      handler_(new Server::ConnectionHandlerImpl(*dispatcher_, 0)), config_(config),
667
      read_disable_on_new_connection_(true), enable_half_close_(config.enable_half_close_),
668
      listener_(*this, http_type_ == Http::CodecType::HTTP3),
669
      filter_chain_(Network::Test::createEmptyFilterChain(std::move(transport_socket_factory))),
670
1.99k
      stats_scope_(stats_store_.createScope("test_server_scope")) {
671
1.99k
  socket_factories_.emplace_back(std::make_unique<FakeListenSocketFactory>(socket_));
672
1.99k
  ENVOY_LOG(info, "starting fake server at {}. UDP={} codec={}", localAddress()->asString(),
673
1.99k
            config.udp_fake_upstream_.has_value(), FakeHttpConnection::typeToString(http_type_));
674
1.99k
  if (config.udp_fake_upstream_.has_value() &&
675
1.99k
      config.udp_fake_upstream_->max_rx_datagram_size_.has_value()) {
676
0
    listener_.udp_listener_config_.config_.mutable_downstream_socket_config()
677
0
        ->mutable_max_rx_datagram_size()
678
0
        ->set_value(config.udp_fake_upstream_->max_rx_datagram_size_.value());
679
0
  }
680
681
1.99k
  if (!defer_initialization) {
682
1.99k
    initializeServer();
683
1.99k
  }
684
1.99k
}
685
686
1.99k
FakeUpstream::~FakeUpstream() { cleanUp(); };
687
688
1.99k
void FakeUpstream::initializeServer() {
689
1.99k
  if (initialized_) {
690
    // Already initialized.
691
0
    return;
692
0
  }
693
694
1.99k
  dispatcher_->post([this]() -> void {
695
1.99k
    EXPECT_TRUE(socket_factories_[0]->doFinalPreWorkerInit().ok());
696
1.99k
    handler_->addListener(absl::nullopt, listener_, runtime_, random_);
697
1.99k
    server_initialized_.setReady();
698
1.99k
  });
699
1.99k
  thread_ = api_->threadFactory().createThread([this]() -> void { threadRoutine(); });
700
1.99k
  server_initialized_.waitReady();
701
1.99k
  initialized_ = true;
702
1.99k
}
703
704
2.39k
void FakeUpstream::cleanUp() {
705
2.39k
  if (thread_.get()) {
706
1.99k
    dispatcher_->exit();
707
1.99k
    thread_->join();
708
1.99k
    thread_.reset();
709
1.99k
  }
710
2.39k
}
711
712
bool FakeUpstream::createNetworkFilterChain(Network::Connection& connection,
713
102
                                            const Filter::NetworkFilterFactoriesList&) {
714
102
  absl::MutexLock lock(&lock_);
715
102
  if (read_disable_on_new_connection_ && http_type_ != Http::CodecType::HTTP3) {
716
    // Disable early close detection to avoid closing the network connection before full
717
    // initialization is complete.
718
102
    connection.detectEarlyCloseWhenReadDisabled(false);
719
102
    connection.readDisable(true);
720
102
    if (disable_and_do_not_enable_) {
721
0
      dynamic_cast<Network::ConnectionImpl*>(&connection)->ioHandle().enableFileEvents(0);
722
0
    }
723
102
  }
724
102
  auto connection_wrapper = std::make_unique<SharedConnectionWrapper>(connection);
725
726
102
  LinkedList::moveIntoListBack(std::move(connection_wrapper), new_connections_);
727
728
  // Normally we don't associate a logical network connection with a FakeHttpConnection  until
729
  // waitForHttpConnection is called, but QUIC needs to be set up as packets come in, so we do
730
  // not lazily create for HTTP/3
731
102
  if (http_type_ == Http::CodecType::HTTP3) {
732
0
    quic_connections_.push_back(std::make_unique<FakeHttpConnection>(
733
0
        *this, consumeConnection(), http_type_, time_system_, config_.max_request_headers_kb_,
734
0
        config_.max_request_headers_count_, config_.headers_with_underscores_action_));
735
0
    quic_connections_.back()->initialize();
736
0
  }
737
102
  return true;
738
102
}
739
740
102
bool FakeUpstream::createListenerFilterChain(Network::ListenerFilterManager&) { return true; }
741
742
void FakeUpstream::createUdpListenerFilterChain(Network::UdpListenerFilterManager& udp_listener,
743
0
                                                Network::UdpReadFilterCallbacks& callbacks) {
744
0
  udp_listener.addReadFilter(std::make_unique<FakeUdpFilter>(*this, callbacks));
745
0
}
746
747
0
bool FakeUpstream::createQuicListenerFilterChain(Network::QuicListenerFilterManager&) {
748
0
  return true;
749
0
}
750
751
1.99k
void FakeUpstream::threadRoutine() {
752
1.99k
  dispatcher_->run(Event::Dispatcher::RunType::Block);
753
1.99k
  handler_.reset();
754
1.99k
  {
755
1.99k
    absl::MutexLock lock(&lock_);
756
1.99k
    new_connections_.clear();
757
1.99k
    quic_connections_.clear();
758
1.99k
    consumed_connections_.clear();
759
1.99k
  }
760
1.99k
}
761
762
AssertionResult FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher,
763
                                                    FakeHttpConnectionPtr& connection,
764
14
                                                    milliseconds timeout) {
765
14
  if (!initialized_) {
766
0
    return AssertionFailure()
767
0
           << "Must initialize the FakeUpstream first by calling initializeServer().";
768
0
  }
769
770
14
  {
771
14
    absl::MutexLock lock(&lock_);
772
773
    // As noted in createNetworkFilterChain, HTTP3 FakeHttpConnections are not
774
    // lazily created, so HTTP3 needs a different wait path here.
775
14
    if (http_type_ == Http::CodecType::HTTP3) {
776
0
      if (quic_connections_.empty() &&
777
0
          !waitForWithDispatcherRun(
778
0
              time_system_, lock_,
779
0
              [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return !quic_connections_.empty(); },
780
0
              client_dispatcher, timeout)) {
781
0
        return AssertionFailure() << "Timed out waiting for new quic connection.";
782
0
      }
783
0
      if (!quic_connections_.empty()) {
784
0
        connection = std::move(quic_connections_.front());
785
0
        quic_connections_.pop_front();
786
0
        return AssertionSuccess();
787
0
      }
788
0
    }
789
790
14
    if (!waitForWithDispatcherRun(
791
14
            time_system_, lock_,
792
14
            [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return !new_connections_.empty(); },
793
14
            client_dispatcher, timeout)) {
794
0
      if (timeout == TestUtility::DefaultTimeout) {
795
0
        ADD_FAILURE()
796
0
            << "Please don't waitForHttpConnection with a 5s timeout if failure is expected\n";
797
0
      }
798
0
      return AssertionFailure() << "Timed out waiting for new connection.";
799
0
    }
800
14
  }
801
14
  return runOnDispatcherThreadAndWait([&]() {
802
14
    absl::MutexLock lock(&lock_);
803
14
    connection = std::make_unique<FakeHttpConnection>(
804
14
        *this, consumeConnection(), http_type_, time_system_, config_.max_request_headers_kb_,
805
14
        config_.max_request_headers_count_, config_.headers_with_underscores_action_);
806
14
    connection->initialize();
807
14
    return AssertionSuccess();
808
14
  });
809
14
}
810
811
absl::StatusOr<int>
812
FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher,
813
                                    std::vector<std::unique_ptr<FakeUpstream>>& upstreams,
814
0
                                    FakeHttpConnectionPtr& connection, milliseconds timeout) {
815
0
  if (upstreams.empty()) {
816
0
    return absl::InternalError("No upstreams configured.");
817
0
  }
818
0
  Event::TestTimeSystem::RealTimeBound bound(timeout);
819
0
  while (bound.withinBound()) {
820
0
    for (size_t i = 0; i < upstreams.size(); ++i) {
821
0
      FakeUpstream& upstream = *upstreams[i];
822
0
      {
823
0
        absl::MutexLock lock(&upstream.lock_);
824
0
        if (!upstream.isInitialized()) {
825
0
          return absl::InternalError(
826
0
              "Must initialize the FakeUpstream first by calling initializeServer().");
827
0
        }
828
0
        if (!waitForWithDispatcherRun(
829
0
                upstream.time_system_, upstream.lock_,
830
0
                [&upstream]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(upstream.lock_) {
831
0
                  return !upstream.new_connections_.empty();
832
0
                },
833
0
                client_dispatcher, 5ms)) {
834
0
          continue;
835
0
        }
836
0
      }
837
838
0
      EXPECT_TRUE(upstream.runOnDispatcherThreadAndWait([&]() {
839
0
        absl::MutexLock lock(&upstream.lock_);
840
0
        connection = std::make_unique<FakeHttpConnection>(
841
0
            upstream, upstream.consumeConnection(), upstream.http_type_, upstream.timeSystem(),
842
0
            Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT,
843
0
            envoy::config::core::v3::HttpProtocolOptions::ALLOW);
844
0
        connection->initialize();
845
0
        return AssertionSuccess();
846
0
      }));
847
0
      return i;
848
0
    }
849
0
  }
850
0
  return absl::InternalError("Timed out waiting for HTTP connection.");
851
0
}
852
853
ABSL_MUST_USE_RESULT
854
0
AssertionResult FakeUpstream::assertPendingConnectionsEmpty() {
855
0
  return runOnDispatcherThreadAndWait([&]() {
856
0
    absl::MutexLock lock(&lock_);
857
0
    return new_connections_.empty() ? AssertionSuccess() : AssertionFailure();
858
0
  });
859
0
}
860
861
AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connection,
862
335
                                                   milliseconds timeout) {
863
335
  if (!initialized_) {
864
0
    return AssertionFailure()
865
0
           << "Must initialize the FakeUpstream first by calling initializeServer().";
866
0
  }
867
868
335
  {
869
335
    absl::MutexLock lock(&lock_);
870
722
    const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
871
722
      return !new_connections_.empty();
872
722
    };
873
874
335
    ENVOY_LOG(debug, "waiting for raw connection");
875
335
    if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
876
249
      return AssertionFailure() << "Timed out waiting for raw connection";
877
249
    }
878
335
  }
879
880
86
  return runOnDispatcherThreadAndWait([&]() {
881
86
    absl::MutexLock lock(&lock_);
882
86
    connection = makeRawConnection(consumeConnection(), timeSystem());
883
86
    connection->initialize();
884
    // Skip enableHalfClose if the connection is already disconnected.
885
86
    if (connection->connected()) {
886
86
      connection->connection().enableHalfClose(enable_half_close_);
887
86
    }
888
86
    return AssertionSuccess();
889
86
  });
890
335
}
891
892
void FakeUpstream::convertFromRawToHttp(FakeRawConnectionPtr& raw_connection,
893
0
                                        FakeHttpConnectionPtr& connection) {
894
0
  absl::MutexLock lock(&lock_);
895
0
  SharedConnectionWrapper& shared_connection = raw_connection->sharedConnection();
896
897
0
  connection = std::make_unique<FakeHttpConnection>(
898
0
      *this, shared_connection, http_type_, time_system_, config_.max_request_headers_kb_,
899
0
      config_.max_request_headers_count_, config_.headers_with_underscores_action_);
900
0
  connection->initialize();
901
0
  raw_connection.release();
902
0
}
903
904
100
SharedConnectionWrapper& FakeUpstream::consumeConnection() {
905
100
  ASSERT(!new_connections_.empty());
906
100
  auto* const connection_wrapper = new_connections_.front().get();
907
  // Skip the thread safety check if the network connection has already been freed since there's no
908
  // alternate way to get access to the dispatcher.
909
100
  ASSERT(!connection_wrapper->connected() || connection_wrapper->dispatcher().isThreadSafe());
910
100
  connection_wrapper->setParented();
911
100
  connection_wrapper->moveBetweenLists(new_connections_, consumed_connections_);
912
100
  if (read_disable_on_new_connection_ && connection_wrapper->connected() &&
913
100
      http_type_ != Http::CodecType::HTTP3 && !disable_and_do_not_enable_) {
914
    // Re-enable read and early close detection.
915
100
    auto& connection = connection_wrapper->connection();
916
100
    connection.detectEarlyCloseWhenReadDisabled(true);
917
100
    connection.readDisable(false);
918
100
  }
919
100
  return *connection_wrapper;
920
100
}
921
922
AssertionResult FakeUpstream::waitForUdpDatagram(Network::UdpRecvData& data_to_fill,
923
0
                                                 std::chrono::milliseconds timeout) {
924
0
  if (!initialized_) {
925
0
    return AssertionFailure()
926
0
           << "Must initialize the FakeUpstream first by calling initializeServer().";
927
0
  }
928
929
0
  absl::MutexLock lock(&lock_);
930
0
  const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
931
0
    return !received_datagrams_.empty();
932
0
  };
933
934
0
  if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
935
0
    return AssertionFailure() << "Timed out waiting for UDP datagram.";
936
0
  }
937
938
0
  data_to_fill = std::move(received_datagrams_.front());
939
0
  received_datagrams_.pop_front();
940
0
  return AssertionSuccess();
941
0
}
942
943
0
Network::FilterStatus FakeUpstream::onRecvDatagram(Network::UdpRecvData& data) {
944
0
  absl::MutexLock lock(&lock_);
945
0
  received_datagrams_.emplace_back(std::move(data));
946
947
0
  return Network::FilterStatus::StopIteration;
948
0
}
949
950
AssertionResult FakeUpstream::runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb,
951
100
                                                           std::chrono::milliseconds timeout) {
952
100
  auto result = std::make_shared<AssertionResult>(AssertionSuccess());
953
100
  auto done = std::make_shared<absl::Notification>();
954
100
  ASSERT(!dispatcher_->isThreadSafe());
955
100
  dispatcher_->post([&]() {
956
100
    *result = cb();
957
100
    done->Notify();
958
100
  });
959
100
  RELEASE_ASSERT(done->WaitForNotificationWithTimeout(absl::FromChrono(timeout)),
960
100
                 "Timed out waiting for cb to run on dispatcher");
961
100
  return *result;
962
100
}
963
964
0
void FakeUpstream::runOnDispatcherThread(std::function<void()> cb) {
965
0
  ASSERT(!dispatcher_->isThreadSafe());
966
0
  dispatcher_->post([&]() { cb(); });
967
0
}
968
969
void FakeUpstream::sendUdpDatagram(const std::string& buffer,
970
0
                                   const Network::Address::InstanceConstSharedPtr& peer) {
971
0
  dispatcher_->post([this, buffer, peer] {
972
0
    const auto rc = Network::Utility::writeToSocket(socket_->ioHandle(), Buffer::OwnedImpl(buffer),
973
0
                                                    nullptr, *peer);
974
0
    EXPECT_TRUE(rc.return_value_ == buffer.length());
975
0
  });
976
0
}
977
978
AssertionResult FakeUpstream::rawWriteConnection(uint32_t index, const std::string& data,
979
                                                 bool end_stream,
980
0
                                                 std::chrono::milliseconds timeout) {
981
0
  if (!initialized_) {
982
0
    return AssertionFailure()
983
0
           << "Must initialize the FakeUpstream first by calling initializeServer().";
984
0
  }
985
986
0
  absl::MutexLock lock(&lock_);
987
0
  auto iter = consumed_connections_.begin();
988
0
  std::advance(iter, index);
989
0
  return (*iter)->executeOnDispatcher(
990
0
      [data, end_stream](Network::Connection& connection) {
991
0
        ASSERT(connection.state() == Network::Connection::State::Open);
992
0
        Buffer::OwnedImpl buffer(data);
993
0
        connection.write(buffer, end_stream);
994
0
      },
995
0
      timeout);
996
0
}
997
998
1.99k
absl::Status FakeUpstream::FakeListenSocketFactory::doFinalPreWorkerInit() {
999
1.99k
  if (socket_->socketType() == Network::Socket::Type::Stream) {
1000
1.99k
    EXPECT_EQ(0, socket_->ioHandle().listen(ENVOY_TCP_BACKLOG_SIZE).return_value_);
1001
1.99k
  } else {
1002
0
    ASSERT(socket_->socketType() == Network::Socket::Type::Datagram);
1003
0
    EXPECT_TRUE(Network::Socket::applyOptions(socket_->options(), *socket_,
1004
0
                                              envoy::config::core::v3::SocketOption::STATE_BOUND));
1005
0
  }
1006
1.99k
  return absl::OkStatus();
1007
1.99k
}
1008
1009
86
FakeRawConnection::~FakeRawConnection() {
1010
  // If the filter was already deleted, it means the shared_connection_ was too, so don't try to
1011
  // access it.
1012
86
  if (read_filter_ != nullptr) {
1013
86
    EXPECT_TRUE(shared_connection_.executeOnDispatcher(
1014
86
        [filter = std::move(read_filter_)](Network::Connection& connection) {
1015
86
          connection.removeReadFilter(filter);
1016
86
        }));
1017
86
  }
1018
86
}
1019
1020
86
void FakeRawConnection::initialize() {
1021
86
  FakeConnectionBase::initialize();
1022
86
  read_filter_ = std::make_shared<ReadFilter>(*this);
1023
86
  if (!shared_connection_.connected()) {
1024
0
    ENVOY_LOG(warn, "FakeRawConnection::initialize: network connection is already disconnected");
1025
0
    return;
1026
0
  }
1027
86
  ASSERT(shared_connection_.dispatcher().isThreadSafe());
1028
86
  shared_connection_.connection().addReadFilter(read_filter_);
1029
86
}
1030
1031
AssertionResult FakeRawConnection::waitForData(uint64_t num_bytes, std::string* data,
1032
0
                                               milliseconds timeout) {
1033
0
  absl::MutexLock lock(&lock_);
1034
0
  const auto reached = [this, num_bytes]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1035
0
    return data_.size() == num_bytes;
1036
0
  };
1037
0
  ENVOY_LOG(debug, "waiting for {} bytes of data", num_bytes);
1038
0
  if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
1039
0
    return AssertionFailure() << fmt::format(
1040
0
               "Timed out waiting for data. Got '{}', waiting for {} bytes.", data_, num_bytes);
1041
0
  }
1042
0
  if (data != nullptr) {
1043
0
    *data = data_;
1044
0
  }
1045
0
  return AssertionSuccess();
1046
0
}
1047
1048
AssertionResult
1049
FakeRawConnection::waitForData(const std::function<bool(const std::string&)>& data_validator,
1050
0
                               std::string* data, milliseconds timeout) {
1051
0
  absl::MutexLock lock(&lock_);
1052
0
  const auto reached = [this, &data_validator]()
1053
0
                           ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return data_validator(data_); };
1054
0
  ENVOY_LOG(debug, "waiting for data");
1055
0
  if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) {
1056
0
    return AssertionFailure() << "Timed out waiting for data.";
1057
0
  }
1058
0
  if (data != nullptr) {
1059
0
    *data = data_;
1060
0
  }
1061
0
  return AssertionSuccess();
1062
0
}
1063
1064
AssertionResult FakeRawConnection::write(const std::string& data, bool end_stream,
1065
3.62k
                                         milliseconds timeout) {
1066
3.62k
  return shared_connection_.executeOnDispatcher(
1067
3.62k
      [data, end_stream](Network::Connection& connection) {
1068
3.57k
        Buffer::OwnedImpl to_write(data);
1069
3.57k
        connection.write(to_write, end_stream);
1070
3.57k
      },
1071
3.62k
      timeout);
1072
3.62k
}
1073
1074
Network::FilterStatus FakeRawConnection::ReadFilter::onData(Buffer::Instance& data,
1075
498
                                                            bool end_stream) {
1076
498
  absl::MutexLock lock(&parent_.lock_);
1077
498
  ENVOY_LOG(debug, "got {} bytes, end_stream {}", data.length(), end_stream);
1078
498
  parent_.data_.append(data.toString());
1079
498
  parent_.half_closed_ = end_stream;
1080
498
  data.drain(data.length());
1081
498
  return Network::FilterStatus::StopIteration;
1082
498
}
1083
1084
ABSL_MUST_USE_RESULT
1085
AssertionResult FakeHttpConnection::waitForInexactRawData(const char* data, std::string* out,
1086
0
                                                          std::chrono::milliseconds timeout) {
1087
0
  absl::MutexLock lock(&lock_);
1088
0
  const auto reached = [this, data, &out]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) {
1089
0
    char peek_buf[200];
1090
0
    auto result = dynamic_cast<Network::ConnectionImpl*>(&connection())
1091
0
                      ->ioHandle()
1092
0
                      .recv(peek_buf, 200, MSG_PEEK);
1093
0
    ASSERT(result.ok() || result.err_->getErrorCode() == Api::IoError::IoErrorCode::Again);
1094
0
    if (!result.ok()) {
1095
0
      return false;
1096
0
    }
1097
0
    absl::string_view peek_data(peek_buf, result.return_value_);
1098
0
    size_t index = peek_data.find(data);
1099
0
    if (index != absl::string_view::npos) {
1100
0
      Buffer::OwnedImpl buffer;
1101
0
      *out = std::string(peek_data.data(), index + 4);
1102
0
      auto result = dynamic_cast<Network::ConnectionImpl*>(&connection())
1103
0
                        ->ioHandle()
1104
0
                        .recv(peek_buf, index + 4, 0);
1105
0
      return true;
1106
0
    }
1107
0
    return false;
1108
0
  };
1109
  // Because the connection must be read disabled to not auto-consume the
1110
  // underlying data, waitFor hangs with no events to force the time system to
1111
  // continue. Break it up into smaller chunks.
1112
0
  for (int i = 0; i < timeout / 10ms; ++i) {
1113
0
    if (time_system_.waitFor(lock_, absl::Condition(&reached), 10ms)) {
1114
0
      return AssertionSuccess();
1115
0
    }
1116
0
  }
1117
0
  return AssertionFailure() << "timed out waiting for raw data";
1118
0
}
1119
1120
0
void FakeHttpConnection::writeRawData(absl::string_view data) {
1121
0
  Buffer::OwnedImpl buffer(data);
1122
0
  Api::IoCallUint64Result result =
1123
0
      dynamic_cast<Network::ConnectionImpl*>(&connection())->ioHandle().write(buffer);
1124
0
  ASSERT(result.ok());
1125
0
}
1126
1127
0
AssertionResult FakeHttpConnection::postWriteRawData(std::string data) {
1128
0
  return shared_connection_.executeOnDispatcher(
1129
0
      [data](Network::Connection& connection) {
1130
0
        Buffer::OwnedImpl to_write(data);
1131
0
        connection.write(to_write, false);
1132
0
      },
1133
0
      TestUtility::DefaultTimeout);
1134
0
}
1135
1136
} // namespace Envoy