Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/test/integration/integration_tcp_client.cc
Line
Count
Source (jump to first uncovered line)
1
#include "test/integration/integration_tcp_client.h"
2
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <memory>
7
#include <string>
8
#include <type_traits>
9
10
#include "envoy/buffer/buffer.h"
11
#include "envoy/common/time.h"
12
#include "envoy/event/dispatcher.h"
13
#include "envoy/event/timer.h"
14
15
#include "source/common/buffer/buffer_impl.h"
16
#include "source/common/common/fmt.h"
17
#include "source/common/network/utility.h"
18
19
#include "test/integration/utility.h"
20
#include "test/mocks/buffer/mocks.h"
21
#include "test/test_common/network_utility.h"
22
#include "test/test_common/test_time_system.h"
23
24
#include "gmock/gmock.h"
25
#include "gtest/gtest.h"
26
27
namespace Envoy {
28
using ::testing::_;
29
using ::testing::AnyNumber;
30
using ::testing::AssertionFailure;
31
using ::testing::AssertionResult;
32
using ::testing::AssertionSuccess;
33
using ::testing::AtLeast;
34
using ::testing::Invoke;
35
using ::testing::NiceMock;
36
37
IntegrationTcpClient::IntegrationTcpClient(
38
    Event::Dispatcher& dispatcher, MockBufferFactory& factory, uint32_t port,
39
    Network::Address::IpVersion version, bool enable_half_close,
40
    const Network::ConnectionSocket::OptionsSharedPtr& options,
41
    Network::Address::InstanceConstSharedPtr source_address, absl::string_view destination_address)
42
    : payload_reader_(new WaitForPayloadReader(dispatcher)),
43
2.11k
      callbacks_(new ConnectionCallbacks(*this)) {
44
2.11k
  EXPECT_CALL(factory, createBuffer_(_, _, _))
45
2.11k
      .Times(AtLeast(1))
46
2.11k
      .WillOnce(Invoke([&](std::function<void()> below_low, std::function<void()> above_high,
47
2.11k
                           std::function<void()> above_overflow) -> Buffer::Instance* {
48
2.11k
        client_write_buffer_ =
49
2.11k
            new NiceMock<MockWatermarkBuffer>(below_low, above_high, above_overflow);
50
2.11k
        return client_write_buffer_;
51
2.11k
      }))
52
2.11k
      .WillRepeatedly(Invoke([](std::function<void()> below_low, std::function<void()> above_high,
53
2.11k
                                std::function<void()> above_overflow) -> Buffer::Instance* {
54
2.11k
        return new Buffer::WatermarkBuffer(below_low, above_high, above_overflow);
55
2.11k
      }));
56
57
2.11k
  connection_ = dispatcher.createClientConnection(
58
2.11k
      *Network::Utility::resolveUrl(fmt::format(
59
2.11k
          "tcp://{}:{}",
60
2.11k
          destination_address.empty() ? Network::Test::getLoopbackAddressUrlString(version)
61
2.11k
                                      : destination_address,
62
2.11k
          port)),
63
2.11k
      source_address, Network::Test::createRawBufferSocket(), options, nullptr);
64
65
2.11k
  ON_CALL(*client_write_buffer_, drain(_))
66
2.11k
      .WillByDefault(Invoke(client_write_buffer_, &MockWatermarkBuffer::trackDrains));
67
2.11k
  EXPECT_CALL(*client_write_buffer_, drain(_)).Times(AnyNumber());
68
69
2.11k
  connection_->enableHalfClose(enable_half_close);
70
2.11k
  connection_->addConnectionCallbacks(*callbacks_);
71
2.11k
  connection_->addReadFilter(payload_reader_);
72
2.11k
  connection_->connect();
73
2.11k
}
74
75
2.25k
void IntegrationTcpClient::close() { connection_->close(Network::ConnectionCloseType::NoFlush); }
76
77
0
void IntegrationTcpClient::close(Network::ConnectionCloseType close_type) {
78
0
  connection_->close(close_type);
79
0
}
80
81
0
void IntegrationTcpClient::waitForData(const std::string& data, bool exact_match) {
82
0
  auto found = payload_reader_->data().find(data);
83
0
  if (found == 0 || (!exact_match && found != std::string::npos)) {
84
0
    return;
85
0
  }
86
87
0
  payload_reader_->setDataToWaitFor(data, exact_match);
88
0
  connection_->dispatcher().run(Event::Dispatcher::RunType::Block);
89
0
}
90
91
AssertionResult IntegrationTcpClient::waitForData(size_t length,
92
0
                                                  std::chrono::milliseconds timeout) {
93
0
  if (payload_reader_->data().size() >= length) {
94
0
    return AssertionSuccess();
95
0
  }
96
97
0
  return payload_reader_->waitForLength(length, timeout);
98
0
}
99
100
0
void IntegrationTcpClient::waitForDisconnect(bool ignore_spurious_events) {
101
0
  if (disconnected_) {
102
0
    return;
103
0
  }
104
0
  Event::TimerPtr timeout_timer =
105
0
      connection_->dispatcher().createTimer([this]() -> void { connection_->dispatcher().exit(); });
106
0
  timeout_timer->enableTimer(TestUtility::DefaultTimeout);
107
108
0
  if (ignore_spurious_events) {
109
0
    while (!disconnected_ && timeout_timer->enabled()) {
110
0
      connection_->dispatcher().run(Event::Dispatcher::RunType::Block);
111
0
    }
112
0
  } else {
113
0
    connection_->dispatcher().run(Event::Dispatcher::RunType::Block);
114
0
  }
115
0
  EXPECT_TRUE(disconnected_);
116
0
}
117
118
0
void IntegrationTcpClient::waitForHalfClose(bool ignore_spurious_events) {
119
0
  waitForHalfClose(TestUtility::DefaultTimeout, ignore_spurious_events);
120
0
}
121
122
void IntegrationTcpClient::waitForHalfClose(std::chrono::milliseconds timeout,
123
0
                                            bool ignore_spurious_events) {
124
0
  if (payload_reader_->readLastByte()) {
125
0
    return;
126
0
  }
127
0
  Event::TimerPtr timeout_timer =
128
0
      connection_->dispatcher().createTimer([this]() -> void { connection_->dispatcher().exit(); });
129
0
  timeout_timer->enableTimer(timeout);
130
131
0
  if (ignore_spurious_events) {
132
0
    while (!payload_reader_->readLastByte() && timeout_timer->enabled()) {
133
0
      connection_->dispatcher().run(Event::Dispatcher::RunType::Block);
134
0
    }
135
0
  } else {
136
0
    connection_->dispatcher().run(Event::Dispatcher::RunType::Block);
137
0
  }
138
139
0
  EXPECT_TRUE(payload_reader_->readLastByte());
140
0
}
141
142
0
void IntegrationTcpClient::readDisable(bool disabled) { connection_->readDisable(disabled); }
143
144
AssertionResult IntegrationTcpClient::write(const std::string& data, bool end_stream, bool verify,
145
5.07k
                                            std::chrono::milliseconds timeout) {
146
5.07k
  Event::TestTimeSystem::RealTimeBound bound(timeout);
147
5.07k
  Buffer::OwnedImpl buffer(data);
148
5.07k
  if (verify) {
149
0
    EXPECT_CALL(*client_write_buffer_, move(_));
150
0
    if (!data.empty()) {
151
0
      EXPECT_CALL(*client_write_buffer_, drain(_)).Times(AtLeast(1));
152
0
    }
153
0
  }
154
155
5.07k
  uint64_t bytes_expected = client_write_buffer_->bytesDrained() + data.size();
156
157
5.07k
  connection_->write(buffer, end_stream);
158
5.07k
  do {
159
5.07k
    connection_->dispatcher().run(Event::Dispatcher::RunType::NonBlock);
160
5.07k
    if (client_write_buffer_->bytesDrained() == bytes_expected || disconnected_) {
161
5.07k
      break;
162
5.07k
    }
163
5.07k
  } while (bound.withinBound());
164
165
5.07k
  if (!bound.withinBound()) {
166
0
    return AssertionFailure() << "Timed out completing write";
167
5.07k
  } else if (verify && (disconnected_ || client_write_buffer_->bytesDrained() != bytes_expected)) {
168
0
    return AssertionFailure()
169
0
           << "Failed to complete write or unexpected disconnect. disconnected_: " << disconnected_
170
0
           << " bytes_drained: " << client_write_buffer_->bytesDrained()
171
0
           << " bytes_expected: " << bytes_expected;
172
0
  }
173
174
5.07k
  return AssertionSuccess();
175
5.07k
}
176
177
2.83k
void IntegrationTcpClient::ConnectionCallbacks::onEvent(Network::ConnectionEvent event) {
178
2.83k
  if (event == Network::ConnectionEvent::RemoteClose) {
179
4
    parent_.disconnected_ = true;
180
4
    parent_.connection_->dispatcher().exit();
181
4
  }
182
2.83k
}
183
184
} // namespace Envoy