Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/common/tcp/conn_pool.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/common/tcp/conn_pool.h"
2
3
#include <memory>
4
5
#include "envoy/event/dispatcher.h"
6
#include "envoy/upstream/upstream.h"
7
8
#include "source/common/stats/timespan_impl.h"
9
#include "source/common/upstream/upstream_impl.h"
10
11
namespace Envoy {
12
namespace Tcp {
13
14
ActiveTcpClient::ActiveTcpClient(Envoy::ConnectionPool::ConnPoolImplBase& parent,
15
                                 const Upstream::HostConstSharedPtr& host,
16
                                 uint64_t concurrent_stream_limit,
17
                                 absl::optional<std::chrono::milliseconds> idle_timeout)
18
    : Envoy::ConnectionPool::ActiveClient(parent, host->cluster().maxRequestsPerConnection(),
19
                                          concurrent_stream_limit),
20
0
      parent_(parent), idle_timeout_(idle_timeout) {
21
0
  Upstream::Host::CreateConnectionData data = host->createConnection(
22
0
      parent_.dispatcher(), parent_.socketOptions(), parent_.transportSocketOptions());
23
0
  real_host_description_ = data.host_description_;
24
0
  connection_ = std::move(data.connection_);
25
0
  connection_->addConnectionCallbacks(*this);
26
0
  read_filter_handle_ = std::make_shared<ConnReadFilter>(*this);
27
0
  connection_->addReadFilter(read_filter_handle_);
28
0
  Upstream::ClusterTrafficStats& cluster_traffic_stats = *host->cluster().trafficStats();
29
0
  connection_->setConnectionStats({cluster_traffic_stats.upstream_cx_rx_bytes_total_,
30
0
                                   cluster_traffic_stats.upstream_cx_rx_bytes_buffered_,
31
0
                                   cluster_traffic_stats.upstream_cx_tx_bytes_total_,
32
0
                                   cluster_traffic_stats.upstream_cx_tx_bytes_buffered_,
33
0
                                   &cluster_traffic_stats.bind_errors_, nullptr});
34
0
  connection_->noDelay(true);
35
0
  connection_->connect();
36
37
0
  if (idle_timeout_.has_value()) {
38
0
    idle_timer_ = connection_->dispatcher().createTimer([this]() -> void { onIdleTimeout(); });
39
0
    setIdleTimer();
40
0
  }
41
0
}
42
43
0
ActiveTcpClient::~ActiveTcpClient() {
44
  // Handle the case where deferred delete results in the ActiveClient being destroyed before
45
  // TcpConnectionData. Make sure the TcpConnectionData will not refer to this ActiveTcpClient
46
  // and handle clean up normally done in clearCallbacks()
47
0
  if (tcp_connection_data_) {
48
0
    ASSERT(state() == ActiveClient::State::Closed);
49
0
    tcp_connection_data_->release();
50
0
    parent_.onStreamClosed(*this, true);
51
0
    parent_.checkForIdleAndCloseIdleConnsIfDraining();
52
0
  }
53
0
}
54
55
// Undo the readDisable done in onEvent(Connected) - now that there is an associated connection,
56
// drain any data.
57
0
void ActiveTcpClient::readEnableIfNew() {
58
  // It is expected for Envoy use of ActiveTcpClient this function only be
59
  // called once. Other users of the TcpConnPool may recycle Tcp connections,
60
  // and this safeguards them against read-enabling too many times.
61
0
  if (!associated_before_) {
62
0
    associated_before_ = true;
63
0
    connection_->readDisable(false);
64
    // Also while we're at it, make sure the connection will proxy all TCP
65
    // data before picking up a FIN.
66
0
    connection_->detectEarlyCloseWhenReadDisabled(false);
67
0
  }
68
0
}
69
70
0
void ActiveTcpClient::close() { connection_->close(Network::ConnectionCloseType::NoFlush); }
71
72
0
void ActiveTcpClient::clearCallbacks() {
73
0
  if (state() == Envoy::ConnectionPool::ActiveClient::State::Busy && parent_.hasPendingStreams()) {
74
0
    auto* pool = &parent_;
75
0
    pool->scheduleOnUpstreamReady();
76
0
  }
77
0
  callbacks_ = nullptr;
78
0
  tcp_connection_data_ = nullptr;
79
0
  parent_.onStreamClosed(*this, true);
80
0
  setIdleTimer();
81
0
  parent_.checkForIdleAndCloseIdleConnsIfDraining();
82
0
}
83
84
0
void ActiveTcpClient::onEvent(Network::ConnectionEvent event) {
85
  // If this is a newly established TCP connection, readDisable. This is to handle a race condition
86
  // for TCP for protocols like MySQL where the upstream writes first, and the data needs to be
87
  // preserved until a downstream connection is associated.
88
  // This is also necessary for prefetch to be used with such protocols.
89
0
  if (event == Network::ConnectionEvent::Connected) {
90
0
    connection_->readDisable(true);
91
0
  }
92
0
  ENVOY_BUG(event != Network::ConnectionEvent::ConnectedZeroRtt,
93
0
            "Unexpected 0-RTT event from the underlying TCP connection.");
94
0
  parent_.onConnectionEvent(*this, connection_->transportFailureReason(), event);
95
96
0
  if (event == Network::ConnectionEvent::LocalClose ||
97
0
      event == Network::ConnectionEvent::RemoteClose) {
98
0
    disableIdleTimer();
99
100
    // Do not pass the Connected event to any session which registered during onEvent above.
101
    // Consumers of connection pool connections assume they are receiving already connected
102
    // connections.
103
0
    if (callbacks_) {
104
0
      if (tcp_connection_data_) {
105
0
        Envoy::Upstream::reportUpstreamCxDestroyActiveRequest(parent_.host(), event);
106
0
      }
107
0
      callbacks_->onEvent(event);
108
      // After receiving a disconnect event, the owner of callbacks_ will likely self-destruct.
109
      // Clear the pointer to avoid using it again.
110
0
      callbacks_ = nullptr;
111
0
    }
112
0
  }
113
0
}
114
115
0
void ActiveTcpClient::onIdleTimeout() {
116
0
  ENVOY_CONN_LOG(debug, "per client idle timeout", *connection_);
117
0
  parent_.host()->cluster().trafficStats()->upstream_cx_idle_timeout_.inc();
118
0
  close();
119
0
}
120
121
0
void ActiveTcpClient::disableIdleTimer() {
122
0
  if (idle_timer_ != nullptr) {
123
0
    idle_timer_->disableTimer();
124
0
  }
125
0
}
126
127
0
void ActiveTcpClient::setIdleTimer() {
128
0
  if (idle_timer_ != nullptr) {
129
0
    ASSERT(idle_timeout_.has_value());
130
131
0
    idle_timer_->enableTimer(idle_timeout_.value());
132
0
  }
133
0
}
134
135
0
void ConnPoolImpl::drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) {
136
0
  drainConnectionsImpl(drain_behavior);
137
0
  if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) {
138
0
    return;
139
0
  }
140
  // Legacy behavior for the TCP connection pool marks all connecting clients
141
  // as draining.
142
0
  for (auto& connecting_client : connecting_clients_) {
143
0
    if (connecting_client->remaining_streams_ > 1) {
144
0
      uint64_t old_limit = connecting_client->effectiveConcurrentStreamLimit();
145
0
      connecting_client->remaining_streams_ = 1;
146
0
      if (connecting_client->effectiveConcurrentStreamLimit() < old_limit) {
147
0
        decrConnectingAndConnectedStreamCapacity(
148
0
            old_limit - connecting_client->effectiveConcurrentStreamLimit(), *connecting_client);
149
0
      }
150
0
    }
151
0
  }
152
0
}
153
154
0
void ConnPoolImpl::closeConnections() {
155
0
  for (auto* list : {&ready_clients_, &busy_clients_, &connecting_clients_}) {
156
0
    while (!list->empty()) {
157
0
      list->front()->close();
158
0
    }
159
0
  }
160
0
}
161
ConnectionPool::Cancellable*
162
0
ConnPoolImpl::newConnection(Tcp::ConnectionPool::Callbacks& callbacks) {
163
0
  TcpAttachContext context(&callbacks);
164
  // TLS early data over TCP is not supported yet.
165
0
  return newStreamImpl(context, /*can_send_early_data=*/false);
166
0
}
167
168
ConnectionPool::Cancellable*
169
ConnPoolImpl::newPendingStream(Envoy::ConnectionPool::AttachContext& context,
170
0
                               bool can_send_early_data) {
171
0
  Envoy::ConnectionPool::PendingStreamPtr pending_stream = std::make_unique<TcpPendingStream>(
172
0
      *this, can_send_early_data, typedContext<TcpAttachContext>(context));
173
0
  return addPendingStream(std::move(pending_stream));
174
0
}
175
176
0
Envoy::ConnectionPool::ActiveClientPtr ConnPoolImpl::instantiateActiveClient() {
177
0
  return std::make_unique<ActiveTcpClient>(*this, Envoy::ConnectionPool::ConnPoolImplBase::host(),
178
0
                                           1, idle_timeout_);
179
0
}
180
181
void ConnPoolImpl::onPoolReady(Envoy::ConnectionPool::ActiveClient& client,
182
0
                               Envoy::ConnectionPool::AttachContext& context) {
183
0
  ActiveTcpClient* tcp_client = static_cast<ActiveTcpClient*>(&client);
184
0
  tcp_client->readEnableIfNew();
185
0
  auto* callbacks = typedContext<TcpAttachContext>(context).callbacks_;
186
0
  std::unique_ptr<Envoy::Tcp::ConnectionPool::ConnectionData> connection_data =
187
0
      std::make_unique<ActiveTcpClient::TcpConnectionData>(*tcp_client, *tcp_client->connection_);
188
0
  callbacks->onPoolReady(std::move(connection_data), tcp_client->real_host_description_);
189
190
  // The tcp client is taken over. Stop the idle timer.
191
0
  if (!connection_data) {
192
0
    tcp_client->disableIdleTimer();
193
0
  }
194
0
}
195
196
void ConnPoolImpl::onPoolFailure(const Upstream::HostDescriptionConstSharedPtr& host_description,
197
                                 absl::string_view failure_reason,
198
                                 ConnectionPool::PoolFailureReason reason,
199
0
                                 Envoy::ConnectionPool::AttachContext& context) {
200
0
  auto* callbacks = typedContext<TcpAttachContext>(context).callbacks_;
201
0
  callbacks->onPoolFailure(reason, failure_reason, host_description);
202
0
}
203
204
} // namespace Tcp
205
} // namespace Envoy