/proc/self/cwd/test/integration/fake_upstream.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #include <cstdint> |
4 | | #include <list> |
5 | | #include <memory> |
6 | | #include <string> |
7 | | |
8 | | #include "envoy/api/api.h" |
9 | | #include "envoy/config/core/v3/base.pb.h" |
10 | | #include "envoy/config/listener/v3/quic_config.pb.h" |
11 | | #include "envoy/grpc/status.h" |
12 | | #include "envoy/http/codec.h" |
13 | | #include "envoy/network/connection.h" |
14 | | #include "envoy/network/connection_handler.h" |
15 | | #include "envoy/network/filter.h" |
16 | | #include "envoy/network/listener.h" |
17 | | #include "envoy/stats/scope.h" |
18 | | |
19 | | #include "source/common/buffer/buffer_impl.h" |
20 | | #include "source/common/buffer/zero_copy_input_stream_impl.h" |
21 | | #include "source/common/common/basic_resource_impl.h" |
22 | | #include "source/common/common/callback_impl.h" |
23 | | #include "source/common/common/linked_object.h" |
24 | | #include "source/common/common/lock_guard.h" |
25 | | #include "source/common/common/thread.h" |
26 | | #include "source/common/config/utility.h" |
27 | | #include "source/common/grpc/codec.h" |
28 | | #include "source/common/grpc/common.h" |
29 | | #include "source/common/http/http1/codec_impl.h" |
30 | | #include "source/common/http/http2/codec_impl.h" |
31 | | #include "source/common/http/http3/codec_stats.h" |
32 | | #include "source/common/network/connection_balancer_impl.h" |
33 | | #include "source/common/network/filter_impl.h" |
34 | | #include "source/common/network/listen_socket_impl.h" |
35 | | #include "source/common/network/udp_listener_impl.h" |
36 | | #include "source/common/network/udp_packet_writer_handler_impl.h" |
37 | | #include "source/common/stats/isolated_store_impl.h" |
38 | | |
39 | | #include "test/mocks/http/header_validator.h" |
40 | | #include "test/mocks/protobuf/mocks.h" |
41 | | #include "test/mocks/server/instance.h" |
42 | | |
43 | | #if defined(ENVOY_ENABLE_QUIC) |
44 | | #include "source/common/quic/active_quic_listener.h" |
45 | | #include "source/common/quic/quic_stat_names.h" |
46 | | #endif |
47 | | |
48 | | #include "source/common/listener_manager/active_raw_udp_listener_config.h" |
49 | | |
50 | | #include "test/mocks/common.h" |
51 | | #include "test/mocks/runtime/mocks.h" |
52 | | #include "test/mocks/server/overload_manager.h" |
53 | | #include "test/test_common/test_time_system.h" |
54 | | #include "test/test_common/utility.h" |
55 | | |
56 | | // TODO(mattklein123): A lot of code should be moved from this header file into the cc file. |
57 | | |
58 | | namespace Envoy { |
59 | | |
60 | | class FakeHttpConnection; |
61 | | class FakeUpstream; |
62 | | |
63 | | /** |
64 | | * Provides a fake HTTP stream for integration testing. |
65 | | */ |
66 | | class FakeStream : public Http::RequestDecoder, |
67 | | public Http::StreamCallbacks, |
68 | | Logger::Loggable<Logger::Id::testing> { |
69 | | public: |
70 | | FakeStream(FakeHttpConnection& parent, Http::ResponseEncoder& encoder, |
71 | | Event::TestTimeSystem& time_system); |
72 | | |
73 | 0 | uint64_t bodyLength() { |
74 | 0 | absl::MutexLock lock(&lock_); |
75 | 0 | return body_.length(); |
76 | 0 | } |
77 | 0 | Buffer::Instance& body() { |
78 | 0 | absl::MutexLock lock(&lock_); |
79 | 0 | return body_; |
80 | 0 | } |
81 | 0 | bool complete() { |
82 | 0 | absl::MutexLock lock(&lock_); |
83 | 0 | return end_stream_; |
84 | 0 | } |
85 | | |
86 | | // Execute a callback using the dispatcher associated with the FakeStream's connection. This |
87 | | // allows execution of non-interrupted sequences of operations on the fake stream which may run |
88 | | // into trouble if client-side events are interleaved. |
89 | | void postToConnectionThread(std::function<void()> cb); |
90 | | void encode1xxHeaders(const Http::ResponseHeaderMap& headers); |
91 | | void encodeHeaders(const Http::HeaderMap& headers, bool end_stream); |
92 | | void encodeData(uint64_t size, bool end_stream); |
93 | | void encodeData(Buffer::Instance& data, bool end_stream); |
94 | | void encodeData(std::string data, bool end_stream); |
95 | | void encodeTrailers(const Http::HeaderMap& trailers); |
96 | | void encodeResetStream(); |
97 | | void encodeMetadata(const Http::MetadataMapVector& metadata_map_vector); |
98 | | void readDisable(bool disable); |
99 | 0 | const Http::RequestHeaderMap& headers() { |
100 | 0 | absl::MutexLock lock(&lock_); |
101 | 0 | return *headers_; |
102 | 0 | } |
103 | 0 | void setAddServedByHeader(bool add_header) { add_served_by_header_ = add_header; } |
104 | 0 | const Http::RequestTrailerMapPtr& trailers() { return trailers_; } |
105 | 0 | bool receivedData() { return received_data_; } |
106 | 0 | Http::Http1StreamEncoderOptionsOptRef http1StreamEncoderOptions() { |
107 | 0 | return encoder_.http1StreamEncoderOptions(); |
108 | 0 | } |
109 | | void |
110 | | sendLocalReply(Http::Code code, absl::string_view body, |
111 | | const std::function<void(Http::ResponseHeaderMap& headers)>& /*modify_headers*/, |
112 | | const absl::optional<Grpc::Status::GrpcStatus> grpc_status, |
113 | 0 | absl::string_view /*details*/) override { |
114 | 0 | bool is_head_request; |
115 | 0 | { |
116 | 0 | absl::MutexLock lock(&lock_); |
117 | 0 | is_head_request = headers_ != nullptr && |
118 | 0 | headers_->getMethodValue() == Http::Headers::get().MethodValues.Head; |
119 | 0 | } |
120 | 0 | Http::Utility::sendLocalReply( |
121 | 0 | false, |
122 | 0 | Http::Utility::EncodeFunctions( |
123 | 0 | {nullptr, nullptr, |
124 | 0 | [&](Http::ResponseHeaderMapPtr&& headers, bool end_stream) -> void { |
125 | 0 | encoder_.encodeHeaders(*headers, end_stream); |
126 | 0 | }, |
127 | 0 | [&](Buffer::Instance& data, bool end_stream) -> void { |
128 | 0 | encoder_.encodeData(data, end_stream); |
129 | 0 | }}), |
130 | 0 | Http::Utility::LocalReplyData({false, code, body, grpc_status, is_head_request})); |
131 | 0 | } |
132 | | |
133 | | ABSL_MUST_USE_RESULT |
134 | | testing::AssertionResult |
135 | | waitForHeadersComplete(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
136 | | |
137 | | ABSL_MUST_USE_RESULT |
138 | | testing::AssertionResult |
139 | | waitForData(Event::Dispatcher& client_dispatcher, uint64_t body_length, |
140 | | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
141 | | |
142 | | ABSL_MUST_USE_RESULT |
143 | | testing::AssertionResult |
144 | | waitForData(Event::Dispatcher& client_dispatcher, absl::string_view body, |
145 | | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
146 | | |
147 | | using ValidatorFunction = const std::function<bool(const std::string&)>; |
148 | | ABSL_MUST_USE_RESULT |
149 | | testing::AssertionResult |
150 | | waitForData(Event::Dispatcher& client_dispatcher, const ValidatorFunction& data_validator, |
151 | | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
152 | | |
153 | | ABSL_MUST_USE_RESULT |
154 | | testing::AssertionResult |
155 | | waitForEndStream(Event::Dispatcher& client_dispatcher, |
156 | | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
157 | | |
158 | | ABSL_MUST_USE_RESULT |
159 | | testing::AssertionResult |
160 | | waitForReset(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
161 | | |
162 | | // gRPC convenience methods. |
163 | | void startGrpcStream(bool send_headers = true); |
164 | | void finishGrpcStream(Grpc::Status::GrpcStatus status); |
165 | 172 | template <class T> void sendGrpcMessage(const T& message) { |
166 | 172 | ASSERT(grpc_stream_started_, |
167 | 172 | "start gRPC stream by calling startGrpcStream before sending a message"); |
168 | 172 | auto serialized_response = Grpc::Common::serializeToGrpcFrame(message); |
169 | 172 | encodeData(*serialized_response, false); |
170 | 172 | ENVOY_LOG(debug, "Sent gRPC message: {}", message.DebugString()); |
171 | 172 | } void Envoy::FakeStream::sendGrpcMessage<envoy::service::discovery::v3::DeltaDiscoveryResponse>(envoy::service::discovery::v3::DeltaDiscoveryResponse const&) Line | Count | Source | 165 | 38 | template <class T> void sendGrpcMessage(const T& message) { | 166 | 38 | ASSERT(grpc_stream_started_, | 167 | 38 | "start gRPC stream by calling startGrpcStream before sending a message"); | 168 | 38 | auto serialized_response = Grpc::Common::serializeToGrpcFrame(message); | 169 | 38 | encodeData(*serialized_response, false); | 170 | 38 | ENVOY_LOG(debug, "Sent gRPC message: {}", message.DebugString()); | 171 | 38 | } |
void Envoy::FakeStream::sendGrpcMessage<envoy::service::discovery::v3::DiscoveryResponse>(envoy::service::discovery::v3::DiscoveryResponse const&) Line | Count | Source | 165 | 134 | template <class T> void sendGrpcMessage(const T& message) { | 166 | 134 | ASSERT(grpc_stream_started_, | 167 | 134 | "start gRPC stream by calling startGrpcStream before sending a message"); | 168 | 134 | auto serialized_response = Grpc::Common::serializeToGrpcFrame(message); | 169 | 134 | encodeData(*serialized_response, false); | 170 | 134 | ENVOY_LOG(debug, "Sent gRPC message: {}", message.DebugString()); | 171 | 134 | } |
|
172 | 278 | template <class T> void decodeGrpcFrame(T& message) { |
173 | 278 | EXPECT_GE(decoded_grpc_frames_.size(), 1); |
174 | 278 | if (decoded_grpc_frames_[0].length_ == 0) { |
175 | 0 | decoded_grpc_frames_.erase(decoded_grpc_frames_.begin()); |
176 | 0 | return; |
177 | 0 | } |
178 | 278 | Buffer::ZeroCopyInputStreamImpl stream(std::move(decoded_grpc_frames_[0].data_)); |
179 | 278 | EXPECT_TRUE(decoded_grpc_frames_[0].flags_ == Grpc::GRPC_FH_DEFAULT); |
180 | 278 | EXPECT_TRUE(message.ParseFromZeroCopyStream(&stream)); |
181 | 278 | ENVOY_LOG(debug, "Received gRPC message: {}", message.DebugString()); |
182 | 278 | decoded_grpc_frames_.erase(decoded_grpc_frames_.begin()); |
183 | 278 | } void Envoy::FakeStream::decodeGrpcFrame<envoy::service::discovery::v3::DiscoveryRequest>(envoy::service::discovery::v3::DiscoveryRequest&) Line | Count | Source | 172 | 216 | template <class T> void decodeGrpcFrame(T& message) { | 173 | 216 | EXPECT_GE(decoded_grpc_frames_.size(), 1); | 174 | 216 | if (decoded_grpc_frames_[0].length_ == 0) { | 175 | 0 | decoded_grpc_frames_.erase(decoded_grpc_frames_.begin()); | 176 | 0 | return; | 177 | 0 | } | 178 | 216 | Buffer::ZeroCopyInputStreamImpl stream(std::move(decoded_grpc_frames_[0].data_)); | 179 | 216 | EXPECT_TRUE(decoded_grpc_frames_[0].flags_ == Grpc::GRPC_FH_DEFAULT); | 180 | 216 | EXPECT_TRUE(message.ParseFromZeroCopyStream(&stream)); | 181 | 216 | ENVOY_LOG(debug, "Received gRPC message: {}", message.DebugString()); | 182 | 216 | decoded_grpc_frames_.erase(decoded_grpc_frames_.begin()); | 183 | 216 | } |
void Envoy::FakeStream::decodeGrpcFrame<envoy::service::discovery::v3::DeltaDiscoveryRequest>(envoy::service::discovery::v3::DeltaDiscoveryRequest&) Line | Count | Source | 172 | 62 | template <class T> void decodeGrpcFrame(T& message) { | 173 | 62 | EXPECT_GE(decoded_grpc_frames_.size(), 1); | 174 | 62 | if (decoded_grpc_frames_[0].length_ == 0) { | 175 | 0 | decoded_grpc_frames_.erase(decoded_grpc_frames_.begin()); | 176 | 0 | return; | 177 | 0 | } | 178 | 62 | Buffer::ZeroCopyInputStreamImpl stream(std::move(decoded_grpc_frames_[0].data_)); | 179 | 62 | EXPECT_TRUE(decoded_grpc_frames_[0].flags_ == Grpc::GRPC_FH_DEFAULT); | 180 | 62 | EXPECT_TRUE(message.ParseFromZeroCopyStream(&stream)); | 181 | 62 | ENVOY_LOG(debug, "Received gRPC message: {}", message.DebugString()); | 182 | 62 | decoded_grpc_frames_.erase(decoded_grpc_frames_.begin()); | 183 | 62 | } |
|
184 | | template <class T> |
185 | | ABSL_MUST_USE_RESULT testing::AssertionResult |
186 | | waitForGrpcMessage(Event::Dispatcher& client_dispatcher, T& message, |
187 | 278 | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) { |
188 | 278 | Event::TestTimeSystem::RealTimeBound bound(timeout); |
189 | 278 | ENVOY_LOG(debug, "Waiting for gRPC message..."); |
190 | 278 | if (!decoded_grpc_frames_.empty()) { |
191 | 101 | decodeGrpcFrame(message); |
192 | 101 | return AssertionSuccess(); |
193 | 101 | } |
194 | 177 | if (!waitForData(client_dispatcher, 5, timeout)) { |
195 | 0 | return testing::AssertionFailure() << "Timed out waiting for start of gRPC message."; |
196 | 0 | } |
197 | 177 | int last_body_size = 0; |
198 | 177 | { |
199 | 177 | absl::MutexLock lock(&lock_); |
200 | 177 | last_body_size = body_.length(); |
201 | 177 | if (!grpc_decoder_.decode(body_, decoded_grpc_frames_).ok()) { |
202 | 0 | return testing::AssertionFailure() |
203 | 0 | << "Couldn't decode gRPC data frame: " << body_.toString(); |
204 | 0 | } |
205 | 177 | } |
206 | 177 | if (decoded_grpc_frames_.empty()) { |
207 | 0 | if (!waitForData(client_dispatcher, grpc_decoder_.length() - last_body_size, |
208 | 0 | bound.timeLeft())) { |
209 | 0 | return testing::AssertionFailure() << "Timed out waiting for end of gRPC message."; |
210 | 0 | } |
211 | 0 | { |
212 | 0 | absl::MutexLock lock(&lock_); |
213 | 0 | if (!grpc_decoder_.decode(body_, decoded_grpc_frames_).ok()) { |
214 | 0 | return testing::AssertionFailure() |
215 | 0 | << "Couldn't decode gRPC data frame: " << body_.toString(); |
216 | 0 | } |
217 | 0 | } |
218 | 0 | } |
219 | 177 | decodeGrpcFrame(message); |
220 | 177 | return AssertionSuccess(); |
221 | 177 | } testing::AssertionResult Envoy::FakeStream::waitForGrpcMessage<envoy::service::discovery::v3::DiscoveryRequest>(Envoy::Event::Dispatcher&, envoy::service::discovery::v3::DiscoveryRequest&, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000l> >) Line | Count | Source | 187 | 216 | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) { | 188 | 216 | Event::TestTimeSystem::RealTimeBound bound(timeout); | 189 | 216 | ENVOY_LOG(debug, "Waiting for gRPC message..."); | 190 | 216 | if (!decoded_grpc_frames_.empty()) { | 191 | 72 | decodeGrpcFrame(message); | 192 | 72 | return AssertionSuccess(); | 193 | 72 | } | 194 | 144 | if (!waitForData(client_dispatcher, 5, timeout)) { | 195 | 0 | return testing::AssertionFailure() << "Timed out waiting for start of gRPC message."; | 196 | 0 | } | 197 | 144 | int last_body_size = 0; | 198 | 144 | { | 199 | 144 | absl::MutexLock lock(&lock_); | 200 | 144 | last_body_size = body_.length(); | 201 | 144 | if (!grpc_decoder_.decode(body_, decoded_grpc_frames_).ok()) { | 202 | 0 | return testing::AssertionFailure() | 203 | 0 | << "Couldn't decode gRPC data frame: " << body_.toString(); | 204 | 0 | } | 205 | 144 | } | 206 | 144 | if (decoded_grpc_frames_.empty()) { | 207 | 0 | if (!waitForData(client_dispatcher, grpc_decoder_.length() - last_body_size, | 208 | 0 | bound.timeLeft())) { | 209 | 0 | return testing::AssertionFailure() << "Timed out waiting for end of gRPC message."; | 210 | 0 | } | 211 | 0 | { | 212 | 0 | absl::MutexLock lock(&lock_); | 213 | 0 | if (!grpc_decoder_.decode(body_, decoded_grpc_frames_).ok()) { | 214 | 0 | return testing::AssertionFailure() | 215 | 0 | << "Couldn't decode gRPC data frame: " << body_.toString(); | 216 | 0 | } | 217 | 0 | } | 218 | 0 | } | 219 | 144 | decodeGrpcFrame(message); | 220 | 144 | return AssertionSuccess(); | 221 | 144 | } |
testing::AssertionResult Envoy::FakeStream::waitForGrpcMessage<envoy::service::discovery::v3::DeltaDiscoveryRequest>(Envoy::Event::Dispatcher&, envoy::service::discovery::v3::DeltaDiscoveryRequest&, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000l> >) Line | Count | Source | 187 | 62 | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout) { | 188 | 62 | Event::TestTimeSystem::RealTimeBound bound(timeout); | 189 | 62 | ENVOY_LOG(debug, "Waiting for gRPC message..."); | 190 | 62 | if (!decoded_grpc_frames_.empty()) { | 191 | 29 | decodeGrpcFrame(message); | 192 | 29 | return AssertionSuccess(); | 193 | 29 | } | 194 | 33 | if (!waitForData(client_dispatcher, 5, timeout)) { | 195 | 0 | return testing::AssertionFailure() << "Timed out waiting for start of gRPC message."; | 196 | 0 | } | 197 | 33 | int last_body_size = 0; | 198 | 33 | { | 199 | 33 | absl::MutexLock lock(&lock_); | 200 | 33 | last_body_size = body_.length(); | 201 | 33 | if (!grpc_decoder_.decode(body_, decoded_grpc_frames_).ok()) { | 202 | 0 | return testing::AssertionFailure() | 203 | 0 | << "Couldn't decode gRPC data frame: " << body_.toString(); | 204 | 0 | } | 205 | 33 | } | 206 | 33 | if (decoded_grpc_frames_.empty()) { | 207 | 0 | if (!waitForData(client_dispatcher, grpc_decoder_.length() - last_body_size, | 208 | 0 | bound.timeLeft())) { | 209 | 0 | return testing::AssertionFailure() << "Timed out waiting for end of gRPC message."; | 210 | 0 | } | 211 | 0 | { | 212 | 0 | absl::MutexLock lock(&lock_); | 213 | 0 | if (!grpc_decoder_.decode(body_, decoded_grpc_frames_).ok()) { | 214 | 0 | return testing::AssertionFailure() | 215 | 0 | << "Couldn't decode gRPC data frame: " << body_.toString(); | 216 | 0 | } | 217 | 0 | } | 218 | 0 | } | 219 | 33 | decodeGrpcFrame(message); | 220 | 33 | return AssertionSuccess(); | 221 | 33 | } |
|
222 | | |
223 | | // Http::StreamDecoder |
224 | | void decodeData(Buffer::Instance& data, bool end_stream) override; |
225 | | void decodeMetadata(Http::MetadataMapPtr&& metadata_map_ptr) override; |
226 | | |
227 | | // Http::RequestDecoder |
228 | | void decodeHeaders(Http::RequestHeaderMapSharedPtr&& headers, bool end_stream) override; |
229 | | void decodeTrailers(Http::RequestTrailerMapPtr&& trailers) override; |
230 | 0 | StreamInfo::StreamInfo& streamInfo() override { |
231 | 0 | RELEASE_ASSERT(false, "initialize if this is needed"); |
232 | 0 | return *stream_info_; |
233 | 0 | } |
234 | 0 | std::list<AccessLog::InstanceSharedPtr> accessLogHandlers() override { |
235 | 0 | return access_log_handlers_; |
236 | 0 | } |
237 | | |
238 | | // Http::StreamCallbacks |
239 | | void onResetStream(Http::StreamResetReason reason, |
240 | | absl::string_view transport_failure_reason) override; |
241 | 0 | void onAboveWriteBufferHighWatermark() override {} |
242 | 0 | void onBelowWriteBufferLowWatermark() override {} |
243 | | |
244 | 1.44k | virtual void setEndStream(bool end) ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { end_stream_ = end; } |
245 | | |
246 | 0 | Event::TestTimeSystem& timeSystem() { return time_system_; } |
247 | | |
248 | 0 | Http::MetadataMap& metadataMap() { return metadata_map_; } |
249 | 0 | absl::node_hash_map<std::string, uint64_t>& duplicatedMetadataKeyCount() { |
250 | 0 | return duplicated_metadata_key_count_; |
251 | 0 | } |
252 | | |
253 | | protected: |
254 | | absl::Mutex lock_; |
255 | | Http::RequestHeaderMapSharedPtr headers_ ABSL_GUARDED_BY(lock_); |
256 | | Buffer::OwnedImpl body_ ABSL_GUARDED_BY(lock_); |
257 | | FakeHttpConnection& parent_; |
258 | | |
259 | | private: |
260 | | Http::ResponseEncoder& encoder_; |
261 | | Http::RequestTrailerMapPtr trailers_ ABSL_GUARDED_BY(lock_); |
262 | | bool end_stream_ ABSL_GUARDED_BY(lock_){}; |
263 | | bool saw_reset_ ABSL_GUARDED_BY(lock_){}; |
264 | | Grpc::Decoder grpc_decoder_; |
265 | | std::vector<Grpc::Frame> decoded_grpc_frames_; |
266 | | bool add_served_by_header_{}; |
267 | | Event::TestTimeSystem& time_system_; |
268 | | Http::MetadataMap metadata_map_; |
269 | | absl::node_hash_map<std::string, uint64_t> duplicated_metadata_key_count_; |
270 | | std::shared_ptr<StreamInfo::StreamInfo> stream_info_; |
271 | | std::list<AccessLog::InstanceSharedPtr> access_log_handlers_; |
272 | | bool received_data_{false}; |
273 | | bool grpc_stream_started_{false}; |
274 | | Http::ServerHeaderValidatorPtr header_validator_; |
275 | | }; |
276 | | |
277 | | using FakeStreamPtr = std::unique_ptr<FakeStream>; |
278 | | |
279 | | // Encapsulates various state and functionality related to sharing a Connection object across |
280 | | // threads. With FakeUpstream fabricated objects, we have a Connection that is associated with a |
281 | | // dispatcher on a thread managed by FakeUpstream. We want to be able to safely invoke methods on |
282 | | // this object from other threads (e.g. the main test thread) and be able to track connection state |
283 | | // (e.g. are we disconnected and the Connection is now possibly deleted). We manage this via a |
284 | | // SharedConnectionWrapper that lives from when the Connection is added to the accepted connection |
285 | | // queue and then through the lifetime of the Fake{Raw,Http}Connection that manages the Connection |
286 | | // through active use. |
287 | | class SharedConnectionWrapper : public Network::ConnectionCallbacks, |
288 | | public LinkedObject<SharedConnectionWrapper> { |
289 | | public: |
290 | | using DisconnectCallback = std::function<void()>; |
291 | | |
292 | | SharedConnectionWrapper(Network::Connection& connection) |
293 | 495 | : connection_(connection), dispatcher_(connection_.dispatcher()) { |
294 | 495 | connection_.addConnectionCallbacks(*this); |
295 | 495 | } |
296 | | |
297 | | // Network::ConnectionCallbacks |
298 | 495 | void onEvent(Network::ConnectionEvent event) override { |
299 | | // Throughout this entire function, we know that the connection_ cannot disappear, since this |
300 | | // callback is invoked prior to connection_ deferred delete. We also know by locking below, |
301 | | // that elsewhere where we also hold lock_, that the connection cannot disappear inside the |
302 | | // locked scope. |
303 | 495 | absl::MutexLock lock(&lock_); |
304 | 495 | if (event == Network::ConnectionEvent::RemoteClose || |
305 | 495 | event == Network::ConnectionEvent::LocalClose) { |
306 | 495 | if (connection_.detectedCloseType() == Network::DetectedCloseType::RemoteReset || |
307 | 495 | connection_.detectedCloseType() == Network::DetectedCloseType::LocalReset) { |
308 | 19 | rst_disconnected_ = true; |
309 | 19 | } |
310 | 495 | disconnected_ = true; |
311 | 495 | } |
312 | 495 | } |
313 | | |
314 | 0 | void onAboveWriteBufferHighWatermark() override {} |
315 | 0 | void onBelowWriteBufferLowWatermark() override {} |
316 | | |
317 | 679 | Event::Dispatcher& dispatcher() { return dispatcher_; } |
318 | | |
319 | 10.6k | bool connected() { |
320 | 10.6k | absl::MutexLock lock(&lock_); |
321 | 10.6k | return connectedLockHeld(); |
322 | 10.6k | } |
323 | | |
324 | 10.7k | bool connectedLockHeld() { |
325 | 10.7k | lock_.AssertReaderHeld(); // TODO(mattklein123): This can't be annotated because the lock |
326 | | // is acquired via the base connection reference. Fix this to |
327 | | // remove the reference. |
328 | 10.7k | return !disconnected_; |
329 | 10.7k | } |
330 | | |
331 | 0 | bool rstDisconnected() { |
332 | 0 | lock_.AssertReaderHeld(); |
333 | 0 | return rst_disconnected_; |
334 | 0 | } |
335 | | |
336 | | // This provides direct access to the underlying connection, but only to const methods. |
337 | | // Stateful connection related methods should happen on the connection's dispatcher via |
338 | | // executeOnDispatcher. |
339 | | // thread safety violations when crossing between the test thread and FakeUpstream thread. |
340 | 1.08k | Network::Connection& connection() const { return connection_; } |
341 | | |
342 | | // Execute some function on the connection's dispatcher. This involves a cross-thread post and |
343 | | // wait-for-completion. If the connection is disconnected, either prior to post or when the |
344 | | // dispatcher schedules the callback, we silently ignore. |
345 | | ABSL_MUST_USE_RESULT |
346 | | testing::AssertionResult |
347 | | executeOnDispatcher(std::function<void(Network::Connection&)> f, |
348 | | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout, |
349 | 3.75k | bool allow_disconnects = true) { |
350 | 3.75k | absl::MutexLock lock(&lock_); |
351 | 3.75k | if (disconnected_) { |
352 | 89 | return testing::AssertionSuccess(); |
353 | 89 | } |
354 | | // Sanity check: detect if the post and wait is attempted from the dispatcher thread; fail |
355 | | // immediately instead of deadlocking. |
356 | 3.67k | ASSERT(!connection_.dispatcher().isThreadSafe(), |
357 | 3.67k | "deadlock: executeOnDispatcher called from dispatcher thread."); |
358 | 3.67k | bool callback_ready_event = false; |
359 | 3.67k | bool unexpected_disconnect = false; |
360 | 3.67k | connection_.dispatcher().post( |
361 | 3.67k | [this, f, &lock = lock_, &callback_ready_event, &unexpected_disconnect]() -> void { |
362 | | // The use of connected() here, vs. !disconnected_, is because we want to use the lock_ |
363 | | // acquisition to briefly serialize. This avoids us entering this completion and issuing |
364 | | // a notifyOne() until the wait() is ready to receive it below. |
365 | 3.67k | if (connected()) { |
366 | 3.62k | f(connection_); |
367 | 3.62k | } else { |
368 | 44 | unexpected_disconnect = true; |
369 | 44 | } |
370 | 3.67k | absl::MutexLock lock_guard(&lock); |
371 | 3.67k | callback_ready_event = true; |
372 | 3.67k | }); |
373 | 3.67k | Event::TestTimeSystem& time_system = |
374 | 3.67k | dynamic_cast<Event::TestTimeSystem&>(connection_.dispatcher().timeSource()); |
375 | 3.67k | if (!time_system.waitFor(lock_, absl::Condition(&callback_ready_event), timeout)) { |
376 | 0 | return testing::AssertionFailure() << "Timed out while executing on dispatcher."; |
377 | 0 | } |
378 | 3.67k | if (unexpected_disconnect && !allow_disconnects) { |
379 | 0 | ENVOY_LOG_MISC(warn, "executeOnDispatcher failed due to disconnect\n"); |
380 | 0 | } |
381 | 3.67k | return testing::AssertionSuccess(); |
382 | 3.67k | } |
383 | | |
384 | 493 | absl::Mutex& lock() { return lock_; } |
385 | | |
386 | 100 | void setParented() { |
387 | 100 | absl::MutexLock lock(&lock_); |
388 | 100 | ASSERT(!parented_); |
389 | 100 | parented_ = true; |
390 | 100 | } |
391 | | |
392 | | private: |
393 | | Network::Connection& connection_; |
394 | | Event::Dispatcher& dispatcher_; |
395 | | absl::Mutex lock_; |
396 | | bool parented_ ABSL_GUARDED_BY(lock_){}; |
397 | | bool disconnected_ ABSL_GUARDED_BY(lock_){}; |
398 | | bool rst_disconnected_ ABSL_GUARDED_BY(lock_){}; |
399 | | }; |
400 | | |
401 | | using SharedConnectionWrapperPtr = std::unique_ptr<SharedConnectionWrapper>; |
402 | | |
403 | | /** |
404 | | * Base class for both fake raw connections and fake HTTP connections. |
405 | | */ |
406 | | class FakeConnectionBase : public Logger::Loggable<Logger::Id::testing> { |
407 | | public: |
408 | 493 | virtual ~FakeConnectionBase() { |
409 | 493 | absl::MutexLock lock(&lock_); |
410 | 493 | ASSERT(initialized_); |
411 | 493 | ASSERT(pending_cbs_ == 0); |
412 | 493 | } |
413 | | |
414 | | ABSL_MUST_USE_RESULT |
415 | | testing::AssertionResult close(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
416 | | |
417 | | ABSL_MUST_USE_RESULT |
418 | | testing::AssertionResult close(Network::ConnectionCloseType close_type, |
419 | | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
420 | | |
421 | | ABSL_MUST_USE_RESULT |
422 | | testing::AssertionResult |
423 | | readDisable(bool disable, std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
424 | | |
425 | | ABSL_MUST_USE_RESULT |
426 | | testing::AssertionResult |
427 | | waitForDisconnect(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
428 | | |
429 | | ABSL_MUST_USE_RESULT |
430 | | testing::AssertionResult |
431 | | waitForRstDisconnect(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
432 | | |
433 | | ABSL_MUST_USE_RESULT |
434 | | testing::AssertionResult |
435 | | waitForHalfClose(std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
436 | | |
437 | 493 | virtual void initialize() { |
438 | 493 | absl::MutexLock lock(&lock_); |
439 | 493 | initialized_ = true; |
440 | 493 | } |
441 | | |
442 | | // Some upstream connection are supposed to be alive forever. |
443 | | ABSL_MUST_USE_RESULT |
444 | | testing::AssertionResult virtual waitForNoPost( |
445 | | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
446 | | |
447 | | // The same caveats apply here as in SharedConnectionWrapper::connection(). |
448 | 86 | Network::Connection& connection() const { return shared_connection_.connection(); } |
449 | 6.67k | bool connected() const { return shared_connection_.connected(); } |
450 | | |
451 | | void postToConnectionThread(std::function<void()> cb); |
452 | 0 | SharedConnectionWrapper& sharedConnection() { return shared_connection_; } |
453 | | |
454 | | protected: |
455 | | FakeConnectionBase(SharedConnectionWrapper& shared_connection, Event::TestTimeSystem& time_system) |
456 | | : shared_connection_(shared_connection), lock_(shared_connection.lock()), |
457 | 493 | dispatcher_(shared_connection_.dispatcher()), time_system_(time_system) {} |
458 | | |
459 | | SharedConnectionWrapper& shared_connection_; |
460 | | absl::Mutex& lock_; // TODO(mattklein123): Use the shared connection lock and figure out better |
461 | | // guarded by annotations. |
462 | | Event::Dispatcher& dispatcher_; |
463 | | bool initialized_ ABSL_GUARDED_BY(lock_){}; |
464 | | bool half_closed_ ABSL_GUARDED_BY(lock_){}; |
465 | | std::atomic<uint64_t> pending_cbs_{}; |
466 | | Event::TestTimeSystem& time_system_; |
467 | | }; |
468 | | |
469 | | /** |
470 | | * Provides a fake HTTP connection for integration testing. |
471 | | */ |
472 | | class FakeHttpConnection : public Http::ServerConnectionCallbacks, public FakeConnectionBase { |
473 | | public: |
474 | | // This is a legacy alias. |
475 | | using Type = Envoy::Http::CodecType; |
476 | 0 | static absl::string_view typeToString(Http::CodecType type) { |
477 | 0 | switch (type) { |
478 | 0 | case Http::CodecType::HTTP1: |
479 | 0 | return "http1"; |
480 | 0 | case Http::CodecType::HTTP2: |
481 | 0 | return "http2"; |
482 | 0 | case Http::CodecType::HTTP3: |
483 | 0 | return "http3"; |
484 | 0 | } |
485 | 0 | return "invalid"; |
486 | 0 | } |
487 | | |
488 | | FakeHttpConnection(FakeUpstream& fake_upstream, SharedConnectionWrapper& shared_connection, |
489 | | Http::CodecType type, Event::TestTimeSystem& time_system, |
490 | | uint32_t max_request_headers_kb, uint32_t max_request_headers_count, |
491 | | envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction |
492 | | headers_with_underscores_action); |
493 | | |
494 | | ABSL_MUST_USE_RESULT |
495 | | testing::AssertionResult |
496 | | waitForNewStream(Event::Dispatcher& client_dispatcher, FakeStreamPtr& stream, |
497 | | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
498 | | |
499 | | // Http::ServerConnectionCallbacks |
500 | | Http::RequestDecoder& newStream(Http::ResponseEncoder& response_encoder, bool) override; |
501 | | // Should only be called for HTTP2 or above |
502 | | void onGoAway(Http::GoAwayErrorCode code) override; |
503 | | |
504 | | // Should only be called for HTTP2 or above, sends a GOAWAY frame with NO_ERROR. |
505 | | void encodeGoAway(); |
506 | | |
507 | | // Should only be called for HTTP2 or above, sends a GOAWAY frame with ENHANCE_YOUR_CALM. |
508 | | void encodeProtocolError(); |
509 | | |
510 | | // Update the maximum number of concurrent streams. |
511 | | void updateConcurrentStreams(uint64_t max_streams); |
512 | | |
513 | | ABSL_MUST_USE_RESULT |
514 | | testing::AssertionResult |
515 | | waitForInexactRawData(const char* data, std::string* out = nullptr, |
516 | | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
517 | | |
518 | | void writeRawData(absl::string_view data); |
519 | | ABSL_MUST_USE_RESULT AssertionResult postWriteRawData(std::string data); |
520 | | |
521 | | Http::ServerHeaderValidatorPtr makeHeaderValidator(); |
522 | 0 | Http::CodecType type() const { return type_; } |
523 | | |
524 | | private: |
525 | | struct ReadFilter : public Network::ReadFilterBaseImpl { |
526 | 407 | ReadFilter(FakeHttpConnection& parent) : parent_(parent) {} |
527 | | |
528 | | // Network::ReadFilter |
529 | 1.37k | Network::FilterStatus onData(Buffer::Instance& data, bool) override { |
530 | 1.37k | Http::Status status = parent_.codec_->dispatch(data); |
531 | | |
532 | 1.37k | if (Http::isCodecProtocolError(status)) { |
533 | 0 | ENVOY_LOG(debug, "FakeUpstream dispatch error: {}", status.message()); |
534 | | // We don't do a full stream shutdown like HCM, but just shutdown the |
535 | | // connection for now. |
536 | 0 | read_filter_callbacks_->connection().close( |
537 | 0 | Network::ConnectionCloseType::FlushWriteAndDelay); |
538 | 0 | } |
539 | 1.37k | return Network::FilterStatus::StopIteration; |
540 | 1.37k | } |
541 | | |
542 | | void |
543 | 407 | initializeReadFilterCallbacks(Network::ReadFilterCallbacks& read_filter_callbacks) override { |
544 | 407 | read_filter_callbacks_ = &read_filter_callbacks; |
545 | 407 | } |
546 | | |
547 | | Network::ReadFilterCallbacks* read_filter_callbacks_{}; |
548 | | FakeHttpConnection& parent_; |
549 | | }; |
550 | | |
551 | | const Http::CodecType type_; |
552 | | Http::ServerConnectionPtr codec_; |
553 | | std::list<FakeStreamPtr> new_streams_ ABSL_GUARDED_BY(lock_); |
554 | | testing::NiceMock<Server::MockOverloadManager> overload_manager_; |
555 | | testing::NiceMock<Random::MockRandomGenerator> random_; |
556 | | testing::NiceMock<Http::MockHeaderValidatorStats> header_validator_stats_; |
557 | | Http::HeaderValidatorFactoryPtr header_validator_factory_; |
558 | | }; |
559 | | |
560 | | using FakeHttpConnectionPtr = std::unique_ptr<FakeHttpConnection>; |
561 | | |
562 | | /** |
563 | | * Fake raw connection for integration testing. |
564 | | */ |
565 | | class FakeRawConnection : public FakeConnectionBase { |
566 | | public: |
567 | | FakeRawConnection(SharedConnectionWrapper& shared_connection, Event::TestTimeSystem& time_system) |
568 | 86 | : FakeConnectionBase(shared_connection, time_system) {} |
569 | | using ValidatorFunction = const std::function<bool(const std::string&)>; |
570 | | ~FakeRawConnection() override; |
571 | | |
572 | | // Writes to data. If data is nullptr, discards the received data. |
573 | | ABSL_MUST_USE_RESULT |
574 | | testing::AssertionResult |
575 | | waitForData(uint64_t num_bytes, std::string* data = nullptr, |
576 | | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
577 | | |
578 | | // Wait until data_validator returns true. |
579 | | // example usage: |
580 | | // std::string data; |
581 | | // ASSERT_TRUE(waitForData(FakeRawConnection::waitForInexactMatch("foo"), &data)); |
582 | | // EXPECT_EQ(data, "foobar"); |
583 | | ABSL_MUST_USE_RESULT |
584 | | testing::AssertionResult |
585 | | waitForData(const ValidatorFunction& data_validator, std::string* data = nullptr, |
586 | | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
587 | | |
588 | | ABSL_MUST_USE_RESULT |
589 | | testing::AssertionResult write(const std::string& data, bool end_stream = false, |
590 | | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
591 | | |
592 | | void initialize() override; |
593 | | |
594 | | // Creates a ValidatorFunction which returns true when data_to_wait_for is |
595 | | // contained in the incoming data string. Unlike many of Envoy waitFor functions, |
596 | | // it does not expect an exact match, simply the presence of data_to_wait_for. |
597 | 0 | static ValidatorFunction waitForInexactMatch(const char* data_to_wait_for) { |
598 | 0 | return [data_to_wait_for](const std::string& data) -> bool { |
599 | 0 | return data.find(data_to_wait_for) != std::string::npos; |
600 | 0 | }; |
601 | 0 | } |
602 | | |
603 | | // Creates a ValidatorFunction which returns true when data_to_wait_for |
604 | | // equals the incoming data string. |
605 | 0 | static ValidatorFunction waitForMatch(const char* data_to_wait_for) { |
606 | 0 | return [data_to_wait_for](const std::string& data) -> bool { return data == data_to_wait_for; }; |
607 | 0 | } |
608 | | |
609 | | // Creates a ValidatorFunction which returns true when data_to_wait_for is |
610 | | // contains at least bytes_read bytes. |
611 | 0 | static ValidatorFunction waitForAtLeastBytes(uint32_t bytes) { |
612 | 0 | return [bytes](const std::string& data) -> bool { return data.size() >= bytes; }; |
613 | 0 | } |
614 | | |
615 | 0 | void clearData() { |
616 | 0 | absl::MutexLock lock(&lock_); |
617 | 0 | data_.clear(); |
618 | 0 | } |
619 | | |
620 | | private: |
621 | | struct ReadFilter : public Network::ReadFilterBaseImpl { |
622 | 86 | ReadFilter(FakeRawConnection& parent) : parent_(parent) {} |
623 | | |
624 | | // Network::ReadFilter |
625 | | Network::FilterStatus onData(Buffer::Instance& data, bool) override; |
626 | | |
627 | | FakeRawConnection& parent_; |
628 | | }; |
629 | | |
630 | | std::string data_ ABSL_GUARDED_BY(lock_); |
631 | | std::shared_ptr<Network::ReadFilter> read_filter_; |
632 | | }; |
633 | | |
634 | | using FakeRawConnectionPtr = std::unique_ptr<FakeRawConnection>; |
635 | | |
636 | | struct FakeUpstreamConfig { |
637 | | struct UdpConfig { |
638 | | absl::optional<uint64_t> max_rx_datagram_size_; |
639 | | }; |
640 | | |
641 | 1.97k | FakeUpstreamConfig(Event::TestTimeSystem& time_system) : time_system_(time_system) { |
642 | 1.97k | http2_options_ = ::Envoy::Http2::Utility::initializeAndValidateOptions(http2_options_).value(); |
643 | | // Legacy options which are always set. |
644 | 1.97k | http2_options_.set_allow_connect(true); |
645 | 1.97k | http2_options_.set_allow_metadata(true); |
646 | 1.97k | http3_options_.set_allow_extended_connect(true); |
647 | 1.97k | http3_options_.set_allow_metadata(true); |
648 | 1.97k | } |
649 | | |
650 | | Event::TestTimeSystem& time_system_; |
651 | | Http::CodecType upstream_protocol_{Http::CodecType::HTTP1}; |
652 | | bool enable_half_close_{}; |
653 | | absl::optional<UdpConfig> udp_fake_upstream_; |
654 | | envoy::config::core::v3::Http2ProtocolOptions http2_options_; |
655 | | envoy::config::core::v3::Http3ProtocolOptions http3_options_; |
656 | | envoy::config::listener::v3::QuicProtocolOptions quic_options_; |
657 | | uint32_t max_request_headers_kb_ = Http::DEFAULT_MAX_REQUEST_HEADERS_KB; |
658 | | uint32_t max_request_headers_count_ = Http::DEFAULT_MAX_HEADERS_COUNT; |
659 | | envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction |
660 | | headers_with_underscores_action_ = envoy::config::core::v3::HttpProtocolOptions::ALLOW; |
661 | | }; |
662 | | |
663 | | /** |
664 | | * Provides a fake upstream server for integration testing. |
665 | | */ |
666 | | class FakeUpstream : Logger::Loggable<Logger::Id::testing>, |
667 | | public Network::FilterChainManager, |
668 | | public Network::FilterChainFactory { |
669 | | public: |
670 | | // Creates a fake upstream bound to the specified unix domain socket path. |
671 | | FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory, |
672 | | const std::string& uds_path, const FakeUpstreamConfig& config); |
673 | | |
674 | | // Creates a fake upstream bound to the specified |address|. |
675 | | FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory, |
676 | | const Network::Address::InstanceConstSharedPtr& address, |
677 | | const FakeUpstreamConfig& config); |
678 | | |
679 | | // Creates a fake upstream bound to INADDR_ANY and the specified `port`. |
680 | | // Set `defer_initialization` to true if you want the FakeUpstream to not immediately listen for |
681 | | // incoming connections, and instead want to control when the FakeUpstream is available for |
682 | | // listening. If `defer_initialization` is set to true, call initializeServer() before invoking |
683 | | // any other functions in this class. |
684 | | FakeUpstream(uint32_t port, Network::Address::IpVersion version, const FakeUpstreamConfig& config, |
685 | | bool defer_initialization = false); |
686 | | |
687 | | FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory, |
688 | | uint32_t port, Network::Address::IpVersion version, |
689 | | const FakeUpstreamConfig& config); |
690 | | ~FakeUpstream() override; |
691 | | |
692 | | // Initializes the FakeUpstream's server. |
693 | | void initializeServer(); |
694 | | |
695 | | // Returns true if the server has been initialized, i.e. that initializeServer() executed |
696 | | // successfully. Returns false otherwise. |
697 | 0 | bool isInitialized() { return initialized_; } |
698 | | |
699 | 0 | Http::CodecType httpType() { return http_type_; } |
700 | | |
701 | | // Returns the new connection via the connection argument. |
702 | | ABSL_MUST_USE_RESULT |
703 | | testing::AssertionResult |
704 | | waitForHttpConnection(Event::Dispatcher& client_dispatcher, FakeHttpConnectionPtr& connection, |
705 | | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
706 | | |
707 | | ABSL_MUST_USE_RESULT |
708 | | testing::AssertionResult assertPendingConnectionsEmpty(); |
709 | | |
710 | | ABSL_MUST_USE_RESULT |
711 | | testing::AssertionResult |
712 | | waitForRawConnection(FakeRawConnectionPtr& connection, |
713 | | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
714 | 4.00k | Network::Address::InstanceConstSharedPtr localAddress() const { |
715 | 4.00k | return socket_->connectionInfoProvider().localAddress(); |
716 | 4.00k | } |
717 | | |
718 | | void convertFromRawToHttp(FakeRawConnectionPtr& raw_connection, |
719 | | FakeHttpConnectionPtr& connection); |
720 | | |
721 | | virtual std::unique_ptr<FakeRawConnection> |
722 | | makeRawConnection(SharedConnectionWrapper& shared_connection, |
723 | 86 | Event::TestTimeSystem& time_system) { |
724 | 86 | return std::make_unique<FakeRawConnection>(shared_connection, time_system); |
725 | 86 | } |
726 | | |
727 | | // Wait for one of the upstreams to receive a connection |
728 | | static absl::StatusOr<int> |
729 | | waitForHttpConnection(Event::Dispatcher& client_dispatcher, |
730 | | std::vector<std::unique_ptr<FakeUpstream>>& upstreams, |
731 | | FakeHttpConnectionPtr& connection, |
732 | | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
733 | | |
734 | | // Waits for 1 UDP datagram to be received. |
735 | | ABSL_MUST_USE_RESULT |
736 | | testing::AssertionResult |
737 | | waitForUdpDatagram(Network::UdpRecvData& data_to_fill, |
738 | | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
739 | | |
740 | | // Send a UDP datagram on the fake upstream thread. |
741 | | void sendUdpDatagram(const std::string& buffer, |
742 | | const Network::Address::InstanceConstSharedPtr& peer); |
743 | | |
744 | | // Network::FilterChainManager |
745 | | const Network::FilterChain* findFilterChain(const Network::ConnectionSocket&, |
746 | 495 | const StreamInfo::StreamInfo&) const override { |
747 | 495 | return filter_chain_.get(); |
748 | 495 | } |
749 | | |
750 | | // Network::FilterChainFactory |
751 | | bool |
752 | | createNetworkFilterChain(Network::Connection& connection, |
753 | | const Filter::NetworkFilterFactoriesList& filter_factories) override; |
754 | | bool createListenerFilterChain(Network::ListenerFilterManager& listener) override; |
755 | | void createUdpListenerFilterChain(Network::UdpListenerFilterManager& udp_listener, |
756 | | Network::UdpReadFilterCallbacks& callbacks) override; |
757 | | bool createQuicListenerFilterChain(Network::QuicListenerFilterManager& listener) override; |
758 | | |
759 | 0 | void setReadDisableOnNewConnection(bool value) { read_disable_on_new_connection_ = value; } |
760 | 0 | void setDisableAllAndDoNotEnable(bool value) { disable_and_do_not_enable_ = value; } |
761 | 1.26k | Event::TestTimeSystem& timeSystem() { return time_system_; } |
762 | | |
763 | | // Stops the dispatcher loop and joins the listening thread. |
764 | | void cleanUp(); |
765 | | |
766 | 0 | Http::Http1::CodecStats& http1CodecStats() { |
767 | 0 | return Http::Http1::CodecStats::atomicGet(http1_codec_stats_, *stats_scope_); |
768 | 0 | } |
769 | | |
770 | 407 | Http::Http2::CodecStats& http2CodecStats() { |
771 | 407 | return Http::Http2::CodecStats::atomicGet(http2_codec_stats_, *stats_scope_); |
772 | 407 | } |
773 | | |
774 | 0 | Http::Http3::CodecStats& http3CodecStats() { |
775 | 0 | return Http::Http3::CodecStats::atomicGet(http3_codec_stats_, *stats_scope_); |
776 | 0 | } |
777 | | |
778 | | // Write into the outbound buffer of the network connection at the specified index. |
779 | | // Note: that this write bypasses any processing by the upstream codec. |
780 | | ABSL_MUST_USE_RESULT |
781 | | testing::AssertionResult |
782 | | rawWriteConnection(uint32_t index, const std::string& data, bool end_stream = false, |
783 | | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
784 | | |
785 | 407 | const envoy::config::core::v3::Http2ProtocolOptions& http2Options() { return http2_options_; } |
786 | 0 | const envoy::config::core::v3::Http3ProtocolOptions& http3Options() { return http3_options_; } |
787 | | |
788 | 0 | Event::DispatcherPtr& dispatcher() { return dispatcher_; } |
789 | 0 | absl::Mutex& lock() { return lock_; } |
790 | | |
791 | | void runOnDispatcherThread(std::function<void()> cb); |
792 | | |
793 | | protected: |
794 | 786 | const FakeUpstreamConfig& config() const { return config_; } |
795 | | |
796 | | Stats::IsolatedStoreImpl stats_store_; |
797 | | const Http::CodecType http_type_; |
798 | | |
799 | | private: |
800 | | FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory, |
801 | | Network::SocketPtr&& connection, const FakeUpstreamConfig& config, |
802 | | bool defer_initialization = false); |
803 | | |
804 | | class FakeListenSocketFactory : public Network::ListenSocketFactory { |
805 | | public: |
806 | 1.99k | FakeListenSocketFactory(Network::SocketSharedPtr socket) : socket_(socket) {} |
807 | | |
808 | | // Network::ListenSocketFactory |
809 | 1.99k | Network::Socket::Type socketType() const override { return socket_->socketType(); } |
810 | 1.99k | const Network::Address::InstanceConstSharedPtr& localAddress() const override { |
811 | 1.99k | return socket_->connectionInfoProvider().localAddress(); |
812 | 1.99k | } |
813 | 1.99k | Network::SocketSharedPtr getListenSocket(uint32_t) override { return socket_; } |
814 | 0 | Network::ListenSocketFactoryPtr clone() const override { return nullptr; } |
815 | 0 | void closeAllSockets() override {} |
816 | | absl::Status doFinalPreWorkerInit() override; |
817 | | |
818 | | private: |
819 | | Network::SocketSharedPtr socket_; |
820 | | }; |
821 | | |
822 | | class FakeUdpFilter : public Network::UdpListenerReadFilter { |
823 | | public: |
824 | | FakeUdpFilter(FakeUpstream& parent, Network::UdpReadFilterCallbacks& callbacks) |
825 | 0 | : UdpListenerReadFilter(callbacks), parent_(parent) {} |
826 | | |
827 | | // Network::UdpListenerReadFilter |
828 | 0 | Network::FilterStatus onData(Network::UdpRecvData& data) override { |
829 | 0 | return parent_.onRecvDatagram(data); |
830 | 0 | } |
831 | 0 | Network::FilterStatus onReceiveError(Api::IoError::IoErrorCode) override { |
832 | 0 | PANIC("not implemented"); |
833 | 0 | } |
834 | | |
835 | | private: |
836 | | FakeUpstream& parent_; |
837 | | }; |
838 | | |
839 | | class FakeListener : public Network::ListenerConfig { |
840 | | public: |
841 | | struct UdpListenerConfigImpl : public Network::UdpListenerConfig { |
842 | | UdpListenerConfigImpl() |
843 | | : writer_factory_(std::make_unique<Network::UdpDefaultWriterFactory>()), |
844 | 1.99k | listener_worker_router_(1) {} |
845 | | |
846 | | // Network::UdpListenerConfig |
847 | 0 | Network::ActiveUdpListenerFactory& listenerFactory() override { return *listener_factory_; } |
848 | 0 | Network::UdpPacketWriterFactory& packetWriterFactory() override { return *writer_factory_; } |
849 | | Network::UdpListenerWorkerRouter& |
850 | 0 | listenerWorkerRouter(const Network::Address::Instance&) override { |
851 | 0 | return listener_worker_router_; |
852 | 0 | } |
853 | 0 | const envoy::config::listener::v3::UdpListenerConfig& config() override { return config_; } |
854 | | |
855 | | envoy::config::listener::v3::UdpListenerConfig config_; |
856 | | std::unique_ptr<Network::ActiveUdpListenerFactory> listener_factory_; |
857 | | std::unique_ptr<Network::UdpPacketWriterFactory> writer_factory_; |
858 | | Network::UdpListenerWorkerRouterImpl listener_worker_router_; |
859 | | }; |
860 | | |
861 | | FakeListener(FakeUpstream& parent, bool is_quic = false) |
862 | | : parent_(parent), name_("fake_upstream"), init_manager_(nullptr), |
863 | 1.99k | listener_info_(std::make_shared<testing::NiceMock<Network::MockListenerInfo>>()) { |
864 | 1.99k | if (is_quic) { |
865 | 0 | #if defined(ENVOY_ENABLE_QUIC) |
866 | 0 | if (context_ == nullptr) { |
867 | | // Only initialize this when needed to avoid slowing down non-QUIC integration tests. |
868 | 0 | context_ = std::make_unique< |
869 | 0 | testing::NiceMock<Server::Configuration::MockServerFactoryContext>>(); |
870 | 0 | } |
871 | 0 | udp_listener_config_.listener_factory_ = std::make_unique<Quic::ActiveQuicListenerFactory>( |
872 | 0 | parent_.quic_options_, 1, parent_.quic_stat_names_, parent_.validation_visitor_, |
873 | 0 | *context_); |
874 | | // Initialize QUICHE flags. |
875 | 0 | quiche::FlagRegistry::getInstance(); |
876 | | #else |
877 | | ASSERT(false, "Running a test that requires QUIC without compiling QUIC"); |
878 | | #endif |
879 | 1.99k | } else { |
880 | 1.99k | udp_listener_config_.listener_factory_ = |
881 | 1.99k | std::make_unique<Server::ActiveRawUdpListenerFactory>(1); |
882 | 1.99k | } |
883 | 1.99k | } |
884 | | |
885 | | UdpListenerConfigImpl udp_listener_config_; |
886 | | |
887 | | private: |
888 | | // Network::ListenerConfig |
889 | 495 | Network::FilterChainManager& filterChainManager() override { return parent_; } |
890 | 990 | Network::FilterChainFactory& filterChainFactory() override { return parent_; } |
891 | 3.98k | std::vector<Network::ListenSocketFactoryPtr>& listenSocketFactories() override { |
892 | 3.98k | return parent_.socket_factories_; |
893 | 3.98k | } |
894 | 1.99k | bool bindToPort() const override { return true; } |
895 | 495 | bool handOffRestoredDestinationConnections() const override { return false; } |
896 | 495 | uint32_t perConnectionBufferLimitBytes() const override { return 0; } |
897 | 1.99k | std::chrono::milliseconds listenerFiltersTimeout() const override { return {}; } |
898 | 1.99k | bool continueOnListenerFiltersTimeout() const override { return false; } |
899 | 31.8k | Stats::Scope& listenerScope() override { return *parent_.stats_store_.rootScope(); } |
900 | 5.97k | uint64_t listenerTag() const override { return 0; } |
901 | 0 | const std::string& name() const override { return name_; } |
902 | 0 | Network::UdpListenerConfigOptRef udpListenerConfig() override { return udp_listener_config_; } |
903 | 1.99k | Network::InternalListenerConfigOptRef internalListenerConfig() override { return {}; } |
904 | 1.99k | Network::ConnectionBalancer& connectionBalancer(const Network::Address::Instance&) override { |
905 | 1.99k | return connection_balancer_; |
906 | 1.99k | } |
907 | 5.97k | bool shouldBypassOverloadManager() const override { return false; } |
908 | 495 | const std::vector<AccessLog::InstanceSharedPtr>& accessLogs() const override { |
909 | 495 | return empty_access_logs_; |
910 | 495 | } |
911 | 495 | const Network::ListenerInfoConstSharedPtr& listenerInfo() const override { |
912 | 495 | return listener_info_; |
913 | 495 | } |
914 | 1.48k | ResourceLimit& openConnections() override { return connection_resource_; } |
915 | 0 | uint32_t tcpBacklogSize() const override { return ENVOY_TCP_BACKLOG_SIZE; } |
916 | 1.99k | uint32_t maxConnectionsToAcceptPerSocketEvent() const override { |
917 | 1.99k | return Network::DefaultMaxConnectionsToAcceptPerSocketEvent; |
918 | 1.99k | } |
919 | 0 | Init::Manager& initManager() override { return *init_manager_; } |
920 | 1.99k | bool ignoreGlobalConnLimit() const override { return false; } |
921 | | |
922 | 0 | void setMaxConnections(const uint32_t num_connections) { |
923 | 0 | connection_resource_.setMax(num_connections); |
924 | 0 | } |
925 | 0 | void clearMaxConnections() { connection_resource_.resetMax(); } |
926 | | |
927 | | FakeUpstream& parent_; |
928 | | const std::string name_; |
929 | | Network::NopConnectionBalancerImpl connection_balancer_; |
930 | | BasicResourceLimitImpl connection_resource_; |
931 | | const std::vector<AccessLog::InstanceSharedPtr> empty_access_logs_; |
932 | | std::unique_ptr<Init::Manager> init_manager_; |
933 | | const Network::ListenerInfoConstSharedPtr listener_info_; |
934 | | std::unique_ptr<Server::Configuration::MockServerFactoryContext> context_; |
935 | | }; |
936 | | |
937 | | void threadRoutine(); |
938 | | SharedConnectionWrapper& consumeConnection() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_); |
939 | | Network::FilterStatus onRecvDatagram(Network::UdpRecvData& data); |
940 | | AssertionResult |
941 | | runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb, |
942 | | std::chrono::milliseconds timeout = TestUtility::DefaultTimeout); |
943 | | |
944 | | const envoy::config::core::v3::Http2ProtocolOptions http2_options_; |
945 | | const envoy::config::core::v3::Http3ProtocolOptions http3_options_; |
946 | | envoy::config::listener::v3::QuicProtocolOptions quic_options_; |
947 | | Network::SocketSharedPtr socket_; |
948 | | std::vector<Network::ListenSocketFactoryPtr> socket_factories_; |
949 | | ConditionalInitializer server_initialized_; |
950 | | // Guards any objects which can be altered both in the upstream thread and the |
951 | | // main test thread. |
952 | | absl::Mutex lock_; |
953 | | Thread::ThreadPtr thread_; |
954 | | Api::ApiPtr api_; |
955 | | Event::TestTimeSystem& time_system_; |
956 | | Event::DispatcherPtr dispatcher_; |
957 | | Network::ConnectionHandlerPtr handler_; |
958 | | std::list<SharedConnectionWrapperPtr> new_connections_ ABSL_GUARDED_BY(lock_); |
959 | | testing::NiceMock<Runtime::MockLoader> runtime_; |
960 | | testing::NiceMock<Random::MockRandomGenerator> random_; |
961 | | |
962 | | // When a QueuedConnectionWrapper is popped from new_connections_, ownership is transferred to |
963 | | // consumed_connections_. This allows later the Connection destruction (when the FakeUpstream is |
964 | | // deleted) on the same thread that allocated the connection. |
965 | | std::list<SharedConnectionWrapperPtr> consumed_connections_ ABSL_GUARDED_BY(lock_); |
966 | | std::list<FakeHttpConnectionPtr> quic_connections_ ABSL_GUARDED_BY(lock_); |
967 | | const FakeUpstreamConfig config_; |
968 | | // Normally connections are read disabled until a fake raw or http connection |
969 | | // is created, and are then read enabled. Setting these true skips both these. |
970 | | bool read_disable_on_new_connection_; |
971 | | // Setting this true disables all events and does not re-enable as the above does. |
972 | | bool disable_and_do_not_enable_{}; |
973 | | const bool enable_half_close_; |
974 | | testing::NiceMock<ProtobufMessage::MockValidationVisitor> validation_visitor_; |
975 | | FakeListener listener_; |
976 | | const Network::FilterChainSharedPtr filter_chain_; |
977 | | std::list<Network::UdpRecvData> received_datagrams_ ABSL_GUARDED_BY(lock_); |
978 | | Stats::ScopeSharedPtr stats_scope_; |
979 | | Http::Http1::CodecStats::AtomicPtr http1_codec_stats_; |
980 | | Http::Http2::CodecStats::AtomicPtr http2_codec_stats_; |
981 | | Http::Http3::CodecStats::AtomicPtr http3_codec_stats_; |
982 | | #ifdef ENVOY_ENABLE_QUIC |
983 | | Quic::QuicStatNames quic_stat_names_ = Quic::QuicStatNames(stats_store_.symbolTable()); |
984 | | #endif |
985 | | bool initialized_ = false; |
986 | | }; |
987 | | |
988 | | using FakeUpstreamPtr = std::unique_ptr<FakeUpstream>; |
989 | | |
990 | | } // namespace Envoy |