/proc/self/cwd/test/integration/fake_upstream.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "test/integration/fake_upstream.h" |
2 | | |
3 | | #include <chrono> |
4 | | #include <cstdint> |
5 | | #include <memory> |
6 | | #include <string> |
7 | | |
8 | | #include "source/common/buffer/buffer_impl.h" |
9 | | #include "source/common/config/utility.h" |
10 | | #include "source/common/http/header_map_impl.h" |
11 | | #include "source/common/http/http1/codec_impl.h" |
12 | | #include "source/common/http/http2/codec_impl.h" |
13 | | #include "source/common/network/address_impl.h" |
14 | | #include "source/common/network/connection_impl.h" |
15 | | #include "source/common/network/listen_socket_impl.h" |
16 | | #include "source/common/network/socket_option_factory.h" |
17 | | #include "source/common/network/utility.h" |
18 | | #include "source/common/runtime/runtime_features.h" |
19 | | |
20 | | #ifdef ENVOY_ENABLE_QUIC |
21 | | #include "source/common/quic/server_codec_impl.h" |
22 | | #include "quiche/quic/test_tools/quic_session_peer.h" |
23 | | #endif |
24 | | |
25 | | #include "source/common/listener_manager/connection_handler_impl.h" |
26 | | |
27 | | #include "test/integration/utility.h" |
28 | | #include "test/test_common/network_utility.h" |
29 | | #include "test/test_common/utility.h" |
30 | | |
31 | | #include "absl/strings/str_cat.h" |
32 | | #include "absl/synchronization/notification.h" |
33 | | |
34 | | using namespace std::chrono_literals; |
35 | | |
36 | | using std::chrono::milliseconds; |
37 | | using testing::AssertionFailure; |
38 | | using testing::AssertionResult; |
39 | | using testing::AssertionSuccess; |
40 | | |
41 | | namespace Envoy { |
42 | | |
43 | | FakeStream::FakeStream(FakeHttpConnection& parent, Http::ResponseEncoder& encoder, |
44 | | Event::TestTimeSystem& time_system) |
45 | | : parent_(parent), encoder_(encoder), time_system_(time_system), |
46 | 800 | header_validator_(parent.makeHeaderValidator()) { |
47 | 800 | encoder.getStream().addCallbacks(*this); |
48 | 800 | } Envoy::FakeStream::FakeStream(Envoy::FakeHttpConnection&, Envoy::Http::ResponseEncoder&, Envoy::Event::TestTimeSystem&) Line | Count | Source | 46 | 786 | header_validator_(parent.makeHeaderValidator()) { | 47 | 786 | encoder.getStream().addCallbacks(*this); | 48 | 786 | } |
Envoy::FakeStream::FakeStream(Envoy::FakeHttpConnection&, Envoy::Http::ResponseEncoder&, Envoy::Event::TestTimeSystem&) Line | Count | Source | 46 | 14 | header_validator_(parent.makeHeaderValidator()) { | 47 | 14 | encoder.getStream().addCallbacks(*this); | 48 | 14 | } |
|
49 | | |
50 | 800 | void FakeStream::decodeHeaders(Http::RequestHeaderMapSharedPtr&& headers, bool end_stream) { |
51 | 800 | absl::MutexLock lock(&lock_); |
52 | 800 | headers_ = std::move(headers); |
53 | 800 | if (header_validator_) { |
54 | 0 | header_validator_->transformRequestHeaders(*headers_); |
55 | 0 | } |
56 | 800 | setEndStream(end_stream); |
57 | 800 | } |
58 | | |
59 | 643 | void FakeStream::decodeData(Buffer::Instance& data, bool end_stream) { |
60 | 643 | received_data_ = true; |
61 | 643 | absl::MutexLock lock(&lock_); |
62 | 643 | body_.add(data); |
63 | 643 | setEndStream(end_stream); |
64 | 643 | } |
65 | | |
66 | 0 | void FakeStream::decodeTrailers(Http::RequestTrailerMapPtr&& trailers) { |
67 | 0 | absl::MutexLock lock(&lock_); |
68 | 0 | setEndStream(true); |
69 | 0 | trailers_ = std::move(trailers); |
70 | 0 | } |
71 | | |
72 | 0 | void FakeStream::decodeMetadata(Http::MetadataMapPtr&& metadata_map_ptr) { |
73 | 0 | for (const auto& metadata : *metadata_map_ptr) { |
74 | 0 | duplicated_metadata_key_count_[metadata.first]++; |
75 | 0 | metadata_map_.insert(metadata); |
76 | 0 | } |
77 | 0 | } |
78 | | |
79 | 2.54k | void FakeStream::postToConnectionThread(std::function<void()> cb) { |
80 | 2.54k | parent_.postToConnectionThread(cb); |
81 | 2.54k | } |
82 | | |
83 | 0 | void FakeStream::encode1xxHeaders(const Http::ResponseHeaderMap& headers) { |
84 | 0 | std::shared_ptr<Http::ResponseHeaderMap> headers_copy( |
85 | 0 | Http::createHeaderMap<Http::ResponseHeaderMapImpl>(headers)); |
86 | 0 | postToConnectionThread([this, headers_copy]() -> void { |
87 | 0 | { |
88 | 0 | absl::MutexLock lock(&lock_); |
89 | 0 | if (!parent_.connected() || saw_reset_) { |
90 | | // Encoded already deleted. |
91 | 0 | return; |
92 | 0 | } |
93 | 0 | } |
94 | 0 | encoder_.encode1xxHeaders(*headers_copy); |
95 | 0 | }); |
96 | 0 | } |
97 | | |
98 | 800 | void FakeStream::encodeHeaders(const Http::HeaderMap& headers, bool end_stream) { |
99 | 800 | std::shared_ptr<Http::ResponseHeaderMap> headers_copy( |
100 | 800 | Http::createHeaderMap<Http::ResponseHeaderMapImpl>(headers)); |
101 | 800 | if (add_served_by_header_) { |
102 | 0 | headers_copy->addCopy(Http::LowerCaseString("x-served-by"), |
103 | 0 | parent_.connection().connectionInfoProvider().localAddress()->asString()); |
104 | 0 | } |
105 | | |
106 | 800 | if (header_validator_) { |
107 | | // Ignore validation results |
108 | 0 | auto result = header_validator_->transformResponseHeaders(*headers_copy); |
109 | 0 | if (result.new_headers) { |
110 | 0 | headers_copy = std::move(result.new_headers); |
111 | 0 | } |
112 | 0 | } |
113 | | |
114 | 800 | postToConnectionThread([this, headers_copy = std::move(headers_copy), end_stream]() -> void { |
115 | 800 | { |
116 | 800 | absl::MutexLock lock(&lock_); |
117 | 800 | if (!parent_.connected() || saw_reset_) { |
118 | | // Encoded already deleted. |
119 | 0 | return; |
120 | 0 | } |
121 | 800 | } |
122 | 800 | encoder_.encodeHeaders(*headers_copy, end_stream); |
123 | 800 | }); |
124 | 800 | } |
125 | | |
126 | 0 | void FakeStream::encodeData(std::string data, bool end_stream) { |
127 | 0 | postToConnectionThread([this, data, end_stream]() -> void { |
128 | 0 | { |
129 | 0 | absl::MutexLock lock(&lock_); |
130 | 0 | if (!parent_.connected() || saw_reset_) { |
131 | | // Encoded already deleted. |
132 | 0 | return; |
133 | 0 | } |
134 | 0 | } |
135 | 0 | Buffer::OwnedImpl fake_data(data.data(), data.size()); |
136 | 0 | encoder_.encodeData(fake_data, end_stream); |
137 | 0 | }); |
138 | 0 | } |
139 | | |
140 | 786 | void FakeStream::encodeData(uint64_t size, bool end_stream) { |
141 | 786 | postToConnectionThread([this, size, end_stream]() -> void { |
142 | 786 | { |
143 | 786 | absl::MutexLock lock(&lock_); |
144 | 786 | if (!parent_.connected() || saw_reset_) { |
145 | | // Encoded already deleted. |
146 | 0 | return; |
147 | 0 | } |
148 | 786 | } |
149 | 786 | Buffer::OwnedImpl data(std::string(size, 'a')); |
150 | 786 | encoder_.encodeData(data, end_stream); |
151 | 786 | }); |
152 | 786 | } |
153 | | |
154 | 172 | void FakeStream::encodeData(Buffer::Instance& data, bool end_stream) { |
155 | 172 | std::shared_ptr<Buffer::Instance> data_copy = std::make_shared<Buffer::OwnedImpl>(data); |
156 | 172 | postToConnectionThread([this, data_copy, end_stream]() -> void { |
157 | 172 | { |
158 | 172 | absl::MutexLock lock(&lock_); |
159 | 172 | if (!parent_.connected() || saw_reset_) { |
160 | | // Encoded already deleted. |
161 | 0 | return; |
162 | 0 | } |
163 | 172 | } |
164 | 172 | encoder_.encodeData(*data_copy, end_stream); |
165 | 172 | }); |
166 | 172 | } |
167 | | |
168 | 786 | void FakeStream::encodeTrailers(const Http::HeaderMap& trailers) { |
169 | 786 | std::shared_ptr<Http::ResponseTrailerMap> trailers_copy( |
170 | 786 | Http::createHeaderMap<Http::ResponseTrailerMapImpl>(trailers)); |
171 | 786 | postToConnectionThread([this, trailers_copy]() -> void { |
172 | 786 | { |
173 | 786 | absl::MutexLock lock(&lock_); |
174 | 786 | if (!parent_.connected() || saw_reset_) { |
175 | | // Encoded already deleted. |
176 | 0 | return; |
177 | 0 | } |
178 | 786 | } |
179 | 786 | encoder_.encodeTrailers(*trailers_copy); |
180 | 786 | }); |
181 | 786 | } |
182 | | |
183 | 0 | void FakeStream::encodeResetStream() { |
184 | 0 | postToConnectionThread([this]() -> void { |
185 | 0 | { |
186 | 0 | absl::MutexLock lock(&lock_); |
187 | 0 | if (!parent_.connected() || saw_reset_) { |
188 | | // Encoded already deleted. |
189 | 0 | return; |
190 | 0 | } |
191 | 0 | } |
192 | 0 | if (parent_.type() == Http::CodecType::HTTP1) { |
193 | 0 | parent_.connection().close(Network::ConnectionCloseType::FlushWrite); |
194 | 0 | } else { |
195 | 0 | encoder_.getStream().resetStream(Http::StreamResetReason::LocalReset); |
196 | 0 | } |
197 | 0 | }); |
198 | 0 | } |
199 | | |
200 | 0 | void FakeStream::encodeMetadata(const Http::MetadataMapVector& metadata_map_vector) { |
201 | 0 | postToConnectionThread([this, &metadata_map_vector]() -> void { |
202 | 0 | { |
203 | 0 | absl::MutexLock lock(&lock_); |
204 | 0 | if (!parent_.connected() || saw_reset_) { |
205 | | // Encoded already deleted. |
206 | 0 | return; |
207 | 0 | } |
208 | 0 | } |
209 | 0 | encoder_.encodeMetadata(metadata_map_vector); |
210 | 0 | }); |
211 | 0 | } |
212 | | |
213 | 0 | void FakeStream::readDisable(bool disable) { |
214 | 0 | postToConnectionThread([this, disable]() -> void { |
215 | 0 | { |
216 | 0 | absl::MutexLock lock(&lock_); |
217 | 0 | if (!parent_.connected() || saw_reset_) { |
218 | | // Encoded already deleted. |
219 | 0 | return; |
220 | 0 | } |
221 | 0 | } |
222 | 0 | encoder_.getStream().readDisable(disable); |
223 | 0 | }); |
224 | 0 | } |
225 | | |
226 | 0 | void FakeStream::onResetStream(Http::StreamResetReason, absl::string_view) { |
227 | 0 | absl::MutexLock lock(&lock_); |
228 | 0 | saw_reset_ = true; |
229 | 0 | } |
230 | | |
231 | 0 | AssertionResult FakeStream::waitForHeadersComplete(milliseconds timeout) { |
232 | 0 | absl::MutexLock lock(&lock_); |
233 | 0 | const auto reached = [this]() |
234 | 0 | ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return headers_ != nullptr; }; |
235 | 0 | if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { |
236 | 0 | return AssertionFailure() << "Timed out waiting for headers."; |
237 | 0 | } |
238 | 0 | return AssertionSuccess(); |
239 | 0 | } |
240 | | |
241 | | namespace { |
242 | | // Perform a wait on a condition while still allowing for periodic client dispatcher runs that |
243 | | // occur on the current thread. |
244 | | bool waitForWithDispatcherRun(Event::TestTimeSystem& time_system, absl::Mutex& lock, |
245 | | const std::function<bool()>& condition, |
246 | | Event::Dispatcher& client_dispatcher, milliseconds timeout) |
247 | 205 | ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock) { |
248 | 205 | Event::TestTimeSystem::RealTimeBound bound(timeout); |
249 | 208 | while (bound.withinBound()) { |
250 | | // Wake up periodically to run the client dispatcher. |
251 | 208 | if (time_system.waitFor(lock, absl::Condition(&condition), 5ms * TIMEOUT_FACTOR)) { |
252 | 205 | return true; |
253 | 205 | } |
254 | | |
255 | | // Run the client dispatcher since we may need to process window updates, etc. |
256 | 3 | client_dispatcher.run(Event::Dispatcher::RunType::NonBlock); |
257 | 3 | } |
258 | 0 | return false; |
259 | 205 | } |
260 | | } // namespace |
261 | | |
262 | | AssertionResult FakeStream::waitForData(Event::Dispatcher& client_dispatcher, uint64_t body_length, |
263 | 177 | milliseconds timeout) { |
264 | 177 | absl::MutexLock lock(&lock_); |
265 | 177 | if (!waitForWithDispatcherRun( |
266 | 177 | time_system_, lock_, |
267 | 177 | [this, body_length]() |
268 | 598 | ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return (body_.length() >= body_length); }, |
269 | 177 | client_dispatcher, timeout)) { |
270 | 0 | return AssertionFailure() << "Timed out waiting for data."; |
271 | 0 | } |
272 | 177 | return AssertionSuccess(); |
273 | 177 | } |
274 | | |
275 | | AssertionResult FakeStream::waitForData(Event::Dispatcher& client_dispatcher, |
276 | 0 | absl::string_view data, milliseconds timeout) { |
277 | 0 | auto succeeded = waitForData(client_dispatcher, data.length(), timeout); |
278 | 0 | if (succeeded) { |
279 | 0 | Buffer::OwnedImpl buffer(data.data(), data.length()); |
280 | 0 | if (!TestUtility::buffersEqual(body(), buffer)) { |
281 | 0 | return AssertionFailure() << body().toString() << " not equal to " << data; |
282 | 0 | } |
283 | 0 | } |
284 | 0 | return succeeded; |
285 | 0 | } |
286 | | |
287 | | AssertionResult FakeStream::waitForData(Event::Dispatcher& client_dispatcher, |
288 | | const FakeStream::ValidatorFunction& data_validator, |
289 | 0 | std::chrono::milliseconds timeout) { |
290 | 0 | absl::MutexLock lock(&lock_); |
291 | 0 | if (!waitForWithDispatcherRun( |
292 | 0 | time_system_, lock_, |
293 | 0 | [this, data_validator]() |
294 | 0 | ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return data_validator(body_.toString()); }, |
295 | 0 | client_dispatcher, timeout)) { |
296 | 0 | return AssertionFailure() << "Timed out waiting for data."; |
297 | 0 | } |
298 | 0 | return AssertionSuccess(); |
299 | 0 | } |
300 | | |
301 | | AssertionResult FakeStream::waitForEndStream(Event::Dispatcher& client_dispatcher, |
302 | 0 | milliseconds timeout) { |
303 | 0 | absl::MutexLock lock(&lock_); |
304 | 0 | if (!waitForWithDispatcherRun( |
305 | 0 | time_system_, lock_, |
306 | 0 | [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return end_stream_; }, client_dispatcher, |
307 | 0 | timeout)) { |
308 | 0 | return AssertionFailure() << "Timed out waiting for end of stream."; |
309 | 0 | } |
310 | 0 | return AssertionSuccess(); |
311 | 0 | } |
312 | | |
313 | 0 | AssertionResult FakeStream::waitForReset(milliseconds timeout) { |
314 | 0 | absl::MutexLock lock(&lock_); |
315 | 0 | if (!time_system_.waitFor(lock_, absl::Condition(&saw_reset_), timeout)) { |
316 | 0 | return AssertionFailure() << "Timed out waiting for reset."; |
317 | 0 | } |
318 | 0 | return AssertionSuccess(); |
319 | 0 | } |
320 | | |
321 | 14 | void FakeStream::startGrpcStream(bool send_headers) { |
322 | 14 | ASSERT(!grpc_stream_started_, "gRPC stream should not be started more than once"); |
323 | 14 | grpc_stream_started_ = true; |
324 | 14 | if (send_headers) { |
325 | 14 | encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); |
326 | 14 | } |
327 | 14 | } |
328 | | |
329 | 0 | void FakeStream::finishGrpcStream(Grpc::Status::GrpcStatus status) { |
330 | 0 | encodeTrailers(Http::TestResponseTrailerMapImpl{ |
331 | 0 | {"grpc-status", std::to_string(static_cast<uint32_t>(status))}}); |
332 | 0 | } |
333 | | |
334 | | class TestHttp1ServerConnectionImpl : public Http::Http1::ServerConnectionImpl { |
335 | | public: |
336 | | using Http::Http1::ServerConnectionImpl::ServerConnectionImpl; |
337 | | }; |
338 | | |
339 | | class TestHttp2ServerConnectionImpl : public Http::Http2::ServerConnectionImpl { |
340 | | public: |
341 | | TestHttp2ServerConnectionImpl( |
342 | | Network::Connection& connection, Http::ServerConnectionCallbacks& callbacks, |
343 | | Http::Http2::CodecStats& stats, Random::RandomGenerator& random_generator, |
344 | | const envoy::config::core::v3::Http2ProtocolOptions& http2_options, |
345 | | const uint32_t max_request_headers_kb, const uint32_t max_request_headers_count, |
346 | | envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction |
347 | | headers_with_underscores_action, |
348 | | Server::OverloadManager& overload_manager) |
349 | | : ServerConnectionImpl(connection, callbacks, stats, random_generator, http2_options, |
350 | | max_request_headers_kb, max_request_headers_count, |
351 | 407 | headers_with_underscores_action, overload_manager) {} |
352 | | |
353 | 0 | void updateConcurrentStreams(uint32_t max_streams) { |
354 | 0 | absl::InlinedVector<http2::adapter::Http2Setting, 1> settings; |
355 | 0 | settings.push_back({http2::adapter::MAX_CONCURRENT_STREAMS, max_streams}); |
356 | 0 | adapter_->SubmitSettings(settings); |
357 | 0 | const int rc = adapter_->Send(); |
358 | 0 | ASSERT(rc == 0); |
359 | 0 | } |
360 | | }; |
361 | | |
362 | | namespace { |
363 | | // Fake upstream codec will not do path normalization, so the tests can observe |
364 | | // the path forwarded by Envoy. |
365 | | ::envoy::extensions::http::header_validators::envoy_default::v3::HeaderValidatorConfig |
366 | 407 | fakeUpstreamHeaderValidatorConfig() { |
367 | 407 | ::envoy::extensions::http::header_validators::envoy_default::v3::HeaderValidatorConfig config; |
368 | 407 | config.mutable_uri_path_normalization_options()->set_skip_path_normalization(true); |
369 | 407 | config.mutable_uri_path_normalization_options()->set_skip_merging_slashes(true); |
370 | 407 | config.mutable_uri_path_normalization_options()->set_path_with_escaped_slashes_action( |
371 | 407 | ::envoy::extensions::http::header_validators::envoy_default::v3::HeaderValidatorConfig:: |
372 | 407 | UriPathNormalizationOptions::KEEP_UNCHANGED); |
373 | 407 | return config; |
374 | 407 | } |
375 | | } // namespace |
376 | | |
377 | | FakeHttpConnection::FakeHttpConnection( |
378 | | FakeUpstream& fake_upstream, SharedConnectionWrapper& shared_connection, Http::CodecType type, |
379 | | Event::TestTimeSystem& time_system, uint32_t max_request_headers_kb, |
380 | | uint32_t max_request_headers_count, |
381 | | envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction |
382 | | headers_with_underscores_action) |
383 | | : FakeConnectionBase(shared_connection, time_system), type_(type), |
384 | | header_validator_factory_( |
385 | 407 | IntegrationUtil::makeHeaderValidationFactory(fakeUpstreamHeaderValidatorConfig())) { |
386 | 407 | ASSERT(max_request_headers_count != 0); |
387 | 407 | if (type == Http::CodecType::HTTP1) { |
388 | 0 | Http::Http1Settings http1_settings; |
389 | 0 | http1_settings.use_balsa_parser_ = |
390 | 0 | Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http1_use_balsa_parser"); |
391 | | // For the purpose of testing, we always have the upstream encode the trailers if any |
392 | 0 | http1_settings.enable_trailers_ = true; |
393 | 0 | Http::Http1::CodecStats& stats = fake_upstream.http1CodecStats(); |
394 | 0 | codec_ = std::make_unique<TestHttp1ServerConnectionImpl>( |
395 | 0 | shared_connection_.connection(), stats, *this, http1_settings, max_request_headers_kb, |
396 | 0 | max_request_headers_count, headers_with_underscores_action, overload_manager_); |
397 | 407 | } else if (type == Http::CodecType::HTTP2) { |
398 | 407 | envoy::config::core::v3::Http2ProtocolOptions http2_options = fake_upstream.http2Options(); |
399 | 407 | Http::Http2::CodecStats& stats = fake_upstream.http2CodecStats(); |
400 | 407 | codec_ = std::make_unique<TestHttp2ServerConnectionImpl>( |
401 | 407 | shared_connection_.connection(), *this, stats, random_, http2_options, |
402 | 407 | max_request_headers_kb, max_request_headers_count, headers_with_underscores_action, |
403 | 407 | overload_manager_); |
404 | 407 | } else { |
405 | 0 | ASSERT(type == Http::CodecType::HTTP3); |
406 | 0 | #ifdef ENVOY_ENABLE_QUIC |
407 | 0 | Http::Http3::CodecStats& stats = fake_upstream.http3CodecStats(); |
408 | 0 | codec_ = std::make_unique<Quic::QuicHttpServerConnectionImpl>( |
409 | 0 | dynamic_cast<Quic::EnvoyQuicServerSession&>(shared_connection_.connection()), *this, stats, |
410 | 0 | fake_upstream.http3Options(), max_request_headers_kb, max_request_headers_count, |
411 | 0 | headers_with_underscores_action); |
412 | | #else |
413 | | ASSERT(false, "running a QUIC integration test without compiling QUIC"); |
414 | | #endif |
415 | 0 | } |
416 | 407 | shared_connection_.connection().addReadFilter( |
417 | 407 | Network::ReadFilterSharedPtr{new ReadFilter(*this)}); |
418 | 407 | } Envoy::FakeHttpConnection::FakeHttpConnection(Envoy::FakeUpstream&, Envoy::SharedConnectionWrapper&, Envoy::Http::CodecType, Envoy::Event::TestTimeSystem&, unsigned int, unsigned int, envoy::config::core::v3::HttpProtocolOptions_HeadersWithUnderscoresAction) Line | Count | Source | 385 | 393 | IntegrationUtil::makeHeaderValidationFactory(fakeUpstreamHeaderValidatorConfig())) { | 386 | 393 | ASSERT(max_request_headers_count != 0); | 387 | 393 | if (type == Http::CodecType::HTTP1) { | 388 | 0 | Http::Http1Settings http1_settings; | 389 | 0 | http1_settings.use_balsa_parser_ = | 390 | 0 | Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http1_use_balsa_parser"); | 391 | | // For the purpose of testing, we always have the upstream encode the trailers if any | 392 | 0 | http1_settings.enable_trailers_ = true; | 393 | 0 | Http::Http1::CodecStats& stats = fake_upstream.http1CodecStats(); | 394 | 0 | codec_ = std::make_unique<TestHttp1ServerConnectionImpl>( | 395 | 0 | shared_connection_.connection(), stats, *this, http1_settings, max_request_headers_kb, | 396 | 0 | max_request_headers_count, headers_with_underscores_action, overload_manager_); | 397 | 393 | } else if (type == Http::CodecType::HTTP2) { | 398 | 393 | envoy::config::core::v3::Http2ProtocolOptions http2_options = fake_upstream.http2Options(); | 399 | 393 | Http::Http2::CodecStats& stats = fake_upstream.http2CodecStats(); | 400 | 393 | codec_ = std::make_unique<TestHttp2ServerConnectionImpl>( | 401 | 393 | shared_connection_.connection(), *this, stats, random_, http2_options, | 402 | 393 | max_request_headers_kb, max_request_headers_count, headers_with_underscores_action, | 403 | 393 | overload_manager_); | 404 | 393 | } else { | 405 | 0 | ASSERT(type == Http::CodecType::HTTP3); | 406 | 0 | #ifdef ENVOY_ENABLE_QUIC | 407 | 0 | Http::Http3::CodecStats& stats = fake_upstream.http3CodecStats(); | 408 | 0 | codec_ = std::make_unique<Quic::QuicHttpServerConnectionImpl>( | 409 | 0 | dynamic_cast<Quic::EnvoyQuicServerSession&>(shared_connection_.connection()), *this, stats, | 410 | 0 | fake_upstream.http3Options(), max_request_headers_kb, max_request_headers_count, | 411 | 0 | headers_with_underscores_action); | 412 | | #else | 413 | | ASSERT(false, "running a QUIC integration test without compiling QUIC"); | 414 | | #endif | 415 | 0 | } | 416 | 393 | shared_connection_.connection().addReadFilter( | 417 | 393 | Network::ReadFilterSharedPtr{new ReadFilter(*this)}); | 418 | 393 | } |
Envoy::FakeHttpConnection::FakeHttpConnection(Envoy::FakeUpstream&, Envoy::SharedConnectionWrapper&, Envoy::Http::CodecType, Envoy::Event::TestTimeSystem&, unsigned int, unsigned int, envoy::config::core::v3::HttpProtocolOptions_HeadersWithUnderscoresAction) Line | Count | Source | 385 | 14 | IntegrationUtil::makeHeaderValidationFactory(fakeUpstreamHeaderValidatorConfig())) { | 386 | 14 | ASSERT(max_request_headers_count != 0); | 387 | 14 | if (type == Http::CodecType::HTTP1) { | 388 | 0 | Http::Http1Settings http1_settings; | 389 | 0 | http1_settings.use_balsa_parser_ = | 390 | 0 | Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http1_use_balsa_parser"); | 391 | | // For the purpose of testing, we always have the upstream encode the trailers if any | 392 | 0 | http1_settings.enable_trailers_ = true; | 393 | 0 | Http::Http1::CodecStats& stats = fake_upstream.http1CodecStats(); | 394 | 0 | codec_ = std::make_unique<TestHttp1ServerConnectionImpl>( | 395 | 0 | shared_connection_.connection(), stats, *this, http1_settings, max_request_headers_kb, | 396 | 0 | max_request_headers_count, headers_with_underscores_action, overload_manager_); | 397 | 14 | } else if (type == Http::CodecType::HTTP2) { | 398 | 14 | envoy::config::core::v3::Http2ProtocolOptions http2_options = fake_upstream.http2Options(); | 399 | 14 | Http::Http2::CodecStats& stats = fake_upstream.http2CodecStats(); | 400 | 14 | codec_ = std::make_unique<TestHttp2ServerConnectionImpl>( | 401 | 14 | shared_connection_.connection(), *this, stats, random_, http2_options, | 402 | 14 | max_request_headers_kb, max_request_headers_count, headers_with_underscores_action, | 403 | 14 | overload_manager_); | 404 | 14 | } else { | 405 | 0 | ASSERT(type == Http::CodecType::HTTP3); | 406 | 0 | #ifdef ENVOY_ENABLE_QUIC | 407 | 0 | Http::Http3::CodecStats& stats = fake_upstream.http3CodecStats(); | 408 | 0 | codec_ = std::make_unique<Quic::QuicHttpServerConnectionImpl>( | 409 | 0 | dynamic_cast<Quic::EnvoyQuicServerSession&>(shared_connection_.connection()), *this, stats, | 410 | 0 | fake_upstream.http3Options(), max_request_headers_kb, max_request_headers_count, | 411 | 0 | headers_with_underscores_action); | 412 | | #else | 413 | | ASSERT(false, "running a QUIC integration test without compiling QUIC"); | 414 | | #endif | 415 | 0 | } | 416 | 14 | shared_connection_.connection().addReadFilter( | 417 | 14 | Network::ReadFilterSharedPtr{new ReadFilter(*this)}); | 418 | 14 | } |
|
419 | | |
420 | 51 | AssertionResult FakeConnectionBase::close(std::chrono::milliseconds timeout) { |
421 | 51 | ENVOY_LOG(trace, "FakeConnectionBase close"); |
422 | 51 | if (!shared_connection_.connected()) { |
423 | 0 | return AssertionSuccess(); |
424 | 0 | } |
425 | 51 | return shared_connection_.executeOnDispatcher( |
426 | 51 | [](Network::Connection& connection) { |
427 | 47 | connection.close(Network::ConnectionCloseType::FlushWrite); |
428 | 47 | }, |
429 | 51 | timeout); |
430 | 51 | } |
431 | | |
432 | | AssertionResult FakeConnectionBase::close(Network::ConnectionCloseType close_type, |
433 | 0 | std::chrono::milliseconds timeout) { |
434 | 0 | ENVOY_LOG(trace, "FakeConnectionBase close type={}", static_cast<int>(close_type)); |
435 | 0 | if (!shared_connection_.connected()) { |
436 | 0 | return AssertionSuccess(); |
437 | 0 | } |
438 | 0 | return shared_connection_.executeOnDispatcher( |
439 | 0 | [&close_type](Network::Connection& connection) { connection.close(close_type); }, timeout); |
440 | 0 | } |
441 | | |
442 | 0 | AssertionResult FakeConnectionBase::readDisable(bool disable, std::chrono::milliseconds timeout) { |
443 | 0 | return shared_connection_.executeOnDispatcher( |
444 | 0 | [disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout); |
445 | 0 | } |
446 | | |
447 | | namespace { |
448 | 0 | Http::Protocol codeTypeToProtocol(Http::CodecType codec_type) { |
449 | 0 | switch (codec_type) { |
450 | 0 | case Http::CodecType::HTTP1: |
451 | 0 | return Http::Protocol::Http11; |
452 | 0 | case Http::CodecType::HTTP2: |
453 | 0 | return Http::Protocol::Http2; |
454 | 0 | case Http::CodecType::HTTP3: |
455 | 0 | return Http::Protocol::Http3; |
456 | 0 | } |
457 | 0 | PANIC_DUE_TO_CORRUPT_ENUM; |
458 | 0 | } |
459 | | } // namespace |
460 | | |
461 | 800 | Http::ServerHeaderValidatorPtr FakeHttpConnection::makeHeaderValidator() { |
462 | 800 | return header_validator_factory_ ? header_validator_factory_->createServerHeaderValidator( |
463 | 0 | codeTypeToProtocol(type_), header_validator_stats_) |
464 | 800 | : nullptr; |
465 | 800 | } |
466 | | |
467 | 14 | Http::RequestDecoder& FakeHttpConnection::newStream(Http::ResponseEncoder& encoder, bool) { |
468 | 14 | absl::MutexLock lock(&lock_); |
469 | 14 | new_streams_.emplace_back(new FakeStream(*this, encoder, time_system_)); |
470 | 14 | return *new_streams_.back(); |
471 | 14 | } |
472 | | |
473 | 0 | void FakeHttpConnection::onGoAway(Http::GoAwayErrorCode code) { |
474 | 0 | ASSERT(type_ != Http::CodecType::HTTP1); |
475 | | // Usually indicates connection level errors, no operations are needed since |
476 | | // the connection will be closed soon. |
477 | 0 | ENVOY_LOG(info, "FakeHttpConnection receives GOAWAY: ", static_cast<int>(code)); |
478 | 0 | } |
479 | | |
480 | 0 | void FakeHttpConnection::encodeGoAway() { |
481 | 0 | ASSERT(type_ != Http::CodecType::HTTP1); |
482 | | |
483 | 0 | postToConnectionThread([this]() { codec_->goAway(); }); |
484 | 0 | } |
485 | | |
486 | 0 | void FakeHttpConnection::updateConcurrentStreams(uint64_t max_streams) { |
487 | 0 | ASSERT(type_ != Http::CodecType::HTTP1); |
488 | | |
489 | 0 | if (type_ == Http::CodecType::HTTP2) { |
490 | 0 | postToConnectionThread([this, max_streams]() { |
491 | 0 | auto codec = dynamic_cast<TestHttp2ServerConnectionImpl*>(codec_.get()); |
492 | 0 | codec->updateConcurrentStreams(max_streams); |
493 | 0 | }); |
494 | 0 | } else { |
495 | 0 | #ifdef ENVOY_ENABLE_QUIC |
496 | 0 | postToConnectionThread([this, max_streams]() { |
497 | 0 | auto codec = dynamic_cast<Quic::QuicHttpServerConnectionImpl*>(codec_.get()); |
498 | 0 | quic::test::QuicSessionPeer::SetMaxOpenIncomingBidirectionalStreams( |
499 | 0 | &codec->quicServerSession(), max_streams); |
500 | 0 | codec->quicServerSession().SendMaxStreams(1, false); |
501 | 0 | }); |
502 | | #else |
503 | | UNREFERENCED_PARAMETER(max_streams); |
504 | | #endif |
505 | 0 | } |
506 | 0 | } |
507 | | |
508 | 0 | void FakeHttpConnection::encodeProtocolError() { |
509 | 0 | ASSERT(type_ != Http::CodecType::HTTP1); |
510 | | |
511 | 0 | Http::Http2::ServerConnectionImpl* codec = |
512 | 0 | dynamic_cast<Http::Http2::ServerConnectionImpl*>(codec_.get()); |
513 | 0 | ASSERT(codec != nullptr); |
514 | 0 | postToConnectionThread([codec]() { |
515 | 0 | Http::Status status = codec->protocolErrorForTest(); |
516 | 0 | ASSERT(Http::getStatusCode(status) == Http::StatusCode::CodecProtocolError); |
517 | 0 | }); |
518 | 0 | } |
519 | | |
520 | 92 | AssertionResult FakeConnectionBase::waitForDisconnect(milliseconds timeout) { |
521 | 92 | ENVOY_LOG(trace, "FakeConnectionBase waiting for disconnect"); |
522 | 92 | absl::MutexLock lock(&lock_); |
523 | 92 | const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { |
524 | 92 | return !shared_connection_.connectedLockHeld(); |
525 | 92 | }; |
526 | | |
527 | 92 | if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { |
528 | 0 | if (timeout == TestUtility::DefaultTimeout) { |
529 | 0 | ADD_FAILURE() << "Please don't waitForDisconnect with a 5s timeout if failure is expected\n"; |
530 | 0 | } |
531 | 0 | return AssertionFailure() << "Timed out waiting for disconnect."; |
532 | 0 | } |
533 | 92 | ENVOY_LOG(trace, "FakeConnectionBase done waiting for disconnect"); |
534 | 92 | return AssertionSuccess(); |
535 | 92 | } |
536 | | |
537 | 0 | AssertionResult FakeConnectionBase::waitForRstDisconnect(std::chrono::milliseconds timeout) { |
538 | 0 | ENVOY_LOG(trace, "FakeConnectionBase waiting for RST disconnect"); |
539 | 0 | absl::MutexLock lock(&lock_); |
540 | 0 | const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { |
541 | 0 | return shared_connection_.rstDisconnected(); |
542 | 0 | }; |
543 | |
|
544 | 0 | if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { |
545 | 0 | if (timeout == TestUtility::DefaultTimeout) { |
546 | 0 | ADD_FAILURE() |
547 | 0 | << "Please don't waitForRstDisconnect with a 5s timeout if failure is expected\n"; |
548 | 0 | } |
549 | 0 | return AssertionFailure() << "Timed out waiting for RST disconnect."; |
550 | 0 | } |
551 | 0 | ENVOY_LOG(trace, "FakeConnectionBase done waiting for RST disconnect"); |
552 | 0 | return AssertionSuccess(); |
553 | 0 | } |
554 | | |
555 | 0 | AssertionResult FakeConnectionBase::waitForHalfClose(milliseconds timeout) { |
556 | 0 | absl::MutexLock lock(&lock_); |
557 | 0 | if (!time_system_.waitFor(lock_, absl::Condition(&half_closed_), timeout)) { |
558 | 0 | return AssertionFailure() << "Timed out waiting for half close."; |
559 | 0 | } |
560 | 0 | return AssertionSuccess(); |
561 | 0 | } |
562 | | |
563 | 0 | AssertionResult FakeConnectionBase::waitForNoPost(milliseconds timeout) { |
564 | 0 | absl::MutexLock lock(&lock_); |
565 | 0 | if (!time_system_.waitFor( |
566 | 0 | lock_, |
567 | 0 | absl::Condition( |
568 | 0 | [](void* fake_connection) -> bool { |
569 | 0 | return static_cast<FakeConnectionBase*>(fake_connection)->pending_cbs_ == 0; |
570 | 0 | }, |
571 | 0 | this), |
572 | 0 | timeout)) { |
573 | 0 | return AssertionFailure() << "Timed out waiting for ops on this connection"; |
574 | 0 | } |
575 | 0 | return AssertionSuccess(); |
576 | 0 | } |
577 | | |
578 | 2.54k | void FakeConnectionBase::postToConnectionThread(std::function<void()> cb) { |
579 | 2.54k | ++pending_cbs_; |
580 | 2.54k | dispatcher_.post([this, cb]() { |
581 | 2.54k | cb(); |
582 | 2.54k | { |
583 | | // Snag this lock not because it's needed but so waitForNoPost doesn't stall |
584 | 2.54k | absl::MutexLock lock(&lock_); |
585 | 2.54k | --pending_cbs_; |
586 | 2.54k | } |
587 | 2.54k | }); |
588 | 2.54k | } |
589 | | |
590 | | AssertionResult FakeHttpConnection::waitForNewStream(Event::Dispatcher& client_dispatcher, |
591 | | FakeStreamPtr& stream, |
592 | 14 | std::chrono::milliseconds timeout) { |
593 | 14 | absl::MutexLock lock(&lock_); |
594 | 14 | if (!waitForWithDispatcherRun( |
595 | 14 | time_system_, lock_, |
596 | 42 | [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return !new_streams_.empty(); }, |
597 | 14 | client_dispatcher, timeout)) { |
598 | 0 | return AssertionFailure() << "Timed out waiting for new stream."; |
599 | 0 | } |
600 | 14 | stream = std::move(new_streams_.front()); |
601 | 14 | new_streams_.pop_front(); |
602 | 14 | return AssertionSuccess(); |
603 | 14 | } |
604 | | |
605 | | FakeUpstream::FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory, |
606 | | const std::string& uds_path, const FakeUpstreamConfig& config) |
607 | | : FakeUpstream(std::move(transport_socket_factory), |
608 | | Network::SocketPtr{new Network::UdsListenSocket( |
609 | | *Network::Address::PipeInstance::create(uds_path))}, |
610 | 0 | config) {} |
611 | | |
612 | | static Network::SocketPtr |
613 | 1.99k | makeTcpListenSocket(const Network::Address::InstanceConstSharedPtr& address) { |
614 | 1.99k | return std::make_unique<Network::TcpListenSocket>(address, nullptr, true); |
615 | 1.99k | } |
616 | | |
617 | | static Network::Address::InstanceConstSharedPtr makeAddress(uint32_t port, |
618 | 14 | Network::Address::IpVersion version) { |
619 | 14 | return Network::Utility::parseInternetAddressNoThrow( |
620 | 14 | Network::Test::getLoopbackAddressString(version), port); |
621 | 14 | } |
622 | | |
623 | | static Network::SocketPtr |
624 | 0 | makeUdpListenSocket(const Network::Address::InstanceConstSharedPtr& address) { |
625 | 0 | auto socket = std::make_unique<Network::UdpListenSocket>(address, nullptr, true); |
626 | | // TODO(mattklein123): These options are set in multiple locations. We should centralize them for |
627 | | // UDP listeners. |
628 | 0 | socket->addOptions(Network::SocketOptionFactory::buildIpPacketInfoOptions()); |
629 | 0 | socket->addOptions(Network::SocketOptionFactory::buildRxQueueOverFlowOptions()); |
630 | 0 | return socket; |
631 | 0 | } |
632 | | |
633 | | static Network::SocketPtr |
634 | | makeListenSocket(const FakeUpstreamConfig& config, |
635 | 1.99k | const Network::Address::InstanceConstSharedPtr& address) { |
636 | 1.99k | return (config.udp_fake_upstream_.has_value() ? makeUdpListenSocket(address) |
637 | 1.99k | : makeTcpListenSocket(address)); |
638 | 1.99k | } |
639 | | |
640 | | FakeUpstream::FakeUpstream(uint32_t port, Network::Address::IpVersion version, |
641 | | const FakeUpstreamConfig& config, const bool defer_initialization) |
642 | | : FakeUpstream(Network::Test::createRawBufferDownstreamSocketFactory(), |
643 | | makeListenSocket(config, makeAddress(port, version)), config, |
644 | 14 | defer_initialization) {} |
645 | | |
646 | | FakeUpstream::FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory, |
647 | | const Network::Address::InstanceConstSharedPtr& address, |
648 | | const FakeUpstreamConfig& config) |
649 | 1.97k | : FakeUpstream(std::move(transport_socket_factory), makeListenSocket(config, address), config) { |
650 | 1.97k | } |
651 | | |
652 | | FakeUpstream::FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory, |
653 | | uint32_t port, Network::Address::IpVersion version, |
654 | | const FakeUpstreamConfig& config) |
655 | | : FakeUpstream(std::move(transport_socket_factory), |
656 | 0 | makeListenSocket(config, makeAddress(port, version)), config) {} |
657 | | |
658 | | FakeUpstream::FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory, |
659 | | Network::SocketPtr&& listen_socket, const FakeUpstreamConfig& config, |
660 | | const bool defer_initialization) |
661 | | : http_type_(config.upstream_protocol_), http2_options_(config.http2_options_), |
662 | | http3_options_(config.http3_options_), quic_options_(config.quic_options_), |
663 | | socket_(Network::SocketSharedPtr(listen_socket.release())), |
664 | | api_(Api::createApiForTest(stats_store_)), time_system_(config.time_system_), |
665 | | dispatcher_(api_->allocateDispatcher("fake_upstream")), |
666 | | handler_(new Server::ConnectionHandlerImpl(*dispatcher_, 0)), config_(config), |
667 | | read_disable_on_new_connection_(true), enable_half_close_(config.enable_half_close_), |
668 | | listener_(*this, http_type_ == Http::CodecType::HTTP3), |
669 | | filter_chain_(Network::Test::createEmptyFilterChain(std::move(transport_socket_factory))), |
670 | 1.99k | stats_scope_(stats_store_.createScope("test_server_scope")) { |
671 | 1.99k | socket_factories_.emplace_back(std::make_unique<FakeListenSocketFactory>(socket_)); |
672 | 1.99k | ENVOY_LOG(info, "starting fake server at {}. UDP={} codec={}", localAddress()->asString(), |
673 | 1.99k | config.udp_fake_upstream_.has_value(), FakeHttpConnection::typeToString(http_type_)); |
674 | 1.99k | if (config.udp_fake_upstream_.has_value() && |
675 | 1.99k | config.udp_fake_upstream_->max_rx_datagram_size_.has_value()) { |
676 | 0 | listener_.udp_listener_config_.config_.mutable_downstream_socket_config() |
677 | 0 | ->mutable_max_rx_datagram_size() |
678 | 0 | ->set_value(config.udp_fake_upstream_->max_rx_datagram_size_.value()); |
679 | 0 | } |
680 | | |
681 | 1.99k | if (!defer_initialization) { |
682 | 1.99k | initializeServer(); |
683 | 1.99k | } |
684 | 1.99k | } |
685 | | |
686 | 1.99k | FakeUpstream::~FakeUpstream() { cleanUp(); }; |
687 | | |
688 | 1.99k | void FakeUpstream::initializeServer() { |
689 | 1.99k | if (initialized_) { |
690 | | // Already initialized. |
691 | 0 | return; |
692 | 0 | } |
693 | | |
694 | 1.99k | dispatcher_->post([this]() -> void { |
695 | 1.99k | EXPECT_TRUE(socket_factories_[0]->doFinalPreWorkerInit().ok()); |
696 | 1.99k | handler_->addListener(absl::nullopt, listener_, runtime_, random_); |
697 | 1.99k | server_initialized_.setReady(); |
698 | 1.99k | }); |
699 | 1.99k | thread_ = api_->threadFactory().createThread([this]() -> void { threadRoutine(); }); |
700 | 1.99k | server_initialized_.waitReady(); |
701 | 1.99k | initialized_ = true; |
702 | 1.99k | } |
703 | | |
704 | 2.39k | void FakeUpstream::cleanUp() { |
705 | 2.39k | if (thread_.get()) { |
706 | 1.99k | dispatcher_->exit(); |
707 | 1.99k | thread_->join(); |
708 | 1.99k | thread_.reset(); |
709 | 1.99k | } |
710 | 2.39k | } |
711 | | |
712 | | bool FakeUpstream::createNetworkFilterChain(Network::Connection& connection, |
713 | 102 | const Filter::NetworkFilterFactoriesList&) { |
714 | 102 | absl::MutexLock lock(&lock_); |
715 | 102 | if (read_disable_on_new_connection_ && http_type_ != Http::CodecType::HTTP3) { |
716 | | // Disable early close detection to avoid closing the network connection before full |
717 | | // initialization is complete. |
718 | 102 | connection.detectEarlyCloseWhenReadDisabled(false); |
719 | 102 | connection.readDisable(true); |
720 | 102 | if (disable_and_do_not_enable_) { |
721 | 0 | dynamic_cast<Network::ConnectionImpl*>(&connection)->ioHandle().enableFileEvents(0); |
722 | 0 | } |
723 | 102 | } |
724 | 102 | auto connection_wrapper = std::make_unique<SharedConnectionWrapper>(connection); |
725 | | |
726 | 102 | LinkedList::moveIntoListBack(std::move(connection_wrapper), new_connections_); |
727 | | |
728 | | // Normally we don't associate a logical network connection with a FakeHttpConnection until |
729 | | // waitForHttpConnection is called, but QUIC needs to be set up as packets come in, so we do |
730 | | // not lazily create for HTTP/3 |
731 | 102 | if (http_type_ == Http::CodecType::HTTP3) { |
732 | 0 | quic_connections_.push_back(std::make_unique<FakeHttpConnection>( |
733 | 0 | *this, consumeConnection(), http_type_, time_system_, config_.max_request_headers_kb_, |
734 | 0 | config_.max_request_headers_count_, config_.headers_with_underscores_action_)); |
735 | 0 | quic_connections_.back()->initialize(); |
736 | 0 | } |
737 | 102 | return true; |
738 | 102 | } |
739 | | |
740 | 102 | bool FakeUpstream::createListenerFilterChain(Network::ListenerFilterManager&) { return true; } |
741 | | |
742 | | void FakeUpstream::createUdpListenerFilterChain(Network::UdpListenerFilterManager& udp_listener, |
743 | 0 | Network::UdpReadFilterCallbacks& callbacks) { |
744 | 0 | udp_listener.addReadFilter(std::make_unique<FakeUdpFilter>(*this, callbacks)); |
745 | 0 | } |
746 | | |
747 | 0 | bool FakeUpstream::createQuicListenerFilterChain(Network::QuicListenerFilterManager&) { |
748 | 0 | return true; |
749 | 0 | } |
750 | | |
751 | 1.99k | void FakeUpstream::threadRoutine() { |
752 | 1.99k | dispatcher_->run(Event::Dispatcher::RunType::Block); |
753 | 1.99k | handler_.reset(); |
754 | 1.99k | { |
755 | 1.99k | absl::MutexLock lock(&lock_); |
756 | 1.99k | new_connections_.clear(); |
757 | 1.99k | quic_connections_.clear(); |
758 | 1.99k | consumed_connections_.clear(); |
759 | 1.99k | } |
760 | 1.99k | } |
761 | | |
762 | | AssertionResult FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher, |
763 | | FakeHttpConnectionPtr& connection, |
764 | 14 | milliseconds timeout) { |
765 | 14 | if (!initialized_) { |
766 | 0 | return AssertionFailure() |
767 | 0 | << "Must initialize the FakeUpstream first by calling initializeServer()."; |
768 | 0 | } |
769 | | |
770 | 14 | { |
771 | 14 | absl::MutexLock lock(&lock_); |
772 | | |
773 | | // As noted in createNetworkFilterChain, HTTP3 FakeHttpConnections are not |
774 | | // lazily created, so HTTP3 needs a different wait path here. |
775 | 14 | if (http_type_ == Http::CodecType::HTTP3) { |
776 | 0 | if (quic_connections_.empty() && |
777 | 0 | !waitForWithDispatcherRun( |
778 | 0 | time_system_, lock_, |
779 | 0 | [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return !quic_connections_.empty(); }, |
780 | 0 | client_dispatcher, timeout)) { |
781 | 0 | return AssertionFailure() << "Timed out waiting for new quic connection."; |
782 | 0 | } |
783 | 0 | if (!quic_connections_.empty()) { |
784 | 0 | connection = std::move(quic_connections_.front()); |
785 | 0 | quic_connections_.pop_front(); |
786 | 0 | return AssertionSuccess(); |
787 | 0 | } |
788 | 0 | } |
789 | | |
790 | 14 | if (!waitForWithDispatcherRun( |
791 | 14 | time_system_, lock_, |
792 | 14 | [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return !new_connections_.empty(); }, |
793 | 14 | client_dispatcher, timeout)) { |
794 | 0 | if (timeout == TestUtility::DefaultTimeout) { |
795 | 0 | ADD_FAILURE() |
796 | 0 | << "Please don't waitForHttpConnection with a 5s timeout if failure is expected\n"; |
797 | 0 | } |
798 | 0 | return AssertionFailure() << "Timed out waiting for new connection."; |
799 | 0 | } |
800 | 14 | } |
801 | 14 | return runOnDispatcherThreadAndWait([&]() { |
802 | 14 | absl::MutexLock lock(&lock_); |
803 | 14 | connection = std::make_unique<FakeHttpConnection>( |
804 | 14 | *this, consumeConnection(), http_type_, time_system_, config_.max_request_headers_kb_, |
805 | 14 | config_.max_request_headers_count_, config_.headers_with_underscores_action_); |
806 | 14 | connection->initialize(); |
807 | 14 | return AssertionSuccess(); |
808 | 14 | }); |
809 | 14 | } |
810 | | |
811 | | absl::StatusOr<int> |
812 | | FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher, |
813 | | std::vector<std::unique_ptr<FakeUpstream>>& upstreams, |
814 | 0 | FakeHttpConnectionPtr& connection, milliseconds timeout) { |
815 | 0 | if (upstreams.empty()) { |
816 | 0 | return absl::InternalError("No upstreams configured."); |
817 | 0 | } |
818 | 0 | Event::TestTimeSystem::RealTimeBound bound(timeout); |
819 | 0 | while (bound.withinBound()) { |
820 | 0 | for (size_t i = 0; i < upstreams.size(); ++i) { |
821 | 0 | FakeUpstream& upstream = *upstreams[i]; |
822 | 0 | { |
823 | 0 | absl::MutexLock lock(&upstream.lock_); |
824 | 0 | if (!upstream.isInitialized()) { |
825 | 0 | return absl::InternalError( |
826 | 0 | "Must initialize the FakeUpstream first by calling initializeServer()."); |
827 | 0 | } |
828 | 0 | if (!waitForWithDispatcherRun( |
829 | 0 | upstream.time_system_, upstream.lock_, |
830 | 0 | [&upstream]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(upstream.lock_) { |
831 | 0 | return !upstream.new_connections_.empty(); |
832 | 0 | }, |
833 | 0 | client_dispatcher, 5ms)) { |
834 | 0 | continue; |
835 | 0 | } |
836 | 0 | } |
837 | | |
838 | 0 | EXPECT_TRUE(upstream.runOnDispatcherThreadAndWait([&]() { |
839 | 0 | absl::MutexLock lock(&upstream.lock_); |
840 | 0 | connection = std::make_unique<FakeHttpConnection>( |
841 | 0 | upstream, upstream.consumeConnection(), upstream.http_type_, upstream.timeSystem(), |
842 | 0 | Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, |
843 | 0 | envoy::config::core::v3::HttpProtocolOptions::ALLOW); |
844 | 0 | connection->initialize(); |
845 | 0 | return AssertionSuccess(); |
846 | 0 | })); |
847 | 0 | return i; |
848 | 0 | } |
849 | 0 | } |
850 | 0 | return absl::InternalError("Timed out waiting for HTTP connection."); |
851 | 0 | } |
852 | | |
853 | | ABSL_MUST_USE_RESULT |
854 | 0 | AssertionResult FakeUpstream::assertPendingConnectionsEmpty() { |
855 | 0 | return runOnDispatcherThreadAndWait([&]() { |
856 | 0 | absl::MutexLock lock(&lock_); |
857 | 0 | return new_connections_.empty() ? AssertionSuccess() : AssertionFailure(); |
858 | 0 | }); |
859 | 0 | } |
860 | | |
861 | | AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connection, |
862 | 335 | milliseconds timeout) { |
863 | 335 | if (!initialized_) { |
864 | 0 | return AssertionFailure() |
865 | 0 | << "Must initialize the FakeUpstream first by calling initializeServer()."; |
866 | 0 | } |
867 | | |
868 | 335 | { |
869 | 335 | absl::MutexLock lock(&lock_); |
870 | 722 | const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { |
871 | 722 | return !new_connections_.empty(); |
872 | 722 | }; |
873 | | |
874 | 335 | ENVOY_LOG(debug, "waiting for raw connection"); |
875 | 335 | if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { |
876 | 249 | return AssertionFailure() << "Timed out waiting for raw connection"; |
877 | 249 | } |
878 | 335 | } |
879 | | |
880 | 86 | return runOnDispatcherThreadAndWait([&]() { |
881 | 86 | absl::MutexLock lock(&lock_); |
882 | 86 | connection = makeRawConnection(consumeConnection(), timeSystem()); |
883 | 86 | connection->initialize(); |
884 | | // Skip enableHalfClose if the connection is already disconnected. |
885 | 86 | if (connection->connected()) { |
886 | 86 | connection->connection().enableHalfClose(enable_half_close_); |
887 | 86 | } |
888 | 86 | return AssertionSuccess(); |
889 | 86 | }); |
890 | 335 | } |
891 | | |
892 | | void FakeUpstream::convertFromRawToHttp(FakeRawConnectionPtr& raw_connection, |
893 | 0 | FakeHttpConnectionPtr& connection) { |
894 | 0 | absl::MutexLock lock(&lock_); |
895 | 0 | SharedConnectionWrapper& shared_connection = raw_connection->sharedConnection(); |
896 | |
|
897 | 0 | connection = std::make_unique<FakeHttpConnection>( |
898 | 0 | *this, shared_connection, http_type_, time_system_, config_.max_request_headers_kb_, |
899 | 0 | config_.max_request_headers_count_, config_.headers_with_underscores_action_); |
900 | 0 | connection->initialize(); |
901 | 0 | raw_connection.release(); |
902 | 0 | } |
903 | | |
904 | 100 | SharedConnectionWrapper& FakeUpstream::consumeConnection() { |
905 | 100 | ASSERT(!new_connections_.empty()); |
906 | 100 | auto* const connection_wrapper = new_connections_.front().get(); |
907 | | // Skip the thread safety check if the network connection has already been freed since there's no |
908 | | // alternate way to get access to the dispatcher. |
909 | 100 | ASSERT(!connection_wrapper->connected() || connection_wrapper->dispatcher().isThreadSafe()); |
910 | 100 | connection_wrapper->setParented(); |
911 | 100 | connection_wrapper->moveBetweenLists(new_connections_, consumed_connections_); |
912 | 100 | if (read_disable_on_new_connection_ && connection_wrapper->connected() && |
913 | 100 | http_type_ != Http::CodecType::HTTP3 && !disable_and_do_not_enable_) { |
914 | | // Re-enable read and early close detection. |
915 | 100 | auto& connection = connection_wrapper->connection(); |
916 | 100 | connection.detectEarlyCloseWhenReadDisabled(true); |
917 | 100 | connection.readDisable(false); |
918 | 100 | } |
919 | 100 | return *connection_wrapper; |
920 | 100 | } |
921 | | |
922 | | AssertionResult FakeUpstream::waitForUdpDatagram(Network::UdpRecvData& data_to_fill, |
923 | 0 | std::chrono::milliseconds timeout) { |
924 | 0 | if (!initialized_) { |
925 | 0 | return AssertionFailure() |
926 | 0 | << "Must initialize the FakeUpstream first by calling initializeServer()."; |
927 | 0 | } |
928 | | |
929 | 0 | absl::MutexLock lock(&lock_); |
930 | 0 | const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { |
931 | 0 | return !received_datagrams_.empty(); |
932 | 0 | }; |
933 | |
|
934 | 0 | if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { |
935 | 0 | return AssertionFailure() << "Timed out waiting for UDP datagram."; |
936 | 0 | } |
937 | | |
938 | 0 | data_to_fill = std::move(received_datagrams_.front()); |
939 | 0 | received_datagrams_.pop_front(); |
940 | 0 | return AssertionSuccess(); |
941 | 0 | } |
942 | | |
943 | 0 | Network::FilterStatus FakeUpstream::onRecvDatagram(Network::UdpRecvData& data) { |
944 | 0 | absl::MutexLock lock(&lock_); |
945 | 0 | received_datagrams_.emplace_back(std::move(data)); |
946 | |
|
947 | 0 | return Network::FilterStatus::StopIteration; |
948 | 0 | } |
949 | | |
950 | | AssertionResult FakeUpstream::runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb, |
951 | 100 | std::chrono::milliseconds timeout) { |
952 | 100 | auto result = std::make_shared<AssertionResult>(AssertionSuccess()); |
953 | 100 | auto done = std::make_shared<absl::Notification>(); |
954 | 100 | ASSERT(!dispatcher_->isThreadSafe()); |
955 | 100 | dispatcher_->post([&]() { |
956 | 100 | *result = cb(); |
957 | 100 | done->Notify(); |
958 | 100 | }); |
959 | 100 | RELEASE_ASSERT(done->WaitForNotificationWithTimeout(absl::FromChrono(timeout)), |
960 | 100 | "Timed out waiting for cb to run on dispatcher"); |
961 | 100 | return *result; |
962 | 100 | } |
963 | | |
964 | 0 | void FakeUpstream::runOnDispatcherThread(std::function<void()> cb) { |
965 | 0 | ASSERT(!dispatcher_->isThreadSafe()); |
966 | 0 | dispatcher_->post([&]() { cb(); }); |
967 | 0 | } |
968 | | |
969 | | void FakeUpstream::sendUdpDatagram(const std::string& buffer, |
970 | 0 | const Network::Address::InstanceConstSharedPtr& peer) { |
971 | 0 | dispatcher_->post([this, buffer, peer] { |
972 | 0 | const auto rc = Network::Utility::writeToSocket(socket_->ioHandle(), Buffer::OwnedImpl(buffer), |
973 | 0 | nullptr, *peer); |
974 | 0 | EXPECT_TRUE(rc.return_value_ == buffer.length()); |
975 | 0 | }); |
976 | 0 | } |
977 | | |
978 | | AssertionResult FakeUpstream::rawWriteConnection(uint32_t index, const std::string& data, |
979 | | bool end_stream, |
980 | 0 | std::chrono::milliseconds timeout) { |
981 | 0 | if (!initialized_) { |
982 | 0 | return AssertionFailure() |
983 | 0 | << "Must initialize the FakeUpstream first by calling initializeServer()."; |
984 | 0 | } |
985 | | |
986 | 0 | absl::MutexLock lock(&lock_); |
987 | 0 | auto iter = consumed_connections_.begin(); |
988 | 0 | std::advance(iter, index); |
989 | 0 | return (*iter)->executeOnDispatcher( |
990 | 0 | [data, end_stream](Network::Connection& connection) { |
991 | 0 | ASSERT(connection.state() == Network::Connection::State::Open); |
992 | 0 | Buffer::OwnedImpl buffer(data); |
993 | 0 | connection.write(buffer, end_stream); |
994 | 0 | }, |
995 | 0 | timeout); |
996 | 0 | } |
997 | | |
998 | 1.99k | absl::Status FakeUpstream::FakeListenSocketFactory::doFinalPreWorkerInit() { |
999 | 1.99k | if (socket_->socketType() == Network::Socket::Type::Stream) { |
1000 | 1.99k | EXPECT_EQ(0, socket_->ioHandle().listen(ENVOY_TCP_BACKLOG_SIZE).return_value_); |
1001 | 1.99k | } else { |
1002 | 0 | ASSERT(socket_->socketType() == Network::Socket::Type::Datagram); |
1003 | 0 | EXPECT_TRUE(Network::Socket::applyOptions(socket_->options(), *socket_, |
1004 | 0 | envoy::config::core::v3::SocketOption::STATE_BOUND)); |
1005 | 0 | } |
1006 | 1.99k | return absl::OkStatus(); |
1007 | 1.99k | } |
1008 | | |
1009 | 86 | FakeRawConnection::~FakeRawConnection() { |
1010 | | // If the filter was already deleted, it means the shared_connection_ was too, so don't try to |
1011 | | // access it. |
1012 | 86 | if (read_filter_ != nullptr) { |
1013 | 86 | EXPECT_TRUE(shared_connection_.executeOnDispatcher( |
1014 | 86 | [filter = std::move(read_filter_)](Network::Connection& connection) { |
1015 | 86 | connection.removeReadFilter(filter); |
1016 | 86 | })); |
1017 | 86 | } |
1018 | 86 | } |
1019 | | |
1020 | 86 | void FakeRawConnection::initialize() { |
1021 | 86 | FakeConnectionBase::initialize(); |
1022 | 86 | read_filter_ = std::make_shared<ReadFilter>(*this); |
1023 | 86 | if (!shared_connection_.connected()) { |
1024 | 0 | ENVOY_LOG(warn, "FakeRawConnection::initialize: network connection is already disconnected"); |
1025 | 0 | return; |
1026 | 0 | } |
1027 | 86 | ASSERT(shared_connection_.dispatcher().isThreadSafe()); |
1028 | 86 | shared_connection_.connection().addReadFilter(read_filter_); |
1029 | 86 | } |
1030 | | |
1031 | | AssertionResult FakeRawConnection::waitForData(uint64_t num_bytes, std::string* data, |
1032 | 0 | milliseconds timeout) { |
1033 | 0 | absl::MutexLock lock(&lock_); |
1034 | 0 | const auto reached = [this, num_bytes]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { |
1035 | 0 | return data_.size() == num_bytes; |
1036 | 0 | }; |
1037 | 0 | ENVOY_LOG(debug, "waiting for {} bytes of data", num_bytes); |
1038 | 0 | if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { |
1039 | 0 | return AssertionFailure() << fmt::format( |
1040 | 0 | "Timed out waiting for data. Got '{}', waiting for {} bytes.", data_, num_bytes); |
1041 | 0 | } |
1042 | 0 | if (data != nullptr) { |
1043 | 0 | *data = data_; |
1044 | 0 | } |
1045 | 0 | return AssertionSuccess(); |
1046 | 0 | } |
1047 | | |
1048 | | AssertionResult |
1049 | | FakeRawConnection::waitForData(const std::function<bool(const std::string&)>& data_validator, |
1050 | 0 | std::string* data, milliseconds timeout) { |
1051 | 0 | absl::MutexLock lock(&lock_); |
1052 | 0 | const auto reached = [this, &data_validator]() |
1053 | 0 | ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return data_validator(data_); }; |
1054 | 0 | ENVOY_LOG(debug, "waiting for data"); |
1055 | 0 | if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { |
1056 | 0 | return AssertionFailure() << "Timed out waiting for data."; |
1057 | 0 | } |
1058 | 0 | if (data != nullptr) { |
1059 | 0 | *data = data_; |
1060 | 0 | } |
1061 | 0 | return AssertionSuccess(); |
1062 | 0 | } |
1063 | | |
1064 | | AssertionResult FakeRawConnection::write(const std::string& data, bool end_stream, |
1065 | 3.62k | milliseconds timeout) { |
1066 | 3.62k | return shared_connection_.executeOnDispatcher( |
1067 | 3.62k | [data, end_stream](Network::Connection& connection) { |
1068 | 3.57k | Buffer::OwnedImpl to_write(data); |
1069 | 3.57k | connection.write(to_write, end_stream); |
1070 | 3.57k | }, |
1071 | 3.62k | timeout); |
1072 | 3.62k | } |
1073 | | |
1074 | | Network::FilterStatus FakeRawConnection::ReadFilter::onData(Buffer::Instance& data, |
1075 | 498 | bool end_stream) { |
1076 | 498 | absl::MutexLock lock(&parent_.lock_); |
1077 | 498 | ENVOY_LOG(debug, "got {} bytes, end_stream {}", data.length(), end_stream); |
1078 | 498 | parent_.data_.append(data.toString()); |
1079 | 498 | parent_.half_closed_ = end_stream; |
1080 | 498 | data.drain(data.length()); |
1081 | 498 | return Network::FilterStatus::StopIteration; |
1082 | 498 | } |
1083 | | |
1084 | | ABSL_MUST_USE_RESULT |
1085 | | AssertionResult FakeHttpConnection::waitForInexactRawData(const char* data, std::string* out, |
1086 | 0 | std::chrono::milliseconds timeout) { |
1087 | 0 | absl::MutexLock lock(&lock_); |
1088 | 0 | const auto reached = [this, data, &out]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { |
1089 | 0 | char peek_buf[200]; |
1090 | 0 | auto result = dynamic_cast<Network::ConnectionImpl*>(&connection()) |
1091 | 0 | ->ioHandle() |
1092 | 0 | .recv(peek_buf, 200, MSG_PEEK); |
1093 | 0 | ASSERT(result.ok() || result.err_->getErrorCode() == Api::IoError::IoErrorCode::Again); |
1094 | 0 | if (!result.ok()) { |
1095 | 0 | return false; |
1096 | 0 | } |
1097 | 0 | absl::string_view peek_data(peek_buf, result.return_value_); |
1098 | 0 | size_t index = peek_data.find(data); |
1099 | 0 | if (index != absl::string_view::npos) { |
1100 | 0 | Buffer::OwnedImpl buffer; |
1101 | 0 | *out = std::string(peek_data.data(), index + 4); |
1102 | 0 | auto result = dynamic_cast<Network::ConnectionImpl*>(&connection()) |
1103 | 0 | ->ioHandle() |
1104 | 0 | .recv(peek_buf, index + 4, 0); |
1105 | 0 | return true; |
1106 | 0 | } |
1107 | 0 | return false; |
1108 | 0 | }; |
1109 | | // Because the connection must be read disabled to not auto-consume the |
1110 | | // underlying data, waitFor hangs with no events to force the time system to |
1111 | | // continue. Break it up into smaller chunks. |
1112 | 0 | for (int i = 0; i < timeout / 10ms; ++i) { |
1113 | 0 | if (time_system_.waitFor(lock_, absl::Condition(&reached), 10ms)) { |
1114 | 0 | return AssertionSuccess(); |
1115 | 0 | } |
1116 | 0 | } |
1117 | 0 | return AssertionFailure() << "timed out waiting for raw data"; |
1118 | 0 | } |
1119 | | |
1120 | 0 | void FakeHttpConnection::writeRawData(absl::string_view data) { |
1121 | 0 | Buffer::OwnedImpl buffer(data); |
1122 | 0 | Api::IoCallUint64Result result = |
1123 | 0 | dynamic_cast<Network::ConnectionImpl*>(&connection())->ioHandle().write(buffer); |
1124 | 0 | ASSERT(result.ok()); |
1125 | 0 | } |
1126 | | |
1127 | 0 | AssertionResult FakeHttpConnection::postWriteRawData(std::string data) { |
1128 | 0 | return shared_connection_.executeOnDispatcher( |
1129 | 0 | [data](Network::Connection& connection) { |
1130 | 0 | Buffer::OwnedImpl to_write(data); |
1131 | 0 | connection.write(to_write, false); |
1132 | 0 | }, |
1133 | 0 | TestUtility::DefaultTimeout); |
1134 | 0 | } |
1135 | | |
1136 | | } // namespace Envoy |