/proc/self/cwd/test/integration/fake_upstream.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "test/integration/fake_upstream.h" |
2 | | |
3 | | #include <chrono> |
4 | | #include <cstdint> |
5 | | #include <memory> |
6 | | #include <string> |
7 | | |
8 | | #include "source/common/buffer/buffer_impl.h" |
9 | | #include "source/common/config/utility.h" |
10 | | #include "source/common/http/header_map_impl.h" |
11 | | #include "source/common/http/http1/codec_impl.h" |
12 | | #include "source/common/http/http2/codec_impl.h" |
13 | | #include "source/common/network/address_impl.h" |
14 | | #include "source/common/network/connection_impl.h" |
15 | | #include "source/common/network/listen_socket_impl.h" |
16 | | #include "source/common/network/socket_option_factory.h" |
17 | | #include "source/common/network/utility.h" |
18 | | #include "source/common/runtime/runtime_features.h" |
19 | | |
20 | | #ifdef ENVOY_ENABLE_QUIC |
21 | | #include "source/common/quic/server_codec_impl.h" |
22 | | #include "quiche/quic/test_tools/quic_session_peer.h" |
23 | | #endif |
24 | | |
25 | | #include "source/extensions/listener_managers/listener_manager/connection_handler_impl.h" |
26 | | |
27 | | #include "test/integration/utility.h" |
28 | | #include "test/test_common/network_utility.h" |
29 | | #include "test/test_common/utility.h" |
30 | | |
31 | | #include "absl/strings/str_cat.h" |
32 | | #include "absl/synchronization/notification.h" |
33 | | |
34 | | using namespace std::chrono_literals; |
35 | | |
36 | | using std::chrono::milliseconds; |
37 | | using testing::AssertionFailure; |
38 | | using testing::AssertionResult; |
39 | | using testing::AssertionSuccess; |
40 | | |
41 | | namespace Envoy { |
42 | | |
43 | | FakeStream::FakeStream(FakeHttpConnection& parent, Http::ResponseEncoder& encoder, |
44 | | Event::TestTimeSystem& time_system) |
45 | | : parent_(parent), encoder_(encoder), time_system_(time_system), |
46 | 1.09k | header_validator_(parent.makeHeaderValidator()) { |
47 | 1.09k | encoder.getStream().addCallbacks(*this); |
48 | 1.09k | } Envoy::FakeStream::FakeStream(Envoy::FakeHttpConnection&, Envoy::Http::ResponseEncoder&, Envoy::Event::TestTimeSystem&) Line | Count | Source | 46 | 1.07k | header_validator_(parent.makeHeaderValidator()) { | 47 | 1.07k | encoder.getStream().addCallbacks(*this); | 48 | 1.07k | } |
Envoy::FakeStream::FakeStream(Envoy::FakeHttpConnection&, Envoy::Http::ResponseEncoder&, Envoy::Event::TestTimeSystem&) Line | Count | Source | 46 | 14 | header_validator_(parent.makeHeaderValidator()) { | 47 | 14 | encoder.getStream().addCallbacks(*this); | 48 | 14 | } |
|
49 | | |
50 | 1.09k | void FakeStream::decodeHeaders(Http::RequestHeaderMapSharedPtr&& headers, bool end_stream) { |
51 | 1.09k | absl::MutexLock lock(&lock_); |
52 | 1.09k | headers_ = std::move(headers); |
53 | 1.09k | if (header_validator_) { |
54 | 0 | header_validator_->transformRequestHeaders(*headers_); |
55 | 0 | } |
56 | 1.09k | setEndStream(end_stream); |
57 | 1.09k | } |
58 | | |
59 | 1.15k | void FakeStream::decodeData(Buffer::Instance& data, bool end_stream) { |
60 | 1.15k | received_data_ = true; |
61 | 1.15k | absl::MutexLock lock(&lock_); |
62 | 1.15k | body_.add(data); |
63 | 1.15k | setEndStream(end_stream); |
64 | 1.15k | } |
65 | | |
66 | 0 | void FakeStream::decodeTrailers(Http::RequestTrailerMapPtr&& trailers) { |
67 | 0 | absl::MutexLock lock(&lock_); |
68 | 0 | setEndStream(true); |
69 | 0 | trailers_ = std::move(trailers); |
70 | 0 | } |
71 | | |
72 | 0 | void FakeStream::decodeMetadata(Http::MetadataMapPtr&& metadata_map_ptr) { |
73 | 0 | for (const auto& metadata : *metadata_map_ptr) { |
74 | 0 | duplicated_metadata_key_count_[metadata.first]++; |
75 | 0 | metadata_map_.insert(metadata); |
76 | 0 | } |
77 | 0 | } |
78 | | |
79 | 3.41k | void FakeStream::postToConnectionThread(std::function<void()> cb) { |
80 | 3.41k | parent_.postToConnectionThread(cb); |
81 | 3.41k | } |
82 | | |
83 | 0 | void FakeStream::encode1xxHeaders(const Http::ResponseHeaderMap& headers) { |
84 | 0 | std::shared_ptr<Http::ResponseHeaderMap> headers_copy( |
85 | 0 | Http::createHeaderMap<Http::ResponseHeaderMapImpl>(headers)); |
86 | 0 | postToConnectionThread([this, headers_copy]() -> void { |
87 | 0 | { |
88 | 0 | absl::MutexLock lock(&lock_); |
89 | 0 | if (!parent_.connected() || saw_reset_) { |
90 | | // Encoded already deleted. |
91 | 0 | return; |
92 | 0 | } |
93 | 0 | } |
94 | 0 | encoder_.encode1xxHeaders(*headers_copy); |
95 | 0 | }); |
96 | 0 | } |
97 | | |
98 | 1.09k | void FakeStream::encodeHeaders(const Http::HeaderMap& headers, bool end_stream) { |
99 | 1.09k | std::shared_ptr<Http::ResponseHeaderMap> headers_copy( |
100 | 1.09k | Http::createHeaderMap<Http::ResponseHeaderMapImpl>(headers)); |
101 | 1.09k | if (add_served_by_header_) { |
102 | 0 | headers_copy->addCopy(Http::LowerCaseString("x-served-by"), |
103 | 0 | parent_.connection().connectionInfoProvider().localAddress()->asString()); |
104 | 0 | } |
105 | | |
106 | 1.09k | if (header_validator_) { |
107 | | // Ignore validation results |
108 | 0 | auto result = header_validator_->transformResponseHeaders(*headers_copy); |
109 | 0 | if (result.new_headers) { |
110 | 0 | headers_copy = std::move(result.new_headers); |
111 | 0 | } |
112 | 0 | } |
113 | | |
114 | 1.09k | postToConnectionThread([this, headers_copy, end_stream]() -> void { |
115 | 1.09k | { |
116 | 1.09k | absl::MutexLock lock(&lock_); |
117 | 1.09k | if (!parent_.connected() || saw_reset_) { |
118 | | // Encoded already deleted. |
119 | 0 | return; |
120 | 0 | } |
121 | 1.09k | } |
122 | 1.09k | encoder_.encodeHeaders(*headers_copy, end_stream); |
123 | 1.09k | }); |
124 | 1.09k | } |
125 | | |
126 | 0 | void FakeStream::encodeData(std::string data, bool end_stream) { |
127 | 0 | postToConnectionThread([this, data, end_stream]() -> void { |
128 | 0 | { |
129 | 0 | absl::MutexLock lock(&lock_); |
130 | 0 | if (!parent_.connected() || saw_reset_) { |
131 | | // Encoded already deleted. |
132 | 0 | return; |
133 | 0 | } |
134 | 0 | } |
135 | 0 | Buffer::OwnedImpl fake_data(data.data(), data.size()); |
136 | 0 | encoder_.encodeData(fake_data, end_stream); |
137 | 0 | }); |
138 | 0 | } |
139 | | |
140 | 1.07k | void FakeStream::encodeData(uint64_t size, bool end_stream) { |
141 | 1.07k | postToConnectionThread([this, size, end_stream]() -> void { |
142 | 1.07k | { |
143 | 1.07k | absl::MutexLock lock(&lock_); |
144 | 1.07k | if (!parent_.connected() || saw_reset_) { |
145 | | // Encoded already deleted. |
146 | 0 | return; |
147 | 0 | } |
148 | 1.07k | } |
149 | 1.07k | Buffer::OwnedImpl data(std::string(size, 'a')); |
150 | 1.07k | encoder_.encodeData(data, end_stream); |
151 | 1.07k | }); |
152 | 1.07k | } |
153 | | |
154 | 172 | void FakeStream::encodeData(Buffer::Instance& data, bool end_stream) { |
155 | 172 | std::shared_ptr<Buffer::Instance> data_copy = std::make_shared<Buffer::OwnedImpl>(data); |
156 | 172 | postToConnectionThread([this, data_copy, end_stream]() -> void { |
157 | 172 | { |
158 | 172 | absl::MutexLock lock(&lock_); |
159 | 172 | if (!parent_.connected() || saw_reset_) { |
160 | | // Encoded already deleted. |
161 | 0 | return; |
162 | 0 | } |
163 | 172 | } |
164 | 172 | encoder_.encodeData(*data_copy, end_stream); |
165 | 172 | }); |
166 | 172 | } |
167 | | |
168 | 1.07k | void FakeStream::encodeTrailers(const Http::HeaderMap& trailers) { |
169 | 1.07k | std::shared_ptr<Http::ResponseTrailerMap> trailers_copy( |
170 | 1.07k | Http::createHeaderMap<Http::ResponseTrailerMapImpl>(trailers)); |
171 | 1.07k | postToConnectionThread([this, trailers_copy]() -> void { |
172 | 1.07k | { |
173 | 1.07k | absl::MutexLock lock(&lock_); |
174 | 1.07k | if (!parent_.connected() || saw_reset_) { |
175 | | // Encoded already deleted. |
176 | 0 | return; |
177 | 0 | } |
178 | 1.07k | } |
179 | 1.07k | encoder_.encodeTrailers(*trailers_copy); |
180 | 1.07k | }); |
181 | 1.07k | } |
182 | | |
183 | 0 | void FakeStream::encodeResetStream() { |
184 | 0 | postToConnectionThread([this]() -> void { |
185 | 0 | { |
186 | 0 | absl::MutexLock lock(&lock_); |
187 | 0 | if (!parent_.connected() || saw_reset_) { |
188 | | // Encoded already deleted. |
189 | 0 | return; |
190 | 0 | } |
191 | 0 | } |
192 | 0 | encoder_.getStream().resetStream(Http::StreamResetReason::LocalReset); |
193 | 0 | }); |
194 | 0 | } |
195 | | |
196 | 0 | void FakeStream::encodeMetadata(const Http::MetadataMapVector& metadata_map_vector) { |
197 | 0 | postToConnectionThread([this, &metadata_map_vector]() -> void { |
198 | 0 | { |
199 | 0 | absl::MutexLock lock(&lock_); |
200 | 0 | if (!parent_.connected() || saw_reset_) { |
201 | | // Encoded already deleted. |
202 | 0 | return; |
203 | 0 | } |
204 | 0 | } |
205 | 0 | encoder_.encodeMetadata(metadata_map_vector); |
206 | 0 | }); |
207 | 0 | } |
208 | | |
209 | 0 | void FakeStream::readDisable(bool disable) { |
210 | 0 | postToConnectionThread([this, disable]() -> void { |
211 | 0 | { |
212 | 0 | absl::MutexLock lock(&lock_); |
213 | 0 | if (!parent_.connected() || saw_reset_) { |
214 | | // Encoded already deleted. |
215 | 0 | return; |
216 | 0 | } |
217 | 0 | } |
218 | 0 | encoder_.getStream().readDisable(disable); |
219 | 0 | }); |
220 | 0 | } |
221 | | |
222 | 0 | void FakeStream::onResetStream(Http::StreamResetReason, absl::string_view) { |
223 | 0 | absl::MutexLock lock(&lock_); |
224 | 0 | saw_reset_ = true; |
225 | 0 | } |
226 | | |
227 | 0 | AssertionResult FakeStream::waitForHeadersComplete(milliseconds timeout) { |
228 | 0 | absl::MutexLock lock(&lock_); |
229 | 0 | const auto reached = [this]() |
230 | 0 | ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return headers_ != nullptr; }; |
231 | 0 | if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { |
232 | 0 | return AssertionFailure() << "Timed out waiting for headers."; |
233 | 0 | } |
234 | 0 | return AssertionSuccess(); |
235 | 0 | } |
236 | | |
237 | | namespace { |
238 | | // Perform a wait on a condition while still allowing for periodic client dispatcher runs that |
239 | | // occur on the current thread. |
240 | | bool waitForWithDispatcherRun(Event::TestTimeSystem& time_system, absl::Mutex& lock, |
241 | | const std::function<bool()>& condition, |
242 | | Event::Dispatcher& client_dispatcher, milliseconds timeout) |
243 | 195 | ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock) { |
244 | 195 | Event::TestTimeSystem::RealTimeBound bound(timeout); |
245 | 274 | while (bound.withinBound()) { |
246 | | // Wake up periodically to run the client dispatcher. |
247 | 274 | if (time_system.waitFor(lock, absl::Condition(&condition), 5ms * TSAN_TIMEOUT_FACTOR)) { |
248 | 195 | return true; |
249 | 195 | } |
250 | | |
251 | | // Run the client dispatcher since we may need to process window updates, etc. |
252 | 79 | client_dispatcher.run(Event::Dispatcher::RunType::NonBlock); |
253 | 79 | } |
254 | 0 | return false; |
255 | 195 | } |
256 | | } // namespace |
257 | | |
258 | | AssertionResult FakeStream::waitForData(Event::Dispatcher& client_dispatcher, uint64_t body_length, |
259 | 167 | milliseconds timeout) { |
260 | 167 | absl::MutexLock lock(&lock_); |
261 | 167 | if (!waitForWithDispatcherRun( |
262 | 167 | time_system_, lock_, |
263 | 167 | [this, body_length]() |
264 | 599 | ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return (body_.length() >= body_length); }, |
265 | 167 | client_dispatcher, timeout)) { |
266 | 0 | return AssertionFailure() << "Timed out waiting for data."; |
267 | 0 | } |
268 | 167 | return AssertionSuccess(); |
269 | 167 | } |
270 | | |
271 | | AssertionResult FakeStream::waitForData(Event::Dispatcher& client_dispatcher, |
272 | 0 | absl::string_view data, milliseconds timeout) { |
273 | 0 | auto succeeded = waitForData(client_dispatcher, data.length(), timeout); |
274 | 0 | if (succeeded) { |
275 | 0 | Buffer::OwnedImpl buffer(data.data(), data.length()); |
276 | 0 | if (!TestUtility::buffersEqual(body(), buffer)) { |
277 | 0 | return AssertionFailure() << body().toString() << " not equal to " << data; |
278 | 0 | } |
279 | 0 | } |
280 | 0 | return succeeded; |
281 | 0 | } |
282 | | |
283 | | AssertionResult FakeStream::waitForData(Event::Dispatcher& client_dispatcher, |
284 | | const FakeStream::ValidatorFunction& data_validator, |
285 | 0 | std::chrono::milliseconds timeout) { |
286 | 0 | absl::MutexLock lock(&lock_); |
287 | 0 | if (!waitForWithDispatcherRun( |
288 | 0 | time_system_, lock_, |
289 | 0 | [this, data_validator]() |
290 | 0 | ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return data_validator(body_.toString()); }, |
291 | 0 | client_dispatcher, timeout)) { |
292 | 0 | return AssertionFailure() << "Timed out waiting for data."; |
293 | 0 | } |
294 | 0 | return AssertionSuccess(); |
295 | 0 | } |
296 | | |
297 | | AssertionResult FakeStream::waitForEndStream(Event::Dispatcher& client_dispatcher, |
298 | 0 | milliseconds timeout) { |
299 | 0 | absl::MutexLock lock(&lock_); |
300 | 0 | if (!waitForWithDispatcherRun( |
301 | 0 | time_system_, lock_, |
302 | 0 | [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return end_stream_; }, client_dispatcher, |
303 | 0 | timeout)) { |
304 | 0 | return AssertionFailure() << "Timed out waiting for end of stream."; |
305 | 0 | } |
306 | 0 | return AssertionSuccess(); |
307 | 0 | } |
308 | | |
309 | 0 | AssertionResult FakeStream::waitForReset(milliseconds timeout) { |
310 | 0 | absl::MutexLock lock(&lock_); |
311 | 0 | if (!time_system_.waitFor(lock_, absl::Condition(&saw_reset_), timeout)) { |
312 | 0 | return AssertionFailure() << "Timed out waiting for reset."; |
313 | 0 | } |
314 | 0 | return AssertionSuccess(); |
315 | 0 | } |
316 | | |
317 | 14 | void FakeStream::startGrpcStream(bool send_headers) { |
318 | 14 | ASSERT(!grpc_stream_started_, "gRPC stream should not be started more than once"); |
319 | 14 | grpc_stream_started_ = true; |
320 | 14 | if (send_headers) { |
321 | 14 | encodeHeaders(Http::TestResponseHeaderMapImpl{{":status", "200"}}, false); |
322 | 14 | } |
323 | 14 | } |
324 | | |
325 | 0 | void FakeStream::finishGrpcStream(Grpc::Status::GrpcStatus status) { |
326 | 0 | encodeTrailers(Http::TestResponseTrailerMapImpl{ |
327 | 0 | {"grpc-status", std::to_string(static_cast<uint32_t>(status))}}); |
328 | 0 | } |
329 | | |
330 | | class TestHttp1ServerConnectionImpl : public Http::Http1::ServerConnectionImpl { |
331 | | public: |
332 | | using Http::Http1::ServerConnectionImpl::ServerConnectionImpl; |
333 | | }; |
334 | | |
335 | | class TestHttp2ServerConnectionImpl : public Http::Http2::ServerConnectionImpl { |
336 | | public: |
337 | | TestHttp2ServerConnectionImpl( |
338 | | Network::Connection& connection, Http::ServerConnectionCallbacks& callbacks, |
339 | | Http::Http2::CodecStats& stats, Random::RandomGenerator& random_generator, |
340 | | const envoy::config::core::v3::Http2ProtocolOptions& http2_options, |
341 | | const uint32_t max_request_headers_kb, const uint32_t max_request_headers_count, |
342 | | envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction |
343 | | headers_with_underscores_action, |
344 | | Server::OverloadManager& overload_manager) |
345 | | : ServerConnectionImpl(connection, callbacks, stats, random_generator, http2_options, |
346 | | max_request_headers_kb, max_request_headers_count, |
347 | 591 | headers_with_underscores_action, overload_manager) {} |
348 | | |
349 | 0 | void updateConcurrentStreams(uint32_t max_streams) { |
350 | 0 | absl::InlinedVector<http2::adapter::Http2Setting, 1> settings; |
351 | 0 | settings.push_back({http2::adapter::MAX_CONCURRENT_STREAMS, max_streams}); |
352 | 0 | adapter_->SubmitSettings(settings); |
353 | 0 | const int rc = adapter_->Send(); |
354 | 0 | ASSERT(rc == 0); |
355 | 0 | } |
356 | | }; |
357 | | |
358 | | namespace { |
359 | | // Fake upstream codec will not do path normalization, so the tests can observe |
360 | | // the path forwarded by Envoy. |
361 | | ::envoy::extensions::http::header_validators::envoy_default::v3::HeaderValidatorConfig |
362 | 591 | fakeUpstreamHeaderValidatorConfig() { |
363 | 591 | ::envoy::extensions::http::header_validators::envoy_default::v3::HeaderValidatorConfig config; |
364 | 591 | config.mutable_uri_path_normalization_options()->set_skip_path_normalization(true); |
365 | 591 | config.mutable_uri_path_normalization_options()->set_skip_merging_slashes(true); |
366 | 591 | config.mutable_uri_path_normalization_options()->set_path_with_escaped_slashes_action( |
367 | 591 | ::envoy::extensions::http::header_validators::envoy_default::v3::HeaderValidatorConfig:: |
368 | 591 | UriPathNormalizationOptions::KEEP_UNCHANGED); |
369 | 591 | return config; |
370 | 591 | } |
371 | | } // namespace |
372 | | |
373 | | FakeHttpConnection::FakeHttpConnection( |
374 | | FakeUpstream& fake_upstream, SharedConnectionWrapper& shared_connection, Http::CodecType type, |
375 | | Event::TestTimeSystem& time_system, uint32_t max_request_headers_kb, |
376 | | uint32_t max_request_headers_count, |
377 | | envoy::config::core::v3::HttpProtocolOptions::HeadersWithUnderscoresAction |
378 | | headers_with_underscores_action) |
379 | | : FakeConnectionBase(shared_connection, time_system), type_(type), |
380 | | header_validator_factory_( |
381 | 591 | IntegrationUtil::makeHeaderValidationFactory(fakeUpstreamHeaderValidatorConfig())) { |
382 | 591 | ASSERT(max_request_headers_count != 0); |
383 | 591 | if (type == Http::CodecType::HTTP1) { |
384 | 0 | Http::Http1Settings http1_settings; |
385 | 0 | http1_settings.use_balsa_parser_ = |
386 | 0 | Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http1_use_balsa_parser"); |
387 | | // For the purpose of testing, we always have the upstream encode the trailers if any |
388 | 0 | http1_settings.enable_trailers_ = true; |
389 | 0 | Http::Http1::CodecStats& stats = fake_upstream.http1CodecStats(); |
390 | 0 | codec_ = std::make_unique<TestHttp1ServerConnectionImpl>( |
391 | 0 | shared_connection_.connection(), stats, *this, http1_settings, max_request_headers_kb, |
392 | 0 | max_request_headers_count, headers_with_underscores_action, overload_manager_); |
393 | 591 | } else if (type == Http::CodecType::HTTP2) { |
394 | 591 | envoy::config::core::v3::Http2ProtocolOptions http2_options = fake_upstream.http2Options(); |
395 | 591 | Http::Http2::CodecStats& stats = fake_upstream.http2CodecStats(); |
396 | 591 | codec_ = std::make_unique<TestHttp2ServerConnectionImpl>( |
397 | 591 | shared_connection_.connection(), *this, stats, random_, http2_options, |
398 | 591 | max_request_headers_kb, max_request_headers_count, headers_with_underscores_action, |
399 | 591 | overload_manager_); |
400 | 591 | } else { |
401 | 0 | ASSERT(type == Http::CodecType::HTTP3); |
402 | 0 | #ifdef ENVOY_ENABLE_QUIC |
403 | 0 | Http::Http3::CodecStats& stats = fake_upstream.http3CodecStats(); |
404 | 0 | codec_ = std::make_unique<Quic::QuicHttpServerConnectionImpl>( |
405 | 0 | dynamic_cast<Quic::EnvoyQuicServerSession&>(shared_connection_.connection()), *this, stats, |
406 | 0 | fake_upstream.http3Options(), max_request_headers_kb, max_request_headers_count, |
407 | 0 | headers_with_underscores_action); |
408 | | #else |
409 | | ASSERT(false, "running a QUIC integration test without compiling QUIC"); |
410 | | #endif |
411 | 0 | } |
412 | 591 | shared_connection_.connection().addReadFilter( |
413 | 591 | Network::ReadFilterSharedPtr{new ReadFilter(*this)}); |
414 | 591 | } Envoy::FakeHttpConnection::FakeHttpConnection(Envoy::FakeUpstream&, Envoy::SharedConnectionWrapper&, Envoy::Http::CodecType, Envoy::Event::TestTimeSystem&, unsigned int, unsigned int, envoy::config::core::v3::HttpProtocolOptions_HeadersWithUnderscoresAction) Line | Count | Source | 381 | 577 | IntegrationUtil::makeHeaderValidationFactory(fakeUpstreamHeaderValidatorConfig())) { | 382 | 577 | ASSERT(max_request_headers_count != 0); | 383 | 577 | if (type == Http::CodecType::HTTP1) { | 384 | 0 | Http::Http1Settings http1_settings; | 385 | 0 | http1_settings.use_balsa_parser_ = | 386 | 0 | Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http1_use_balsa_parser"); | 387 | | // For the purpose of testing, we always have the upstream encode the trailers if any | 388 | 0 | http1_settings.enable_trailers_ = true; | 389 | 0 | Http::Http1::CodecStats& stats = fake_upstream.http1CodecStats(); | 390 | 0 | codec_ = std::make_unique<TestHttp1ServerConnectionImpl>( | 391 | 0 | shared_connection_.connection(), stats, *this, http1_settings, max_request_headers_kb, | 392 | 0 | max_request_headers_count, headers_with_underscores_action, overload_manager_); | 393 | 577 | } else if (type == Http::CodecType::HTTP2) { | 394 | 577 | envoy::config::core::v3::Http2ProtocolOptions http2_options = fake_upstream.http2Options(); | 395 | 577 | Http::Http2::CodecStats& stats = fake_upstream.http2CodecStats(); | 396 | 577 | codec_ = std::make_unique<TestHttp2ServerConnectionImpl>( | 397 | 577 | shared_connection_.connection(), *this, stats, random_, http2_options, | 398 | 577 | max_request_headers_kb, max_request_headers_count, headers_with_underscores_action, | 399 | 577 | overload_manager_); | 400 | 577 | } else { | 401 | 0 | ASSERT(type == Http::CodecType::HTTP3); | 402 | 0 | #ifdef ENVOY_ENABLE_QUIC | 403 | 0 | Http::Http3::CodecStats& stats = fake_upstream.http3CodecStats(); | 404 | 0 | codec_ = std::make_unique<Quic::QuicHttpServerConnectionImpl>( | 405 | 0 | dynamic_cast<Quic::EnvoyQuicServerSession&>(shared_connection_.connection()), *this, stats, | 406 | 0 | fake_upstream.http3Options(), max_request_headers_kb, max_request_headers_count, | 407 | 0 | headers_with_underscores_action); | 408 | | #else | 409 | | ASSERT(false, "running a QUIC integration test without compiling QUIC"); | 410 | | #endif | 411 | 0 | } | 412 | 577 | shared_connection_.connection().addReadFilter( | 413 | 577 | Network::ReadFilterSharedPtr{new ReadFilter(*this)}); | 414 | 577 | } |
Envoy::FakeHttpConnection::FakeHttpConnection(Envoy::FakeUpstream&, Envoy::SharedConnectionWrapper&, Envoy::Http::CodecType, Envoy::Event::TestTimeSystem&, unsigned int, unsigned int, envoy::config::core::v3::HttpProtocolOptions_HeadersWithUnderscoresAction) Line | Count | Source | 381 | 14 | IntegrationUtil::makeHeaderValidationFactory(fakeUpstreamHeaderValidatorConfig())) { | 382 | 14 | ASSERT(max_request_headers_count != 0); | 383 | 14 | if (type == Http::CodecType::HTTP1) { | 384 | 0 | Http::Http1Settings http1_settings; | 385 | 0 | http1_settings.use_balsa_parser_ = | 386 | 0 | Runtime::runtimeFeatureEnabled("envoy.reloadable_features.http1_use_balsa_parser"); | 387 | | // For the purpose of testing, we always have the upstream encode the trailers if any | 388 | 0 | http1_settings.enable_trailers_ = true; | 389 | 0 | Http::Http1::CodecStats& stats = fake_upstream.http1CodecStats(); | 390 | 0 | codec_ = std::make_unique<TestHttp1ServerConnectionImpl>( | 391 | 0 | shared_connection_.connection(), stats, *this, http1_settings, max_request_headers_kb, | 392 | 0 | max_request_headers_count, headers_with_underscores_action, overload_manager_); | 393 | 14 | } else if (type == Http::CodecType::HTTP2) { | 394 | 14 | envoy::config::core::v3::Http2ProtocolOptions http2_options = fake_upstream.http2Options(); | 395 | 14 | Http::Http2::CodecStats& stats = fake_upstream.http2CodecStats(); | 396 | 14 | codec_ = std::make_unique<TestHttp2ServerConnectionImpl>( | 397 | 14 | shared_connection_.connection(), *this, stats, random_, http2_options, | 398 | 14 | max_request_headers_kb, max_request_headers_count, headers_with_underscores_action, | 399 | 14 | overload_manager_); | 400 | 14 | } else { | 401 | 0 | ASSERT(type == Http::CodecType::HTTP3); | 402 | 0 | #ifdef ENVOY_ENABLE_QUIC | 403 | 0 | Http::Http3::CodecStats& stats = fake_upstream.http3CodecStats(); | 404 | 0 | codec_ = std::make_unique<Quic::QuicHttpServerConnectionImpl>( | 405 | 0 | dynamic_cast<Quic::EnvoyQuicServerSession&>(shared_connection_.connection()), *this, stats, | 406 | 0 | fake_upstream.http3Options(), max_request_headers_kb, max_request_headers_count, | 407 | 0 | headers_with_underscores_action); | 408 | | #else | 409 | | ASSERT(false, "running a QUIC integration test without compiling QUIC"); | 410 | | #endif | 411 | 0 | } | 412 | 14 | shared_connection_.connection().addReadFilter( | 413 | 14 | Network::ReadFilterSharedPtr{new ReadFilter(*this)}); | 414 | 14 | } |
|
415 | | |
416 | 78 | AssertionResult FakeConnectionBase::close(std::chrono::milliseconds timeout) { |
417 | 78 | ENVOY_LOG(trace, "FakeConnectionBase close"); |
418 | 78 | if (!shared_connection_.connected()) { |
419 | 0 | return AssertionSuccess(); |
420 | 0 | } |
421 | 78 | return shared_connection_.executeOnDispatcher( |
422 | 78 | [](Network::Connection& connection) { |
423 | 74 | connection.close(Network::ConnectionCloseType::FlushWrite); |
424 | 74 | }, |
425 | 78 | timeout); |
426 | 78 | } |
427 | | |
428 | | AssertionResult FakeConnectionBase::close(Network::ConnectionCloseType close_type, |
429 | 0 | std::chrono::milliseconds timeout) { |
430 | 0 | ENVOY_LOG(trace, "FakeConnectionBase close type={}", static_cast<int>(close_type)); |
431 | 0 | if (!shared_connection_.connected()) { |
432 | 0 | return AssertionSuccess(); |
433 | 0 | } |
434 | 0 | return shared_connection_.executeOnDispatcher( |
435 | 0 | [&close_type](Network::Connection& connection) { connection.close(close_type); }, timeout); |
436 | 0 | } |
437 | | |
438 | 0 | AssertionResult FakeConnectionBase::readDisable(bool disable, std::chrono::milliseconds timeout) { |
439 | 0 | return shared_connection_.executeOnDispatcher( |
440 | 0 | [disable](Network::Connection& connection) { connection.readDisable(disable); }, timeout); |
441 | 0 | } |
442 | | |
443 | | namespace { |
444 | 0 | Http::Protocol codeTypeToProtocol(Http::CodecType codec_type) { |
445 | 0 | switch (codec_type) { |
446 | 0 | case Http::CodecType::HTTP1: |
447 | 0 | return Http::Protocol::Http11; |
448 | 0 | case Http::CodecType::HTTP2: |
449 | 0 | return Http::Protocol::Http2; |
450 | 0 | case Http::CodecType::HTTP3: |
451 | 0 | return Http::Protocol::Http3; |
452 | 0 | } |
453 | 0 | PANIC_DUE_TO_CORRUPT_ENUM; |
454 | 0 | } |
455 | | } // namespace |
456 | | |
457 | 1.09k | Http::ServerHeaderValidatorPtr FakeHttpConnection::makeHeaderValidator() { |
458 | 1.09k | return header_validator_factory_ ? header_validator_factory_->createServerHeaderValidator( |
459 | 0 | codeTypeToProtocol(type_), header_validator_stats_) |
460 | 1.09k | : nullptr; |
461 | 1.09k | } |
462 | | |
463 | 14 | Http::RequestDecoder& FakeHttpConnection::newStream(Http::ResponseEncoder& encoder, bool) { |
464 | 14 | absl::MutexLock lock(&lock_); |
465 | 14 | new_streams_.emplace_back(new FakeStream(*this, encoder, time_system_)); |
466 | 14 | return *new_streams_.back(); |
467 | 14 | } |
468 | | |
469 | 0 | void FakeHttpConnection::onGoAway(Http::GoAwayErrorCode code) { |
470 | 0 | ASSERT(type_ != Http::CodecType::HTTP1); |
471 | | // Usually indicates connection level errors, no operations are needed since |
472 | | // the connection will be closed soon. |
473 | 0 | ENVOY_LOG(info, "FakeHttpConnection receives GOAWAY: ", static_cast<int>(code)); |
474 | 0 | } |
475 | | |
476 | 0 | void FakeHttpConnection::encodeGoAway() { |
477 | 0 | ASSERT(type_ != Http::CodecType::HTTP1); |
478 | | |
479 | 0 | postToConnectionThread([this]() { codec_->goAway(); }); |
480 | 0 | } |
481 | | |
482 | 0 | void FakeHttpConnection::updateConcurrentStreams(uint64_t max_streams) { |
483 | 0 | ASSERT(type_ != Http::CodecType::HTTP1); |
484 | | |
485 | 0 | if (type_ == Http::CodecType::HTTP2) { |
486 | 0 | postToConnectionThread([this, max_streams]() { |
487 | 0 | auto codec = dynamic_cast<TestHttp2ServerConnectionImpl*>(codec_.get()); |
488 | 0 | codec->updateConcurrentStreams(max_streams); |
489 | 0 | }); |
490 | 0 | } else { |
491 | 0 | #ifdef ENVOY_ENABLE_QUIC |
492 | 0 | postToConnectionThread([this, max_streams]() { |
493 | 0 | auto codec = dynamic_cast<Quic::QuicHttpServerConnectionImpl*>(codec_.get()); |
494 | 0 | quic::test::QuicSessionPeer::SetMaxOpenIncomingBidirectionalStreams( |
495 | 0 | &codec->quicServerSession(), max_streams); |
496 | 0 | codec->quicServerSession().SendMaxStreams(1, false); |
497 | 0 | }); |
498 | | #else |
499 | | UNREFERENCED_PARAMETER(max_streams); |
500 | | #endif |
501 | 0 | } |
502 | 0 | } |
503 | | |
504 | 0 | void FakeHttpConnection::encodeProtocolError() { |
505 | 0 | ASSERT(type_ != Http::CodecType::HTTP1); |
506 | | |
507 | 0 | Http::Http2::ServerConnectionImpl* codec = |
508 | 0 | dynamic_cast<Http::Http2::ServerConnectionImpl*>(codec_.get()); |
509 | 0 | ASSERT(codec != nullptr); |
510 | 0 | postToConnectionThread([codec]() { |
511 | 0 | Http::Status status = codec->protocolErrorForTest(); |
512 | 0 | ASSERT(Http::getStatusCode(status) == Http::StatusCode::CodecProtocolError); |
513 | 0 | }); |
514 | 0 | } |
515 | | |
516 | 115 | AssertionResult FakeConnectionBase::waitForDisconnect(milliseconds timeout) { |
517 | 115 | ENVOY_LOG(trace, "FakeConnectionBase waiting for disconnect"); |
518 | 115 | absl::MutexLock lock(&lock_); |
519 | 117 | const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { |
520 | 117 | return !shared_connection_.connectedLockHeld(); |
521 | 117 | }; |
522 | | |
523 | 115 | if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { |
524 | 0 | if (timeout == TestUtility::DefaultTimeout) { |
525 | 0 | ADD_FAILURE() << "Please don't waitForDisconnect with a 5s timeout if failure is expected\n"; |
526 | 0 | } |
527 | 0 | return AssertionFailure() << "Timed out waiting for disconnect."; |
528 | 0 | } |
529 | 115 | ENVOY_LOG(trace, "FakeConnectionBase done waiting for disconnect"); |
530 | 115 | return AssertionSuccess(); |
531 | 115 | } |
532 | | |
533 | 0 | AssertionResult FakeConnectionBase::waitForRstDisconnect(std::chrono::milliseconds timeout) { |
534 | 0 | ENVOY_LOG(trace, "FakeConnectionBase waiting for RST disconnect"); |
535 | 0 | absl::MutexLock lock(&lock_); |
536 | 0 | const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { |
537 | 0 | return shared_connection_.rstDisconnected(); |
538 | 0 | }; |
539 | |
|
540 | 0 | if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { |
541 | 0 | if (timeout == TestUtility::DefaultTimeout) { |
542 | 0 | ADD_FAILURE() |
543 | 0 | << "Please don't waitForRstDisconnect with a 5s timeout if failure is expected\n"; |
544 | 0 | } |
545 | 0 | return AssertionFailure() << "Timed out waiting for RST disconnect."; |
546 | 0 | } |
547 | 0 | ENVOY_LOG(trace, "FakeConnectionBase done waiting for RST disconnect"); |
548 | 0 | return AssertionSuccess(); |
549 | 0 | } |
550 | | |
551 | 0 | AssertionResult FakeConnectionBase::waitForHalfClose(milliseconds timeout) { |
552 | 0 | absl::MutexLock lock(&lock_); |
553 | 0 | if (!time_system_.waitFor(lock_, absl::Condition(&half_closed_), timeout)) { |
554 | 0 | return AssertionFailure() << "Timed out waiting for half close."; |
555 | 0 | } |
556 | 0 | return AssertionSuccess(); |
557 | 0 | } |
558 | | |
559 | 0 | AssertionResult FakeConnectionBase::waitForNoPost(milliseconds timeout) { |
560 | 0 | absl::MutexLock lock(&lock_); |
561 | 0 | if (!time_system_.waitFor( |
562 | 0 | lock_, |
563 | 0 | absl::Condition( |
564 | 0 | [](void* fake_connection) -> bool { |
565 | 0 | return static_cast<FakeConnectionBase*>(fake_connection)->pending_cbs_ == 0; |
566 | 0 | }, |
567 | 0 | this), |
568 | 0 | timeout)) { |
569 | 0 | return AssertionFailure() << "Timed out waiting for ops on this connection"; |
570 | 0 | } |
571 | 0 | return AssertionSuccess(); |
572 | 0 | } |
573 | | |
574 | 3.41k | void FakeConnectionBase::postToConnectionThread(std::function<void()> cb) { |
575 | 3.41k | ++pending_cbs_; |
576 | 3.41k | dispatcher_.post([this, cb]() { |
577 | 3.41k | cb(); |
578 | 3.41k | --pending_cbs_; |
579 | 3.41k | }); |
580 | 3.41k | } |
581 | | |
582 | | AssertionResult FakeHttpConnection::waitForNewStream(Event::Dispatcher& client_dispatcher, |
583 | | FakeStreamPtr& stream, |
584 | 14 | std::chrono::milliseconds timeout) { |
585 | 14 | absl::MutexLock lock(&lock_); |
586 | 14 | if (!waitForWithDispatcherRun( |
587 | 14 | time_system_, lock_, |
588 | 20 | [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return !new_streams_.empty(); }, |
589 | 14 | client_dispatcher, timeout)) { |
590 | 0 | return AssertionFailure() << "Timed out waiting for new stream."; |
591 | 0 | } |
592 | 14 | stream = std::move(new_streams_.front()); |
593 | 14 | new_streams_.pop_front(); |
594 | 14 | return AssertionSuccess(); |
595 | 14 | } |
596 | | |
597 | | FakeUpstream::FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory, |
598 | | const std::string& uds_path, const FakeUpstreamConfig& config) |
599 | | : FakeUpstream(std::move(transport_socket_factory), |
600 | | Network::SocketPtr{new Network::UdsListenSocket( |
601 | | std::make_shared<Network::Address::PipeInstance>(uds_path))}, |
602 | 0 | config) {} |
603 | | |
604 | | static Network::SocketPtr |
605 | 2.66k | makeTcpListenSocket(const Network::Address::InstanceConstSharedPtr& address) { |
606 | 2.66k | return std::make_unique<Network::TcpListenSocket>(address, nullptr, true); |
607 | 2.66k | } |
608 | | |
609 | | static Network::Address::InstanceConstSharedPtr makeAddress(uint32_t port, |
610 | 14 | Network::Address::IpVersion version) { |
611 | 14 | return Network::Utility::parseInternetAddress(Network::Test::getLoopbackAddressString(version), |
612 | 14 | port); |
613 | 14 | } |
614 | | |
615 | | static Network::SocketPtr |
616 | 0 | makeUdpListenSocket(const Network::Address::InstanceConstSharedPtr& address) { |
617 | 0 | auto socket = std::make_unique<Network::UdpListenSocket>(address, nullptr, true); |
618 | | // TODO(mattklein123): These options are set in multiple locations. We should centralize them for |
619 | | // UDP listeners. |
620 | 0 | socket->addOptions(Network::SocketOptionFactory::buildIpPacketInfoOptions()); |
621 | 0 | socket->addOptions(Network::SocketOptionFactory::buildRxQueueOverFlowOptions()); |
622 | 0 | return socket; |
623 | 0 | } |
624 | | |
625 | | static Network::SocketPtr |
626 | | makeListenSocket(const FakeUpstreamConfig& config, |
627 | 2.66k | const Network::Address::InstanceConstSharedPtr& address) { |
628 | 2.66k | return (config.udp_fake_upstream_.has_value() ? makeUdpListenSocket(address) |
629 | 2.66k | : makeTcpListenSocket(address)); |
630 | 2.66k | } |
631 | | |
632 | | FakeUpstream::FakeUpstream(uint32_t port, Network::Address::IpVersion version, |
633 | | const FakeUpstreamConfig& config, const bool defer_initialization) |
634 | | : FakeUpstream(Network::Test::createRawBufferDownstreamSocketFactory(), |
635 | | makeListenSocket(config, makeAddress(port, version)), config, |
636 | 14 | defer_initialization) {} |
637 | | |
638 | | FakeUpstream::FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory, |
639 | | const Network::Address::InstanceConstSharedPtr& address, |
640 | | const FakeUpstreamConfig& config) |
641 | 2.64k | : FakeUpstream(std::move(transport_socket_factory), makeListenSocket(config, address), config) { |
642 | 2.64k | } |
643 | | |
644 | | FakeUpstream::FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory, |
645 | | uint32_t port, Network::Address::IpVersion version, |
646 | | const FakeUpstreamConfig& config) |
647 | | : FakeUpstream(std::move(transport_socket_factory), |
648 | 0 | makeListenSocket(config, makeAddress(port, version)), config) {} |
649 | | |
650 | | FakeUpstream::FakeUpstream(Network::DownstreamTransportSocketFactoryPtr&& transport_socket_factory, |
651 | | Network::SocketPtr&& listen_socket, const FakeUpstreamConfig& config, |
652 | | const bool defer_initialization) |
653 | | : http_type_(config.upstream_protocol_), http2_options_(config.http2_options_), |
654 | | http3_options_(config.http3_options_), quic_options_(config.quic_options_), |
655 | | socket_(Network::SocketSharedPtr(listen_socket.release())), |
656 | | api_(Api::createApiForTest(stats_store_)), time_system_(config.time_system_), |
657 | | dispatcher_(api_->allocateDispatcher("fake_upstream")), |
658 | | handler_(new Server::ConnectionHandlerImpl(*dispatcher_, 0)), config_(config), |
659 | | read_disable_on_new_connection_(true), enable_half_close_(config.enable_half_close_), |
660 | | listener_(*this, http_type_ == Http::CodecType::HTTP3), |
661 | | filter_chain_(Network::Test::createEmptyFilterChain(std::move(transport_socket_factory))), |
662 | 2.66k | stats_scope_(stats_store_.createScope("test_server_scope")) { |
663 | 2.66k | socket_factories_.emplace_back(std::make_unique<FakeListenSocketFactory>(socket_)); |
664 | 2.66k | ENVOY_LOG(info, "starting fake server at {}. UDP={} codec={}", localAddress()->asString(), |
665 | 2.66k | config.udp_fake_upstream_.has_value(), FakeHttpConnection::typeToString(http_type_)); |
666 | 2.66k | if (config.udp_fake_upstream_.has_value() && |
667 | 2.66k | config.udp_fake_upstream_->max_rx_datagram_size_.has_value()) { |
668 | 0 | listener_.udp_listener_config_.config_.mutable_downstream_socket_config() |
669 | 0 | ->mutable_max_rx_datagram_size() |
670 | 0 | ->set_value(config.udp_fake_upstream_->max_rx_datagram_size_.value()); |
671 | 0 | } |
672 | | |
673 | 2.66k | if (!defer_initialization) { |
674 | 2.66k | initializeServer(); |
675 | 2.66k | } |
676 | 2.66k | } |
677 | | |
678 | 2.66k | FakeUpstream::~FakeUpstream() { cleanUp(); }; |
679 | | |
680 | 2.66k | void FakeUpstream::initializeServer() { |
681 | 2.66k | if (initialized_) { |
682 | | // Already initialized. |
683 | 0 | return; |
684 | 0 | } |
685 | | |
686 | 2.66k | dispatcher_->post([this]() -> void { |
687 | 2.66k | socket_factories_[0]->doFinalPreWorkerInit(); |
688 | 2.66k | handler_->addListener(absl::nullopt, listener_, runtime_); |
689 | 2.66k | server_initialized_.setReady(); |
690 | 2.66k | }); |
691 | 2.66k | thread_ = api_->threadFactory().createThread([this]() -> void { threadRoutine(); }); |
692 | 2.66k | server_initialized_.waitReady(); |
693 | 2.66k | initialized_ = true; |
694 | 2.66k | } |
695 | | |
696 | 3.25k | void FakeUpstream::cleanUp() { |
697 | 3.25k | if (thread_.get()) { |
698 | 2.66k | dispatcher_->exit(); |
699 | 2.66k | thread_->join(); |
700 | 2.66k | thread_.reset(); |
701 | 2.66k | } |
702 | 3.25k | } |
703 | | |
704 | | bool FakeUpstream::createNetworkFilterChain(Network::Connection& connection, |
705 | 144 | const Filter::NetworkFilterFactoriesList&) { |
706 | 144 | absl::MutexLock lock(&lock_); |
707 | 144 | if (read_disable_on_new_connection_ && http_type_ != Http::CodecType::HTTP3) { |
708 | | // Disable early close detection to avoid closing the network connection before full |
709 | | // initialization is complete. |
710 | 144 | connection.detectEarlyCloseWhenReadDisabled(false); |
711 | 144 | connection.readDisable(true); |
712 | 144 | if (disable_and_do_not_enable_) { |
713 | 0 | dynamic_cast<Network::ConnectionImpl*>(&connection)->ioHandle().enableFileEvents(0); |
714 | 0 | } |
715 | 144 | } |
716 | 144 | auto connection_wrapper = std::make_unique<SharedConnectionWrapper>(connection); |
717 | | |
718 | 144 | LinkedList::moveIntoListBack(std::move(connection_wrapper), new_connections_); |
719 | | |
720 | | // Normally we don't associate a logical network connection with a FakeHttpConnection until |
721 | | // waitForHttpConnection is called, but QUIC needs to be set up as packets come in, so we do |
722 | | // not lazily create for HTTP/3 |
723 | 144 | if (http_type_ == Http::CodecType::HTTP3) { |
724 | 0 | quic_connections_.push_back(std::make_unique<FakeHttpConnection>( |
725 | 0 | *this, consumeConnection(), http_type_, time_system_, config_.max_request_headers_kb_, |
726 | 0 | config_.max_request_headers_count_, config_.headers_with_underscores_action_)); |
727 | 0 | quic_connections_.back()->initialize(); |
728 | 0 | } |
729 | 144 | return true; |
730 | 144 | } |
731 | | |
732 | 144 | bool FakeUpstream::createListenerFilterChain(Network::ListenerFilterManager&) { return true; } |
733 | | |
734 | | void FakeUpstream::createUdpListenerFilterChain(Network::UdpListenerFilterManager& udp_listener, |
735 | 0 | Network::UdpReadFilterCallbacks& callbacks) { |
736 | 0 | udp_listener.addReadFilter(std::make_unique<FakeUdpFilter>(*this, callbacks)); |
737 | 0 | } |
738 | | |
739 | 0 | bool FakeUpstream::createQuicListenerFilterChain(Network::QuicListenerFilterManager&) { |
740 | 0 | return true; |
741 | 0 | } |
742 | | |
743 | 2.66k | void FakeUpstream::threadRoutine() { |
744 | 2.66k | dispatcher_->run(Event::Dispatcher::RunType::Block); |
745 | 2.66k | handler_.reset(); |
746 | 2.66k | { |
747 | 2.66k | absl::MutexLock lock(&lock_); |
748 | 2.66k | new_connections_.clear(); |
749 | 2.66k | quic_connections_.clear(); |
750 | 2.66k | consumed_connections_.clear(); |
751 | 2.66k | } |
752 | 2.66k | } |
753 | | |
754 | | AssertionResult FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher, |
755 | | FakeHttpConnectionPtr& connection, |
756 | 14 | milliseconds timeout) { |
757 | 14 | if (!initialized_) { |
758 | 0 | return AssertionFailure() |
759 | 0 | << "Must initialize the FakeUpstream first by calling initializeServer()."; |
760 | 0 | } |
761 | | |
762 | 14 | { |
763 | 14 | absl::MutexLock lock(&lock_); |
764 | | |
765 | | // As noted in createNetworkFilterChain, HTTP3 FakeHttpConnections are not |
766 | | // lazily created, so HTTP3 needs a different wait path here. |
767 | 14 | if (http_type_ == Http::CodecType::HTTP3) { |
768 | 0 | if (quic_connections_.empty() && |
769 | 0 | !waitForWithDispatcherRun( |
770 | 0 | time_system_, lock_, |
771 | 0 | [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return !quic_connections_.empty(); }, |
772 | 0 | client_dispatcher, timeout)) { |
773 | 0 | return AssertionFailure() << "Timed out waiting for new quic connection."; |
774 | 0 | } |
775 | 0 | if (!quic_connections_.empty()) { |
776 | 0 | connection = std::move(quic_connections_.front()); |
777 | 0 | quic_connections_.pop_front(); |
778 | 0 | return AssertionSuccess(); |
779 | 0 | } |
780 | 0 | } |
781 | | |
782 | 14 | if (!waitForWithDispatcherRun( |
783 | 14 | time_system_, lock_, |
784 | 14 | [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return !new_connections_.empty(); }, |
785 | 14 | client_dispatcher, timeout)) { |
786 | 0 | if (timeout == TestUtility::DefaultTimeout) { |
787 | 0 | ADD_FAILURE() |
788 | 0 | << "Please don't waitForHttpConnection with a 5s timeout if failure is expected\n"; |
789 | 0 | } |
790 | 0 | return AssertionFailure() << "Timed out waiting for new connection."; |
791 | 0 | } |
792 | 14 | } |
793 | 14 | return runOnDispatcherThreadAndWait([&]() { |
794 | 14 | absl::MutexLock lock(&lock_); |
795 | 14 | connection = std::make_unique<FakeHttpConnection>( |
796 | 14 | *this, consumeConnection(), http_type_, time_system_, config_.max_request_headers_kb_, |
797 | 14 | config_.max_request_headers_count_, config_.headers_with_underscores_action_); |
798 | 14 | connection->initialize(); |
799 | 14 | return AssertionSuccess(); |
800 | 14 | }); |
801 | 14 | } |
802 | | |
803 | | AssertionResult |
804 | | FakeUpstream::waitForHttpConnection(Event::Dispatcher& client_dispatcher, |
805 | | std::vector<std::unique_ptr<FakeUpstream>>& upstreams, |
806 | 0 | FakeHttpConnectionPtr& connection, milliseconds timeout) { |
807 | 0 | if (upstreams.empty()) { |
808 | 0 | return AssertionFailure() << "No upstreams configured."; |
809 | 0 | } |
810 | 0 | Event::TestTimeSystem::RealTimeBound bound(timeout); |
811 | 0 | while (bound.withinBound()) { |
812 | 0 | for (auto& it : upstreams) { |
813 | 0 | FakeUpstream& upstream = *it; |
814 | 0 | { |
815 | 0 | absl::MutexLock lock(&upstream.lock_); |
816 | 0 | if (!upstream.isInitialized()) { |
817 | 0 | return AssertionFailure() |
818 | 0 | << "Must initialize the FakeUpstream first by calling initializeServer()."; |
819 | 0 | } |
820 | 0 | if (!waitForWithDispatcherRun( |
821 | 0 | upstream.time_system_, upstream.lock_, |
822 | 0 | [&upstream]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(upstream.lock_) { |
823 | 0 | return !upstream.new_connections_.empty(); |
824 | 0 | }, |
825 | 0 | client_dispatcher, 5ms)) { |
826 | 0 | continue; |
827 | 0 | } |
828 | 0 | } |
829 | | |
830 | 0 | return upstream.runOnDispatcherThreadAndWait([&]() { |
831 | 0 | absl::MutexLock lock(&upstream.lock_); |
832 | 0 | connection = std::make_unique<FakeHttpConnection>( |
833 | 0 | upstream, upstream.consumeConnection(), upstream.http_type_, upstream.timeSystem(), |
834 | 0 | Http::DEFAULT_MAX_REQUEST_HEADERS_KB, Http::DEFAULT_MAX_HEADERS_COUNT, |
835 | 0 | envoy::config::core::v3::HttpProtocolOptions::ALLOW); |
836 | 0 | connection->initialize(); |
837 | 0 | return AssertionSuccess(); |
838 | 0 | }); |
839 | 0 | } |
840 | 0 | } |
841 | 0 | return AssertionFailure() << "Timed out waiting for HTTP connection."; |
842 | 0 | } |
843 | | |
844 | | ABSL_MUST_USE_RESULT |
845 | 0 | AssertionResult FakeUpstream::assertPendingConnectionsEmpty() { |
846 | 0 | return runOnDispatcherThreadAndWait([&]() { |
847 | 0 | absl::MutexLock lock(&lock_); |
848 | 0 | return new_connections_.empty() ? AssertionSuccess() : AssertionFailure(); |
849 | 0 | }); |
850 | 0 | } |
851 | | |
852 | | AssertionResult FakeUpstream::waitForRawConnection(FakeRawConnectionPtr& connection, |
853 | 261 | milliseconds timeout) { |
854 | 261 | if (!initialized_) { |
855 | 0 | return AssertionFailure() |
856 | 0 | << "Must initialize the FakeUpstream first by calling initializeServer()."; |
857 | 0 | } |
858 | | |
859 | 261 | { |
860 | 261 | absl::MutexLock lock(&lock_); |
861 | 570 | const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { |
862 | 570 | return !new_connections_.empty(); |
863 | 570 | }; |
864 | | |
865 | 261 | ENVOY_LOG(debug, "waiting for raw connection"); |
866 | 261 | if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { |
867 | 149 | return AssertionFailure() << "Timed out waiting for raw connection"; |
868 | 149 | } |
869 | 261 | } |
870 | | |
871 | 112 | return runOnDispatcherThreadAndWait([&]() { |
872 | 112 | absl::MutexLock lock(&lock_); |
873 | 112 | connection = makeRawConnection(consumeConnection(), timeSystem()); |
874 | 112 | connection->initialize(); |
875 | | // Skip enableHalfClose if the connection is already disconnected. |
876 | 112 | if (connection->connected()) { |
877 | 112 | connection->connection().enableHalfClose(enable_half_close_); |
878 | 112 | } |
879 | 112 | return AssertionSuccess(); |
880 | 112 | }); |
881 | 261 | } |
882 | | |
883 | | void FakeUpstream::convertFromRawToHttp(FakeRawConnectionPtr& raw_connection, |
884 | 0 | FakeHttpConnectionPtr& connection) { |
885 | 0 | absl::MutexLock lock(&lock_); |
886 | 0 | SharedConnectionWrapper& shared_connection = raw_connection->sharedConnection(); |
887 | |
|
888 | 0 | connection = std::make_unique<FakeHttpConnection>( |
889 | 0 | *this, shared_connection, http_type_, time_system_, config_.max_request_headers_kb_, |
890 | 0 | config_.max_request_headers_count_, config_.headers_with_underscores_action_); |
891 | 0 | connection->initialize(); |
892 | 0 | raw_connection.release(); |
893 | 0 | } |
894 | | |
895 | 126 | SharedConnectionWrapper& FakeUpstream::consumeConnection() { |
896 | 126 | ASSERT(!new_connections_.empty()); |
897 | 126 | auto* const connection_wrapper = new_connections_.front().get(); |
898 | | // Skip the thread safety check if the network connection has already been freed since there's no |
899 | | // alternate way to get access to the dispatcher. |
900 | 126 | ASSERT(!connection_wrapper->connected() || connection_wrapper->dispatcher().isThreadSafe()); |
901 | 126 | connection_wrapper->setParented(); |
902 | 126 | connection_wrapper->moveBetweenLists(new_connections_, consumed_connections_); |
903 | 126 | if (read_disable_on_new_connection_ && connection_wrapper->connected() && |
904 | 126 | http_type_ != Http::CodecType::HTTP3 && !disable_and_do_not_enable_) { |
905 | | // Re-enable read and early close detection. |
906 | 126 | auto& connection = connection_wrapper->connection(); |
907 | 126 | connection.detectEarlyCloseWhenReadDisabled(true); |
908 | 126 | connection.readDisable(false); |
909 | 126 | } |
910 | 126 | return *connection_wrapper; |
911 | 126 | } |
912 | | |
913 | | AssertionResult FakeUpstream::waitForUdpDatagram(Network::UdpRecvData& data_to_fill, |
914 | 0 | std::chrono::milliseconds timeout) { |
915 | 0 | if (!initialized_) { |
916 | 0 | return AssertionFailure() |
917 | 0 | << "Must initialize the FakeUpstream first by calling initializeServer()."; |
918 | 0 | } |
919 | | |
920 | 0 | absl::MutexLock lock(&lock_); |
921 | 0 | const auto reached = [this]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { |
922 | 0 | return !received_datagrams_.empty(); |
923 | 0 | }; |
924 | |
|
925 | 0 | if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { |
926 | 0 | return AssertionFailure() << "Timed out waiting for UDP datagram."; |
927 | 0 | } |
928 | | |
929 | 0 | data_to_fill = std::move(received_datagrams_.front()); |
930 | 0 | received_datagrams_.pop_front(); |
931 | 0 | return AssertionSuccess(); |
932 | 0 | } |
933 | | |
934 | 0 | Network::FilterStatus FakeUpstream::onRecvDatagram(Network::UdpRecvData& data) { |
935 | 0 | absl::MutexLock lock(&lock_); |
936 | 0 | received_datagrams_.emplace_back(std::move(data)); |
937 | |
|
938 | 0 | return Network::FilterStatus::StopIteration; |
939 | 0 | } |
940 | | |
941 | | AssertionResult FakeUpstream::runOnDispatcherThreadAndWait(std::function<AssertionResult()> cb, |
942 | 126 | std::chrono::milliseconds timeout) { |
943 | 126 | auto result = std::make_shared<AssertionResult>(AssertionSuccess()); |
944 | 126 | auto done = std::make_shared<absl::Notification>(); |
945 | 126 | ASSERT(!dispatcher_->isThreadSafe()); |
946 | 126 | dispatcher_->post([&]() { |
947 | 126 | *result = cb(); |
948 | 126 | done->Notify(); |
949 | 126 | }); |
950 | 126 | RELEASE_ASSERT(done->WaitForNotificationWithTimeout(absl::FromChrono(timeout)), |
951 | 126 | "Timed out waiting for cb to run on dispatcher"); |
952 | 126 | return *result; |
953 | 126 | } |
954 | | |
955 | | void FakeUpstream::sendUdpDatagram(const std::string& buffer, |
956 | 0 | const Network::Address::InstanceConstSharedPtr& peer) { |
957 | 0 | dispatcher_->post([this, buffer, peer] { |
958 | 0 | const auto rc = Network::Utility::writeToSocket(socket_->ioHandle(), Buffer::OwnedImpl(buffer), |
959 | 0 | nullptr, *peer); |
960 | 0 | EXPECT_TRUE(rc.return_value_ == buffer.length()); |
961 | 0 | }); |
962 | 0 | } |
963 | | |
964 | | AssertionResult FakeUpstream::rawWriteConnection(uint32_t index, const std::string& data, |
965 | | bool end_stream, |
966 | 0 | std::chrono::milliseconds timeout) { |
967 | 0 | if (!initialized_) { |
968 | 0 | return AssertionFailure() |
969 | 0 | << "Must initialize the FakeUpstream first by calling initializeServer()."; |
970 | 0 | } |
971 | | |
972 | 0 | absl::MutexLock lock(&lock_); |
973 | 0 | auto iter = consumed_connections_.begin(); |
974 | 0 | std::advance(iter, index); |
975 | 0 | return (*iter)->executeOnDispatcher( |
976 | 0 | [data, end_stream](Network::Connection& connection) { |
977 | 0 | ASSERT(connection.state() == Network::Connection::State::Open); |
978 | 0 | Buffer::OwnedImpl buffer(data); |
979 | 0 | connection.write(buffer, end_stream); |
980 | 0 | }, |
981 | 0 | timeout); |
982 | 0 | } |
983 | | |
984 | 2.66k | void FakeUpstream::FakeListenSocketFactory::doFinalPreWorkerInit() { |
985 | 2.66k | if (socket_->socketType() == Network::Socket::Type::Stream) { |
986 | 2.66k | ASSERT_EQ(0, socket_->ioHandle().listen(ENVOY_TCP_BACKLOG_SIZE).return_value_); |
987 | 2.66k | } else { |
988 | 0 | ASSERT(socket_->socketType() == Network::Socket::Type::Datagram); |
989 | 0 | ASSERT_TRUE(Network::Socket::applyOptions(socket_->options(), *socket_, |
990 | 0 | envoy::config::core::v3::SocketOption::STATE_BOUND)); |
991 | 0 | } |
992 | 2.66k | } |
993 | | |
994 | 112 | FakeRawConnection::~FakeRawConnection() { |
995 | | // If the filter was already deleted, it means the shared_connection_ was too, so don't try to |
996 | | // access it. |
997 | 112 | if (read_filter_ != nullptr) { |
998 | 112 | EXPECT_TRUE(shared_connection_.executeOnDispatcher( |
999 | 112 | [filter = std::move(read_filter_)](Network::Connection& connection) { |
1000 | 112 | connection.removeReadFilter(filter); |
1001 | 112 | })); |
1002 | 112 | } |
1003 | 112 | } |
1004 | | |
1005 | 112 | void FakeRawConnection::initialize() { |
1006 | 112 | FakeConnectionBase::initialize(); |
1007 | 112 | read_filter_ = std::make_shared<ReadFilter>(*this); |
1008 | 112 | if (!shared_connection_.connected()) { |
1009 | 0 | ENVOY_LOG(warn, "FakeRawConnection::initialize: network connection is already disconnected"); |
1010 | 0 | return; |
1011 | 0 | } |
1012 | 112 | ASSERT(shared_connection_.dispatcher().isThreadSafe()); |
1013 | 112 | shared_connection_.connection().addReadFilter(read_filter_); |
1014 | 112 | } |
1015 | | |
1016 | | AssertionResult FakeRawConnection::waitForData(uint64_t num_bytes, std::string* data, |
1017 | 0 | milliseconds timeout) { |
1018 | 0 | absl::MutexLock lock(&lock_); |
1019 | 0 | const auto reached = [this, num_bytes]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { |
1020 | 0 | return data_.size() == num_bytes; |
1021 | 0 | }; |
1022 | 0 | ENVOY_LOG(debug, "waiting for {} bytes of data", num_bytes); |
1023 | 0 | if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { |
1024 | 0 | return AssertionFailure() << fmt::format( |
1025 | 0 | "Timed out waiting for data. Got '{}', waiting for {} bytes.", data_, num_bytes); |
1026 | 0 | } |
1027 | 0 | if (data != nullptr) { |
1028 | 0 | *data = data_; |
1029 | 0 | } |
1030 | 0 | return AssertionSuccess(); |
1031 | 0 | } |
1032 | | |
1033 | | AssertionResult |
1034 | | FakeRawConnection::waitForData(const std::function<bool(const std::string&)>& data_validator, |
1035 | 0 | std::string* data, milliseconds timeout) { |
1036 | 0 | absl::MutexLock lock(&lock_); |
1037 | 0 | const auto reached = [this, &data_validator]() |
1038 | 0 | ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { return data_validator(data_); }; |
1039 | 0 | ENVOY_LOG(debug, "waiting for data"); |
1040 | 0 | if (!time_system_.waitFor(lock_, absl::Condition(&reached), timeout)) { |
1041 | 0 | return AssertionFailure() << "Timed out waiting for data."; |
1042 | 0 | } |
1043 | 0 | if (data != nullptr) { |
1044 | 0 | *data = data_; |
1045 | 0 | } |
1046 | 0 | return AssertionSuccess(); |
1047 | 0 | } |
1048 | | |
1049 | | AssertionResult FakeRawConnection::write(const std::string& data, bool end_stream, |
1050 | 2.18k | milliseconds timeout) { |
1051 | 2.18k | return shared_connection_.executeOnDispatcher( |
1052 | 2.18k | [data, end_stream](Network::Connection& connection) { |
1053 | 2.17k | Buffer::OwnedImpl to_write(data); |
1054 | 2.17k | connection.write(to_write, end_stream); |
1055 | 2.17k | }, |
1056 | 2.18k | timeout); |
1057 | 2.18k | } |
1058 | | |
1059 | | Network::FilterStatus FakeRawConnection::ReadFilter::onData(Buffer::Instance& data, |
1060 | 244 | bool end_stream) { |
1061 | 244 | absl::MutexLock lock(&parent_.lock_); |
1062 | 244 | ENVOY_LOG(debug, "got {} bytes, end_stream {}", data.length(), end_stream); |
1063 | 244 | parent_.data_.append(data.toString()); |
1064 | 244 | parent_.half_closed_ = end_stream; |
1065 | 244 | data.drain(data.length()); |
1066 | 244 | return Network::FilterStatus::StopIteration; |
1067 | 244 | } |
1068 | | |
1069 | | ABSL_MUST_USE_RESULT |
1070 | | AssertionResult FakeHttpConnection::waitForInexactRawData(const char* data, std::string* out, |
1071 | 0 | std::chrono::milliseconds timeout) { |
1072 | 0 | absl::MutexLock lock(&lock_); |
1073 | 0 | const auto reached = [this, data, &out]() ABSL_EXCLUSIVE_LOCKS_REQUIRED(lock_) { |
1074 | 0 | char peek_buf[200]; |
1075 | 0 | auto result = dynamic_cast<Network::ConnectionImpl*>(&connection()) |
1076 | 0 | ->ioHandle() |
1077 | 0 | .recv(peek_buf, 200, MSG_PEEK); |
1078 | 0 | ASSERT(result.ok() || result.err_->getErrorCode() == Api::IoError::IoErrorCode::Again); |
1079 | 0 | if (!result.ok()) { |
1080 | 0 | return false; |
1081 | 0 | } |
1082 | 0 | absl::string_view peek_data(peek_buf, result.return_value_); |
1083 | 0 | size_t index = peek_data.find(data); |
1084 | 0 | if (index != absl::string_view::npos) { |
1085 | 0 | Buffer::OwnedImpl buffer; |
1086 | 0 | *out = std::string(peek_data.data(), index + 4); |
1087 | 0 | auto result = dynamic_cast<Network::ConnectionImpl*>(&connection()) |
1088 | 0 | ->ioHandle() |
1089 | 0 | .recv(peek_buf, index + 4, 0); |
1090 | 0 | return true; |
1091 | 0 | } |
1092 | 0 | return false; |
1093 | 0 | }; |
1094 | | // Because the connection must be read disabled to not auto-consume the |
1095 | | // underlying data, waitFor hangs with no events to force the time system to |
1096 | | // continue. Break it up into smaller chunks. |
1097 | 0 | for (int i = 0; i < timeout / 10ms; ++i) { |
1098 | 0 | if (time_system_.waitFor(lock_, absl::Condition(&reached), 10ms)) { |
1099 | 0 | return AssertionSuccess(); |
1100 | 0 | } |
1101 | 0 | } |
1102 | 0 | return AssertionFailure() << "timed out waiting for raw data"; |
1103 | 0 | } |
1104 | | |
1105 | 0 | void FakeHttpConnection::writeRawData(absl::string_view data) { |
1106 | 0 | Buffer::OwnedImpl buffer(data); |
1107 | 0 | Api::IoCallUint64Result result = |
1108 | 0 | dynamic_cast<Network::ConnectionImpl*>(&connection())->ioHandle().write(buffer); |
1109 | 0 | ASSERT(result.ok()); |
1110 | 0 | } |
1111 | | |
1112 | 0 | AssertionResult FakeHttpConnection::postWriteRawData(std::string data) { |
1113 | 0 | return shared_connection_.executeOnDispatcher( |
1114 | 0 | [data](Network::Connection& connection) { |
1115 | 0 | Buffer::OwnedImpl to_write(data); |
1116 | 0 | connection.write(to_write, false); |
1117 | 0 | }, |
1118 | 0 | TestUtility::DefaultTimeout); |
1119 | 0 | } |
1120 | | |
1121 | | } // namespace Envoy |