/proc/self/cwd/test/integration/utility.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "utility.h" |
2 | | |
3 | | #include <chrono> |
4 | | #include <cstdint> |
5 | | #include <memory> |
6 | | #include <string> |
7 | | |
8 | | #include "envoy/config/bootstrap/v3/bootstrap.pb.h" |
9 | | #include "envoy/event/dispatcher.h" |
10 | | #include "envoy/extensions/transport_sockets/quic/v3/quic_transport.pb.h" |
11 | | #include "envoy/network/connection.h" |
12 | | |
13 | | #include "source/common/api/api_impl.h" |
14 | | #include "source/common/buffer/buffer_impl.h" |
15 | | #include "source/common/common/assert.h" |
16 | | #include "source/common/common/fmt.h" |
17 | | #include "source/common/config/utility.h" |
18 | | #include "source/common/http/header_map_impl.h" |
19 | | #include "source/common/http/headers.h" |
20 | | #include "source/common/network/address_impl.h" |
21 | | #include "source/common/network/utility.h" |
22 | | #include "source/common/quic/quic_stat_names.h" |
23 | | #include "source/common/upstream/upstream_impl.h" |
24 | | #include "source/extensions/http/header_validators/envoy_default/http1_header_validator.h" |
25 | | #include "source/extensions/http/header_validators/envoy_default/http2_header_validator.h" |
26 | | |
27 | | #ifdef ENVOY_ENABLE_QUIC |
28 | | #include "source/common/quic/client_connection_factory_impl.h" |
29 | | #include "source/common/quic/quic_transport_socket_factory.h" |
30 | | #include "quiche/quic/core/deterministic_connection_id_generator.h" |
31 | | #endif |
32 | | |
33 | | #ifdef ENVOY_ENABLE_YAML |
34 | | #include "test/common/upstream/utility.h" |
35 | | #include "test/integration/ssl_utility.h" |
36 | | #endif |
37 | | #include "test/mocks/common.h" |
38 | | #include "test/mocks/server/instance.h" |
39 | | #include "test/mocks/server/transport_socket_factory_context.h" |
40 | | #include "test/mocks/stats/mocks.h" |
41 | | #include "test/mocks/upstream/cluster_info.h" |
42 | | #include "test/test_common/environment.h" |
43 | | #include "test/test_common/network_utility.h" |
44 | | #include "test/test_common/printers.h" |
45 | | #include "test/test_common/utility.h" |
46 | | |
47 | | #include "absl/strings/match.h" |
48 | | |
49 | | namespace Envoy { |
50 | | |
51 | | using ::envoy::extensions::http::header_validators::envoy_default::v3::HeaderValidatorConfig; |
52 | | using ::Envoy::Extensions::Http::HeaderValidators::EnvoyDefault::ClientHttp1HeaderValidator; |
53 | | using ::Envoy::Extensions::Http::HeaderValidators::EnvoyDefault::ClientHttp2HeaderValidator; |
54 | | using ::Envoy::Extensions::Http::HeaderValidators::EnvoyDefault::ConfigOverrides; |
55 | | using ::Envoy::Extensions::Http::HeaderValidators::EnvoyDefault::ServerHttp1HeaderValidator; |
56 | | using ::Envoy::Extensions::Http::HeaderValidators::EnvoyDefault::ServerHttp2HeaderValidator; |
57 | | |
58 | | namespace { |
59 | | |
60 | 0 | RawConnectionDriver::DoWriteCallback writeBufferCallback(Buffer::Instance& data) { |
61 | 0 | auto shared_data = std::make_shared<Buffer::OwnedImpl>(); |
62 | 0 | shared_data->move(data); |
63 | 0 | return [shared_data](Buffer::Instance& dest) { |
64 | 0 | if (shared_data->length() > 0) { |
65 | 0 | dest.add(*shared_data); |
66 | 0 | shared_data->drain(shared_data->length()); |
67 | 0 | } |
68 | 0 | return false; |
69 | 0 | }; |
70 | 0 | } |
71 | | |
72 | | } // namespace |
73 | | |
74 | 2.64k | void BufferingStreamDecoder::decodeHeaders(Http::ResponseHeaderMapPtr&& headers, bool end_stream) { |
75 | 2.64k | ASSERT(!complete_); |
76 | 2.64k | complete_ = end_stream; |
77 | 2.64k | headers_ = std::move(headers); |
78 | 2.64k | if (complete_) { |
79 | 0 | onComplete(); |
80 | 0 | } |
81 | 2.64k | } |
82 | | |
83 | 5.29k | void BufferingStreamDecoder::decodeData(Buffer::Instance& data, bool end_stream) { |
84 | 5.29k | ASSERT(!complete_); |
85 | 5.29k | complete_ = end_stream; |
86 | 5.29k | body_.append(data.toString()); |
87 | 5.29k | if (complete_) { |
88 | 2.64k | onComplete(); |
89 | 2.64k | } |
90 | 5.29k | } |
91 | | |
92 | 0 | void BufferingStreamDecoder::decodeTrailers(Http::ResponseTrailerMapPtr&&) { |
93 | 0 | PANIC("not implemented"); |
94 | 0 | } |
95 | | |
96 | 2.64k | void BufferingStreamDecoder::onComplete() { |
97 | 2.64k | ASSERT(complete_); |
98 | 2.64k | on_complete_cb_(); |
99 | 2.64k | } |
100 | | |
101 | 0 | void BufferingStreamDecoder::onResetStream(Http::StreamResetReason, absl::string_view) { |
102 | 0 | ADD_FAILURE(); |
103 | 0 | } |
104 | | |
105 | | // A callback for a QUIC client connection to unblock the test after handshake succeeds. QUIC |
106 | | // network connection initiates handshake and raises Connected event when it's done. Tests should |
107 | | // proceed with sending requests afterwards. |
108 | | class TestConnectionCallbacks : public Network::ConnectionCallbacks { |
109 | | public: |
110 | 2.64k | TestConnectionCallbacks(Event::Dispatcher& dispatcher) : dispatcher_(dispatcher) {} |
111 | | |
112 | | // Network::ConnectionCallbacks |
113 | 0 | void onEvent(Network::ConnectionEvent event) override { |
114 | 0 | if (event == Network::ConnectionEvent::Connected) { |
115 | | // Handshake finished, unblock the test to continue. This is needed because we call |
116 | | // Dispatcher::run() with Block to wait for the handshake to finish before proceeding. |
117 | | // TODO(danzh) find an alternative approach with behaviors more in parallel with SSL. |
118 | 0 | connected_ = true; |
119 | 0 | dispatcher_.exit(); |
120 | 0 | } else if (event == Network::ConnectionEvent::RemoteClose) { |
121 | | // If the peer closes the connection, no need to wait anymore. |
122 | 0 | dispatcher_.exit(); |
123 | 0 | } else { |
124 | 0 | if (!connected_) { |
125 | | // Before handshake gets established, any connection failure should exit the loop. I.e. a |
126 | | // QUIC connection may fail of INVALID_VERSION if both this client doesn't support any of |
127 | | // the versions the server advertised before handshake established. In this case the |
128 | | // connection is closed locally and this is in a blocking event loop. |
129 | 0 | dispatcher_.exit(); |
130 | 0 | } |
131 | 0 | } |
132 | 0 | } |
133 | | |
134 | 0 | void onAboveWriteBufferHighWatermark() override {} |
135 | 0 | void onBelowWriteBufferLowWatermark() override {} |
136 | | |
137 | | private: |
138 | | Event::Dispatcher& dispatcher_; |
139 | | bool connected_{false}; |
140 | | }; |
141 | | |
142 | | Network::UpstreamTransportSocketFactoryPtr |
143 | | IntegrationUtil::createQuicUpstreamTransportSocketFactory(Api::Api& api, Stats::Store& store, |
144 | | Ssl::ContextManager& context_manager, |
145 | 0 | const std::string& san_to_match) { |
146 | 0 | NiceMock<Server::Configuration::MockTransportSocketFactoryContext> context; |
147 | 0 | ON_CALL(context.server_context_, api()).WillByDefault(testing::ReturnRef(api)); |
148 | 0 | ON_CALL(context, statsScope()).WillByDefault(testing::ReturnRef(*store.rootScope())); |
149 | 0 | ON_CALL(context, sslContextManager()).WillByDefault(testing::ReturnRef(context_manager)); |
150 | 0 | envoy::extensions::transport_sockets::quic::v3::QuicUpstreamTransport |
151 | 0 | quic_transport_socket_config; |
152 | 0 | auto* tls_context = quic_transport_socket_config.mutable_upstream_tls_context(); |
153 | 0 | #ifdef ENVOY_ENABLE_YAML |
154 | 0 | initializeUpstreamTlsContextConfig( |
155 | 0 | Ssl::ClientSslTransportOptions().setAlpn(true).setSan(san_to_match).setSni("lyft.com"), |
156 | 0 | *tls_context); |
157 | | #else |
158 | | UNREFERENCED_PARAMETER(tls_context); |
159 | | UNREFERENCED_PARAMETER(san_to_match); |
160 | | RELEASE_ASSERT(0, "unsupported"); |
161 | | #endif // ENVOY_ENABLE_YAML |
162 | |
|
163 | 0 | envoy::config::core::v3::TransportSocket message; |
164 | 0 | message.mutable_typed_config()->PackFrom(quic_transport_socket_config); |
165 | 0 | auto& config_factory = Config::Utility::getAndCheckFactory< |
166 | 0 | Server::Configuration::UpstreamTransportSocketConfigFactory>(message); |
167 | 0 | return config_factory.createTransportSocketFactory(quic_transport_socket_config, context); |
168 | 0 | } |
169 | | |
170 | | BufferingStreamDecoderPtr |
171 | | sendRequestAndWaitForResponse(Event::Dispatcher& dispatcher, const std::string& method, |
172 | | const std::string& url, const std::string& body, |
173 | | const std::string& host, const std::string& content_type, |
174 | 2.64k | Http::CodecClientProd& client) { |
175 | 2.64k | BufferingStreamDecoderPtr response(new BufferingStreamDecoder([&]() -> void { |
176 | 2.64k | client.close(); |
177 | 2.64k | dispatcher.exit(); |
178 | 2.64k | })); |
179 | 2.64k | Http::RequestEncoder& encoder = client.newStream(*response); |
180 | 2.64k | encoder.getStream().addCallbacks(*response); |
181 | | |
182 | 2.64k | Http::TestRequestHeaderMapImpl headers; |
183 | 2.64k | headers.setMethod(method); |
184 | 2.64k | headers.setPath(url); |
185 | 2.64k | headers.setHost(host); |
186 | 2.64k | headers.setReferenceScheme(Http::Headers::get().SchemeValues.Http); |
187 | 2.64k | if (!content_type.empty()) { |
188 | 0 | headers.setContentType(content_type); |
189 | 0 | } |
190 | 2.64k | const auto status = encoder.encodeHeaders(headers, body.empty()); |
191 | 2.64k | ASSERT(status.ok()); |
192 | 2.64k | if (!body.empty()) { |
193 | 0 | Buffer::OwnedImpl body_buffer(body); |
194 | 0 | encoder.encodeData(body_buffer, true); |
195 | 0 | } |
196 | | |
197 | 2.64k | dispatcher.run(Event::Dispatcher::RunType::Block); |
198 | 2.64k | return response; |
199 | 2.64k | } |
200 | | |
201 | | BufferingStreamDecoderPtr |
202 | | IntegrationUtil::makeSingleRequest(const Network::Address::InstanceConstSharedPtr& addr, |
203 | | const std::string& method, const std::string& url, |
204 | | const std::string& body, Http::CodecType type, |
205 | 2.64k | const std::string& host, const std::string& content_type) { |
206 | 2.64k | NiceMock<Stats::MockIsolatedStatsStore> mock_stats_store; |
207 | 2.64k | Quic::QuicStatNames quic_stat_names(mock_stats_store.symbolTable()); |
208 | 2.64k | NiceMock<Random::MockRandomGenerator> random; |
209 | 2.64k | Event::GlobalTimeSystem time_system; |
210 | 2.64k | NiceMock<Random::MockRandomGenerator> random_generator; |
211 | 2.64k | envoy::config::bootstrap::v3::Bootstrap bootstrap; |
212 | 2.64k | Api::Impl api(Thread::threadFactoryForTest(), mock_stats_store, time_system, |
213 | 2.64k | Filesystem::fileSystemForTest(), random_generator, bootstrap); |
214 | 2.64k | Event::DispatcherPtr dispatcher(api.allocateDispatcher("test_thread")); |
215 | 2.64k | TestConnectionCallbacks connection_callbacks(*dispatcher); |
216 | 2.64k | Network::TransportSocketOptionsConstSharedPtr options; |
217 | | |
218 | 2.64k | std::shared_ptr<Upstream::MockClusterInfo> cluster{new NiceMock<Upstream::MockClusterInfo>()}; |
219 | 2.64k | Upstream::HostDescriptionConstSharedPtr host_description = |
220 | 2.64k | std::make_shared<Upstream::HostDescriptionImpl>( |
221 | 2.64k | cluster, "", |
222 | 2.64k | Network::Utility::resolveUrl( |
223 | 2.64k | fmt::format("{}://127.0.0.1:80", (type == Http::CodecType::HTTP3 ? "udp" : "tcp"))), |
224 | 2.64k | nullptr, envoy::config::core::v3::Locality().default_instance(), |
225 | 2.64k | envoy::config::endpoint::v3::Endpoint::HealthCheckConfig::default_instance(), 0, |
226 | 2.64k | time_system); |
227 | | |
228 | 2.64k | if (type <= Http::CodecType::HTTP2) { |
229 | 2.64k | Http::CodecClientProd client(type, |
230 | 2.64k | dispatcher->createClientConnection( |
231 | 2.64k | addr, Network::Address::InstanceConstSharedPtr(), |
232 | 2.64k | Network::Test::createRawBufferSocket(), nullptr, nullptr), |
233 | 2.64k | host_description, *dispatcher, random, options); |
234 | 2.64k | return sendRequestAndWaitForResponse(*dispatcher, method, url, body, host, content_type, |
235 | 2.64k | client); |
236 | 2.64k | } |
237 | | |
238 | 0 | #ifdef ENVOY_ENABLE_QUIC |
239 | 0 | Extensions::TransportSockets::Tls::ContextManagerImpl manager(time_system); |
240 | 0 | Network::UpstreamTransportSocketFactoryPtr transport_socket_factory = |
241 | 0 | createQuicUpstreamTransportSocketFactory(api, mock_stats_store, manager, |
242 | 0 | "spiffe://lyft.com/backend-team"); |
243 | 0 | auto& quic_transport_socket_factory = |
244 | 0 | dynamic_cast<Quic::QuicClientTransportSocketFactory&>(*transport_socket_factory); |
245 | 0 | auto persistent_info = std::make_unique<Quic::PersistentQuicInfoImpl>(*dispatcher, 0); |
246 | |
|
247 | 0 | Network::Address::InstanceConstSharedPtr local_address; |
248 | 0 | if (addr->ip()->version() == Network::Address::IpVersion::v4) { |
249 | 0 | local_address = Network::Utility::getLocalAddress(Network::Address::IpVersion::v4); |
250 | 0 | } else { |
251 | | // Docker only works with loopback v6 address. |
252 | 0 | local_address = std::make_shared<Network::Address::Ipv6Instance>("::1"); |
253 | 0 | } |
254 | 0 | quic::DeterministicConnectionIdGenerator generator(quic::kQuicDefaultConnectionIdLength); |
255 | 0 | Network::ClientConnectionPtr connection = Quic::createQuicNetworkConnection( |
256 | 0 | *persistent_info, quic_transport_socket_factory.getCryptoConfig(), |
257 | 0 | quic::QuicServerId( |
258 | 0 | quic_transport_socket_factory.clientContextConfig()->serverNameIndication(), |
259 | 0 | static_cast<uint16_t>(addr->ip()->port())), |
260 | 0 | *dispatcher, addr, local_address, quic_stat_names, {}, *mock_stats_store.rootScope(), nullptr, |
261 | 0 | nullptr, generator); |
262 | 0 | connection->addConnectionCallbacks(connection_callbacks); |
263 | 0 | Http::CodecClientProd client(type, std::move(connection), host_description, *dispatcher, random, |
264 | 0 | options); |
265 | | // Quic connection needs to finish handshake. |
266 | 0 | dispatcher->run(Event::Dispatcher::RunType::Block); |
267 | 0 | return sendRequestAndWaitForResponse(*dispatcher, method, url, body, host, content_type, client); |
268 | | #else |
269 | | ASSERT(false, "running a QUIC integration test without compiling QUIC"); |
270 | | return nullptr; |
271 | | #endif |
272 | 2.64k | } |
273 | | |
274 | | BufferingStreamDecoderPtr |
275 | | IntegrationUtil::makeSingleRequest(uint32_t port, const std::string& method, const std::string& url, |
276 | | const std::string& body, Http::CodecType type, |
277 | | Network::Address::IpVersion ip_version, const std::string& host, |
278 | 0 | const std::string& content_type) { |
279 | 0 | auto addr = Network::Utility::resolveUrl( |
280 | 0 | fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(ip_version), port)); |
281 | 0 | return makeSingleRequest(addr, method, url, body, type, host, content_type); |
282 | 0 | } |
283 | | |
284 | | RawConnectionDriver::RawConnectionDriver(uint32_t port, Buffer::Instance& request_data, |
285 | | ReadCallback response_data_callback, |
286 | | Network::Address::IpVersion version, |
287 | | Event::Dispatcher& dispatcher, |
288 | | Network::TransportSocketPtr transport_socket) |
289 | | : RawConnectionDriver(port, writeBufferCallback(request_data), response_data_callback, version, |
290 | 0 | dispatcher, std::move(transport_socket)) {} |
291 | | |
292 | | RawConnectionDriver::RawConnectionDriver(uint32_t port, DoWriteCallback write_request_callback, |
293 | | ReadCallback response_data_callback, |
294 | | Network::Address::IpVersion version, |
295 | | Event::Dispatcher& dispatcher, |
296 | | Network::TransportSocketPtr transport_socket) |
297 | 0 | : dispatcher_(dispatcher), remaining_bytes_to_send_(0) { |
298 | 0 | api_ = Api::createApiForTest(stats_store_); |
299 | 0 | Event::GlobalTimeSystem time_system; |
300 | 0 | callbacks_ = std::make_unique<ConnectionCallbacks>( |
301 | 0 | [this, write_request_callback]() { |
302 | 0 | Buffer::OwnedImpl buffer; |
303 | 0 | const bool close_after = write_request_callback(buffer); |
304 | 0 | remaining_bytes_to_send_ += buffer.length(); |
305 | 0 | client_->write(buffer, close_after); |
306 | 0 | }, |
307 | 0 | dispatcher); |
308 | |
|
309 | 0 | if (transport_socket == nullptr) { |
310 | 0 | transport_socket = Network::Test::createRawBufferSocket(); |
311 | 0 | } |
312 | |
|
313 | 0 | client_ = dispatcher_.createClientConnection( |
314 | 0 | Network::Utility::resolveUrl( |
315 | 0 | fmt::format("tcp://{}:{}", Network::Test::getLoopbackAddressUrlString(version), port)), |
316 | 0 | Network::Address::InstanceConstSharedPtr(), std::move(transport_socket), nullptr, nullptr); |
317 | | // ConnectionCallbacks will call write_request_callback from the connect and low-watermark |
318 | | // callbacks. Set a small buffer limit so high-watermark is triggered after every write and |
319 | | // low-watermark is triggered every time the buffer is drained. |
320 | 0 | client_->setBufferLimits(1); |
321 | 0 | client_->addConnectionCallbacks(*callbacks_); |
322 | 0 | client_->addReadFilter( |
323 | 0 | Network::ReadFilterSharedPtr{new ForwardingFilter(*this, response_data_callback)}); |
324 | 0 | client_->addBytesSentCallback([&](uint64_t bytes) { |
325 | 0 | remaining_bytes_to_send_ -= bytes; |
326 | 0 | return true; |
327 | 0 | }); |
328 | 0 | client_->connect(); |
329 | 0 | } |
330 | | |
331 | | // This factory is needed to avoid dealing with the ServerFactoryContext, which is |
332 | | // problematic in dedicated threads for fake upstreams or test client. |
333 | | // UHV is needed in fake upstreams or test client for translation |
334 | | // of extended CONNECT to upgrade, which is done by the codecs. |
335 | | class FakeHeaderValidatorFactory : public Http::HeaderValidatorFactory { |
336 | | public: |
337 | 0 | FakeHeaderValidatorFactory(const HeaderValidatorConfig& config) : config_(config) {} |
338 | | |
339 | | Http::ServerHeaderValidatorPtr |
340 | 0 | createServerHeaderValidator(Http::Protocol protocol, Http::HeaderValidatorStats& stats) override { |
341 | 0 | ConfigOverrides config_overrides; |
342 | 0 |
|
343 | 0 | switch (protocol) { |
344 | 0 | case Http::Protocol::Http3: |
345 | 0 | case Http::Protocol::Http2: |
346 | 0 | return std::make_unique<ServerHttp2HeaderValidator>(config_, protocol, stats, |
347 | 0 | config_overrides); |
348 | 0 | case Http::Protocol::Http11: |
349 | 0 | case Http::Protocol::Http10: |
350 | 0 | return std::make_unique<ServerHttp1HeaderValidator>(config_, protocol, stats, |
351 | 0 | config_overrides); |
352 | 0 | } |
353 | 0 | PANIC_DUE_TO_CORRUPT_ENUM; |
354 | 0 | } |
355 | | |
356 | | Http::ClientHeaderValidatorPtr |
357 | 0 | createClientHeaderValidator(Http::Protocol protocol, Http::HeaderValidatorStats& stats) override { |
358 | 0 | ConfigOverrides config_overrides; |
359 | 0 |
|
360 | 0 | switch (protocol) { |
361 | 0 | case Http::Protocol::Http3: |
362 | 0 | case Http::Protocol::Http2: |
363 | 0 | return std::make_unique<ClientHttp2HeaderValidator>(config_, protocol, stats, |
364 | 0 | config_overrides); |
365 | 0 | case Http::Protocol::Http11: |
366 | 0 | case Http::Protocol::Http10: |
367 | 0 | return std::make_unique<ClientHttp1HeaderValidator>(config_, protocol, stats, |
368 | 0 | config_overrides); |
369 | 0 | } |
370 | 0 | PANIC_DUE_TO_CORRUPT_ENUM; |
371 | 0 | } |
372 | | |
373 | | private: |
374 | | const HeaderValidatorConfig config_; |
375 | | }; |
376 | | |
377 | | Http::HeaderValidatorFactoryPtr |
378 | 1.69k | IntegrationUtil::makeHeaderValidationFactory([[maybe_unused]] const HeaderValidatorConfig& config) { |
379 | | #ifdef ENVOY_ENABLE_UHV |
380 | | return std::make_unique<FakeHeaderValidatorFactory>(config); |
381 | | #else |
382 | 1.69k | return nullptr; |
383 | 1.69k | #endif |
384 | 1.69k | } |
385 | | |
386 | 0 | RawConnectionDriver::~RawConnectionDriver() = default; |
387 | | |
388 | 0 | testing::AssertionResult RawConnectionDriver::waitForConnection() { |
389 | | // TODO(mattklein123): Add a timeout and switch to events and waitFor(). |
390 | 0 | while (!callbacks_->connected() && !callbacks_->closed()) { |
391 | 0 | Event::GlobalTimeSystem().timeSystem().realSleepDoNotUseWithoutScrutiny( |
392 | 0 | std::chrono::milliseconds(10)); |
393 | 0 | dispatcher_.run(Event::Dispatcher::RunType::NonBlock); |
394 | 0 | } |
395 | 0 | if (!callbacks_->connected()) { |
396 | 0 | return testing::AssertionFailure(); |
397 | 0 | } |
398 | 0 | return testing::AssertionSuccess(); |
399 | 0 | } |
400 | | |
401 | | testing::AssertionResult RawConnectionDriver::run(Event::Dispatcher::RunType run_type, |
402 | 0 | std::chrono::milliseconds timeout) { |
403 | 0 | Event::TimerPtr timeout_timer = dispatcher_.createTimer([this]() -> void { dispatcher_.exit(); }); |
404 | 0 | timeout_timer->enableTimer(timeout); |
405 | |
|
406 | 0 | dispatcher_.run(run_type); |
407 | |
|
408 | 0 | if (timeout_timer->enabled()) { |
409 | 0 | timeout_timer->disableTimer(); |
410 | 0 | return testing::AssertionSuccess(); |
411 | 0 | } |
412 | 0 | return testing::AssertionFailure(); |
413 | 0 | } |
414 | | |
415 | 0 | void RawConnectionDriver::close() { client_->close(Network::ConnectionCloseType::FlushWrite); } |
416 | | |
417 | 0 | bool RawConnectionDriver::allBytesSent() const { return remaining_bytes_to_send_ == 0; } |
418 | | |
419 | | WaitForPayloadReader::WaitForPayloadReader(Event::Dispatcher& dispatcher) |
420 | 2.52k | : dispatcher_(dispatcher) {} |
421 | | |
422 | 407 | Network::FilterStatus WaitForPayloadReader::onData(Buffer::Instance& data, bool end_stream) { |
423 | 407 | data_.append(data.toString()); |
424 | 407 | data.drain(data.length()); |
425 | 407 | read_end_stream_ = end_stream; |
426 | 407 | if ((!data_to_wait_for_.empty() && absl::StartsWith(data_, data_to_wait_for_)) || |
427 | 407 | (exact_match_ == false && data_.find(data_to_wait_for_) != std::string::npos) || end_stream) { |
428 | 0 | data_to_wait_for_.clear(); |
429 | 0 | dispatcher_.exit(); |
430 | 0 | } |
431 | | |
432 | 407 | if (wait_for_length_ && data_.size() >= length_to_wait_for_) { |
433 | 0 | wait_for_length_ = false; |
434 | 0 | dispatcher_.exit(); |
435 | 0 | } |
436 | | |
437 | 407 | return Network::FilterStatus::StopIteration; |
438 | 407 | } |
439 | | |
440 | | } // namespace Envoy |