/proc/self/cwd/test/integration/integration_tcp_client.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "test/integration/integration_tcp_client.h" |
2 | | |
3 | | #include <chrono> |
4 | | #include <cstdint> |
5 | | #include <functional> |
6 | | #include <memory> |
7 | | #include <string> |
8 | | #include <type_traits> |
9 | | |
10 | | #include "envoy/buffer/buffer.h" |
11 | | #include "envoy/common/time.h" |
12 | | #include "envoy/event/dispatcher.h" |
13 | | #include "envoy/event/timer.h" |
14 | | |
15 | | #include "source/common/buffer/buffer_impl.h" |
16 | | #include "source/common/common/fmt.h" |
17 | | #include "source/common/network/utility.h" |
18 | | |
19 | | #include "test/integration/utility.h" |
20 | | #include "test/mocks/buffer/mocks.h" |
21 | | #include "test/test_common/network_utility.h" |
22 | | #include "test/test_common/test_time_system.h" |
23 | | |
24 | | #include "gmock/gmock.h" |
25 | | #include "gtest/gtest.h" |
26 | | |
27 | | namespace Envoy { |
28 | | using ::testing::_; |
29 | | using ::testing::AnyNumber; |
30 | | using ::testing::AssertionFailure; |
31 | | using ::testing::AssertionResult; |
32 | | using ::testing::AssertionSuccess; |
33 | | using ::testing::AtLeast; |
34 | | using ::testing::Invoke; |
35 | | using ::testing::NiceMock; |
36 | | |
37 | | IntegrationTcpClient::IntegrationTcpClient( |
38 | | Event::Dispatcher& dispatcher, MockBufferFactory& factory, uint32_t port, |
39 | | Network::Address::IpVersion version, bool enable_half_close, |
40 | | const Network::ConnectionSocket::OptionsSharedPtr& options, |
41 | | Network::Address::InstanceConstSharedPtr source_address, absl::string_view destination_address) |
42 | | : payload_reader_(new WaitForPayloadReader(dispatcher)), |
43 | 2.11k | callbacks_(new ConnectionCallbacks(*this)) { |
44 | 2.11k | EXPECT_CALL(factory, createBuffer_(_, _, _)) |
45 | 2.11k | .Times(AtLeast(1)) |
46 | 2.11k | .WillOnce(Invoke([&](std::function<void()> below_low, std::function<void()> above_high, |
47 | 2.11k | std::function<void()> above_overflow) -> Buffer::Instance* { |
48 | 2.11k | client_write_buffer_ = |
49 | 2.11k | new NiceMock<MockWatermarkBuffer>(below_low, above_high, above_overflow); |
50 | 2.11k | return client_write_buffer_; |
51 | 2.11k | })) |
52 | 2.11k | .WillRepeatedly(Invoke([](std::function<void()> below_low, std::function<void()> above_high, |
53 | 2.11k | std::function<void()> above_overflow) -> Buffer::Instance* { |
54 | 2.11k | return new Buffer::WatermarkBuffer(below_low, above_high, above_overflow); |
55 | 2.11k | })); |
56 | | |
57 | 2.11k | connection_ = dispatcher.createClientConnection( |
58 | 2.11k | *Network::Utility::resolveUrl(fmt::format( |
59 | 2.11k | "tcp://{}:{}", |
60 | 2.11k | destination_address.empty() ? Network::Test::getLoopbackAddressUrlString(version) |
61 | 2.11k | : destination_address, |
62 | 2.11k | port)), |
63 | 2.11k | source_address, Network::Test::createRawBufferSocket(), options, nullptr); |
64 | | |
65 | 2.11k | ON_CALL(*client_write_buffer_, drain(_)) |
66 | 2.11k | .WillByDefault(Invoke(client_write_buffer_, &MockWatermarkBuffer::trackDrains)); |
67 | 2.11k | EXPECT_CALL(*client_write_buffer_, drain(_)).Times(AnyNumber()); |
68 | | |
69 | 2.11k | connection_->enableHalfClose(enable_half_close); |
70 | 2.11k | connection_->addConnectionCallbacks(*callbacks_); |
71 | 2.11k | connection_->addReadFilter(payload_reader_); |
72 | 2.11k | connection_->connect(); |
73 | 2.11k | } |
74 | | |
75 | 2.25k | void IntegrationTcpClient::close() { connection_->close(Network::ConnectionCloseType::NoFlush); } |
76 | | |
77 | 0 | void IntegrationTcpClient::close(Network::ConnectionCloseType close_type) { |
78 | 0 | connection_->close(close_type); |
79 | 0 | } |
80 | | |
81 | 0 | void IntegrationTcpClient::waitForData(const std::string& data, bool exact_match) { |
82 | 0 | auto found = payload_reader_->data().find(data); |
83 | 0 | if (found == 0 || (!exact_match && found != std::string::npos)) { |
84 | 0 | return; |
85 | 0 | } |
86 | | |
87 | 0 | payload_reader_->setDataToWaitFor(data, exact_match); |
88 | 0 | connection_->dispatcher().run(Event::Dispatcher::RunType::Block); |
89 | 0 | } |
90 | | |
91 | | AssertionResult IntegrationTcpClient::waitForData(size_t length, |
92 | 0 | std::chrono::milliseconds timeout) { |
93 | 0 | if (payload_reader_->data().size() >= length) { |
94 | 0 | return AssertionSuccess(); |
95 | 0 | } |
96 | | |
97 | 0 | return payload_reader_->waitForLength(length, timeout); |
98 | 0 | } |
99 | | |
100 | 0 | void IntegrationTcpClient::waitForDisconnect(bool ignore_spurious_events) { |
101 | 0 | if (disconnected_) { |
102 | 0 | return; |
103 | 0 | } |
104 | 0 | Event::TimerPtr timeout_timer = |
105 | 0 | connection_->dispatcher().createTimer([this]() -> void { connection_->dispatcher().exit(); }); |
106 | 0 | timeout_timer->enableTimer(TestUtility::DefaultTimeout); |
107 | |
|
108 | 0 | if (ignore_spurious_events) { |
109 | 0 | while (!disconnected_ && timeout_timer->enabled()) { |
110 | 0 | connection_->dispatcher().run(Event::Dispatcher::RunType::Block); |
111 | 0 | } |
112 | 0 | } else { |
113 | 0 | connection_->dispatcher().run(Event::Dispatcher::RunType::Block); |
114 | 0 | } |
115 | 0 | EXPECT_TRUE(disconnected_); |
116 | 0 | } |
117 | | |
118 | 0 | void IntegrationTcpClient::waitForHalfClose(bool ignore_spurious_events) { |
119 | 0 | waitForHalfClose(TestUtility::DefaultTimeout, ignore_spurious_events); |
120 | 0 | } |
121 | | |
122 | | void IntegrationTcpClient::waitForHalfClose(std::chrono::milliseconds timeout, |
123 | 0 | bool ignore_spurious_events) { |
124 | 0 | if (payload_reader_->readLastByte()) { |
125 | 0 | return; |
126 | 0 | } |
127 | 0 | Event::TimerPtr timeout_timer = |
128 | 0 | connection_->dispatcher().createTimer([this]() -> void { connection_->dispatcher().exit(); }); |
129 | 0 | timeout_timer->enableTimer(timeout); |
130 | |
|
131 | 0 | if (ignore_spurious_events) { |
132 | 0 | while (!payload_reader_->readLastByte() && timeout_timer->enabled()) { |
133 | 0 | connection_->dispatcher().run(Event::Dispatcher::RunType::Block); |
134 | 0 | } |
135 | 0 | } else { |
136 | 0 | connection_->dispatcher().run(Event::Dispatcher::RunType::Block); |
137 | 0 | } |
138 | |
|
139 | 0 | EXPECT_TRUE(payload_reader_->readLastByte()); |
140 | 0 | } |
141 | | |
142 | 0 | void IntegrationTcpClient::readDisable(bool disabled) { connection_->readDisable(disabled); } |
143 | | |
144 | | AssertionResult IntegrationTcpClient::write(const std::string& data, bool end_stream, bool verify, |
145 | 5.07k | std::chrono::milliseconds timeout) { |
146 | 5.07k | Event::TestTimeSystem::RealTimeBound bound(timeout); |
147 | 5.07k | Buffer::OwnedImpl buffer(data); |
148 | 5.07k | if (verify) { |
149 | 0 | EXPECT_CALL(*client_write_buffer_, move(_)); |
150 | 0 | if (!data.empty()) { |
151 | 0 | EXPECT_CALL(*client_write_buffer_, drain(_)).Times(AtLeast(1)); |
152 | 0 | } |
153 | 0 | } |
154 | | |
155 | 5.07k | uint64_t bytes_expected = client_write_buffer_->bytesDrained() + data.size(); |
156 | | |
157 | 5.07k | connection_->write(buffer, end_stream); |
158 | 5.07k | do { |
159 | 5.07k | connection_->dispatcher().run(Event::Dispatcher::RunType::NonBlock); |
160 | 5.07k | if (client_write_buffer_->bytesDrained() == bytes_expected || disconnected_) { |
161 | 5.07k | break; |
162 | 5.07k | } |
163 | 5.07k | } while (bound.withinBound()); |
164 | | |
165 | 5.07k | if (!bound.withinBound()) { |
166 | 0 | return AssertionFailure() << "Timed out completing write"; |
167 | 5.07k | } else if (verify && (disconnected_ || client_write_buffer_->bytesDrained() != bytes_expected)) { |
168 | 0 | return AssertionFailure() |
169 | 0 | << "Failed to complete write or unexpected disconnect. disconnected_: " << disconnected_ |
170 | 0 | << " bytes_drained: " << client_write_buffer_->bytesDrained() |
171 | 0 | << " bytes_expected: " << bytes_expected; |
172 | 0 | } |
173 | | |
174 | 5.07k | return AssertionSuccess(); |
175 | 5.07k | } |
176 | | |
177 | 2.83k | void IntegrationTcpClient::ConnectionCallbacks::onEvent(Network::ConnectionEvent event) { |
178 | 2.83k | if (event == Network::ConnectionEvent::RemoteClose) { |
179 | 4 | parent_.disconnected_ = true; |
180 | 4 | parent_.connection_->dispatcher().exit(); |
181 | 4 | } |
182 | 2.83k | } |
183 | | |
184 | | } // namespace Envoy |