Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/common/tcp/conn_pool.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include <list>
4
#include <memory>
5
6
#include "envoy/event/deferred_deletable.h"
7
#include "envoy/event/timer.h"
8
#include "envoy/network/connection.h"
9
#include "envoy/network/filter.h"
10
#include "envoy/stats/timespan.h"
11
#include "envoy/tcp/conn_pool.h"
12
#include "envoy/upstream/upstream.h"
13
14
#include "source/common/common/linked_object.h"
15
#include "source/common/common/logger.h"
16
#include "source/common/http/conn_pool_base.h"
17
#include "source/common/network/filter_impl.h"
18
#include "source/common/runtime/runtime_features.h"
19
20
namespace Envoy {
21
namespace Tcp {
22
23
class ConnPoolImpl;
24
25
struct TcpAttachContext : public Envoy::ConnectionPool::AttachContext {
26
0
  TcpAttachContext(Tcp::ConnectionPool::Callbacks* callbacks) : callbacks_(callbacks) {}
27
  Tcp::ConnectionPool::Callbacks* callbacks_;
28
};
29
30
class TcpPendingStream : public Envoy::ConnectionPool::PendingStream {
31
public:
32
  TcpPendingStream(Envoy::ConnectionPool::ConnPoolImplBase& parent, bool can_send_early_data,
33
                   TcpAttachContext& context)
34
0
      : Envoy::ConnectionPool::PendingStream(parent, can_send_early_data), context_(context) {}
35
0
  Envoy::ConnectionPool::AttachContext& context() override { return context_; }
36
37
  TcpAttachContext context_;
38
};
39
40
class ActiveTcpClient : public Envoy::ConnectionPool::ActiveClient {
41
public:
42
  struct ConnReadFilter : public Network::ReadFilterBaseImpl {
43
0
    ConnReadFilter(ActiveTcpClient& parent) : parent_(parent) {}
44
45
    // Network::ReadFilter
46
0
    Network::FilterStatus onData(Buffer::Instance& data, bool end_stream) override {
47
0
      parent_.onUpstreamData(data, end_stream);
48
0
      return Network::FilterStatus::StopIteration;
49
0
    }
50
    ActiveTcpClient& parent_;
51
  };
52
53
  // This acts as the bridge between the ActiveTcpClient and an individual TCP connection.
54
  class TcpConnectionData : public Envoy::Tcp::ConnectionPool::ConnectionData {
55
  public:
56
    TcpConnectionData(ActiveTcpClient& parent, Network::ClientConnection& connection)
57
0
        : parent_(&parent), connection_(connection) {
58
0
      parent_->tcp_connection_data_ = this;
59
0
    }
60
0
    ~TcpConnectionData() override {
61
      // Generally it is the case that TcpConnectionData will be destroyed before the
62
      // ActiveTcpClient. Because ordering on the deferred delete list is not guaranteed in the
63
      // case of a disconnect, make sure parent_ is valid before doing clean-up.
64
0
      if (parent_) {
65
0
        parent_->clearCallbacks();
66
0
      }
67
0
    }
68
69
0
    Network::ClientConnection& connection() override { return connection_; }
70
0
    void setConnectionState(ConnectionPool::ConnectionStatePtr&& state) override {
71
0
      parent_->connection_state_ = std::move(state);
72
0
    }
73
74
0
    void addUpstreamCallbacks(ConnectionPool::UpstreamCallbacks& callbacks) override {
75
0
      parent_->callbacks_ = &callbacks;
76
0
    }
77
0
    void release() { parent_ = nullptr; }
78
79
  protected:
80
0
    ConnectionPool::ConnectionState* connectionState() override {
81
0
      return parent_->connection_state_.get();
82
0
    }
83
84
  private:
85
    ActiveTcpClient* parent_;
86
    Network::ClientConnection& connection_;
87
  };
88
89
  ActiveTcpClient(Envoy::ConnectionPool::ConnPoolImplBase& parent,
90
                  const Upstream::HostConstSharedPtr& host, uint64_t concurrent_stream_limit,
91
                  absl::optional<std::chrono::milliseconds> idle_timeout);
92
  ~ActiveTcpClient() override;
93
94
  // Override the default's of Envoy::ConnectionPool::ActiveClient for class-specific functions.
95
  // Network::ConnectionCallbacks
96
  void onEvent(Network::ConnectionEvent event) override;
97
0
  void onAboveWriteBufferHighWatermark() override { callbacks_->onAboveWriteBufferHighWatermark(); }
98
0
  void onBelowWriteBufferLowWatermark() override { callbacks_->onBelowWriteBufferLowWatermark(); }
99
100
  // Undo the readDisable done in onEvent(Connected) - now that there is an associated connection,
101
  // drain any data.
102
0
  void readEnableIfNew() {
103
    // It is expected for Envoy use of ActiveTcpClient this function only be
104
    // called once. Other users of the TcpConnPool may recycle Tcp connections,
105
    // and this safeguards them against read-enabling too many times.
106
0
    if (!associated_before_) {
107
0
      associated_before_ = true;
108
0
      connection_->readDisable(false);
109
      // Also while we're at it, make sure the connection will proxy all TCP
110
      // data before picking up a FIN.
111
0
      connection_->detectEarlyCloseWhenReadDisabled(false);
112
0
    }
113
0
  }
114
115
0
  void initializeReadFilters() override { connection_->initializeReadFilters(); }
116
0
  absl::optional<Http::Protocol> protocol() const override { return {}; }
117
  void close() override;
118
0
  uint32_t numActiveStreams() const override { return callbacks_ ? 1 : 0; }
119
0
  bool closingWithIncompleteStream() const override { return false; }
120
0
  uint64_t id() const override { return connection_->id(); }
121
122
0
  void onUpstreamData(Buffer::Instance& data, bool end_stream) {
123
0
    if (callbacks_) {
124
0
      callbacks_->onUpstreamData(data, end_stream);
125
0
    } else {
126
0
      close();
127
0
    }
128
0
  }
129
  virtual void clearCallbacks();
130
131
  // Called if the underlying connection is idle over the cluster's tcpPoolIdleTimeout()
132
  void onIdleTimeout();
133
  void disableIdleTimer();
134
  void setIdleTimer();
135
136
  std::shared_ptr<ConnReadFilter> read_filter_handle_;
137
  Envoy::ConnectionPool::ConnPoolImplBase& parent_;
138
  ConnectionPool::UpstreamCallbacks* callbacks_{};
139
  Network::ClientConnectionPtr connection_;
140
  ConnectionPool::ConnectionStatePtr connection_state_;
141
  TcpConnectionData* tcp_connection_data_{};
142
  bool associated_before_{};
143
  absl::optional<std::chrono::milliseconds> idle_timeout_;
144
  Event::TimerPtr idle_timer_;
145
};
146
147
class ConnPoolImpl : public Envoy::ConnectionPool::ConnPoolImplBase,
148
                     public Tcp::ConnectionPool::Instance {
149
public:
150
  ConnPoolImpl(Event::Dispatcher& dispatcher, Upstream::HostConstSharedPtr host,
151
               Upstream::ResourcePriority priority,
152
               const Network::ConnectionSocket::OptionsSharedPtr& options,
153
               Network::TransportSocketOptionsConstSharedPtr transport_socket_options,
154
               Upstream::ClusterConnectivityState& state,
155
               absl::optional<std::chrono::milliseconds> idle_timeout)
156
      : Envoy::ConnectionPool::ConnPoolImplBase(host, priority, dispatcher, options,
157
                                                transport_socket_options, state),
158
0
        idle_timeout_(idle_timeout) {}
159
0
  ~ConnPoolImpl() override { destructAllConnections(); }
160
161
  // Event::DeferredDeletable
162
0
  void deleteIsPending() override { deleteIsPendingImpl(); }
163
164
0
  void addIdleCallback(IdleCb cb) override { addIdleCallbackImpl(cb); }
165
0
  bool isIdle() const override { return isIdleImpl(); }
166
0
  void drainConnections(Envoy::ConnectionPool::DrainBehavior drain_behavior) override {
167
0
    drainConnectionsImpl(drain_behavior);
168
0
    if (drain_behavior == Envoy::ConnectionPool::DrainBehavior::DrainAndDelete) {
169
0
      return;
170
0
    }
171
    // Legacy behavior for the TCP connection pool marks all connecting clients
172
    // as draining.
173
0
    for (auto& connecting_client : connecting_clients_) {
174
0
      if (connecting_client->remaining_streams_ > 1) {
175
0
        uint64_t old_limit = connecting_client->effectiveConcurrentStreamLimit();
176
0
        connecting_client->remaining_streams_ = 1;
177
0
        if (connecting_client->effectiveConcurrentStreamLimit() < old_limit) {
178
0
          decrConnectingAndConnectedStreamCapacity(
179
0
              old_limit - connecting_client->effectiveConcurrentStreamLimit(), *connecting_client);
180
0
        }
181
0
      }
182
0
    }
183
0
  }
184
185
0
  void closeConnections() override {
186
0
    for (auto* list : {&ready_clients_, &busy_clients_, &connecting_clients_}) {
187
0
      while (!list->empty()) {
188
0
        list->front()->close();
189
0
      }
190
0
    }
191
0
  }
192
0
  ConnectionPool::Cancellable* newConnection(Tcp::ConnectionPool::Callbacks& callbacks) override {
193
0
    TcpAttachContext context(&callbacks);
194
    // TLS early data over TCP is not supported yet.
195
0
    return newStreamImpl(context, /*can_send_early_data=*/false);
196
0
  }
197
0
  bool maybePreconnect(float preconnect_ratio) override {
198
0
    return maybePreconnectImpl(preconnect_ratio);
199
0
  }
200
201
  ConnectionPool::Cancellable* newPendingStream(Envoy::ConnectionPool::AttachContext& context,
202
0
                                                bool can_send_early_data) override {
203
0
    Envoy::ConnectionPool::PendingStreamPtr pending_stream = std::make_unique<TcpPendingStream>(
204
0
        *this, can_send_early_data, typedContext<TcpAttachContext>(context));
205
0
    return addPendingStream(std::move(pending_stream));
206
0
  }
207
208
0
  Upstream::HostDescriptionConstSharedPtr host() const override {
209
0
    return Envoy::ConnectionPool::ConnPoolImplBase::host();
210
0
  }
211
212
0
  Envoy::ConnectionPool::ActiveClientPtr instantiateActiveClient() override {
213
0
    return std::make_unique<ActiveTcpClient>(*this, Envoy::ConnectionPool::ConnPoolImplBase::host(),
214
0
                                             1, idle_timeout_);
215
0
  }
216
217
  void onPoolReady(Envoy::ConnectionPool::ActiveClient& client,
218
0
                   Envoy::ConnectionPool::AttachContext& context) override {
219
0
    ActiveTcpClient* tcp_client = static_cast<ActiveTcpClient*>(&client);
220
0
    tcp_client->readEnableIfNew();
221
0
    auto* callbacks = typedContext<TcpAttachContext>(context).callbacks_;
222
0
    std::unique_ptr<Envoy::Tcp::ConnectionPool::ConnectionData> connection_data =
223
0
        std::make_unique<ActiveTcpClient::TcpConnectionData>(*tcp_client, *tcp_client->connection_);
224
0
    callbacks->onPoolReady(std::move(connection_data), tcp_client->real_host_description_);
225
226
    // The tcp client is taken over. Stop the idle timer.
227
0
    if (!connection_data) {
228
0
      tcp_client->disableIdleTimer();
229
0
    }
230
0
  }
231
232
  void onPoolFailure(const Upstream::HostDescriptionConstSharedPtr& host_description,
233
                     absl::string_view failure_reason, ConnectionPool::PoolFailureReason reason,
234
0
                     Envoy::ConnectionPool::AttachContext& context) override {
235
0
    auto* callbacks = typedContext<TcpAttachContext>(context).callbacks_;
236
0
    callbacks->onPoolFailure(reason, failure_reason, host_description);
237
0
  }
238
239
0
  bool enforceMaxRequests() const override { return false; }
240
  // These two functions exist for testing parity between old and new Tcp Connection Pools.
241
0
  virtual void onConnReleased(Envoy::ConnectionPool::ActiveClient&) {}
242
0
  virtual void onConnDestroyed() {}
243
244
  absl::optional<std::chrono::milliseconds> idle_timeout_;
245
};
246
247
} // namespace Tcp
248
} // namespace Envoy