/proc/self/cwd/test/integration/utility.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "utility.h" |
2 | | |
3 | | #include <chrono> |
4 | | #include <cstdint> |
5 | | #include <memory> |
6 | | #include <string> |
7 | | |
8 | | #include "envoy/config/bootstrap/v3/bootstrap.pb.h" |
9 | | #include "envoy/event/dispatcher.h" |
10 | | #include "envoy/extensions/transport_sockets/quic/v3/quic_transport.pb.h" |
11 | | #include "envoy/network/connection.h" |
12 | | |
13 | | #include "source/common/api/api_impl.h" |
14 | | #include "source/common/buffer/buffer_impl.h" |
15 | | #include "source/common/common/assert.h" |
16 | | #include "source/common/common/fmt.h" |
17 | | #include "source/common/config/utility.h" |
18 | | #include "source/common/http/header_map_impl.h" |
19 | | #include "source/common/http/headers.h" |
20 | | #include "source/common/network/address_impl.h" |
21 | | #include "source/common/network/utility.h" |
22 | | #include "source/common/quic/quic_stat_names.h" |
23 | | #include "source/common/upstream/upstream_impl.h" |
24 | | #include "source/extensions/http/header_validators/envoy_default/http1_header_validator.h" |
25 | | #include "source/extensions/http/header_validators/envoy_default/http2_header_validator.h" |
26 | | |
27 | | #ifdef ENVOY_ENABLE_QUIC |
28 | | #include "source/common/quic/client_connection_factory_impl.h" |
29 | | #include "source/common/quic/quic_client_transport_socket_factory.h" |
30 | | #include "quiche/quic/core/deterministic_connection_id_generator.h" |
31 | | #endif |
32 | | |
33 | | #ifdef ENVOY_ENABLE_YAML |
34 | | #include "test/common/upstream/utility.h" |
35 | | #include "test/integration/ssl_utility.h" |
36 | | #endif |
37 | | #include "test/mocks/common.h" |
38 | | #include "test/mocks/server/instance.h" |
39 | | #include "test/mocks/server/transport_socket_factory_context.h" |
40 | | #include "test/mocks/stats/mocks.h" |
41 | | #include "test/mocks/upstream/cluster_info.h" |
42 | | #include "test/test_common/environment.h" |
43 | | #include "test/test_common/network_utility.h" |
44 | | #include "test/test_common/printers.h" |
45 | | #include "test/test_common/utility.h" |
46 | | |
47 | | #include "absl/strings/match.h" |
48 | | |
49 | | namespace Envoy { |
50 | | |
51 | | using ::envoy::extensions::http::header_validators::envoy_default::v3::HeaderValidatorConfig; |
52 | | using ::Envoy::Extensions::Http::HeaderValidators::EnvoyDefault::ClientHttp1HeaderValidator; |
53 | | using ::Envoy::Extensions::Http::HeaderValidators::EnvoyDefault::ClientHttp2HeaderValidator; |
54 | | using ::Envoy::Extensions::Http::HeaderValidators::EnvoyDefault::ConfigOverrides; |
55 | | using ::Envoy::Extensions::Http::HeaderValidators::EnvoyDefault::ServerHttp1HeaderValidator; |
56 | | using ::Envoy::Extensions::Http::HeaderValidators::EnvoyDefault::ServerHttp2HeaderValidator; |
57 | | |
58 | | namespace { |
59 | | |
60 | 0 | RawConnectionDriver::DoWriteCallback writeBufferCallback(Buffer::Instance& data) { |
61 | 0 | auto shared_data = std::make_shared<Buffer::OwnedImpl>(); |
62 | 0 | shared_data->move(data); |
63 | 0 | return [shared_data](Buffer::Instance& dest) { |
64 | 0 | if (shared_data->length() > 0) { |
65 | 0 | dest.add(*shared_data); |
66 | 0 | shared_data->drain(shared_data->length()); |
67 | 0 | } |
68 | 0 | return false; |
69 | 0 | }; |
70 | 0 | } |
71 | | |
72 | | } // namespace |
73 | | |
74 | 0 | void BufferingStreamDecoder::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) { |
75 | 0 | ASSERT(!complete_); |
76 | 0 | complete_ = end_stream; |
77 | 0 | headers_ = std::move(headers); |
78 | 0 | if (complete_) { |
79 | 0 | onComplete(); |
80 | 0 | } |
81 | 0 | } |
82 | | |
83 | 0 | void BufferingStreamDecoder::decodeData(Buffer::Instance& data, bool end_stream) { |
84 | 0 | ASSERT(!complete_); |
85 | 0 | complete_ = end_stream; |
86 | 0 | body_.append(data.toString()); |
87 | 0 | if (complete_) { |
88 | 0 | onComplete(); |
89 | 0 | } |
90 | 0 | } |
91 | | |
92 | 0 | void BufferingStreamDecoder::decodeTrailers(Http::ResponseTrailerMapPtr&&) { |
93 | 0 | PANIC("not implemented"); |
94 | 0 | } |
95 | | |
96 | 0 | void BufferingStreamDecoder::onComplete() { |
97 | 0 | ASSERT(complete_); |
98 | 0 | on_complete_cb_(); |
99 | 0 | } |
100 | | |
101 | 0 | void BufferingStreamDecoder::onResetStream(Http::StreamResetReason, absl::string_view) { |
102 | 0 | ADD_FAILURE(); |
103 | 0 | } |
104 | | |
105 | | // A callback for a QUIC client connection to unblock the test after handshake succeeds. QUIC |
106 | | // network connection initiates handshake and raises Connected event when it's done. Tests should |
107 | | // proceed with sending requests afterwards. |
108 | | class TestConnectionCallbacks : public Network::ConnectionCallbacks { |
109 | | public: |
110 | 0 | TestConnectionCallbacks(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {} |
111 | | |
112 | | // Network::ConnectionCallbacks |
113 | 0 | void onEvent(Network::ConnectionEvent event) override { |
114 | 0 | if (event == Network::ConnectionEvent::Connected) { |
115 | | // Handshake finished, unblock the test to continue. This is needed because we call |
116 | | // Dispatcher::run() with Block to wait for the handshake to finish before proceeding. |
117 | | // TODO(danzh) find an alternative approach with behaviors more in parallel with SSL. |
118 | 0 | connected_ = true; |
119 | 0 | dispatcher_.exit(); |
120 | 0 | } else if (event == Network::ConnectionEvent::RemoteClose) { |
121 | | // If the peer closes the connection, no need to wait anymore. |
122 | 0 | dispatcher_.exit(); |
123 | 0 | } else { |
124 | 0 | if (!connected_) { |
125 | | // Before handshake gets established, any connection failure should exit the loop. I.e. a |
126 | | // QUIC connection may fail of INVALID_VERSION if both this client doesn't support any of |
127 | | // the versions the server advertised before handshake established. In this case the |
128 | | // connection is closed locally and this is in a blocking event loop. |
129 | 0 | dispatcher_.exit(); |
130 | 0 | } |
131 | 0 | } |
132 | 0 | } |
133 | | |
134 | 0 | void onAboveWriteBufferHighWatermark() override {} |
135 | 0 | void onBelowWriteBufferLowWatermark() override {} |
136 | | |
137 | | private: |
138 | | Event::Dispatcher& dispatcher_; |
139 | | bool connected_{false}; |
140 | | }; |
141 | | |
142 | | Network::UpstreamTransportSocketFactoryPtr |
143 | | IntegrationUtil::createQuicUpstreamTransportSocketFactory(Api::Api& api, Stats::Store& store, |
144 | | Ssl::ContextManager& context_manager, |
145 | | ThreadLocal::Instance& threadlocal, |
146 | | const std::string& san_to_match, |
147 | 0 | bool connect_to_upstreams) { |
148 | 0 | NiceMock<Server::Configuration::MockTransportSocketFactoryContext> context; |
149 | 0 | ON_CALL(context.server_context_, api()).WillByDefault(testing::ReturnRef(api)); |
150 | 0 | ON_CALL(context, statsScope()).WillByDefault(testing::ReturnRef(*store.rootScope())); |
151 | 0 | ON_CALL(context, sslContextManager()).WillByDefault(testing::ReturnRef(context_manager)); |
152 | 0 | ON_CALL(context.server_context_, threadLocal()).WillByDefault(testing::ReturnRef(threadlocal)); |
153 | 0 | envoy::extensions::transport_sockets::quic::v3::QuicUpstreamTransport |
154 | 0 | quic_transport_socket_config; |
155 | 0 | auto* tls_context = quic_transport_socket_config.mutable_upstream_tls_context(); |
156 | 0 | #ifdef ENVOY_ENABLE_YAML |
157 | 0 | initializeUpstreamTlsContextConfig( |
158 | 0 | Ssl::ClientSslTransportOptions().setAlpn(true).setSan(san_to_match).setSni("lyft.com"), |
159 | 0 | *tls_context, connect_to_upstreams); |
160 | | #else |
161 | | UNREFERENCED_PARAMETER(tls_context); |
162 | | UNREFERENCED_PARAMETER(san_to_match); |
163 | | UNREFERENCED_PARAMETER(connect_to_upstreams); |
164 | | RELEASE_ASSERT(0, "unsupported"); |
165 | | #endif // ENVOY_ENABLE_YAML |
166 | |
|
167 | 0 | envoy::config::core::v3::TransportSocket message; |
168 | 0 | message.mutable_typed_config()->PackFrom(quic_transport_socket_config); |
169 | 0 | auto& config_factory = Config::Utility::getAndCheckFactory< |
170 | 0 | Server::Configuration::UpstreamTransportSocketConfigFactory>(message); |
171 | 0 | return config_factory.createTransportSocketFactory(quic_transport_socket_config, context).value(); |
172 | 0 | } |
173 | | |
174 | | BufferingStreamDecoderPtr |
175 | | sendRequestAndWaitForResponse(Event::Dispatcher& dispatcher, const std::string& method, |
176 | | const std::string& url, const std::string& body, |
177 | | const std::string& host, const std::string& content_type, |
178 | 0 | Http::CodecClientProd& client) { |
179 | 0 | BufferingStreamDecoderPtr response(new BufferingStreamDecoder([&]() -> void { |
180 | 0 | client.close(); |
181 | 0 | dispatcher.exit(); |
182 | 0 | })); |
183 | 0 | Http::RequestEncoder& encoder = client.newStream(*response); |
184 | 0 | encoder.getStream().addCallbacks(*response); |
185 | |
|
186 | 0 | Http::TestRequestHeaderMapImpl headers; |
187 | 0 | headers.setMethod(method); |
188 | 0 | headers.setPath(url); |
189 | 0 | headers.setHost(host); |
190 | 0 | headers.setReferenceScheme(Http::Headers::get().SchemeValues.Http); |
191 | 0 | if (!content_type.empty()) { |
192 | 0 | headers.setContentType(content_type); |
193 | 0 | } |
194 | 0 | const auto status = encoder.encodeHeaders(headers, body.empty()); |
195 | 0 | ASSERT(status.ok()); |
196 | 0 | if (!body.empty()) { |
197 | 0 | Buffer::OwnedImpl body_buffer(body); |
198 | 0 | encoder.encodeData(body_buffer, true); |
199 | 0 | } |
200 | |
|
201 | 0 | dispatcher.run(Event::Dispatcher::RunType::Block); |
202 | 0 | return response; |
203 | 0 | } |
204 | | |
205 | | BufferingStreamDecoderPtr |
206 | | IntegrationUtil::makeSingleRequest(const Network::Address::InstanceConstSharedPtr& addr, |
207 | | const std::string& method, const std::string& url, |
208 | | const std::string& body, Http::CodecType type, |
209 | 0 | const std::string& host, const std::string& content_type) { |
210 | 0 | NiceMock<Stats::MockIsolatedStatsStore> mock_stats_store; |
211 | 0 | Quic::QuicStatNames quic_stat_names(mock_stats_store.symbolTable()); |
212 | 0 | NiceMock<Random::MockRandomGenerator> random; |
213 | 0 | Event::GlobalTimeSystem time_system; |
214 | 0 | NiceMock<Random::MockRandomGenerator> random_generator; |
215 | 0 | envoy::config::bootstrap::v3::Bootstrap bootstrap; |
216 | 0 | Api::Impl api(Thread::threadFactoryForTest(), mock_stats_store, time_system, |
217 | 0 | Filesystem::fileSystemForTest(), random_generator, bootstrap); |
218 | 0 | Event::DispatcherPtr dispatcher(api.allocateDispatcher("test_thread")); |
219 | 0 | TestConnectionCallbacks connection_callbacks(*dispatcher); |
220 | 0 | Network::TransportSocketOptionsConstSharedPtr options; |
221 | |
|
222 | 0 | std::shared_ptr<Upstream::MockClusterInfo> cluster{new NiceMock<Upstream::MockClusterInfo>()}; |
223 | 0 | Upstream::HostDescriptionConstSharedPtr host_description = |
224 | 0 | std::make_shared<Upstream::HostDescriptionImpl>( |
225 | 0 | cluster, "", |
226 | 0 | *Network::Utility::resolveUrl( |
227 | 0 | fmt::format("{}://127.0.0.1:80", (type == Http::CodecType::HTTP3 ? "udp" : "tcp"))), |
228 | 0 | nullptr, nullptr, envoy::config::core::v3::Locality().default_instance(), |
229 | 0 | envoy::config::endpoint::v3::Endpoint::HealthCheckConfig::default_instance(), 0, |
230 | 0 | time_system); |
231 | |
|
232 | 0 | if (type <= Http::CodecType::HTTP2) { |
233 | 0 | Http::CodecClientProd client(type, |
234 | 0 | dispatcher->createClientConnection( |
235 | 0 | addr, Network::Address::InstanceConstSharedPtr(), |
236 | 0 | Network::Test::createRawBufferSocket(), nullptr, nullptr), |
237 | 0 | host_description, *dispatcher, random, options); |
238 | 0 | return sendRequestAndWaitForResponse(*dispatcher, method, url, body, host, content_type, |
239 | 0 | client); |
240 | 0 | } |
241 | | |
242 | 0 | #ifdef ENVOY_ENABLE_QUIC |
243 | 0 | testing::NiceMock<ThreadLocal::MockInstance> threadlocal; |
244 | 0 | NiceMock<Server::Configuration::MockServerFactoryContext> server_factory_context; |
245 | 0 | Extensions::TransportSockets::Tls::ContextManagerImpl manager(server_factory_context); |
246 | 0 | Network::UpstreamTransportSocketFactoryPtr transport_socket_factory = |
247 | 0 | createQuicUpstreamTransportSocketFactory(api, mock_stats_store, manager, threadlocal, |
248 | 0 | "spiffe://lyft.com/backend-team"); |
249 | 0 | auto& quic_transport_socket_factory = |
250 | 0 | dynamic_cast<Quic::QuicClientTransportSocketFactory&>(*transport_socket_factory); |
251 | 0 | auto persistent_info = std::make_unique<Quic::PersistentQuicInfoImpl>(*dispatcher, 0); |
252 | |
|
253 | 0 | Network::Address::InstanceConstSharedPtr local_address; |
254 | 0 | if (addr->ip()->version() == Network::Address::IpVersion::v4) { |
255 | 0 | local_address = Network::Utility::getLocalAddress(Network::Address::IpVersion::v4); |
256 | 0 | } else { |
257 | | // Docker only works with loopback v6 address. |
258 | 0 | local_address = std::make_shared<Network::Address::Ipv6Instance>("::1"); |
259 | 0 | } |
260 | 0 | quic::DeterministicConnectionIdGenerator generator(quic::kQuicDefaultConnectionIdLength); |
261 | 0 | Network::ClientConnectionPtr connection = Quic::createQuicNetworkConnection( |
262 | 0 | *persistent_info, quic_transport_socket_factory.getCryptoConfig(), |
263 | 0 | quic::QuicServerId( |
264 | 0 | quic_transport_socket_factory.clientContextConfig()->serverNameIndication(), |
265 | 0 | static_cast<uint16_t>(addr->ip()->port())), |
266 | 0 | *dispatcher, addr, local_address, quic_stat_names, {}, *mock_stats_store.rootScope(), nullptr, |
267 | 0 | nullptr, generator, quic_transport_socket_factory); |
268 | 0 | connection->addConnectionCallbacks(connection_callbacks); |
269 | 0 | Http::CodecClientProd client(type, std::move(connection), host_description, *dispatcher, random, |
270 | 0 | options); |
271 | | // Quic connection needs to finish handshake. |
272 | 0 | dispatcher->run(Event::Dispatcher::RunType::Block); |
273 | 0 | return sendRequestAndWaitForResponse(*dispatcher, method, url, body, host, content_type, client); |
274 | | #else |
275 | | ASSERT(false, "running a QUIC integration test without compiling QUIC"); |
276 | | return nullptr; |
277 | | #endif |
278 | 0 | } |
279 | | |
280 | | BufferingStreamDecoderPtr |
281 | | IntegrationUtil::makeSingleRequest(uint32_t port, const std::string& method, const std::string& url, |
282 | | const std::string& body, Http::CodecType type, |
283 | | Network::Address::IpVersion ip_version, const std::string& host, |
284 | 0 | const std::string& content_type) { |
285 | 0 | auto addr = *Network::Utility::resolveUrl( |
286 | 0 | fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(ip_version), port)); |
287 | 0 | return makeSingleRequest(addr, method, url, body, type, host, content_type); |
288 | 0 | } |
289 | | |
290 | | RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& request_data, |
291 | | ReadCallback response_data_callback, |
292 | | Network::Address::IpVersion version, |
293 | | Event::Dispatcher& dispatcher, |
294 | | Network::TransportSocketPtr transport_socket) |
295 | | : RawConnectionDriver(port, writeBufferCallback(request_data), response_data_callback, version, |
296 | 0 | dispatcher, std::move(transport_socket)) {} |
297 | | |
298 | | RawConnectionDriver::RawConnectionDriver(uint32_t port, DoWriteCallback write_request_callback, |
299 | | ReadCallback response_data_callback, |
300 | | Network::Address::IpVersion version, |
301 | | Event::Dispatcher& dispatcher, |
302 | | Network::TransportSocketPtr transport_socket) |
303 | 0 | : dispatcher_(dispatcher), remaining_bytes_to_send_(0) { |
304 | 0 | api_ = Api::createApiForTest(stats_store_); |
305 | 0 | Event::GlobalTimeSystem time_system; |
306 | 0 | callbacks_ = std::make_unique<ConnectionCallbacks>( |
307 | 0 | [this, write_request_callback]() { |
308 | 0 | Buffer::OwnedImpl buffer; |
309 | 0 | const bool close_after = write_request_callback(buffer); |
310 | 0 | remaining_bytes_to_send_ += buffer.length(); |
311 | 0 | client_->write(buffer, close_after); |
312 | 0 | }, |
313 | 0 | dispatcher); |
314 | |
|
315 | 0 | if (transport_socket == nullptr) { |
316 | 0 | transport_socket = Network::Test::createRawBufferSocket(); |
317 | 0 | } |
318 | |
|
319 | 0 | client_ = dispatcher_.createClientConnection( |
320 | 0 | *Network::Utility::resolveUrl( |
321 | 0 | fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version), port)), |
322 | 0 | Network::Address::InstanceConstSharedPtr(), std::move(transport_socket), nullptr, nullptr); |
323 | | // ConnectionCallbacks will call write_request_callback from the connect and low-watermark |
324 | | // callbacks. Set a small buffer limit so high-watermark is triggered after every write and |
325 | | // low-watermark is triggered every time the buffer is drained. |
326 | 0 | client_->setBufferLimits(1); |
327 | 0 | client_->addConnectionCallbacks(*callbacks_); |
328 | 0 | client_->addReadFilter( |
329 | 0 | Network::ReadFilterSharedPtr{new ForwardingFilter(*this, response_data_callback)}); |
330 | 0 | client_->addBytesSentCallback([&](uint64_t bytes) { |
331 | 0 | remaining_bytes_to_send_ -= bytes; |
332 | 0 | return true; |
333 | 0 | }); |
334 | 0 | client_->connect(); |
335 | 0 | } |
336 | | |
337 | | // This factory is needed to avoid dealing with the ServerFactoryContext, which is |
338 | | // problematic in dedicated threads for fake upstreams or test client. |
339 | | // UHV is needed in fake upstreams or test client for translation |
340 | | // of extended CONNECT to upgrade, which is done by the codecs. |
341 | | class FakeHeaderValidatorFactory : public Http::HeaderValidatorFactory { |
342 | | public: |
343 | 0 | FakeHeaderValidatorFactory(const HeaderValidatorConfig& config) : config_(config) {} |
344 | | |
345 | | Http::ServerHeaderValidatorPtr |
346 | 0 | createServerHeaderValidator(Http::Protocol protocol, Http::HeaderValidatorStats& stats) override { |
347 | 0 | ConfigOverrides config_overrides; |
348 | 0 |
|
349 | 0 | switch (protocol) { |
350 | 0 | case Http::Protocol::Http3: |
351 | 0 | case Http::Protocol::Http2: |
352 | 0 | return std::make_unique<ServerHttp2HeaderValidator>(config_, protocol, stats, |
353 | 0 | config_overrides); |
354 | 0 | case Http::Protocol::Http11: |
355 | 0 | case Http::Protocol::Http10: |
356 | 0 | return std::make_unique<ServerHttp1HeaderValidator>(config_, protocol, stats, |
357 | 0 | config_overrides); |
358 | 0 | } |
359 | 0 | PANIC_DUE_TO_CORRUPT_ENUM; |
360 | 0 | } |
361 | | |
362 | | Http::ClientHeaderValidatorPtr |
363 | 0 | createClientHeaderValidator(Http::Protocol protocol, Http::HeaderValidatorStats& stats) override { |
364 | 0 | ConfigOverrides config_overrides; |
365 | 0 |
|
366 | 0 | switch (protocol) { |
367 | 0 | case Http::Protocol::Http3: |
368 | 0 | case Http::Protocol::Http2: |
369 | 0 | return std::make_unique<ClientHttp2HeaderValidator>(config_, protocol, stats, |
370 | 0 | config_overrides); |
371 | 0 | case Http::Protocol::Http11: |
372 | 0 | case Http::Protocol::Http10: |
373 | 0 | return std::make_unique<ClientHttp1HeaderValidator>(config_, protocol, stats, |
374 | 0 | config_overrides); |
375 | 0 | } |
376 | 0 | PANIC_DUE_TO_CORRUPT_ENUM; |
377 | 0 | } |
378 | | |
379 | | private: |
380 | | const HeaderValidatorConfig config_; |
381 | | }; |
382 | | |
383 | | Http::HeaderValidatorFactoryPtr |
384 | 1.22k | IntegrationUtil::makeHeaderValidationFactory([[maybe_unused]] const HeaderValidatorConfig& config) { |
385 | | #ifdef ENVOY_ENABLE_UHV |
386 | | return std::make_unique<FakeHeaderValidatorFactory>(config); |
387 | | #else |
388 | 1.22k | return nullptr; |
389 | 1.22k | #endif |
390 | 1.22k | } |
391 | | |
392 | 0 | RawConnectionDriver::~RawConnectionDriver() = default; |
393 | | |
394 | 0 | testing::AssertionResult RawConnectionDriver::waitForConnection() { |
395 | | // TODO(mattklein123): Add a timeout and switch to events and waitFor(). |
396 | 0 | while (!callbacks_->connected() && !callbacks_->closed()) { |
397 | 0 | Event::GlobalTimeSystem().timeSystem().realSleepDoNotUseWithoutScrutiny( |
398 | 0 | std::chrono::milliseconds(10)); |
399 | 0 | dispatcher_.run(Event::Dispatcher::RunType::NonBlock); |
400 | 0 | } |
401 | 0 | if (!callbacks_->connected()) { |
402 | 0 | return testing::AssertionFailure(); |
403 | 0 | } |
404 | 0 | return testing::AssertionSuccess(); |
405 | 0 | } |
406 | | |
407 | | testing::AssertionResult RawConnectionDriver::run(Event::Dispatcher::RunType run_type, |
408 | 0 | std::chrono::milliseconds timeout) { |
409 | 0 | Event::TimerPtr timeout_timer = dispatcher_.createTimer([this]() -> void { dispatcher_.exit(); }); |
410 | 0 | timeout_timer->enableTimer(timeout); |
411 | |
|
412 | 0 | dispatcher_.run(run_type); |
413 | |
|
414 | 0 | if (timeout_timer->enabled()) { |
415 | 0 | timeout_timer->disableTimer(); |
416 | 0 | return testing::AssertionSuccess(); |
417 | 0 | } |
418 | 0 | return testing::AssertionFailure(); |
419 | 0 | } |
420 | | |
421 | 0 | void RawConnectionDriver::close() { client_->close(Network::ConnectionCloseType::FlushWrite); } |
422 | | |
423 | 0 | bool RawConnectionDriver::allBytesSent() const { return remaining_bytes_to_send_ == 0; } |
424 | | |
425 | | WaitForPayloadReader::WaitForPayloadReader(Event::Dispatcher& dispatcher) |
426 | 2.11k | : dispatcher_(dispatcher) {} |
427 | | |
428 | 130 | Network::FilterStatus WaitForPayloadReader::onData(Buffer::Instance& data, bool end_stream) { |
429 | 130 | data_.append(data.toString()); |
430 | 130 | data.drain(data.length()); |
431 | 130 | read_end_stream_ = end_stream; |
432 | 130 | if ((!data_to_wait_for_.empty() && absl::StartsWith(data_, data_to_wait_for_)) || |
433 | 130 | (exact_match_ == false && data_.find(data_to_wait_for_) != std::string::npos) || end_stream) { |
434 | 0 | data_to_wait_for_.clear(); |
435 | 0 | dispatcher_.exit(); |
436 | 0 | } |
437 | | |
438 | 130 | if (wait_for_length_ && data_.size() >= length_to_wait_for_) { |
439 | 0 | wait_for_length_ = false; |
440 | 0 | dispatcher_.exit(); |
441 | 0 | } |
442 | | |
443 | 130 | return Network::FilterStatus::StopIteration; |
444 | 130 | } |
445 | | |
446 | | } // namespace Envoy |