1
#include "source/common/tcp/conn_pool.h"
2

            
3
#include <memory>
4

            
5
#include "envoy/event/dispatcher.h"
6
#include "envoy/upstream/upstream.h"
7

            
8
#include "source/common/stats/timespan_impl.h"
9
#include "source/common/upstream/upstream_impl.h"
10

            
11
namespace Envoy {
12
namespace Tcp {
13

            
14
ActiveTcpClient::ActiveTcpClient(Envoy::ConnectionPool::ConnPoolImplBase& parent,
15
                                 const Upstream::HostConstSharedPtr& host,
16
                                 uint32_t concurrent_stream_limit,
17
                                 absl::optional<std::chrono::milliseconds> idle_timeout)
18
2167
    : Envoy::ConnectionPool::ActiveClient(parent, host->cluster().maxRequestsPerConnection(),
19
2167
                                          concurrent_stream_limit),
20
2167
      parent_(parent), idle_timeout_(idle_timeout) {
21
2167
  Upstream::Host::CreateConnectionData data = host->createConnection(
22
2167
      parent_.dispatcher(), parent_.socketOptions(), parent_.transportSocketOptions());
23
2167
  real_host_description_ = data.host_description_;
24
2167
  connection_ = std::move(data.connection_);
25
2167
  connection_->addConnectionCallbacks(*this);
26
2167
  read_filter_handle_ = std::make_shared<ConnReadFilter>(*this);
27
2167
  connection_->addReadFilter(read_filter_handle_);
28
2167
  Upstream::ClusterTrafficStats& cluster_traffic_stats = *host->cluster().trafficStats();
29
2167
  connection_->setConnectionStats({cluster_traffic_stats.upstream_cx_rx_bytes_total_,
30
2167
                                   cluster_traffic_stats.upstream_cx_rx_bytes_buffered_,
31
2167
                                   cluster_traffic_stats.upstream_cx_tx_bytes_total_,
32
2167
                                   cluster_traffic_stats.upstream_cx_tx_bytes_buffered_,
33
2167
                                   &cluster_traffic_stats.bind_errors_, nullptr});
34
2167
  connection_->noDelay(true);
35
2167
  connection_->connect();
36

            
37
2167
  if (idle_timeout_.has_value()) {
38
2067
    idle_timer_ = connection_->dispatcher().createTimer([this]() -> void { onIdleTimeout(); });
39
2067
    setIdleTimer();
40
2067
  }
41
2167
}
42

            
43
2167
ActiveTcpClient::~ActiveTcpClient() {
44
  // Handle the case where deferred delete results in the ActiveClient being destroyed before
45
  // TcpConnectionData. Make sure the TcpConnectionData will not refer to this ActiveTcpClient
46
  // and handle clean up normally done in clearCallbacks()
47
2167
  if (tcp_connection_data_) {
48
9
    ASSERT(state() == ActiveClient::State::Closed);
49
9
    tcp_connection_data_->release();
50
9
    parent_.onStreamClosed(*this, true);
51
9
    parent_.checkForIdleAndCloseIdleConnsIfDraining();
52
9
  }
53
2167
}
54

            
55
// Undo the readDisable done in onEvent(Connected) - now that there is an associated connection,
56
// drain any data.
57
2042
void ActiveTcpClient::readEnableIfNew() {
58
  // It is expected for Envoy use of ActiveTcpClient this function only be
59
  // called once. Other users of the TcpConnPool may recycle Tcp connections,
60
  // and this safeguards them against read-enabling too many times.
61
2042
  if (!associated_before_) {
62
2037
    associated_before_ = true;
63
2037
    connection_->readDisable(false);
64
    // Also while we're at it, make sure the connection will proxy all TCP
65
    // data before picking up a FIN.
66
2037
    connection_->detectEarlyCloseWhenReadDisabled(false);
67
2037
  }
68
2042
}
69

            
70
24
void ActiveTcpClient::close(Network::ConnectionCloseType type, absl::string_view details) {
71
24
  connection_->close(type, details);
72
24
}
73

            
74
2033
void ActiveTcpClient::clearCallbacks() {
75
2033
  if (state() == Envoy::ConnectionPool::ActiveClient::State::Busy && parent_.hasPendingStreams()) {
76
55
    auto* pool = &parent_;
77
55
    pool->scheduleOnUpstreamReady();
78
55
  }
79
2033
  callbacks_ = nullptr;
80
2033
  tcp_connection_data_ = nullptr;
81
2033
  parent_.onStreamClosed(*this, true);
82
2033
  setIdleTimer();
83
2033
  parent_.checkForIdleAndCloseIdleConnsIfDraining();
84
2033
}
85

            
86
4246
void ActiveTcpClient::onEvent(Network::ConnectionEvent event) {
87
  // If this is a newly established TCP connection, readDisable. This is to handle a race condition
88
  // for TCP for protocols like MySQL where the upstream writes first, and the data needs to be
89
  // preserved until a downstream connection is associated.
90
  // This is also necessary for prefetch to be used with such protocols.
91
4246
  if (event == Network::ConnectionEvent::Connected) {
92
2138
    connection_->readDisable(true);
93
2138
  }
94
4246
  ENVOY_BUG(event != Network::ConnectionEvent::ConnectedZeroRtt,
95
4246
            "Unexpected 0-RTT event from the underlying TCP connection.");
96
4246
  parent_.onConnectionEvent(*this, connection_->transportFailureReason(), event);
97

            
98
4246
  if (event == Network::ConnectionEvent::LocalClose ||
99
4246
      event == Network::ConnectionEvent::RemoteClose) {
100
2108
    disableIdleTimer();
101

            
102
    // Do not pass the Connected event to any session which registered during onEvent above.
103
    // Consumers of connection pool connections assume they are receiving already connected
104
    // connections.
105
2108
    if (callbacks_) {
106
1775
      if (tcp_connection_data_) {
107
1775
        Envoy::Upstream::reportUpstreamCxDestroyActiveRequest(parent_.host(), event);
108
1775
      }
109
1775
      callbacks_->onEvent(event);
110
      // After receiving a disconnect event, the owner of callbacks_ will likely self-destruct.
111
      // Clear the pointer to avoid using it again.
112
1775
      callbacks_ = nullptr;
113
1775
    }
114
2108
  }
115
4246
}
116

            
117
3
void ActiveTcpClient::onIdleTimeout() {
118
3
  ENVOY_CONN_LOG(debug, "per client idle timeout", *connection_);
119
3
  parent_.host()->cluster().trafficStats()->upstream_cx_idle_timeout_.inc();
120
3
  close();
121
3
}
122

            
123
4150
void ActiveTcpClient::disableIdleTimer() {
124
4150
  if (idle_timer_ != nullptr) {
125
4083
    idle_timer_->disableTimer();
126
4083
  }
127
4150
}
128

            
129
4100
void ActiveTcpClient::setIdleTimer() {
130
4100
  if (idle_timer_ != nullptr) {
131
4077
    ASSERT(idle_timeout_.has_value());
132

            
133
4077
    idle_timer_->enableTimer(idle_timeout_.value());
134
4077
  }
135
4100
}
136

            
137
7
void ConnPoolImpl::drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) {
138
7
  drainConnectionsImpl(drain_behavior);
139
7
  if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) {
140
4
    return;
141
4
  }
142
  // Legacy behavior for the TCP connection pool marks all connecting clients
143
  // as draining.
144
4
  for (auto& connecting_client : connecting_clients_) {
145
3
    if (connecting_client->remaining_streams_ > 1) {
146
3
      uint64_t old_limit = connecting_client->effectiveConcurrentStreamLimit();
147
3
      connecting_client->remaining_streams_ = 1;
148
3
      if (connecting_client->effectiveConcurrentStreamLimit() < old_limit) {
149
        decrConnectingAndConnectedStreamCapacity(
150
            old_limit - connecting_client->effectiveConcurrentStreamLimit(), *connecting_client);
151
      }
152
3
    }
153
3
  }
154
3
}
155

            
156
1
void ConnPoolImpl::closeConnections() {
157
3
  for (auto* list : {&ready_clients_, &busy_clients_, &connecting_clients_}) {
158
4
    while (!list->empty()) {
159
1
      list->front()->close();
160
1
    }
161
3
  }
162
1
}
163
ConnectionPool::Cancellable*
164
2243
ConnPoolImpl::newConnection(Tcp::ConnectionPool::Callbacks& callbacks) {
165
2243
  TcpAttachContext context(&callbacks);
166
  // TLS early data over TCP is not supported yet.
167
2243
  return newStreamImpl(context, /*can_send_early_data=*/false);
168
2243
}
169

            
170
ConnectionPool::Cancellable*
171
ConnPoolImpl::newPendingStream(Envoy::ConnectionPool::AttachContext& context,
172
2218
                               bool can_send_early_data) {
173
2218
  Envoy::ConnectionPool::PendingStreamPtr pending_stream = std::make_unique<TcpPendingStream>(
174
2218
      *this, can_send_early_data, typedContext<TcpAttachContext>(context));
175
2218
  return addPendingStream(std::move(pending_stream));
176
2218
}
177

            
178
2067
Envoy::ConnectionPool::ActiveClientPtr ConnPoolImpl::instantiateActiveClient() {
179
2067
  return std::make_unique<ActiveTcpClient>(*this, Envoy::ConnectionPool::ConnPoolImplBase::host(),
180
2067
                                           1, idle_timeout_);
181
2067
}
182

            
183
void ConnPoolImpl::onPoolReady(Envoy::ConnectionPool::ActiveClient& client,
184
2042
                               Envoy::ConnectionPool::AttachContext& context) {
185
2042
  ActiveTcpClient* tcp_client = static_cast<ActiveTcpClient*>(&client);
186
2042
  tcp_client->readEnableIfNew();
187
2042
  auto* callbacks = typedContext<TcpAttachContext>(context).callbacks_;
188
2042
  std::unique_ptr<Envoy::Tcp::ConnectionPool::ConnectionData> connection_data =
189
2042
      std::make_unique<ActiveTcpClient::TcpConnectionData>(*tcp_client, *tcp_client->connection_);
190
2042
  callbacks->onPoolReady(std::move(connection_data), tcp_client->real_host_description_);
191

            
192
  // The tcp client is taken over. Stop the idle timer.
193
2042
  if (!connection_data) {
194
2042
    tcp_client->disableIdleTimer();
195
2042
  }
196
2042
}
197

            
198
void ConnPoolImpl::onPoolFailure(const Upstream::HostDescriptionConstSharedPtr& host_description,
199
                                 absl::string_view failure_reason,
200
                                 ConnectionPool::PoolFailureReason reason,
201
16
                                 Envoy::ConnectionPool::AttachContext& context) {
202
16
  auto* callbacks = typedContext<TcpAttachContext>(context).callbacks_;
203
16
  callbacks->onPoolFailure(reason, failure_reason, host_description);
204
16
}
205

            
206
} // namespace Tcp
207
} // namespace Envoy