Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/test/integration/integration_tcp_client.cc
Line
Count
Source (jump to first uncovered line)
1
#include "test/integration/integration_tcp_client.h"
2
3
#include <chrono>
4
#include <cstdint>
5
#include <functional>
6
#include <memory>
7
#include <string>
8
#include <type_traits>
9
10
#include "envoy/buffer/buffer.h"
11
#include "envoy/common/time.h"
12
#include "envoy/event/dispatcher.h"
13
#include "envoy/event/timer.h"
14
15
#include "source/common/buffer/buffer_impl.h"
16
#include "source/common/common/fmt.h"
17
#include "source/common/network/utility.h"
18
19
#include "test/integration/utility.h"
20
#include "test/mocks/buffer/mocks.h"
21
#include "test/test_common/network_utility.h"
22
#include "test/test_common/test_time_system.h"
23
24
#include "gmock/gmock.h"
25
#include "gtest/gtest.h"
26
27
namespace Envoy {
28
using ::testing::_;
29
using ::testing::AnyNumber;
30
using ::testing::AssertionFailure;
31
using ::testing::AssertionResult;
32
using ::testing::AssertionSuccess;
33
using ::testing::AtLeast;
34
using ::testing::Invoke;
35
using ::testing::NiceMock;
36
37
IntegrationTcpClient::IntegrationTcpClient(
38
    Event::Dispatcher& dispatcher, MockBufferFactory& factory, uint32_t port,
39
    Network::Address::IpVersion version, bool enable_half_close,
40
    const Network::ConnectionSocket::OptionsSharedPtr& options,
41
    Network::Address::InstanceConstSharedPtr source_address, absl::string_view destination_address)
42
    : payload_reader_(new WaitForPayloadReader(dispatcher)),
43
2.52k
      callbacks_(new ConnectionCallbacks(*this)) {
44
2.52k
  EXPECT_CALL(factory, createBuffer_(_, _, _))
45
2.52k
      .Times(AtLeast(1))
46
2.52k
      .WillOnce(Invoke([&](std::function<void()> below_low, std::function<void()> above_high,
47
2.52k
                           std::function<void()> above_overflow) -> Buffer::Instance* {
48
2.52k
        client_write_buffer_ =
49
2.52k
            new NiceMock<MockWatermarkBuffer>(below_low, above_high, above_overflow);
50
2.52k
        return client_write_buffer_;
51
2.52k
      }))
52
2.52k
      .WillRepeatedly(Invoke([](std::function<void()> below_low, std::function<void()> above_high,
53
2.52k
                                std::function<void()> above_overflow) -> Buffer::Instance* {
54
2.52k
        return new Buffer::WatermarkBuffer(below_low, above_high, above_overflow);
55
2.52k
      }));
56
57
2.52k
  connection_ = dispatcher.createClientConnection(
58
2.52k
      Network::Utility::resolveUrl(fmt::format(
59
2.52k
          "tcp://{}:{}",
60
2.52k
          destination_address.empty() ? Network::Test::getLoopbackAddressUrlString(version)
61
2.52k
                                      : destination_address,
62
2.52k
          port)),
63
2.52k
      source_address, Network::Test::createRawBufferSocket(), options, nullptr);
64
65
2.52k
  ON_CALL(*client_write_buffer_, drain(_))
66
2.52k
      .WillByDefault(Invoke(client_write_buffer_, &MockWatermarkBuffer::trackDrains));
67
2.52k
  EXPECT_CALL(*client_write_buffer_, drain(_)).Times(AnyNumber());
68
69
2.52k
  connection_->enableHalfClose(enable_half_close);
70
2.52k
  connection_->addConnectionCallbacks(*callbacks_);
71
2.52k
  connection_->addReadFilter(payload_reader_);
72
2.52k
  connection_->connect();
73
2.52k
}
74
75
2.59k
void IntegrationTcpClient::close() { connection_->close(Network::ConnectionCloseType::NoFlush); }
76
77
0
void IntegrationTcpClient::close(Network::ConnectionCloseType close_type) {
78
0
  connection_->close(close_type);
79
0
}
80
81
0
void IntegrationTcpClient::waitForData(const std::string& data, bool exact_match) {
82
0
  auto found = payload_reader_->data().find(data);
83
0
  if (found == 0 || (!exact_match && found != std::string::npos)) {
84
0
    return;
85
0
  }
86
87
0
  payload_reader_->setDataToWaitFor(data, exact_match);
88
0
  connection_->dispatcher().run(Event::Dispatcher::RunType::Block);
89
0
}
90
91
AssertionResult IntegrationTcpClient::waitForData(size_t length,
92
0
                                                  std::chrono::milliseconds timeout) {
93
0
  if (payload_reader_->data().size() >= length) {
94
0
    return AssertionSuccess();
95
0
  }
96
97
0
  return payload_reader_->waitForLength(length, timeout);
98
0
}
99
100
0
void IntegrationTcpClient::waitForDisconnect(bool ignore_spurious_events) {
101
0
  if (disconnected_) {
102
0
    return;
103
0
  }
104
0
  Event::TimerPtr timeout_timer =
105
0
      connection_->dispatcher().createTimer([this]() -> void { connection_->dispatcher().exit(); });
106
0
  timeout_timer->enableTimer(TestUtility::DefaultTimeout);
107
108
0
  if (ignore_spurious_events) {
109
0
    while (!disconnected_ && timeout_timer->enabled()) {
110
0
      connection_->dispatcher().run(Event::Dispatcher::RunType::Block);
111
0
    }
112
0
  } else {
113
0
    connection_->dispatcher().run(Event::Dispatcher::RunType::Block);
114
0
  }
115
0
  EXPECT_TRUE(disconnected_);
116
0
}
117
118
0
void IntegrationTcpClient::waitForHalfClose(bool ignore_spurious_events) {
119
0
  waitForHalfClose(TestUtility::DefaultTimeout, ignore_spurious_events);
120
0
}
121
122
void IntegrationTcpClient::waitForHalfClose(std::chrono::milliseconds timeout,
123
0
                                            bool ignore_spurious_events) {
124
0
  if (payload_reader_->readLastByte()) {
125
0
    return;
126
0
  }
127
0
  Event::TimerPtr timeout_timer =
128
0
      connection_->dispatcher().createTimer([this]() -> void { connection_->dispatcher().exit(); });
129
0
  timeout_timer->enableTimer(timeout);
130
131
0
  if (ignore_spurious_events) {
132
0
    while (!payload_reader_->readLastByte() && timeout_timer->enabled()) {
133
0
      connection_->dispatcher().run(Event::Dispatcher::RunType::Block);
134
0
    }
135
0
  } else {
136
0
    connection_->dispatcher().run(Event::Dispatcher::RunType::Block);
137
0
  }
138
139
0
  EXPECT_TRUE(payload_reader_->readLastByte());
140
0
}
141
142
0
void IntegrationTcpClient::readDisable(bool disabled) { connection_->readDisable(disabled); }
143
144
AssertionResult IntegrationTcpClient::write(const std::string& data, bool end_stream, bool verify,
145
6.05k
                                            std::chrono::milliseconds timeout) {
146
6.05k
  Event::TestTimeSystem::RealTimeBound bound(timeout);
147
6.05k
  Buffer::OwnedImpl buffer(data);
148
6.05k
  if (verify) {
149
0
    EXPECT_CALL(*client_write_buffer_, move(_));
150
0
    if (!data.empty()) {
151
0
      EXPECT_CALL(*client_write_buffer_, drain(_)).Times(AtLeast(1));
152
0
    }
153
0
  }
154
155
6.05k
  uint64_t bytes_expected = client_write_buffer_->bytesDrained() + data.size();
156
157
6.05k
  connection_->write(buffer, end_stream);
158
10.3k
  do {
159
10.3k
    connection_->dispatcher().run(Event::Dispatcher::RunType::NonBlock);
160
10.3k
    if (client_write_buffer_->bytesDrained() == bytes_expected || disconnected_) {
161
6.05k
      break;
162
6.05k
    }
163
10.3k
  } while (bound.withinBound());
164
165
6.05k
  if (!bound.withinBound()) {
166
0
    return AssertionFailure() << "Timed out completing write";
167
6.05k
  } else if (verify && (disconnected_ || client_write_buffer_->bytesDrained() != bytes_expected)) {
168
0
    return AssertionFailure()
169
0
           << "Failed to complete write or unexpected disconnect. disconnected_: " << disconnected_
170
0
           << " bytes_drained: " << client_write_buffer_->bytesDrained()
171
0
           << " bytes_expected: " << bytes_expected;
172
0
  }
173
174
6.05k
  return AssertionSuccess();
175
6.05k
}
176
177
3.23k
void IntegrationTcpClient::ConnectionCallbacks::onEvent(Network::ConnectionEvent event) {
178
3.23k
  if (event == Network::ConnectionEvent::RemoteClose) {
179
12
    parent_.disconnected_ = true;
180
12
    parent_.connection_->dispatcher().exit();
181
12
  }
182
3.23k
}
183
184
} // namespace Envoy