Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/test/integration/fake_upstream.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include <cstdint>
4
#include <list>
5
#include <memory>
6
#include <string>
7
8
#include "envoy/api/api.h"
9
#include "envoy/config/core/v3/base.pb.h"
10
#include "envoy/config/listener/v3/quic_config.pb.h"
11
#include "envoy/grpc/status.h"
12
#include "envoy/http/codec.h"
13
#include "envoy/network/connection.h"
14
#include "envoy/network/connection_handler.h"
15
#include "envoy/network/filter.h"
16
#include "envoy/network/listener.h"
17
#include "envoy/stats/scope.h"
18
19
#include "source/common/buffer/buffer_impl.h"
20
#include "source/common/buffer/zero_copy_input_stream_impl.h"
21
#include "source/common/common/basic_resource_impl.h"
22
#include "source/common/common/callback_impl.h"
23
#include "source/common/common/linked_object.h"
24
#include "source/common/common/lock_guard.h"
25
#include "source/common/common/thread.h"
26
#include "source/common/config/utility.h"
27
#include "source/common/grpc/codec.h"
28
#include "source/common/grpc/common.h"
29
#include "source/common/http/http1/codec_impl.h"
30
#include "source/common/http/http2/codec_impl.h"
31
#include "source/common/http/http3/codec_stats.h"
32
#include "source/common/network/connection_balancer_impl.h"
33
#include "source/common/network/filter_impl.h"
34
#include "source/common/network/listen_socket_impl.h"
35
#include "source/common/network/udp_listener_impl.h"
36
#include "source/common/network/udp_packet_writer_handler_impl.h"
37
#include "source/common/stats/isolated_store_impl.h"
38
39
#include "test/mocks/http/header_validator.h"
40
#include "test/mocks/protobuf/mocks.h"
41
#include "test/mocks/server/instance.h"
42
43
#if defined(ENVOY_ENABLE_QUIC)
44
#include "source/common/quic/active_quic_listener.h"
45
#include "source/common/quic/quic_stat_names.h"
46
#endif
47
48
#include "source/common/listener_manager/active_raw_udp_listener_config.h"
49
50
#include "test/mocks/common.h"
51
#include "test/mocks/runtime/mocks.h"
52
#include "test/mocks/server/overload_manager.h"
53
#include "test/test_common/test_time_system.h"
54
#include "test/test_common/utility.h"
55
56
// TODO(mattklein123): A lot of code should be moved from this header file into the cc file.
57
58
namespace Envoy {
59
60
class FakeHttpConnection;
61
class FakeUpstream;
62
63
/**
64
 * Provides a fake HTTP stream for integration testing.
65
 */
66
class FakeStream : public Http::RequestDecoder,
67
                   public Http::StreamCallbacks,
68
                   Logger::Loggable<Logger::Id::testing> {
69
public:
70
  FakeStream(FakeHttpConnection& parent, Http::ResponseEncoder& encoder,
71
             Event::TestTimeSystem& time_system);
72
73
0
  uint64_t bodyLength() {
74
0
    absl::MutexLock lock(&lock_);
75
0
    return body_.length();
76
0
  }
77
0
  Buffer::Instance& body() {
78
0
    absl::MutexLock lock(&lock_);
79
0
    return body_;
80
0
  }
81
0
  bool complete() {
82
0
    absl::MutexLock lock(&lock_);
83
0
    return end_stream_;
84
0
  }
85
86
  // Execute a callback using the dispatcher associated with the FakeStream's connection. This
87
  // allows execution of non-interrupted sequences of operations on the fake stream which may run
88
  // into trouble if client-side events are interleaved.
89
  void postToConnectionThread(std::function<void()> cb);
90
  void encode1xxHeaders(const Http::ResponseHeaderMap& headers);
91
  void encodeHeaders(const Http::HeaderMap& headers, bool end_stream);
92
  void encodeData(uint64_t size, bool end_stream);
93
  void encodeData(Buffer::Instance& data, bool end_stream);
94
  void encodeData(std::string data, bool end_stream);
95
  void encodeTrailers(const Http::HeaderMap& trailers);
96
  void encodeResetStream();
97
  void encodeMetadata(const Http::MetadataMapVector& metadata_map_vector);
98
  void readDisable(bool disable);
99
0
  const Http::RequestHeaderMap& headers() {
100
0
    absl::MutexLock lock(&lock_);
101
0
    return *headers_;
102
0
  }
103
0
  void setAddServedByHeader(bool add_header) { add_served_by_header_ = add_header; }
104
0
  const Http::RequestTrailerMapPtr& trailers() { return trailers_; }
105
0
  bool receivedData() { return received_data_; }
106
0
  Http::Http1StreamEncoderOptionsOptRef http1StreamEncoderOptions() {
107
0
    return encoder_.http1StreamEncoderOptions();
108
0
  }
109
  void
110
  sendLocalReply(Http::Code code, absl::string_view body,
111
                 const std::function<void(Http::ResponseHeaderMap& headers)>& /*modify_headers*/,
112
                 const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
113
0
                 absl::string_view /*details*/) override {
114
0
    bool is_head_request;
115
0
    {
116
0
      absl::MutexLock lock(&lock_);
117
0
      is_head_request = headers_ != nullptr &&
118
0
                        headers_->getMethodValue() == Http::Headers::get().MethodValues.Head;
119
0
    }
120
0
    Http::Utility::sendLocalReply(
121
0
        false,
122
0
        Http::Utility::EncodeFunctions(
123
0
            {nullptr, nullptr,
124
0
             [&](Http::ResponseHeaderMapPtr&& headers, bool end_stream) -> void {
125
0
               encoder_.encodeHeaders(*headers, end_stream);
126
0
             },
127
0
             [&](Buffer::Instance& data, bool end_stream) -> void {
128
0
               encoder_.encodeData(data, end_stream);
129
0
             }}),
130
0
        Http::Utility::LocalReplyData({false, code, body, grpc_status, is_head_request}));
131
0
  }
132
133
  ABSL_MUST_USE_RESULT
134
  testing::AssertionResult
135
  waitForHeadersComplete(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
136
137
  ABSL_MUST_USE_RESULT
138
  testing::AssertionResult
139
  waitForData(Event::Dispatcher& client_dispatcher, uint64_t body_length,
140
              std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
141
142
  ABSL_MUST_USE_RESULT
143
  testing::AssertionResult
144
  waitForData(Event::Dispatcher& client_dispatcher, absl::string_view body,
145
              std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
146
147
  using ValidatorFunction = const std::function<bool(const std::string&)>;
148
  ABSL_MUST_USE_RESULT
149
  testing::AssertionResult
150
  waitForData(Event::Dispatcher& client_dispatcher, const ValidatorFunction& data_validator,
151
              std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
152
153
  ABSL_MUST_USE_RESULT
154
  testing::AssertionResult
155
  waitForEndStream(Event::Dispatcher& client_dispatcher,
156
                   std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
157
158
  ABSL_MUST_USE_RESULT
159
  testing::AssertionResult
160
  waitForReset(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
161
162
  // gRPC convenience methods.
163
  void startGrpcStream(bool send_headers = true);
164
  void finishGrpcStream(Grpc::Status::GrpcStatus status);
165
172
  template <class T> void sendGrpcMessage(const T& message) {
166
172
    ASSERT(grpc_stream_started_,
167
172
           "start gRPC stream by calling startGrpcStream before sending a message");
168
172
    auto serialized_response = Grpc::Common::serializeToGrpcFrame(message);
169
172
    encodeData(*serialized_response, false);
170
172
    ENVOY_LOG(debug, "Sent gRPC message: {}", message.DebugString());
171
172
  }
void Envoy::FakeStream::sendGrpcMessage<envoy::service::discovery::v3::DeltaDiscoveryResponse>(envoy::service::discovery::v3::DeltaDiscoveryResponse const&)
Line
Count
Source
165
38
  template <class T> void sendGrpcMessage(const T& message) {
166
38
    ASSERT(grpc_stream_started_,
167
38
           "start gRPC stream by calling startGrpcStream before sending a message");
168
38
    auto serialized_response = Grpc::Common::serializeToGrpcFrame(message);
169
38
    encodeData(*serialized_response, false);
170
38
    ENVOY_LOG(debug, "Sent gRPC message: {}", message.DebugString());
171
38
  }
void Envoy::FakeStream::sendGrpcMessage<envoy::service::discovery::v3::DiscoveryResponse>(envoy::service::discovery::v3::DiscoveryResponse const&)
Line
Count
Source
165
134
  template <class T> void sendGrpcMessage(const T& message) {
166
134
    ASSERT(grpc_stream_started_,
167
134
           "start gRPC stream by calling startGrpcStream before sending a message");
168
134
    auto serialized_response = Grpc::Common::serializeToGrpcFrame(message);
169
134
    encodeData(*serialized_response, false);
170
134
    ENVOY_LOG(debug, "Sent gRPC message: {}", message.DebugString());
171
134
  }
172
278
  template <class T> void decodeGrpcFrame(T& message) {
173
278
    EXPECT_GE(decoded_grpc_frames_.size(), 1);
174
278
    if (decoded_grpc_frames_[0].length_ == 0) {
175
0
      decoded_grpc_frames_.erase(decoded_grpc_frames_.begin());
176
0
      return;
177
0
    }
178
278
    Buffer::ZeroCopyInputStreamImpl stream(std::move(decoded_grpc_frames_[0].data_));
179
278
    EXPECT_TRUE(decoded_grpc_frames_[0].flags_ == Grpc::GRPC_FH_DEFAULT);
180
278
    EXPECT_TRUE(message.ParseFromZeroCopyStream(&stream));
181
278
    ENVOY_LOG(debug, "Received gRPC message: {}", message.DebugString());
182
278
    decoded_grpc_frames_.erase(decoded_grpc_frames_.begin());
183
278
  }
void Envoy::FakeStream::decodeGrpcFrame<envoy::service::discovery::v3::DiscoveryRequest>(envoy::service::discovery::v3::DiscoveryRequest&)
Line
Count
Source
172
216
  template <class T> void decodeGrpcFrame(T& message) {
173
216
    EXPECT_GE(decoded_grpc_frames_.size(), 1);
174
216
    if (decoded_grpc_frames_[0].length_ == 0) {
175
0
      decoded_grpc_frames_.erase(decoded_grpc_frames_.begin());
176
0
      return;
177
0
    }
178
216
    Buffer::ZeroCopyInputStreamImpl stream(std::move(decoded_grpc_frames_[0].data_));
179
216
    EXPECT_TRUE(decoded_grpc_frames_[0].flags_ == Grpc::GRPC_FH_DEFAULT);
180
216
    EXPECT_TRUE(message.ParseFromZeroCopyStream(&stream));
181
216
    ENVOY_LOG(debug, "Received gRPC message: {}", message.DebugString());
182
216
    decoded_grpc_frames_.erase(decoded_grpc_frames_.begin());
183
216
  }
void Envoy::FakeStream::decodeGrpcFrame<envoy::service::discovery::v3::DeltaDiscoveryRequest>(envoy::service::discovery::v3::DeltaDiscoveryRequest&)
Line
Count
Source
172
62
  template <class T> void decodeGrpcFrame(T& message) {
173
62
    EXPECT_GE(decoded_grpc_frames_.size(), 1);
174
62
    if (decoded_grpc_frames_[0].length_ == 0) {
175
0
      decoded_grpc_frames_.erase(decoded_grpc_frames_.begin());
176
0
      return;
177
0
    }
178
62
    Buffer::ZeroCopyInputStreamImpl stream(std::move(decoded_grpc_frames_[0].data_));
179
62
    EXPECT_TRUE(decoded_grpc_frames_[0].flags_ == Grpc::GRPC_FH_DEFAULT);
180
62
    EXPECT_TRUE(message.ParseFromZeroCopyStream(&stream));
181
62
    ENVOY_LOG(debug, "Received gRPC message: {}", message.DebugString());
182
62
    decoded_grpc_frames_.erase(decoded_grpc_frames_.begin());
183
62
  }
184
  template <class T>
185
  ABSL_MUST_USE_RESULT testing::AssertionResult
186
  waitForGrpcMessage(Event::Dispatcher& client_dispatcher, T& message,
187
278
                     std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) {
188
278
    Event::TestTimeSystem::RealTimeBound bound(timeout);
189
278
    ENVOY_LOG(debug, "Waiting for gRPC message...");
190
278
    if (!decoded_grpc_frames_.empty()) {
191
101
      decodeGrpcFrame(message);
192
101
      return AssertionSuccess();
193
101
    }
194
177
    if (!waitForData(client_dispatcher, 5, timeout)) {
195
0
      return testing::AssertionFailure() << "Timed out waiting for start of gRPC message.";
196
0
    }
197
177
    int last_body_size = 0;
198
177
    {
199
177
      absl::MutexLock lock(&lock_);
200
177
      last_body_size = body_.length();
201
177
      if (!grpc_decoder_.decode(body_, decoded_grpc_frames_).ok()) {
202
0
        return testing::AssertionFailure()
203
0
               << "Couldn't decode gRPC data frame: " << body_.toString();
204
0
      }
205
177
    }
206
177
    if (decoded_grpc_frames_.empty()) {
207
0
      if (!waitForData(client_dispatcher, grpc_decoder_.length() - last_body_size,
208
0
                       bound.timeLeft())) {
209
0
        return testing::AssertionFailure() << "Timed out waiting for end of gRPC message.";
210
0
      }
211
0
      {
212
0
        absl::MutexLock lock(&lock_);
213
0
        if (!grpc_decoder_.decode(body_, decoded_grpc_frames_).ok()) {
214
0
          return testing::AssertionFailure()
215
0
                 << "Couldn't decode gRPC data frame: " << body_.toString();
216
0
        }
217
0
      }
218
0
    }
219
177
    decodeGrpcFrame(message);
220
177
    return AssertionSuccess();
221
177
  }
testing::AssertionResult Envoy::FakeStream::waitForGrpcMessage<envoy::service::discovery::v3::DiscoveryRequest>(Envoy::Event::Dispatcher&, envoy::service::discovery::v3::DiscoveryRequest&, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000l> >)
Line
Count
Source
187
216
                     std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) {
188
216
    Event::TestTimeSystem::RealTimeBound bound(timeout);
189
216
    ENVOY_LOG(debug, "Waiting for gRPC message...");
190
216
    if (!decoded_grpc_frames_.empty()) {
191
72
      decodeGrpcFrame(message);
192
72
      return AssertionSuccess();
193
72
    }
194
144
    if (!waitForData(client_dispatcher, 5, timeout)) {
195
0
      return testing::AssertionFailure() << "Timed out waiting for start of gRPC message.";
196
0
    }
197
144
    int last_body_size = 0;
198
144
    {
199
144
      absl::MutexLock lock(&lock_);
200
144
      last_body_size = body_.length();
201
144
      if (!grpc_decoder_.decode(body_, decoded_grpc_frames_).ok()) {
202
0
        return testing::AssertionFailure()
203
0
               << "Couldn't decode gRPC data frame: " << body_.toString();
204
0
      }
205
144
    }
206
144
    if (decoded_grpc_frames_.empty()) {
207
0
      if (!waitForData(client_dispatcher, grpc_decoder_.length() - last_body_size,
208
0
                       bound.timeLeft())) {
209
0
        return testing::AssertionFailure() << "Timed out waiting for end of gRPC message.";
210
0
      }
211
0
      {
212
0
        absl::MutexLock lock(&lock_);
213
0
        if (!grpc_decoder_.decode(body_, decoded_grpc_frames_).ok()) {
214
0
          return testing::AssertionFailure()
215
0
                 << "Couldn't decode gRPC data frame: " << body_.toString();
216
0
        }
217
0
      }
218
0
    }
219
144
    decodeGrpcFrame(message);
220
144
    return AssertionSuccess();
221
144
  }
testing::AssertionResult Envoy::FakeStream::waitForGrpcMessage<envoy::service::discovery::v3::DeltaDiscoveryRequest>(Envoy::Event::Dispatcher&, envoy::service::discovery::v3::DeltaDiscoveryRequest&, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000l> >)
Line
Count
Source
187
62
                     std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) {
188
62
    Event::TestTimeSystem::RealTimeBound bound(timeout);
189
62
    ENVOY_LOG(debug, "Waiting for gRPC message...");
190
62
    if (!decoded_grpc_frames_.empty()) {
191
29
      decodeGrpcFrame(message);
192
29
      return AssertionSuccess();
193
29
    }
194
33
    if (!waitForData(client_dispatcher, 5, timeout)) {
195
0
      return testing::AssertionFailure() << "Timed out waiting for start of gRPC message.";
196
0
    }
197
33
    int last_body_size = 0;
198
33
    {
199
33
      absl::MutexLock lock(&lock_);
200
33
      last_body_size = body_.length();
201
33
      if (!grpc_decoder_.decode(body_, decoded_grpc_frames_).ok()) {
202
0
        return testing::AssertionFailure()
203
0
               << "Couldn't decode gRPC data frame: " << body_.toString();
204
0
      }
205
33
    }
206
33
    if (decoded_grpc_frames_.empty()) {
207
0
      if (!waitForData(client_dispatcher, grpc_decoder_.length() - last_body_size,
208
0
                       bound.timeLeft())) {
209
0
        return testing::AssertionFailure() << "Timed out waiting for end of gRPC message.";
210
0
      }
211
0
      {
212
0
        absl::MutexLock lock(&lock_);
213
0
        if (!grpc_decoder_.decode(body_, decoded_grpc_frames_).ok()) {
214
0
          return testing::AssertionFailure()
215
0
                 << "Couldn't decode gRPC data frame: " << body_.toString();
216
0
        }
217
0
      }
218
0
    }
219
33
    decodeGrpcFrame(message);
220
33
    return AssertionSuccess();
221
33
  }
222
223
  // Http::StreamDecoder
224
  void decodeData(Buffer::Instance& data, bool end_stream) override;
225
  void decodeMetadata(Http::MetadataMapPtr&& metadata_map_ptr) override;
226
227
  // Http::RequestDecoder
228
  void decodeHeaders(Http::RequestHeaderMapSharedPtr&& headers, bool end_stream) override;
229
  void decodeTrailers(Http::RequestTrailerMapPtr&& trailers) override;
230
0
  StreamInfo::StreamInfo& streamInfo() override {
231
0
    RELEASE_ASSERT(false, "initialize if this is needed");
232
0
    return *stream_info_;
233
0
  }
234
0
  std::list<AccessLog::InstanceSharedPtr> accessLogHandlers() override {
235
0
    return access_log_handlers_;
236
0
  }
237
238
  // Http::StreamCallbacks
239
  void onResetStream(Http::StreamResetReason reason,
240
                     absl::string_view transport_failure_reason) override;
241
0
  void onAboveWriteBufferHighWatermark() override {}
242
0
  void onBelowWriteBufferLowWatermark() override {}
243
244
1.44k
  virtual void setEndStream(bool end) ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { end_stream_ = end; }
245
246
0
  Event::TestTimeSystem& timeSystem() { return time_system_; }
247
248
0
  Http::MetadataMap& metadataMap() { return metadata_map_; }
249
0
  absl::node_hash_map<std::string, uint64_t>& duplicatedMetadataKeyCount() {
250
0
    return duplicated_metadata_key_count_;
251
0
  }
252
253
protected:
254
  absl::Mutex lock_;
255
  Http::RequestHeaderMapSharedPtr headers_ ABSL_GUARDED_BY(lock_);
256
  Buffer::OwnedImpl body_ ABSL_GUARDED_BY(lock_);
257
  FakeHttpConnection& parent_;
258
259
private:
260
  Http::ResponseEncoder& encoder_;
261
  Http::RequestTrailerMapPtr trailers_ ABSL_GUARDED_BY(lock_);
262
  bool end_stream_ ABSL_GUARDED_BY(lock_){};
263
  bool saw_reset_ ABSL_GUARDED_BY(lock_){};
264
  Grpc::Decoder grpc_decoder_;
265
  std::vector<Grpc::Frame> decoded_grpc_frames_;
266
  bool add_served_by_header_{};
267
  Event::TestTimeSystem& time_system_;
268
  Http::MetadataMap metadata_map_;
269
  absl::node_hash_map<std::string, uint64_t> duplicated_metadata_key_count_;
270
  std::shared_ptr<StreamInfo::StreamInfo> stream_info_;
271
  std::list<AccessLog::InstanceSharedPtr> access_log_handlers_;
272
  bool received_data_{false};
273
  bool grpc_stream_started_{false};
274
  Http::ServerHeaderValidatorPtr header_validator_;
275
};
276
277
using FakeStreamPtr = std::unique_ptr<FakeStream>;
278
279
// Encapsulates various state and functionality related to sharing a Connection object across
280
// threads. With FakeUpstream fabricated objects, we have a Connection that is associated with a
281
// dispatcher on a thread managed by FakeUpstream. We want to be able to safely invoke methods on
282
// this object from other threads (e.g. the main test thread) and be able to track connection state
283
// (e.g. are we disconnected and the Connection is now possibly deleted). We manage this via a
284
// SharedConnectionWrapper that lives from when the Connection is added to the accepted connection
285
// queue and then through the lifetime of the Fake{Raw,Http}Connection that manages the Connection
286
// through active use.
287
class SharedConnectionWrapper : public Network::ConnectionCallbacks,
288
                                public LinkedObject<SharedConnectionWrapper> {
289
public:
290
  using DisconnectCallback = std::function<void()>;
291
292
  SharedConnectionWrapper(Network::Connection& connection)
293
495
      : connection_(connection), dispatcher_(connection_.dispatcher()) {
294
495
    connection_.addConnectionCallbacks(*this);
295
495
  }
296
297
  // Network::ConnectionCallbacks
298
495
  void onEvent(Network::ConnectionEvent event) override {
299
    // Throughout this entire function, we know that the connection_ cannot disappear, since this
300
    // callback is invoked prior to connection_ deferred delete. We also know by locking below,
301
    // that elsewhere where we also hold lock_, that the connection cannot disappear inside the
302
    // locked scope.
303
495
    absl::MutexLock lock(&lock_);
304
495
    if (event == Network::ConnectionEvent::RemoteClose ||
305
495
        event == Network::ConnectionEvent::LocalClose) {
306
495
      if (connection_.detectedCloseType() == Network::DetectedCloseType::RemoteReset ||
307
495
          connection_.detectedCloseType() == Network::DetectedCloseType::LocalReset) {
308
19
        rst_disconnected_ = true;
309
19
      }
310
495
      disconnected_ = true;
311
495
    }
312
495
  }
313
314
0
  void onAboveWriteBufferHighWatermark() override {}
315
0
  void onBelowWriteBufferLowWatermark() override {}
316
317
679
  Event::Dispatcher& dispatcher() { return dispatcher_; }
318
319
10.6k
  bool connected() {
320
10.6k
    absl::MutexLock lock(&lock_);
321
10.6k
    return connectedLockHeld();
322
10.6k
  }
323
324
10.7k
  bool connectedLockHeld() {
325
10.7k
    lock_.AssertReaderHeld(); // TODO(mattklein123): This can't be annotated because the lock
326
                              // is acquired via the base connection reference. Fix this to
327
                              // remove the reference.
328
10.7k
    return !disconnected_;
329
10.7k
  }
330
331
0
  bool rstDisconnected() {
332
0
    lock_.AssertReaderHeld();
333
0
    return rst_disconnected_;
334
0
  }
335
336
  // This provides direct access to the underlying connection, but only to const methods.
337
  // Stateful connection related methods should happen on the connection's dispatcher via
338
  // executeOnDispatcher.
339
  // thread safety violations when crossing between the test thread and FakeUpstream thread.
340
1.08k
  Network::Connection& connection() const { return connection_; }
341
342
  // Execute some function on the connection's dispatcher. This involves a cross-thread post and
343
  // wait-for-completion. If the connection is disconnected, either prior to post or when the
344
  // dispatcher schedules the callback, we silently ignore.
345
  ABSL_MUST_USE_RESULT
346
  testing::AssertionResult
347
  executeOnDispatcher(std::function<void(Network::Connection&)> f,
348
                      std::chrono::milliseconds timeout = TestUtility::DefaultTimeout,
349
3.75k
                      bool allow_disconnects = true) {
350
3.75k
    absl::MutexLock lock(&lock_);
351
3.75k
    if (disconnected_) {
352
89
      return testing::AssertionSuccess();
353
89
    }
354
    // Sanity check: detect if the post and wait is attempted from the dispatcher thread; fail
355
    // immediately instead of deadlocking.
356
3.67k
    ASSERT(!connection_.dispatcher().isThreadSafe(),
357
3.67k
           "deadlock: executeOnDispatcher called from dispatcher thread.");
358
3.67k
    bool callback_ready_event = false;
359
3.67k
    bool unexpected_disconnect = false;
360
3.67k
    connection_.dispatcher().post(
361
3.67k
        [this, f, &lock = lock_, &callback_ready_event, &unexpected_disconnect]() -> void {
362
          // The use of connected() here, vs. !disconnected_, is because we want to use the lock_
363
          // acquisition to briefly serialize. This avoids us entering this completion and issuing
364
          // a notifyOne() until the wait() is ready to receive it below.
365
3.67k
          if (connected()) {
366
3.62k
            f(connection_);
367
3.62k
          } else {
368
44
            unexpected_disconnect = true;
369
44
          }
370
3.67k
          absl::MutexLock lock_guard(&lock);
371
3.67k
          callback_ready_event = true;
372
3.67k
        });
373
3.67k
    Event::TestTimeSystem& time_system =
374
3.67k
        dynamic_cast<Event::TestTimeSystem&>(connection_.dispatcher().timeSource());
375
3.67k
    if (!time_system.waitFor(lock_, absl::Condition(&callback_ready_event), timeout)) {
376
0
      return testing::AssertionFailure() << "Timed out while executing on dispatcher.";
377
0
    }
378
3.67k
    if (unexpected_disconnect && !allow_disconnects) {
379
0
      ENVOY_LOG_MISC(warn, "executeOnDispatcher failed due to disconnect\n");
380
0
    }
381
3.67k
    return testing::AssertionSuccess();
382
3.67k
  }
383
384
493
  absl::Mutex& lock() { return lock_; }
385
386
100
  void setParented() {
387
100
    absl::MutexLock lock(&lock_);
388
100
    ASSERT(!parented_);
389
100
    parented_ = true;
390
100
  }
391
392
private:
393
  Network::Connection& connection_;
394
  Event::Dispatcher& dispatcher_;
395
  absl::Mutex lock_;
396
  bool parented_ ABSL_GUARDED_BY(lock_){};
397
  bool disconnected_ ABSL_GUARDED_BY(lock_){};
398
  bool rst_disconnected_ ABSL_GUARDED_BY(lock_){};
399
};
400
401
using SharedConnectionWrapperPtr = std::unique_ptr<SharedConnectionWrapper>;
402
403
/**
404
 * Base class for both fake raw connections and fake HTTP connections.
405
 */
406
class FakeConnectionBase : public Logger::Loggable<Logger::Id::testing> {
407
public:
408
493
  virtual ~FakeConnectionBase() {
409
493
    absl::MutexLock lock(&lock_);
410
493
    ASSERT(initialized_);
411
493
    ASSERT(pending_cbs_ == 0);
412
493
  }
413
414
  ABSL_MUST_USE_RESULT
415
  testing::AssertionResult close(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
416
417
  ABSL_MUST_USE_RESULT
418
  testing::AssertionResult close(Network::ConnectionCloseType close_type,
419
                                 std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
420
421
  ABSL_MUST_USE_RESULT
422
  testing::AssertionResult
423
  readDisable(bool disable, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
424
425
  ABSL_MUST_USE_RESULT
426
  testing::AssertionResult
427
  waitForDisconnect(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
428
429
  ABSL_MUST_USE_RESULT
430
  testing::AssertionResult
431
  waitForRstDisconnect(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
432
433
  ABSL_MUST_USE_RESULT
434
  testing::AssertionResult
435
  waitForHalfClose(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
436
437
493
  virtual void initialize() {
438
493
    absl::MutexLock lock(&lock_);
439
493
    initialized_ = true;
440
493
  }
441
442
  // Some upstream connection are supposed to be alive forever.
443
  ABSL_MUST_USE_RESULT
444
  testing::AssertionResult virtual waitForNoPost(
445
      std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
446
447
  // The same caveats apply here as in SharedConnectionWrapper::connection().
448
86
  Network::Connection& connection() const { return shared_connection_.connection(); }
449
6.67k
  bool connected() const { return shared_connection_.connected(); }
450
451
  void postToConnectionThread(std::function<void()> cb);
452
0
  SharedConnectionWrapper& sharedConnection() { return shared_connection_; }
453
454
protected:
455
  FakeConnectionBase(SharedConnectionWrapper& shared_connection, Event::TestTimeSystem& time_system)
456
      : shared_connection_(shared_connection), lock_(shared_connection.lock()),
457
493
        dispatcher_(shared_connection_.dispatcher()), time_system_(time_system) {}
458
459
  SharedConnectionWrapper& shared_connection_;
460
  absl::Mutex& lock_; // TODO(mattklein123): Use the shared connection lock and figure out better
461
                      // guarded by annotations.
462
  Event::Dispatcher& dispatcher_;
463
  bool initialized_ ABSL_GUARDED_BY(lock_){};
464
  bool half_closed_ ABSL_GUARDED_BY(lock_){};
465
  std::atomic<uint64_t> pending_cbs_{};
466
  Event::TestTimeSystem& time_system_;
467
};
468
469
/**
470
 * Provides a fake HTTP connection for integration testing.
471
 */
472
class FakeHttpConnection : public Http::ServerConnectionCallbacks, public FakeConnectionBase {
473
public:
474
  // This is a legacy alias.
475
  using Type = Envoy::Http::CodecType;
476
0
  static absl::string_view typeToString(Http::CodecType type) {
477
0
    switch (type) {
478
0
    case Http::CodecType::HTTP1:
479
0
      return "http1";
480
0
    case Http::CodecType::HTTP2:
481
0
      return "http2";
482
0
    case Http::CodecType::HTTP3:
483
0
      return "http3";
484
0
    }
485
0
    return "invalid";
486
0
  }
487
488
  FakeHttpConnection(FakeUpstream& fake_upstream, SharedConnectionWrapper& shared_connection,
489
                     Http::CodecType type, Event::TestTimeSystem& time_system,
490
                     uint32_t max_request_headers_kb, uint32_t max_request_headers_count,
491
                     envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction
492
                         headers_with_underscores_action);
493
494
  ABSL_MUST_USE_RESULT
495
  testing::AssertionResult
496
  waitForNewStream(Event::Dispatcher& client_dispatcher, FakeStreamPtr& stream,
497
                   std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
498
499
  // Http::ServerConnectionCallbacks
500
  Http::RequestDecoder& newStream(Http::ResponseEncoder& response_encoder, bool) override;
501
  // Should only be called for HTTP2 or above
502
  void onGoAway(Http::GoAwayErrorCode code) override;
503
504
  // Should only be called for HTTP2 or above, sends a GOAWAY frame with NO_ERROR.
505
  void encodeGoAway();
506
507
  // Should only be called for HTTP2 or above, sends a GOAWAY frame with ENHANCE_YOUR_CALM.
508
  void encodeProtocolError();
509
510
  // Update the maximum number of concurrent streams.
511
  void updateConcurrentStreams(uint64_t max_streams);
512
513
  ABSL_MUST_USE_RESULT
514
  testing::AssertionResult
515
  waitForInexactRawData(const char* data, std::string* out = nullptr,
516
                        std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
517
518
  void writeRawData(absl::string_view data);
519
  ABSL_MUST_USE_RESULT AssertionResult postWriteRawData(std::string data);
520
521
  Http::ServerHeaderValidatorPtr makeHeaderValidator();
522
0
  Http::CodecType type() const { return type_; }
523
524
private:
525
  struct ReadFilter : public Network::ReadFilterBaseImpl {
526
407
    ReadFilter(FakeHttpConnection& parent) : parent_(parent) {}
527
528
    // Network::ReadFilter
529
1.37k
    Network::FilterStatus onData(Buffer::Instance& data, bool) override {
530
1.37k
      Http::Status status = parent_.codec_->dispatch(data);
531
532
1.37k
      if (Http::isCodecProtocolError(status)) {
533
0
        ENVOY_LOG(debug, "FakeUpstream dispatch error: {}", status.message());
534
        // We don't do a full stream shutdown like HCM, but just shutdown the
535
        // connection for now.
536
0
        read_filter_callbacks_->connection().close(
537
0
            Network::ConnectionCloseType::FlushWriteAndDelay);
538
0
      }
539
1.37k
      return Network::FilterStatus::StopIteration;
540
1.37k
    }
541
542
    void
543
407
    initializeReadFilterCallbacks(Network::ReadFilterCallbacks& read_filter_callbacks) override {
544
407
      read_filter_callbacks_ = &read_filter_callbacks;
545
407
    }
546
547
    Network::ReadFilterCallbacks* read_filter_callbacks_{};
548
    FakeHttpConnection& parent_;
549
  };
550
551
  const Http::CodecType type_;
552
  Http::ServerConnectionPtr codec_;
553
  std::list<FakeStreamPtr> new_streams_ ABSL_GUARDED_BY(lock_);
554
  testing::NiceMock<Server::MockOverloadManager> overload_manager_;
555
  testing::NiceMock<Random::MockRandomGenerator> random_;
556
  testing::NiceMock<Http::MockHeaderValidatorStats> header_validator_stats_;
557
  Http::HeaderValidatorFactoryPtr header_validator_factory_;
558
};
559
560
using FakeHttpConnectionPtr = std::unique_ptr<FakeHttpConnection>;
561
562
/**
563
 * Fake raw connection for integration testing.
564
 */
565
class FakeRawConnection : public FakeConnectionBase {
566
public:
567
  FakeRawConnection(SharedConnectionWrapper& shared_connection, Event::TestTimeSystem& time_system)
568
86
      : FakeConnectionBase(shared_connection, time_system) {}
569
  using ValidatorFunction = const std::function<bool(const std::string&)>;
570
  ~FakeRawConnection() override;
571
572
  // Writes to data. If data is nullptr, discards the received data.
573
  ABSL_MUST_USE_RESULT
574
  testing::AssertionResult
575
  waitForData(uint64_t num_bytes, std::string* data = nullptr,
576
              std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
577
578
  // Wait until data_validator returns true.
579
  // example usage:
580
  // std::string data;
581
  // ASSERT_TRUE(waitForData(FakeRawConnection::waitForInexactMatch("foo"), &data));
582
  // EXPECT_EQ(data, "foobar");
583
  ABSL_MUST_USE_RESULT
584
  testing::AssertionResult
585
  waitForData(const ValidatorFunction& data_validator, std::string* data = nullptr,
586
              std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
587
588
  ABSL_MUST_USE_RESULT
589
  testing::AssertionResult write(const std::string& data, bool end_stream = false,
590
                                 std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
591
592
  void initialize() override;
593
594
  // Creates a ValidatorFunction which returns true when data_to_wait_for is
595
  // contained in the incoming data string. Unlike many of Envoy waitFor functions,
596
  // it does not expect an exact match, simply the presence of data_to_wait_for.
597
0
  static ValidatorFunction waitForInexactMatch(const char* data_to_wait_for) {
598
0
    return [data_to_wait_for](const std::string& data) -> bool {
599
0
      return data.find(data_to_wait_for) != std::string::npos;
600
0
    };
601
0
  }
602
603
  // Creates a ValidatorFunction which returns true when data_to_wait_for
604
  // equals the incoming data string.
605
0
  static ValidatorFunction waitForMatch(const char* data_to_wait_for) {
606
0
    return [data_to_wait_for](const std::string& data) -> bool { return data == data_to_wait_for; };
607
0
  }
608
609
  // Creates a ValidatorFunction which returns true when data_to_wait_for is
610
  // contains at least bytes_read bytes.
611
0
  static ValidatorFunction waitForAtLeastBytes(uint32_t bytes) {
612
0
    return [bytes](const std::string& data) -> bool { return data.size() >= bytes; };
613
0
  }
614
615
0
  void clearData() {
616
0
    absl::MutexLock lock(&lock_);
617
0
    data_.clear();
618
0
  }
619
620
private:
621
  struct ReadFilter : public Network::ReadFilterBaseImpl {
622
86
    ReadFilter(FakeRawConnection& parent) : parent_(parent) {}
623
624
    // Network::ReadFilter
625
    Network::FilterStatus onData(Buffer::Instance& data, bool) override;
626
627
    FakeRawConnection& parent_;
628
  };
629
630
  std::string data_ ABSL_GUARDED_BY(lock_);
631
  std::shared_ptr<Network::ReadFilter> read_filter_;
632
};
633
634
using FakeRawConnectionPtr = std::unique_ptr<FakeRawConnection>;
635
636
struct FakeUpstreamConfig {
637
  struct UdpConfig {
638
    absl::optional<uint64_t> max_rx_datagram_size_;
639
  };
640
641
1.97k
  FakeUpstreamConfig(Event::TestTimeSystem& time_system) : time_system_(time_system) {
642
1.97k
    http2_options_ = ::Envoy::Http2::Utility::initializeAndValidateOptions(http2_options_).value();
643
    // Legacy options which are always set.
644
1.97k
    http2_options_.set_allow_connect(true);
645
1.97k
    http2_options_.set_allow_metadata(true);
646
1.97k
    http3_options_.set_allow_extended_connect(true);
647
1.97k
    http3_options_.set_allow_metadata(true);
648
1.97k
  }
649
650
  Event::TestTimeSystem& time_system_;
651
  Http::CodecType upstream_protocol_{Http::CodecType::HTTP1};
652
  bool enable_half_close_{};
653
  absl::optional<UdpConfig> udp_fake_upstream_;
654
  envoy::config::core::v3::Http2ProtocolOptions http2_options_;
655
  envoy::config::core::v3::Http3ProtocolOptions http3_options_;
656
  envoy::config::listener::v3::QuicProtocolOptions quic_options_;
657
  uint32_t max_request_headers_kb_ = Http::DEFAULT_MAX_REQUEST_HEADERS_KB;
658
  uint32_t max_request_headers_count_ = Http::DEFAULT_MAX_HEADERS_COUNT;
659
  envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction
660
      headers_with_underscores_action_ = envoy::config::core::v3::HttpProtocolOptions::ALLOW;
661
};
662
663
/**
664
 * Provides a fake upstream server for integration testing.
665
 */
666
class FakeUpstream : Logger::Loggable<Logger::Id::testing>,
667
                     public Network::FilterChainManager,
668
                     public Network::FilterChainFactory {
669
public:
670
  // Creates a fake upstream bound to the specified unix domain socket path.
671
  FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory,
672
               const std::string& uds_path, const FakeUpstreamConfig& config);
673
674
  // Creates a fake upstream bound to the specified |address|.
675
  FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory,
676
               const Network::Address::InstanceConstSharedPtr& address,
677
               const FakeUpstreamConfig& config);
678
679
  // Creates a fake upstream bound to INADDR_ANY and the specified `port`.
680
  // Set `defer_initialization` to true if you want the FakeUpstream to not immediately listen for
681
  // incoming connections, and instead want to control when the FakeUpstream is available for
682
  // listening. If `defer_initialization` is set to true, call initializeServer() before invoking
683
  // any other functions in this class.
684
  FakeUpstream(uint32_t port, Network::Address::IpVersion version, const FakeUpstreamConfig& config,
685
               bool defer_initialization = false);
686
687
  FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory,
688
               uint32_t port, Network::Address::IpVersion version,
689
               const FakeUpstreamConfig& config);
690
  ~FakeUpstream() override;
691
692
  // Initializes the FakeUpstream's server.
693
  void initializeServer();
694
695
  // Returns true if the server has been initialized, i.e. that initializeServer() executed
696
  // successfully. Returns false otherwise.
697
0
  bool isInitialized() { return initialized_; }
698
699
0
  Http::CodecType httpType() { return http_type_; }
700
701
  // Returns the new connection via the connection argument.
702
  ABSL_MUST_USE_RESULT
703
  testing::AssertionResult
704
  waitForHttpConnection(Event::Dispatcher& client_dispatcher, FakeHttpConnectionPtr& connection,
705
                        std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
706
707
  ABSL_MUST_USE_RESULT
708
  testing::AssertionResult assertPendingConnectionsEmpty();
709
710
  ABSL_MUST_USE_RESULT
711
  testing::AssertionResult
712
  waitForRawConnection(FakeRawConnectionPtr& connection,
713
                       std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
714
4.00k
  Network::Address::InstanceConstSharedPtr localAddress() const {
715
4.00k
    return socket_->connectionInfoProvider().localAddress();
716
4.00k
  }
717
718
  void convertFromRawToHttp(FakeRawConnectionPtr& raw_connection,
719
                            FakeHttpConnectionPtr& connection);
720
721
  virtual std::unique_ptr<FakeRawConnection>
722
  makeRawConnection(SharedConnectionWrapper& shared_connection,
723
86
                    Event::TestTimeSystem& time_system) {
724
86
    return std::make_unique<FakeRawConnection>(shared_connection, time_system);
725
86
  }
726
727
  // Wait for one of the upstreams to receive a connection
728
  static absl::StatusOr<int>
729
  waitForHttpConnection(Event::Dispatcher& client_dispatcher,
730
                        std::vector<std::unique_ptr<FakeUpstream>>& upstreams,
731
                        FakeHttpConnectionPtr& connection,
732
                        std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
733
734
  // Waits for 1 UDP datagram to be received.
735
  ABSL_MUST_USE_RESULT
736
  testing::AssertionResult
737
  waitForUdpDatagram(Network::UdpRecvData& data_to_fill,
738
                     std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
739
740
  // Send a UDP datagram on the fake upstream thread.
741
  void sendUdpDatagram(const std::string& buffer,
742
                       const Network::Address::InstanceConstSharedPtr& peer);
743
744
  // Network::FilterChainManager
745
  const Network::FilterChain* findFilterChain(const Network::ConnectionSocket&,
746
495
                                              const StreamInfo::StreamInfo&) const override {
747
495
    return filter_chain_.get();
748
495
  }
749
750
  // Network::FilterChainFactory
751
  bool
752
  createNetworkFilterChain(Network::Connection& connection,
753
                           const Filter::NetworkFilterFactoriesList& filter_factories) override;
754
  bool createListenerFilterChain(Network::ListenerFilterManager& listener) override;
755
  void createUdpListenerFilterChain(Network::UdpListenerFilterManager& udp_listener,
756
                                    Network::UdpReadFilterCallbacks& callbacks) override;
757
  bool createQuicListenerFilterChain(Network::QuicListenerFilterManager& listener) override;
758
759
0
  void setReadDisableOnNewConnection(bool value) { read_disable_on_new_connection_ = value; }
760
0
  void setDisableAllAndDoNotEnable(bool value) { disable_and_do_not_enable_ = value; }
761
1.26k
  Event::TestTimeSystem& timeSystem() { return time_system_; }
762
763
  // Stops the dispatcher loop and joins the listening thread.
764
  void cleanUp();
765
766
0
  Http::Http1::CodecStats& http1CodecStats() {
767
0
    return Http::Http1::CodecStats::atomicGet(http1_codec_stats_, *stats_scope_);
768
0
  }
769
770
407
  Http::Http2::CodecStats& http2CodecStats() {
771
407
    return Http::Http2::CodecStats::atomicGet(http2_codec_stats_, *stats_scope_);
772
407
  }
773
774
0
  Http::Http3::CodecStats& http3CodecStats() {
775
0
    return Http::Http3::CodecStats::atomicGet(http3_codec_stats_, *stats_scope_);
776
0
  }
777
778
  // Write into the outbound buffer of the network connection at the specified index.
779
  // Note: that this write bypasses any processing by the upstream codec.
780
  ABSL_MUST_USE_RESULT
781
  testing::AssertionResult
782
  rawWriteConnection(uint32_t index, const std::string& data, bool end_stream = false,
783
                     std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
784
785
407
  const envoy::config::core::v3::Http2ProtocolOptions& http2Options() { return http2_options_; }
786
0
  const envoy::config::core::v3::Http3ProtocolOptions& http3Options() { return http3_options_; }
787
788
0
  Event::DispatcherPtr& dispatcher() { return dispatcher_; }
789
0
  absl::Mutex& lock() { return lock_; }
790
791
  void runOnDispatcherThread(std::function<void()> cb);
792
793
protected:
794
786
  const FakeUpstreamConfig& config() const { return config_; }
795
796
  Stats::IsolatedStoreImpl stats_store_;
797
  const Http::CodecType http_type_;
798
799
private:
800
  FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory,
801
               Network::SocketPtr&& connection, const FakeUpstreamConfig& config,
802
               bool defer_initialization = false);
803
804
  class FakeListenSocketFactory : public Network::ListenSocketFactory {
805
  public:
806
1.99k
    FakeListenSocketFactory(Network::SocketSharedPtr socket) : socket_(socket) {}
807
808
    // Network::ListenSocketFactory
809
1.99k
    Network::Socket::Type socketType() const override { return socket_->socketType(); }
810
1.99k
    const Network::Address::InstanceConstSharedPtr& localAddress() const override {
811
1.99k
      return socket_->connectionInfoProvider().localAddress();
812
1.99k
    }
813
1.99k
    Network::SocketSharedPtr getListenSocket(uint32_t) override { return socket_; }
814
0
    Network::ListenSocketFactoryPtr clone() const override { return nullptr; }
815
0
    void closeAllSockets() override {}
816
    absl::Status doFinalPreWorkerInit() override;
817
818
  private:
819
    Network::SocketSharedPtr socket_;
820
  };
821
822
  class FakeUdpFilter : public Network::UdpListenerReadFilter {
823
  public:
824
    FakeUdpFilter(FakeUpstream& parent, Network::UdpReadFilterCallbacks& callbacks)
825
0
        : UdpListenerReadFilter(callbacks), parent_(parent) {}
826
827
    // Network::UdpListenerReadFilter
828
0
    Network::FilterStatus onData(Network::UdpRecvData& data) override {
829
0
      return parent_.onRecvDatagram(data);
830
0
    }
831
0
    Network::FilterStatus onReceiveError(Api::IoError::IoErrorCode) override {
832
0
      PANIC("not implemented");
833
0
    }
834
835
  private:
836
    FakeUpstream& parent_;
837
  };
838
839
  class FakeListener : public Network::ListenerConfig {
840
  public:
841
    struct UdpListenerConfigImpl : public Network::UdpListenerConfig {
842
      UdpListenerConfigImpl()
843
          : writer_factory_(std::make_unique<Network::UdpDefaultWriterFactory>()),
844
1.99k
            listener_worker_router_(1) {}
845
846
      // Network::UdpListenerConfig
847
0
      Network::ActiveUdpListenerFactory& listenerFactory() override { return *listener_factory_; }
848
0
      Network::UdpPacketWriterFactory& packetWriterFactory() override { return *writer_factory_; }
849
      Network::UdpListenerWorkerRouter&
850
0
      listenerWorkerRouter(const Network::Address::Instance&) override {
851
0
        return listener_worker_router_;
852
0
      }
853
0
      const envoy::config::listener::v3::UdpListenerConfig& config() override { return config_; }
854
855
      envoy::config::listener::v3::UdpListenerConfig config_;
856
      std::unique_ptr<Network::ActiveUdpListenerFactory> listener_factory_;
857
      std::unique_ptr<Network::UdpPacketWriterFactory> writer_factory_;
858
      Network::UdpListenerWorkerRouterImpl listener_worker_router_;
859
    };
860
861
    FakeListener(FakeUpstream& parent, bool is_quic = false)
862
        : parent_(parent), name_("fake_upstream"), init_manager_(nullptr),
863
1.99k
          listener_info_(std::make_shared<testing::NiceMock<Network::MockListenerInfo>>()) {
864
1.99k
      if (is_quic) {
865
0
#if defined(ENVOY_ENABLE_QUIC)
866
0
        if (context_ == nullptr) {
867
          // Only initialize this when needed to avoid slowing down non-QUIC integration tests.
868
0
          context_ = std::make_unique<
869
0
              testing::NiceMock<Server::Configuration::MockServerFactoryContext>>();
870
0
        }
871
0
        udp_listener_config_.listener_factory_ = std::make_unique<Quic::ActiveQuicListenerFactory>(
872
0
            parent_.quic_options_, 1, parent_.quic_stat_names_, parent_.validation_visitor_,
873
0
            *context_);
874
        // Initialize QUICHE flags.
875
0
        quiche::FlagRegistry::getInstance();
876
#else
877
        ASSERT(false, "Running a test that requires QUIC without compiling QUIC");
878
#endif
879
1.99k
      } else {
880
1.99k
        udp_listener_config_.listener_factory_ =
881
1.99k
            std::make_unique<Server::ActiveRawUdpListenerFactory>(1);
882
1.99k
      }
883
1.99k
    }
884
885
    UdpListenerConfigImpl udp_listener_config_;
886
887
  private:
888
    // Network::ListenerConfig
889
495
    Network::FilterChainManager& filterChainManager() override { return parent_; }
890
990
    Network::FilterChainFactory& filterChainFactory() override { return parent_; }
891
3.98k
    std::vector<Network::ListenSocketFactoryPtr>& listenSocketFactories() override {
892
3.98k
      return parent_.socket_factories_;
893
3.98k
    }
894
1.99k
    bool bindToPort() const override { return true; }
895
495
    bool handOffRestoredDestinationConnections() const override { return false; }
896
495
    uint32_t perConnectionBufferLimitBytes() const override { return 0; }
897
1.99k
    std::chrono::milliseconds listenerFiltersTimeout() const override { return {}; }
898
1.99k
    bool continueOnListenerFiltersTimeout() const override { return false; }
899
31.8k
    Stats::Scope& listenerScope() override { return *parent_.stats_store_.rootScope(); }
900
5.97k
    uint64_t listenerTag() const override { return 0; }
901
0
    const std::string& name() const override { return name_; }
902
0
    Network::UdpListenerConfigOptRef udpListenerConfig() override { return udp_listener_config_; }
903
1.99k
    Network::InternalListenerConfigOptRef internalListenerConfig() override { return {}; }
904
1.99k
    Network::ConnectionBalancer& connectionBalancer(const Network::Address::Instance&) override {
905
1.99k
      return connection_balancer_;
906
1.99k
    }
907
5.97k
    bool shouldBypassOverloadManager() const override { return false; }
908
495
    const std::vector<AccessLog::InstanceSharedPtr>& accessLogs() const override {
909
495
      return empty_access_logs_;
910
495
    }
911
495
    const Network::ListenerInfoConstSharedPtr& listenerInfo() const override {
912
495
      return listener_info_;
913
495
    }
914
1.48k
    ResourceLimit& openConnections() override { return connection_resource_; }
915
0
    uint32_t tcpBacklogSize() const override { return ENVOY_TCP_BACKLOG_SIZE; }
916
1.99k
    uint32_t maxConnectionsToAcceptPerSocketEvent() const override {
917
1.99k
      return Network::DefaultMaxConnectionsToAcceptPerSocketEvent;
918
1.99k
    }
919
0
    Init::Manager& initManager() override { return *init_manager_; }
920
1.99k
    bool ignoreGlobalConnLimit() const override { return false; }
921
922
0
    void setMaxConnections(const uint32_t num_connections) {
923
0
      connection_resource_.setMax(num_connections);
924
0
    }
925
0
    void clearMaxConnections() { connection_resource_.resetMax(); }
926
927
    FakeUpstream& parent_;
928
    const std::string name_;
929
    Network::NopConnectionBalancerImpl connection_balancer_;
930
    BasicResourceLimitImpl connection_resource_;
931
    const std::vector<AccessLog::InstanceSharedPtr> empty_access_logs_;
932
    std::unique_ptr<Init::Manager> init_manager_;
933
    const Network::ListenerInfoConstSharedPtr listener_info_;
934
    std::unique_ptr<Server::Configuration::MockServerFactoryContext> context_;
935
  };
936
937
  void threadRoutine();
938
  SharedConnectionWrapper& consumeConnection() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_);
939
  Network::FilterStatus onRecvDatagram(Network::UdpRecvData& data);
940
  AssertionResult
941
  runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb,
942
                               std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
943
944
  const envoy::config::core::v3::Http2ProtocolOptions http2_options_;
945
  const envoy::config::core::v3::Http3ProtocolOptions http3_options_;
946
  envoy::config::listener::v3::QuicProtocolOptions quic_options_;
947
  Network::SocketSharedPtr socket_;
948
  std::vector<Network::ListenSocketFactoryPtr> socket_factories_;
949
  ConditionalInitializer server_initialized_;
950
  // Guards any objects which can be altered both in the upstream thread and the
951
  // main test thread.
952
  absl::Mutex lock_;
953
  Thread::ThreadPtr thread_;
954
  Api::ApiPtr api_;
955
  Event::TestTimeSystem& time_system_;
956
  Event::DispatcherPtr dispatcher_;
957
  Network::ConnectionHandlerPtr handler_;
958
  std::list<SharedConnectionWrapperPtr> new_connections_ ABSL_GUARDED_BY(lock_);
959
  testing::NiceMock<Runtime::MockLoader> runtime_;
960
  testing::NiceMock<Random::MockRandomGenerator> random_;
961
962
  // When a QueuedConnectionWrapper is popped from new_connections_, ownership is transferred to
963
  // consumed_connections_. This allows later the Connection destruction (when the FakeUpstream is
964
  // deleted) on the same thread that allocated the connection.
965
  std::list<SharedConnectionWrapperPtr> consumed_connections_ ABSL_GUARDED_BY(lock_);
966
  std::list<FakeHttpConnectionPtr> quic_connections_ ABSL_GUARDED_BY(lock_);
967
  const FakeUpstreamConfig config_;
968
  // Normally connections are read disabled until a fake raw or http connection
969
  // is created, and are then read enabled. Setting these true skips both these.
970
  bool read_disable_on_new_connection_;
971
  // Setting this true disables all events and does not re-enable as the above does.
972
  bool disable_and_do_not_enable_{};
973
  const bool enable_half_close_;
974
  testing::NiceMock<ProtobufMessage::MockValidationVisitor> validation_visitor_;
975
  FakeListener listener_;
976
  const Network::FilterChainSharedPtr filter_chain_;
977
  std::list<Network::UdpRecvData> received_datagrams_ ABSL_GUARDED_BY(lock_);
978
  Stats::ScopeSharedPtr stats_scope_;
979
  Http::Http1::CodecStats::AtomicPtr http1_codec_stats_;
980
  Http::Http2::CodecStats::AtomicPtr http2_codec_stats_;
981
  Http::Http3::CodecStats::AtomicPtr http3_codec_stats_;
982
#ifdef ENVOY_ENABLE_QUIC
983
  Quic::QuicStatNames quic_stat_names_ = Quic::QuicStatNames(stats_store_.symbolTable());
984
#endif
985
  bool initialized_ = false;
986
};
987
988
using FakeUpstreamPtr = std::unique_ptr<FakeUpstream>;
989
990
} // namespace Envoy