Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/test/integration/fake_upstream.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include <cstdint>
4
#include <list>
5
#include <memory>
6
#include <string>
7
8
#include "envoy/api/api.h"
9
#include "envoy/config/core/v3/base.pb.h"
10
#include "envoy/config/listener/v3/quic_config.pb.h"
11
#include "envoy/grpc/status.h"
12
#include "envoy/http/codec.h"
13
#include "envoy/network/connection.h"
14
#include "envoy/network/connection_handler.h"
15
#include "envoy/network/filter.h"
16
#include "envoy/network/listener.h"
17
#include "envoy/stats/scope.h"
18
19
#include "source/common/buffer/buffer_impl.h"
20
#include "source/common/buffer/zero_copy_input_stream_impl.h"
21
#include "source/common/common/basic_resource_impl.h"
22
#include "source/common/common/callback_impl.h"
23
#include "source/common/common/linked_object.h"
24
#include "source/common/common/lock_guard.h"
25
#include "source/common/common/thread.h"
26
#include "source/common/config/utility.h"
27
#include "source/common/grpc/codec.h"
28
#include "source/common/grpc/common.h"
29
#include "source/common/http/http1/codec_impl.h"
30
#include "source/common/http/http2/codec_impl.h"
31
#include "source/common/http/http3/codec_stats.h"
32
#include "source/common/network/connection_balancer_impl.h"
33
#include "source/common/network/filter_impl.h"
34
#include "source/common/network/listen_socket_impl.h"
35
#include "source/common/network/udp_listener_impl.h"
36
#include "source/common/network/udp_packet_writer_handler_impl.h"
37
#include "source/common/stats/isolated_store_impl.h"
38
39
#include "test/mocks/http/header_validator.h"
40
#include "test/mocks/protobuf/mocks.h"
41
#include "test/mocks/server/instance.h"
42
43
#if defined(ENVOY_ENABLE_QUIC)
44
#include "source/common/quic/active_quic_listener.h"
45
#include "source/common/quic/quic_stat_names.h"
46
#endif
47
48
#include "source/extensions/listener_managers/listener_manager/active_raw_udp_listener_config.h"
49
50
#include "test/mocks/common.h"
51
#include "test/mocks/runtime/mocks.h"
52
#include "test/mocks/server/overload_manager.h"
53
#include "test/test_common/test_time_system.h"
54
#include "test/test_common/utility.h"
55
56
// TODO(mattklein123): A lot of code should be moved from this header file into the cc file.
57
58
namespace Envoy {
59
60
class FakeHttpConnection;
61
class FakeUpstream;
62
63
/**
64
 * Provides a fake HTTP stream for integration testing.
65
 */
66
class FakeStream : public Http::RequestDecoder,
67
                   public Http::StreamCallbacks,
68
                   Logger::Loggable<Logger::Id::testing> {
69
public:
70
  FakeStream(FakeHttpConnection& parent, Http::ResponseEncoder& encoder,
71
             Event::TestTimeSystem& time_system);
72
73
0
  uint64_t bodyLength() {
74
0
    absl::MutexLock lock(&lock_);
75
0
    return body_.length();
76
0
  }
77
0
  Buffer::Instance& body() {
78
0
    absl::MutexLock lock(&lock_);
79
0
    return body_;
80
0
  }
81
0
  bool complete() {
82
0
    absl::MutexLock lock(&lock_);
83
0
    return end_stream_;
84
0
  }
85
86
  // Execute a callback using the dispatcher associated with the FakeStream's connection. This
87
  // allows execution of non-interrupted sequences of operations on the fake stream which may run
88
  // into trouble if client-side events are interleaved.
89
  void postToConnectionThread(std::function<void()> cb);
90
  void encode1xxHeaders(const Http::ResponseHeaderMap& headers);
91
  void encodeHeaders(const Http::HeaderMap& headers, bool end_stream);
92
  void encodeData(uint64_t size, bool end_stream);
93
  void encodeData(Buffer::Instance& data, bool end_stream);
94
  void encodeData(std::string data, bool end_stream);
95
  void encodeTrailers(const Http::HeaderMap& trailers);
96
  void encodeResetStream();
97
  void encodeMetadata(const Http::MetadataMapVector& metadata_map_vector);
98
  void readDisable(bool disable);
99
0
  const Http::RequestHeaderMap& headers() {
100
0
    absl::MutexLock lock(&lock_);
101
0
    return *headers_;
102
0
  }
103
0
  void setAddServedByHeader(bool add_header) { add_served_by_header_ = add_header; }
104
0
  const Http::RequestTrailerMapPtr& trailers() { return trailers_; }
105
0
  bool receivedData() { return received_data_; }
106
0
  Http::Http1StreamEncoderOptionsOptRef http1StreamEncoderOptions() {
107
0
    return encoder_.http1StreamEncoderOptions();
108
0
  }
109
  void
110
  sendLocalReply(Http::Code code, absl::string_view body,
111
                 const std::function<void(Http::ResponseHeaderMap& headers)>& /*modify_headers*/,
112
                 const absl::optional<Grpc::Status::GrpcStatus> grpc_status,
113
0
                 absl::string_view /*details*/) override {
114
0
    bool is_head_request;
115
0
    {
116
0
      absl::MutexLock lock(&lock_);
117
0
      is_head_request = headers_ != nullptr &&
118
0
                        headers_->getMethodValue() == Http::Headers::get().MethodValues.Head;
119
0
    }
120
0
    Http::Utility::sendLocalReply(
121
0
        false,
122
0
        Http::Utility::EncodeFunctions(
123
0
            {nullptr, nullptr,
124
0
             [&](Http::ResponseHeaderMapPtr&& headers, bool end_stream) -> void {
125
0
               encoder_.encodeHeaders(*headers, end_stream);
126
0
             },
127
0
             [&](Buffer::Instance& data, bool end_stream) -> void {
128
0
               encoder_.encodeData(data, end_stream);
129
0
             }}),
130
0
        Http::Utility::LocalReplyData({false, code, body, grpc_status, is_head_request}));
131
0
  }
132
133
  ABSL_MUST_USE_RESULT
134
  testing::AssertionResult
135
  waitForHeadersComplete(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
136
137
  ABSL_MUST_USE_RESULT
138
  testing::AssertionResult
139
  waitForData(Event::Dispatcher& client_dispatcher, uint64_t body_length,
140
              std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
141
142
  ABSL_MUST_USE_RESULT
143
  testing::AssertionResult
144
  waitForData(Event::Dispatcher& client_dispatcher, absl::string_view body,
145
              std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
146
147
  using ValidatorFunction = const std::function<bool(const std::string&)>;
148
  ABSL_MUST_USE_RESULT
149
  testing::AssertionResult
150
  waitForData(Event::Dispatcher& client_dispatcher, const ValidatorFunction& data_validator,
151
              std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
152
153
  ABSL_MUST_USE_RESULT
154
  testing::AssertionResult
155
  waitForEndStream(Event::Dispatcher& client_dispatcher,
156
                   std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
157
158
  ABSL_MUST_USE_RESULT
159
  testing::AssertionResult
160
  waitForReset(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
161
162
  // gRPC convenience methods.
163
  void startGrpcStream(bool send_headers = true);
164
  void finishGrpcStream(Grpc::Status::GrpcStatus status);
165
172
  template <class T> void sendGrpcMessage(const T& message) {
166
172
    ASSERT(grpc_stream_started_,
167
172
           "start gRPC stream by calling startGrpcStream before sending a message");
168
172
    auto serialized_response = Grpc::Common::serializeToGrpcFrame(message);
169
172
    encodeData(*serialized_response, false);
170
172
    ENVOY_LOG(debug, "Sent gRPC message: {}", message.DebugString());
171
172
  }
void Envoy::FakeStream::sendGrpcMessage<envoy::service::discovery::v3::DeltaDiscoveryResponse>(envoy::service::discovery::v3::DeltaDiscoveryResponse const&)
Line
Count
Source
165
38
  template <class T> void sendGrpcMessage(const T& message) {
166
38
    ASSERT(grpc_stream_started_,
167
38
           "start gRPC stream by calling startGrpcStream before sending a message");
168
38
    auto serialized_response = Grpc::Common::serializeToGrpcFrame(message);
169
38
    encodeData(*serialized_response, false);
170
38
    ENVOY_LOG(debug, "Sent gRPC message: {}", message.DebugString());
171
38
  }
void Envoy::FakeStream::sendGrpcMessage<envoy::service::discovery::v3::DiscoveryResponse>(envoy::service::discovery::v3::DiscoveryResponse const&)
Line
Count
Source
165
134
  template <class T> void sendGrpcMessage(const T& message) {
166
134
    ASSERT(grpc_stream_started_,
167
134
           "start gRPC stream by calling startGrpcStream before sending a message");
168
134
    auto serialized_response = Grpc::Common::serializeToGrpcFrame(message);
169
134
    encodeData(*serialized_response, false);
170
134
    ENVOY_LOG(debug, "Sent gRPC message: {}", message.DebugString());
171
134
  }
172
278
  template <class T> void decodeGrpcFrame(T& message) {
173
278
    EXPECT_GE(decoded_grpc_frames_.size(), 1);
174
278
    if (decoded_grpc_frames_[0].length_ == 0) {
175
0
      decoded_grpc_frames_.erase(decoded_grpc_frames_.begin());
176
0
      return;
177
0
    }
178
278
    Buffer::ZeroCopyInputStreamImpl stream(std::move(decoded_grpc_frames_[0].data_));
179
278
    EXPECT_TRUE(decoded_grpc_frames_[0].flags_ == Grpc::GRPC_FH_DEFAULT);
180
278
    EXPECT_TRUE(message.ParseFromZeroCopyStream(&stream));
181
278
    ENVOY_LOG(debug, "Received gRPC message: {}", message.DebugString());
182
278
    decoded_grpc_frames_.erase(decoded_grpc_frames_.begin());
183
278
  }
void Envoy::FakeStream::decodeGrpcFrame<envoy::service::discovery::v3::DiscoveryRequest>(envoy::service::discovery::v3::DiscoveryRequest&)
Line
Count
Source
172
216
  template <class T> void decodeGrpcFrame(T& message) {
173
216
    EXPECT_GE(decoded_grpc_frames_.size(), 1);
174
216
    if (decoded_grpc_frames_[0].length_ == 0) {
175
0
      decoded_grpc_frames_.erase(decoded_grpc_frames_.begin());
176
0
      return;
177
0
    }
178
216
    Buffer::ZeroCopyInputStreamImpl stream(std::move(decoded_grpc_frames_[0].data_));
179
216
    EXPECT_TRUE(decoded_grpc_frames_[0].flags_ == Grpc::GRPC_FH_DEFAULT);
180
216
    EXPECT_TRUE(message.ParseFromZeroCopyStream(&stream));
181
216
    ENVOY_LOG(debug, "Received gRPC message: {}", message.DebugString());
182
216
    decoded_grpc_frames_.erase(decoded_grpc_frames_.begin());
183
216
  }
void Envoy::FakeStream::decodeGrpcFrame<envoy::service::discovery::v3::DeltaDiscoveryRequest>(envoy::service::discovery::v3::DeltaDiscoveryRequest&)
Line
Count
Source
172
62
  template <class T> void decodeGrpcFrame(T& message) {
173
62
    EXPECT_GE(decoded_grpc_frames_.size(), 1);
174
62
    if (decoded_grpc_frames_[0].length_ == 0) {
175
0
      decoded_grpc_frames_.erase(decoded_grpc_frames_.begin());
176
0
      return;
177
0
    }
178
62
    Buffer::ZeroCopyInputStreamImpl stream(std::move(decoded_grpc_frames_[0].data_));
179
62
    EXPECT_TRUE(decoded_grpc_frames_[0].flags_ == Grpc::GRPC_FH_DEFAULT);
180
62
    EXPECT_TRUE(message.ParseFromZeroCopyStream(&stream));
181
62
    ENVOY_LOG(debug, "Received gRPC message: {}", message.DebugString());
182
62
    decoded_grpc_frames_.erase(decoded_grpc_frames_.begin());
183
62
  }
184
  template <class T>
185
  ABSL_MUST_USE_RESULT testing::AssertionResult
186
  waitForGrpcMessage(Event::Dispatcher& client_dispatcher, T& message,
187
278
                     std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) {
188
278
    Event::TestTimeSystem::RealTimeBound bound(timeout);
189
278
    ENVOY_LOG(debug, "Waiting for gRPC message...");
190
278
    if (!decoded_grpc_frames_.empty()) {
191
111
      decodeGrpcFrame(message);
192
111
      return AssertionSuccess();
193
111
    }
194
167
    if (!waitForData(client_dispatcher, 5, timeout)) {
195
0
      return testing::AssertionFailure() << "Timed out waiting for start of gRPC message.";
196
0
    }
197
167
    int last_body_size = 0;
198
167
    {
199
167
      absl::MutexLock lock(&lock_);
200
167
      last_body_size = body_.length();
201
167
      if (!grpc_decoder_.decode(body_, decoded_grpc_frames_)) {
202
0
        return testing::AssertionFailure()
203
0
               << "Couldn't decode gRPC data frame: " << body_.toString();
204
0
      }
205
167
    }
206
167
    if (decoded_grpc_frames_.empty()) {
207
0
      if (!waitForData(client_dispatcher, grpc_decoder_.length() - last_body_size,
208
0
                       bound.timeLeft())) {
209
0
        return testing::AssertionFailure() << "Timed out waiting for end of gRPC message.";
210
0
      }
211
0
      {
212
0
        absl::MutexLock lock(&lock_);
213
0
        if (!grpc_decoder_.decode(body_, decoded_grpc_frames_)) {
214
0
          return testing::AssertionFailure()
215
0
                 << "Couldn't decode gRPC data frame: " << body_.toString();
216
0
        }
217
0
      }
218
0
    }
219
167
    decodeGrpcFrame(message);
220
167
    return AssertionSuccess();
221
167
  }
testing::AssertionResult Envoy::FakeStream::waitForGrpcMessage<envoy::service::discovery::v3::DiscoveryRequest>(Envoy::Event::Dispatcher&, envoy::service::discovery::v3::DiscoveryRequest&, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000l> >)
Line
Count
Source
187
216
                     std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) {
188
216
    Event::TestTimeSystem::RealTimeBound bound(timeout);
189
216
    ENVOY_LOG(debug, "Waiting for gRPC message...");
190
216
    if (!decoded_grpc_frames_.empty()) {
191
78
      decodeGrpcFrame(message);
192
78
      return AssertionSuccess();
193
78
    }
194
138
    if (!waitForData(client_dispatcher, 5, timeout)) {
195
0
      return testing::AssertionFailure() << "Timed out waiting for start of gRPC message.";
196
0
    }
197
138
    int last_body_size = 0;
198
138
    {
199
138
      absl::MutexLock lock(&lock_);
200
138
      last_body_size = body_.length();
201
138
      if (!grpc_decoder_.decode(body_, decoded_grpc_frames_)) {
202
0
        return testing::AssertionFailure()
203
0
               << "Couldn't decode gRPC data frame: " << body_.toString();
204
0
      }
205
138
    }
206
138
    if (decoded_grpc_frames_.empty()) {
207
0
      if (!waitForData(client_dispatcher, grpc_decoder_.length() - last_body_size,
208
0
                       bound.timeLeft())) {
209
0
        return testing::AssertionFailure() << "Timed out waiting for end of gRPC message.";
210
0
      }
211
0
      {
212
0
        absl::MutexLock lock(&lock_);
213
0
        if (!grpc_decoder_.decode(body_, decoded_grpc_frames_)) {
214
0
          return testing::AssertionFailure()
215
0
                 << "Couldn't decode gRPC data frame: " << body_.toString();
216
0
        }
217
0
      }
218
0
    }
219
138
    decodeGrpcFrame(message);
220
138
    return AssertionSuccess();
221
138
  }
testing::AssertionResult Envoy::FakeStream::waitForGrpcMessage<envoy::service::discovery::v3::DeltaDiscoveryRequest>(Envoy::Event::Dispatcher&, envoy::service::discovery::v3::DeltaDiscoveryRequest&, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000l> >)
Line
Count
Source
187
62
                     std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) {
188
62
    Event::TestTimeSystem::RealTimeBound bound(timeout);
189
62
    ENVOY_LOG(debug, "Waiting for gRPC message...");
190
62
    if (!decoded_grpc_frames_.empty()) {
191
33
      decodeGrpcFrame(message);
192
33
      return AssertionSuccess();
193
33
    }
194
29
    if (!waitForData(client_dispatcher, 5, timeout)) {
195
0
      return testing::AssertionFailure() << "Timed out waiting for start of gRPC message.";
196
0
    }
197
29
    int last_body_size = 0;
198
29
    {
199
29
      absl::MutexLock lock(&lock_);
200
29
      last_body_size = body_.length();
201
29
      if (!grpc_decoder_.decode(body_, decoded_grpc_frames_)) {
202
0
        return testing::AssertionFailure()
203
0
               << "Couldn't decode gRPC data frame: " << body_.toString();
204
0
      }
205
29
    }
206
29
    if (decoded_grpc_frames_.empty()) {
207
0
      if (!waitForData(client_dispatcher, grpc_decoder_.length() - last_body_size,
208
0
                       bound.timeLeft())) {
209
0
        return testing::AssertionFailure() << "Timed out waiting for end of gRPC message.";
210
0
      }
211
0
      {
212
0
        absl::MutexLock lock(&lock_);
213
0
        if (!grpc_decoder_.decode(body_, decoded_grpc_frames_)) {
214
0
          return testing::AssertionFailure()
215
0
                 << "Couldn't decode gRPC data frame: " << body_.toString();
216
0
        }
217
0
      }
218
0
    }
219
29
    decodeGrpcFrame(message);
220
29
    return AssertionSuccess();
221
29
  }
222
223
  // Http::StreamDecoder
224
  void decodeData(Buffer::Instance& data, bool end_stream) override;
225
  void decodeMetadata(Http::MetadataMapPtr&& metadata_map_ptr) override;
226
227
  // Http::RequestDecoder
228
  void decodeHeaders(Http::RequestHeaderMapSharedPtr&& headers, bool end_stream) override;
229
  void decodeTrailers(Http::RequestTrailerMapPtr&& trailers) override;
230
0
  StreamInfo::StreamInfo& streamInfo() override {
231
0
    RELEASE_ASSERT(false, "initialize if this is needed");
232
0
    return *stream_info_;
233
0
  }
234
0
  std::list<AccessLog::InstanceSharedPtr> accessLogHandlers() override {
235
0
    return access_log_handlers_;
236
0
  }
237
238
  // Http::StreamCallbacks
239
  void onResetStream(Http::StreamResetReason reason,
240
                     absl::string_view transport_failure_reason) override;
241
0
  void onAboveWriteBufferHighWatermark() override {}
242
0
  void onBelowWriteBufferLowWatermark() override {}
243
244
2.24k
  virtual void setEndStream(bool end) ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { end_stream_ = end; }
245
246
0
  Event::TestTimeSystem& timeSystem() { return time_system_; }
247
248
0
  Http::MetadataMap& metadataMap() { return metadata_map_; }
249
0
  absl::node_hash_map<std::string, uint64_t>& duplicatedMetadataKeyCount() {
250
0
    return duplicated_metadata_key_count_;
251
0
  }
252
253
protected:
254
  absl::Mutex lock_;
255
  Http::RequestHeaderMapSharedPtr headers_ ABSL_GUARDED_BY(lock_);
256
  Buffer::OwnedImpl body_ ABSL_GUARDED_BY(lock_);
257
  FakeHttpConnection& parent_;
258
259
private:
260
  Http::ResponseEncoder& encoder_;
261
  Http::RequestTrailerMapPtr trailers_ ABSL_GUARDED_BY(lock_);
262
  bool end_stream_ ABSL_GUARDED_BY(lock_){};
263
  bool saw_reset_ ABSL_GUARDED_BY(lock_){};
264
  Grpc::Decoder grpc_decoder_;
265
  std::vector<Grpc::Frame> decoded_grpc_frames_;
266
  bool add_served_by_header_{};
267
  Event::TestTimeSystem& time_system_;
268
  Http::MetadataMap metadata_map_;
269
  absl::node_hash_map<std::string, uint64_t> duplicated_metadata_key_count_;
270
  std::shared_ptr<StreamInfo::StreamInfo> stream_info_;
271
  std::list<AccessLog::InstanceSharedPtr> access_log_handlers_;
272
  bool received_data_{false};
273
  bool grpc_stream_started_{false};
274
  Http::ServerHeaderValidatorPtr header_validator_;
275
};
276
277
using FakeStreamPtr = std::unique_ptr<FakeStream>;
278
279
// Encapsulates various state and functionality related to sharing a Connection object across
280
// threads. With FakeUpstream fabricated objects, we have a Connection that is associated with a
281
// dispatcher on a thread managed by FakeUpstream. We want to be able to safely invoke methods on
282
// this object from other threads (e.g. the main test thread) and be able to track connection state
283
// (e.g. are we disconnected and the Connection is now possibly deleted). We manage this via a
284
// SharedConnectionWrapper that lives from when the Connection is added to the accepted connection
285
// queue and then through the lifetime of the Fake{Raw,Http}Connection that manages the Connection
286
// through active use.
287
class SharedConnectionWrapper : public Network::ConnectionCallbacks,
288
                                public LinkedObject<SharedConnectionWrapper> {
289
public:
290
  using DisconnectCallback = std::function<void()>;
291
292
  SharedConnectionWrapper(Network::Connection& connection)
293
721
      : connection_(connection), dispatcher_(connection_.dispatcher()) {
294
721
    connection_.addConnectionCallbacks(*this);
295
721
  }
296
297
  // Network::ConnectionCallbacks
298
721
  void onEvent(Network::ConnectionEvent event) override {
299
    // Throughout this entire function, we know that the connection_ cannot disappear, since this
300
    // callback is invoked prior to connection_ deferred delete. We also know by locking below,
301
    // that elsewhere where we also hold lock_, that the connection cannot disappear inside the
302
    // locked scope.
303
721
    absl::MutexLock lock(&lock_);
304
721
    if (event == Network::ConnectionEvent::RemoteClose ||
305
721
        event == Network::ConnectionEvent::LocalClose) {
306
721
      if (connection_.detectedCloseType() == Network::DetectedCloseType::RemoteReset ||
307
721
          connection_.detectedCloseType() == Network::DetectedCloseType::LocalReset) {
308
9
        rst_disconnected_ = true;
309
9
      }
310
721
      disconnected_ = true;
311
721
    }
312
721
  }
313
314
0
  void onAboveWriteBufferHighWatermark() override {}
315
0
  void onBelowWriteBufferLowWatermark() override {}
316
317
941
  Event::Dispatcher& dispatcher() { return dispatcher_; }
318
319
8.69k
  bool connected() {
320
8.69k
    absl::MutexLock lock(&lock_);
321
8.69k
    return connectedLockHeld();
322
8.69k
  }
323
324
8.81k
  bool connectedLockHeld() {
325
8.81k
    lock_.AssertReaderHeld(); // TODO(mattklein123): This can't be annotated because the lock
326
                              // is acquired via the base connection reference. Fix this to
327
                              // remove the reference.
328
8.81k
    return !disconnected_;
329
8.81k
  }
330
331
0
  bool rstDisconnected() {
332
0
    lock_.AssertReaderHeld();
333
0
    return rst_disconnected_;
334
0
  }
335
336
  // This provides direct access to the underlying connection, but only to const methods.
337
  // Stateful connection related methods should happen on the connection's dispatcher via
338
  // executeOnDispatcher.
339
  // thread safety violations when crossing between the test thread and FakeUpstream thread.
340
1.53k
  Network::Connection& connection() const { return connection_; }
341
342
  // Execute some function on the connection's dispatcher. This involves a cross-thread post and
343
  // wait-for-completion. If the connection is disconnected, either prior to post or when the
344
  // dispatcher schedules the callback, we silently ignore.
345
  ABSL_MUST_USE_RESULT
346
  testing::AssertionResult
347
  executeOnDispatcher(std::function<void(Network::Connection&)> f,
348
                      std::chrono::milliseconds timeout = TestUtility::DefaultTimeout,
349
2.37k
                      bool allow_disconnects = true) {
350
2.37k
    absl::MutexLock lock(&lock_);
351
2.37k
    if (disconnected_) {
352
114
      return testing::AssertionSuccess();
353
114
    }
354
    // Sanity check: detect if the post and wait is attempted from the dispatcher thread; fail
355
    // immediately instead of deadlocking.
356
2.25k
    ASSERT(!connection_.dispatcher().isThreadSafe(),
357
2.25k
           "deadlock: executeOnDispatcher called from dispatcher thread.");
358
2.25k
    bool callback_ready_event = false;
359
2.25k
    bool unexpected_disconnect = false;
360
2.25k
    connection_.dispatcher().post(
361
2.25k
        [this, f, &lock = lock_, &callback_ready_event, &unexpected_disconnect]() -> void {
362
          // The use of connected() here, vs. !disconnected_, is because we want to use the lock_
363
          // acquisition to briefly serialize. This avoids us entering this completion and issuing
364
          // a notifyOne() until the wait() is ready to receive it below.
365
2.25k
          if (connected()) {
366
2.24k
            f(connection_);
367
2.24k
          } else {
368
11
            unexpected_disconnect = true;
369
11
          }
370
2.25k
          absl::MutexLock lock_guard(&lock);
371
2.25k
          callback_ready_event = true;
372
2.25k
        });
373
2.25k
    Event::TestTimeSystem& time_system =
374
2.25k
        dynamic_cast<Event::TestTimeSystem&>(connection_.dispatcher().timeSource());
375
2.25k
    if (!time_system.waitFor(lock_, absl::Condition(&callback_ready_event), timeout)) {
376
0
      return testing::AssertionFailure() << "Timed out while executing on dispatcher.";
377
0
    }
378
2.25k
    if (unexpected_disconnect && !allow_disconnects) {
379
0
      ENVOY_LOG_MISC(warn, "executeOnDispatcher failed due to disconnect\n");
380
0
    }
381
2.25k
    return testing::AssertionSuccess();
382
2.25k
  }
383
384
703
  absl::Mutex& lock() { return lock_; }
385
386
126
  void setParented() {
387
126
    absl::MutexLock lock(&lock_);
388
126
    ASSERT(!parented_);
389
126
    parented_ = true;
390
126
  }
391
392
private:
393
  Network::Connection& connection_;
394
  Event::Dispatcher& dispatcher_;
395
  absl::Mutex lock_;
396
  bool parented_ ABSL_GUARDED_BY(lock_){};
397
  bool disconnected_ ABSL_GUARDED_BY(lock_){};
398
  bool rst_disconnected_ ABSL_GUARDED_BY(lock_){};
399
};
400
401
using SharedConnectionWrapperPtr = std::unique_ptr<SharedConnectionWrapper>;
402
403
/**
404
 * Base class for both fake raw connections and fake HTTP connections.
405
 */
406
class FakeConnectionBase : public Logger::Loggable<Logger::Id::testing> {
407
public:
408
703
  virtual ~FakeConnectionBase() {
409
703
    absl::MutexLock lock(&lock_);
410
703
    ASSERT(initialized_);
411
703
    ASSERT(pending_cbs_ == 0);
412
703
  }
413
414
  ABSL_MUST_USE_RESULT
415
  testing::AssertionResult close(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
416
417
  ABSL_MUST_USE_RESULT
418
  testing::AssertionResult close(Network::ConnectionCloseType close_type,
419
                                 std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
420
421
  ABSL_MUST_USE_RESULT
422
  testing::AssertionResult
423
  readDisable(bool disable, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
424
425
  ABSL_MUST_USE_RESULT
426
  testing::AssertionResult
427
  waitForDisconnect(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
428
429
  ABSL_MUST_USE_RESULT
430
  testing::AssertionResult
431
  waitForRstDisconnect(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
432
433
  ABSL_MUST_USE_RESULT
434
  testing::AssertionResult
435
  waitForHalfClose(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
436
437
703
  virtual void initialize() {
438
703
    absl::MutexLock lock(&lock_);
439
703
    initialized_ = true;
440
703
  }
441
442
  // Some upstream connection are supposed to be alive forever.
443
  ABSL_MUST_USE_RESULT
444
  testing::AssertionResult virtual waitForNoPost(
445
      std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
446
447
  // The same caveats apply here as in SharedConnectionWrapper::connection().
448
112
  Network::Connection& connection() const { return shared_connection_.connection(); }
449
5.99k
  bool connected() const { return shared_connection_.connected(); }
450
451
  void postToConnectionThread(std::function<void()> cb);
452
0
  SharedConnectionWrapper& sharedConnection() { return shared_connection_; }
453
454
protected:
455
  FakeConnectionBase(SharedConnectionWrapper& shared_connection, Event::TestTimeSystem& time_system)
456
      : shared_connection_(shared_connection), lock_(shared_connection.lock()),
457
703
        dispatcher_(shared_connection_.dispatcher()), time_system_(time_system) {}
458
459
  SharedConnectionWrapper& shared_connection_;
460
  absl::Mutex& lock_; // TODO(mattklein123): Use the shared connection lock and figure out better
461
                      // guarded by annotations.
462
  Event::Dispatcher& dispatcher_;
463
  bool initialized_ ABSL_GUARDED_BY(lock_){};
464
  bool half_closed_ ABSL_GUARDED_BY(lock_){};
465
  std::atomic<uint64_t> pending_cbs_{};
466
  Event::TestTimeSystem& time_system_;
467
};
468
469
/**
470
 * Provides a fake HTTP connection for integration testing.
471
 */
472
class FakeHttpConnection : public Http::ServerConnectionCallbacks, public FakeConnectionBase {
473
public:
474
  // This is a legacy alias.
475
  using Type = Envoy::Http::CodecType;
476
0
  static absl::string_view typeToString(Http::CodecType type) {
477
0
    switch (type) {
478
0
    case Http::CodecType::HTTP1:
479
0
      return "http1";
480
0
    case Http::CodecType::HTTP2:
481
0
      return "http2";
482
0
    case Http::CodecType::HTTP3:
483
0
      return "http3";
484
0
    }
485
0
    return "invalid";
486
0
  }
487
488
  FakeHttpConnection(FakeUpstream& fake_upstream, SharedConnectionWrapper& shared_connection,
489
                     Http::CodecType type, Event::TestTimeSystem& time_system,
490
                     uint32_t max_request_headers_kb, uint32_t max_request_headers_count,
491
                     envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction
492
                         headers_with_underscores_action);
493
494
  ABSL_MUST_USE_RESULT
495
  testing::AssertionResult
496
  waitForNewStream(Event::Dispatcher& client_dispatcher, FakeStreamPtr& stream,
497
                   std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
498
499
  // Http::ServerConnectionCallbacks
500
  Http::RequestDecoder& newStream(Http::ResponseEncoder& response_encoder, bool) override;
501
  // Should only be called for HTTP2 or above
502
  void onGoAway(Http::GoAwayErrorCode code) override;
503
504
  // Should only be called for HTTP2 or above, sends a GOAWAY frame with NO_ERROR.
505
  void encodeGoAway();
506
507
  // Should only be called for HTTP2 or above, sends a GOAWAY frame with ENHANCE_YOUR_CALM.
508
  void encodeProtocolError();
509
510
  // Update the maximum number of concurrent streams.
511
  void updateConcurrentStreams(uint64_t max_streams);
512
513
  ABSL_MUST_USE_RESULT
514
  testing::AssertionResult
515
  waitForInexactRawData(const char* data, std::string* out = nullptr,
516
                        std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
517
518
  void writeRawData(absl::string_view data);
519
  ABSL_MUST_USE_RESULT AssertionResult postWriteRawData(std::string data);
520
521
  Http::ServerHeaderValidatorPtr makeHeaderValidator();
522
523
private:
524
  struct ReadFilter : public Network::ReadFilterBaseImpl {
525
591
    ReadFilter(FakeHttpConnection& parent) : parent_(parent) {}
526
527
    // Network::ReadFilter
528
1.97k
    Network::FilterStatus onData(Buffer::Instance& data, bool) override {
529
1.97k
      Http::Status status = parent_.codec_->dispatch(data);
530
531
1.97k
      if (Http::isCodecProtocolError(status)) {
532
0
        ENVOY_LOG(debug, "FakeUpstream dispatch error: {}", status.message());
533
        // We don't do a full stream shutdown like HCM, but just shutdown the
534
        // connection for now.
535
0
        read_filter_callbacks_->connection().close(
536
0
            Network::ConnectionCloseType::FlushWriteAndDelay);
537
0
      }
538
1.97k
      return Network::FilterStatus::StopIteration;
539
1.97k
    }
540
541
    void
542
591
    initializeReadFilterCallbacks(Network::ReadFilterCallbacks& read_filter_callbacks) override {
543
591
      read_filter_callbacks_ = &read_filter_callbacks;
544
591
    }
545
546
    Network::ReadFilterCallbacks* read_filter_callbacks_{};
547
    FakeHttpConnection& parent_;
548
  };
549
550
  const Http::CodecType type_;
551
  Http::ServerConnectionPtr codec_;
552
  std::list<FakeStreamPtr> new_streams_ ABSL_GUARDED_BY(lock_);
553
  testing::NiceMock<Server::MockOverloadManager> overload_manager_;
554
  testing::NiceMock<Random::MockRandomGenerator> random_;
555
  testing::NiceMock<Http::MockHeaderValidatorStats> header_validator_stats_;
556
  Http::HeaderValidatorFactoryPtr header_validator_factory_;
557
};
558
559
using FakeHttpConnectionPtr = std::unique_ptr<FakeHttpConnection>;
560
561
/**
562
 * Fake raw connection for integration testing.
563
 */
564
class FakeRawConnection : public FakeConnectionBase {
565
public:
566
  FakeRawConnection(SharedConnectionWrapper& shared_connection, Event::TestTimeSystem& time_system)
567
112
      : FakeConnectionBase(shared_connection, time_system) {}
568
  using ValidatorFunction = const std::function<bool(const std::string&)>;
569
  ~FakeRawConnection() override;
570
571
  // Writes to data. If data is nullptr, discards the received data.
572
  ABSL_MUST_USE_RESULT
573
  testing::AssertionResult
574
  waitForData(uint64_t num_bytes, std::string* data = nullptr,
575
              std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
576
577
  // Wait until data_validator returns true.
578
  // example usage:
579
  // std::string data;
580
  // ASSERT_TRUE(waitForData(FakeRawConnection::waitForInexactMatch("foo"), &data));
581
  // EXPECT_EQ(data, "foobar");
582
  ABSL_MUST_USE_RESULT
583
  testing::AssertionResult
584
  waitForData(const ValidatorFunction& data_validator, std::string* data = nullptr,
585
              std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
586
587
  ABSL_MUST_USE_RESULT
588
  testing::AssertionResult write(const std::string& data, bool end_stream = false,
589
                                 std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
590
591
  void initialize() override;
592
593
  // Creates a ValidatorFunction which returns true when data_to_wait_for is
594
  // contained in the incoming data string. Unlike many of Envoy waitFor functions,
595
  // it does not expect an exact match, simply the presence of data_to_wait_for.
596
0
  static ValidatorFunction waitForInexactMatch(const char* data_to_wait_for) {
597
0
    return [data_to_wait_for](const std::string& data) -> bool {
598
0
      return data.find(data_to_wait_for) != std::string::npos;
599
0
    };
600
0
  }
601
602
  // Creates a ValidatorFunction which returns true when data_to_wait_for
603
  // equals the incoming data string.
604
0
  static ValidatorFunction waitForMatch(const char* data_to_wait_for) {
605
0
    return [data_to_wait_for](const std::string& data) -> bool { return data == data_to_wait_for; };
606
0
  }
607
608
  // Creates a ValidatorFunction which returns true when data_to_wait_for is
609
  // contains at least bytes_read bytes.
610
0
  static ValidatorFunction waitForAtLeastBytes(uint32_t bytes) {
611
0
    return [bytes](const std::string& data) -> bool { return data.size() >= bytes; };
612
0
  }
613
614
0
  void clearData() {
615
0
    absl::MutexLock lock(&lock_);
616
0
    data_.clear();
617
0
  }
618
619
private:
620
  struct ReadFilter : public Network::ReadFilterBaseImpl {
621
112
    ReadFilter(FakeRawConnection& parent) : parent_(parent) {}
622
623
    // Network::ReadFilter
624
    Network::FilterStatus onData(Buffer::Instance& data, bool) override;
625
626
    FakeRawConnection& parent_;
627
  };
628
629
  std::string data_ ABSL_GUARDED_BY(lock_);
630
  std::shared_ptr<Network::ReadFilter> read_filter_;
631
};
632
633
using FakeRawConnectionPtr = std::unique_ptr<FakeRawConnection>;
634
635
struct FakeUpstreamConfig {
636
  struct UdpConfig {
637
    absl::optional<uint64_t> max_rx_datagram_size_;
638
  };
639
640
2.64k
  FakeUpstreamConfig(Event::TestTimeSystem& time_system) : time_system_(time_system) {
641
2.64k
    http2_options_ = ::Envoy::Http2::Utility::initializeAndValidateOptions(http2_options_);
642
    // Legacy options which are always set.
643
2.64k
    http2_options_.set_allow_connect(true);
644
2.64k
    http2_options_.set_allow_metadata(true);
645
2.64k
    http3_options_.set_allow_extended_connect(true);
646
2.64k
  }
647
648
  Event::TestTimeSystem& time_system_;
649
  Http::CodecType upstream_protocol_{Http::CodecType::HTTP1};
650
  bool enable_half_close_{};
651
  absl::optional<UdpConfig> udp_fake_upstream_;
652
  envoy::config::core::v3::Http2ProtocolOptions http2_options_;
653
  envoy::config::core::v3::Http3ProtocolOptions http3_options_;
654
  envoy::config::listener::v3::QuicProtocolOptions quic_options_;
655
  uint32_t max_request_headers_kb_ = Http::DEFAULT_MAX_REQUEST_HEADERS_KB;
656
  uint32_t max_request_headers_count_ = Http::DEFAULT_MAX_HEADERS_COUNT;
657
  envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction
658
      headers_with_underscores_action_ = envoy::config::core::v3::HttpProtocolOptions::ALLOW;
659
};
660
661
/**
662
 * Provides a fake upstream server for integration testing.
663
 */
664
class FakeUpstream : Logger::Loggable<Logger::Id::testing>,
665
                     public Network::FilterChainManager,
666
                     public Network::FilterChainFactory {
667
public:
668
  // Creates a fake upstream bound to the specified unix domain socket path.
669
  FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory,
670
               const std::string& uds_path, const FakeUpstreamConfig& config);
671
672
  // Creates a fake upstream bound to the specified |address|.
673
  FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory,
674
               const Network::Address::InstanceConstSharedPtr& address,
675
               const FakeUpstreamConfig& config);
676
677
  // Creates a fake upstream bound to INADDR_ANY and the specified `port`.
678
  // Set `defer_initialization` to true if you want the FakeUpstream to not immediately listen for
679
  // incoming connections, and instead want to control when the FakeUpstream is available for
680
  // listening. If `defer_initialization` is set to true, call initializeServer() before invoking
681
  // any other functions in this class.
682
  FakeUpstream(uint32_t port, Network::Address::IpVersion version, const FakeUpstreamConfig& config,
683
               bool defer_initialization = false);
684
685
  FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory,
686
               uint32_t port, Network::Address::IpVersion version,
687
               const FakeUpstreamConfig& config);
688
  ~FakeUpstream() override;
689
690
  // Initializes the FakeUpstream's server.
691
  void initializeServer();
692
693
  // Returns true if the server has been initialized, i.e. that initializeServer() executed
694
  // successfully. Returns false otherwise.
695
0
  bool isInitialized() { return initialized_; }
696
697
0
  Http::CodecType httpType() { return http_type_; }
698
699
  // Returns the new connection via the connection argument.
700
  ABSL_MUST_USE_RESULT
701
  testing::AssertionResult
702
  waitForHttpConnection(Event::Dispatcher& client_dispatcher, FakeHttpConnectionPtr& connection,
703
                        std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
704
705
  ABSL_MUST_USE_RESULT
706
  testing::AssertionResult assertPendingConnectionsEmpty();
707
708
  ABSL_MUST_USE_RESULT
709
  testing::AssertionResult
710
  waitForRawConnection(FakeRawConnectionPtr& connection,
711
                       std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
712
5.35k
  Network::Address::InstanceConstSharedPtr localAddress() const {
713
5.35k
    return socket_->connectionInfoProvider().localAddress();
714
5.35k
  }
715
716
  void convertFromRawToHttp(FakeRawConnectionPtr& raw_connection,
717
                            FakeHttpConnectionPtr& connection);
718
719
  virtual std::unique_ptr<FakeRawConnection>
720
  makeRawConnection(SharedConnectionWrapper& shared_connection,
721
112
                    Event::TestTimeSystem& time_system) {
722
112
    return std::make_unique<FakeRawConnection>(shared_connection, time_system);
723
112
  }
724
725
  // Wait for one of the upstreams to receive a connection
726
  ABSL_MUST_USE_RESULT
727
  static testing::AssertionResult
728
  waitForHttpConnection(Event::Dispatcher& client_dispatcher,
729
                        std::vector<std::unique_ptr<FakeUpstream>>& upstreams,
730
                        FakeHttpConnectionPtr& connection,
731
                        std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
732
733
  // Waits for 1 UDP datagram to be received.
734
  ABSL_MUST_USE_RESULT
735
  testing::AssertionResult
736
  waitForUdpDatagram(Network::UdpRecvData& data_to_fill,
737
                     std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
738
739
  // Send a UDP datagram on the fake upstream thread.
740
  void sendUdpDatagram(const std::string& buffer,
741
                       const Network::Address::InstanceConstSharedPtr& peer);
742
743
  // Network::FilterChainManager
744
  const Network::FilterChain* findFilterChain(const Network::ConnectionSocket&,
745
721
                                              const StreamInfo::StreamInfo&) const override {
746
721
    return filter_chain_.get();
747
721
  }
748
749
  // Network::FilterChainFactory
750
  bool
751
  createNetworkFilterChain(Network::Connection& connection,
752
                           const Filter::NetworkFilterFactoriesList& filter_factories) override;
753
  bool createListenerFilterChain(Network::ListenerFilterManager& listener) override;
754
  void createUdpListenerFilterChain(Network::UdpListenerFilterManager& udp_listener,
755
                                    Network::UdpReadFilterCallbacks& callbacks) override;
756
  bool createQuicListenerFilterChain(Network::QuicListenerFilterManager& listener) override;
757
758
0
  void setReadDisableOnNewConnection(bool value) { read_disable_on_new_connection_ = value; }
759
0
  void setDisableAllAndDoNotEnable(bool value) { disable_and_do_not_enable_ = value; }
760
1.76k
  Event::TestTimeSystem& timeSystem() { return time_system_; }
761
762
  // Stops the dispatcher loop and joins the listening thread.
763
  void cleanUp();
764
765
0
  Http::Http1::CodecStats& http1CodecStats() {
766
0
    return Http::Http1::CodecStats::atomicGet(http1_codec_stats_, *stats_scope_);
767
0
  }
768
769
591
  Http::Http2::CodecStats& http2CodecStats() {
770
591
    return Http::Http2::CodecStats::atomicGet(http2_codec_stats_, *stats_scope_);
771
591
  }
772
773
0
  Http::Http3::CodecStats& http3CodecStats() {
774
0
    return Http::Http3::CodecStats::atomicGet(http3_codec_stats_, *stats_scope_);
775
0
  }
776
777
  // Write into the outbound buffer of the network connection at the specified index.
778
  // Note: that this write bypasses any processing by the upstream codec.
779
  ABSL_MUST_USE_RESULT
780
  testing::AssertionResult
781
  rawWriteConnection(uint32_t index, const std::string& data, bool end_stream = false,
782
                     std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
783
784
591
  const envoy::config::core::v3::Http2ProtocolOptions& http2Options() { return http2_options_; }
785
0
  const envoy::config::core::v3::Http3ProtocolOptions& http3Options() { return http3_options_; }
786
787
0
  Event::DispatcherPtr& dispatcher() { return dispatcher_; }
788
0
  absl::Mutex& lock() { return lock_; }
789
790
protected:
791
1.15k
  const FakeUpstreamConfig& config() const { return config_; }
792
793
  Stats::IsolatedStoreImpl stats_store_;
794
  const Http::CodecType http_type_;
795
796
private:
797
  FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory,
798
               Network::SocketPtr&& connection, const FakeUpstreamConfig& config,
799
               bool defer_initialization = false);
800
801
  class FakeListenSocketFactory : public Network::ListenSocketFactory {
802
  public:
803
2.66k
    FakeListenSocketFactory(Network::SocketSharedPtr socket) : socket_(socket) {}
804
805
    // Network::ListenSocketFactory
806
2.66k
    Network::Socket::Type socketType() const override { return socket_->socketType(); }
807
2.66k
    const Network::Address::InstanceConstSharedPtr& localAddress() const override {
808
2.66k
      return socket_->connectionInfoProvider().localAddress();
809
2.66k
    }
810
2.66k
    Network::SocketSharedPtr getListenSocket(uint32_t) override { return socket_; }
811
0
    Network::ListenSocketFactoryPtr clone() const override { return nullptr; }
812
0
    void closeAllSockets() override {}
813
    void doFinalPreWorkerInit() override;
814
815
  private:
816
    Network::SocketSharedPtr socket_;
817
  };
818
819
  class FakeUdpFilter : public Network::UdpListenerReadFilter {
820
  public:
821
    FakeUdpFilter(FakeUpstream& parent, Network::UdpReadFilterCallbacks& callbacks)
822
0
        : UdpListenerReadFilter(callbacks), parent_(parent) {}
823
824
    // Network::UdpListenerReadFilter
825
0
    Network::FilterStatus onData(Network::UdpRecvData& data) override {
826
0
      return parent_.onRecvDatagram(data);
827
0
    }
828
0
    Network::FilterStatus onReceiveError(Api::IoError::IoErrorCode) override {
829
0
      PANIC("not implemented");
830
0
    }
831
832
  private:
833
    FakeUpstream& parent_;
834
  };
835
836
  class FakeListener : public Network::ListenerConfig {
837
  public:
838
    struct UdpListenerConfigImpl : public Network::UdpListenerConfig {
839
      UdpListenerConfigImpl()
840
          : writer_factory_(std::make_unique<Network::UdpDefaultWriterFactory>()),
841
2.66k
            listener_worker_router_(1) {}
842
843
      // Network::UdpListenerConfig
844
0
      Network::ActiveUdpListenerFactory& listenerFactory() override { return *listener_factory_; }
845
0
      Network::UdpPacketWriterFactory& packetWriterFactory() override { return *writer_factory_; }
846
      Network::UdpListenerWorkerRouter&
847
0
      listenerWorkerRouter(const Network::Address::Instance&) override {
848
0
        return listener_worker_router_;
849
0
      }
850
0
      const envoy::config::listener::v3::UdpListenerConfig& config() override { return config_; }
851
852
      envoy::config::listener::v3::UdpListenerConfig config_;
853
      std::unique_ptr<Network::ActiveUdpListenerFactory> listener_factory_;
854
      std::unique_ptr<Network::UdpPacketWriterFactory> writer_factory_;
855
      Network::UdpListenerWorkerRouterImpl listener_worker_router_;
856
    };
857
858
    FakeListener(FakeUpstream& parent, bool is_quic = false)
859
2.66k
        : parent_(parent), name_("fake_upstream"), init_manager_(nullptr) {
860
2.66k
      if (is_quic) {
861
0
#if defined(ENVOY_ENABLE_QUIC)
862
0
        udp_listener_config_.listener_factory_ = std::make_unique<Quic::ActiveQuicListenerFactory>(
863
0
            parent_.quic_options_, 1, parent_.quic_stat_names_, parent_.validation_visitor_,
864
0
            absl::nullopt);
865
        // Initialize QUICHE flags.
866
0
        quiche::FlagRegistry::getInstance();
867
#else
868
        ASSERT(false, "Running a test that requires QUIC without compiling QUIC");
869
#endif
870
2.66k
      } else {
871
2.66k
        udp_listener_config_.listener_factory_ =
872
2.66k
            std::make_unique<Server::ActiveRawUdpListenerFactory>(1);
873
2.66k
      }
874
2.66k
    }
875
876
    UdpListenerConfigImpl udp_listener_config_;
877
878
  private:
879
    // Network::ListenerConfig
880
721
    Network::FilterChainManager& filterChainManager() override { return parent_; }
881
1.44k
    Network::FilterChainFactory& filterChainFactory() override { return parent_; }
882
5.32k
    std::vector<Network::ListenSocketFactoryPtr>& listenSocketFactories() override {
883
5.32k
      return parent_.socket_factories_;
884
5.32k
    }
885
2.66k
    bool bindToPort() const override { return true; }
886
721
    bool handOffRestoredDestinationConnections() const override { return false; }
887
721
    uint32_t perConnectionBufferLimitBytes() const override { return 0; }
888
2.66k
    std::chrono::milliseconds listenerFiltersTimeout() const override { return {}; }
889
2.66k
    bool continueOnListenerFiltersTimeout() const override { return false; }
890
42.5k
    Stats::Scope& listenerScope() override { return *parent_.stats_store_.rootScope(); }
891
7.98k
    uint64_t listenerTag() const override { return 0; }
892
0
    const std::string& name() const override { return name_; }
893
0
    Network::UdpListenerConfigOptRef udpListenerConfig() override { return udp_listener_config_; }
894
2.66k
    Network::InternalListenerConfigOptRef internalListenerConfig() override { return {}; }
895
2.66k
    Network::ConnectionBalancer& connectionBalancer(const Network::Address::Instance&) override {
896
2.66k
      return connection_balancer_;
897
2.66k
    }
898
0
    envoy::config::core::v3::TrafficDirection direction() const override {
899
0
      return envoy::config::core::v3::UNSPECIFIED;
900
0
    }
901
721
    const std::vector<AccessLog::InstanceSharedPtr>& accessLogs() const override {
902
721
      return empty_access_logs_;
903
721
    }
904
2.16k
    ResourceLimit& openConnections() override { return connection_resource_; }
905
0
    uint32_t tcpBacklogSize() const override { return ENVOY_TCP_BACKLOG_SIZE; }
906
2.66k
    uint32_t maxConnectionsToAcceptPerSocketEvent() const override {
907
2.66k
      return Network::DefaultMaxConnectionsToAcceptPerSocketEvent;
908
2.66k
    }
909
0
    Init::Manager& initManager() override { return *init_manager_; }
910
2.66k
    bool ignoreGlobalConnLimit() const override { return false; }
911
912
0
    void setMaxConnections(const uint32_t num_connections) {
913
0
      connection_resource_.setMax(num_connections);
914
0
    }
915
0
    void clearMaxConnections() { connection_resource_.resetMax(); }
916
917
    FakeUpstream& parent_;
918
    const std::string name_;
919
    Network::NopConnectionBalancerImpl connection_balancer_;
920
    BasicResourceLimitImpl connection_resource_;
921
    const std::vector<AccessLog::InstanceSharedPtr> empty_access_logs_;
922
    std::unique_ptr<Init::Manager> init_manager_;
923
  };
924
925
  void threadRoutine();
926
  SharedConnectionWrapper& consumeConnection() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_);
927
  Network::FilterStatus onRecvDatagram(Network::UdpRecvData& data);
928
  AssertionResult
929
  runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb,
930
                               std::chrono::milliseconds timeout = TestUtility::DefaultTimeout);
931
932
  const envoy::config::core::v3::Http2ProtocolOptions http2_options_;
933
  const envoy::config::core::v3::Http3ProtocolOptions http3_options_;
934
  envoy::config::listener::v3::QuicProtocolOptions quic_options_;
935
  Network::SocketSharedPtr socket_;
936
  std::vector<Network::ListenSocketFactoryPtr> socket_factories_;
937
  ConditionalInitializer server_initialized_;
938
  // Guards any objects which can be altered both in the upstream thread and the
939
  // main test thread.
940
  absl::Mutex lock_;
941
  Thread::ThreadPtr thread_;
942
  Api::ApiPtr api_;
943
  Event::TestTimeSystem& time_system_;
944
  Event::DispatcherPtr dispatcher_;
945
  Network::ConnectionHandlerPtr handler_;
946
  std::list<SharedConnectionWrapperPtr> new_connections_ ABSL_GUARDED_BY(lock_);
947
  testing::NiceMock<Runtime::MockLoader> runtime_;
948
949
  // When a QueuedConnectionWrapper is popped from new_connections_, ownership is transferred to
950
  // consumed_connections_. This allows later the Connection destruction (when the FakeUpstream is
951
  // deleted) on the same thread that allocated the connection.
952
  std::list<SharedConnectionWrapperPtr> consumed_connections_ ABSL_GUARDED_BY(lock_);
953
  std::list<FakeHttpConnectionPtr> quic_connections_ ABSL_GUARDED_BY(lock_);
954
  const FakeUpstreamConfig config_;
955
  // Normally connections are read disabled until a fake raw or http connection
956
  // is created, and are then read enabled. Setting these true skips both these.
957
  bool read_disable_on_new_connection_;
958
  // Setting this true disables all events and does not re-enable as the above does.
959
  bool disable_and_do_not_enable_{};
960
  const bool enable_half_close_;
961
  FakeListener listener_;
962
  const Network::FilterChainSharedPtr filter_chain_;
963
  std::list<Network::UdpRecvData> received_datagrams_ ABSL_GUARDED_BY(lock_);
964
  Stats::ScopeSharedPtr stats_scope_;
965
  Http::Http1::CodecStats::AtomicPtr http1_codec_stats_;
966
  Http::Http2::CodecStats::AtomicPtr http2_codec_stats_;
967
  Http::Http3::CodecStats::AtomicPtr http3_codec_stats_;
968
  testing::NiceMock<ProtobufMessage::MockValidationVisitor> validation_visitor_;
969
#ifdef ENVOY_ENABLE_QUIC
970
  Quic::QuicStatNames quic_stat_names_ = Quic::QuicStatNames(stats_store_.symbolTable());
971
#endif
972
  bool initialized_ = false;
973
};
974
975
using FakeUpstreamPtr = std::unique_ptr<FakeUpstream>;
976
977
} // namespace Envoy